mcp测试案例
This commit is contained in:
34
examples/mcp_adapters/server/weather_server.py
Normal file
34
examples/mcp_adapters/server/weather_server.py
Normal file
@@ -0,0 +1,34 @@
|
||||
from fastapi import FastAPI
|
||||
from mcp.server.fastmcp import FastMCP
|
||||
import asyncio
|
||||
from contextlib import asynccontextmanager
|
||||
|
||||
# 创建 FastAPI 应用
|
||||
app = FastAPI()
|
||||
|
||||
# 创建 FastMCP 服务器
|
||||
mcp = FastMCP("Weather")
|
||||
|
||||
# 定义工具:获取天气
|
||||
@mcp.tool()
|
||||
async def get_weather(location: str) -> str:
|
||||
"""返回指定位置的天气情况"""
|
||||
return f"The weather in {location} is sunny!"
|
||||
|
||||
# 使用 FastAPI 的 lifespan 来启动 MCP 服务器
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
# 在应用启动时启动 MCP 服务
|
||||
loop = asyncio.get_event_loop()
|
||||
task = loop.create_task(mcp.run_streamable_http_async())
|
||||
yield
|
||||
# 在应用关闭时停止 MCP 服务
|
||||
task.cancel()
|
||||
|
||||
# 将 lifespan 事件添加到 FastAPI 应用
|
||||
app = FastAPI(lifespan=lifespan)
|
||||
|
||||
# 启动 FastAPI 应用
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
uvicorn.run(app, host="0.0.0.0", port=8000)
|
||||
Reference in New Issue
Block a user