add mcp use method

This commit is contained in:
2025-09-01 11:59:17 +08:00
parent 9435767fb1
commit 87a7f3f9df
7 changed files with 559 additions and 0 deletions

View File

@@ -0,0 +1,214 @@
# MCP 集成三种用法总览
本节汇总了在 **LangGraph + Qwen 自定义模型(`ChatQwenOpenAICompat`** 下使用 MCP 工具的三种常见模式:**ReAct 代理**、**直接工具循环**、**计划-执行Plan & Execute**。每种模式都支持从 **代码**、**配置文件JSON/YAML** 或 **环境变量** 加载 MCP 服务器;既可连接本地 `stdio`(如 `npx`/`python` 启动的 MCP server也可连接远程 **HTTP streamable** 服务器。
---
## 配置来源与优先级
可通过任意方式声明 MCP 服务器,内部合并优先级如下(后者覆盖前者同名项):
1. **配置文件**`config_path``MCP_CONFIG_PATH`JSON/YAML
2. **环境变量**`MCP_SERVERS_JSON`JSON 字符串)
3. **代码传入**`servers: Dict[str, Dict[str, Any]]`
4. **兜底**:本地 `weather` HTTP server`http://127.0.0.1:8000/mcp/``transport=streamable_http`
**示例JSON 配置(`mcp_servers.json`**
```json
{
"servers": {
"weather": {
"url": "http://127.0.0.1:8000/mcp/",
"transport": "streamable_http"
},
"airbnb": {
"command": "npx",
"args": ["-y", "@openbnb/mcp-server-airbnb"],
"transport": "stdio"
}
}
}
```
**示例YAML 配置(`mcp_servers.yaml`**
```yaml
servers:
weather:
url: http://127.0.0.1:8000/mcp/
transport: streamable_http
playwright:
command: npx
args: ["@playwright/mcp@latest"]
transport: stdio
env:
DISPLAY: ":1"
```
> 环境变量(任选):
> - `MCP_CONFIG_PATH=./mcp_servers.yaml`
> - `MCP_SERVERS_JSON='{"weather":{"url":"http://127.0.0.1:8000/mcp/","transport":"streamable_http"}}'`
---
## 用法一ReAct 代理(最简单、默认推荐)
**特点**
- 一行创建:自动把 MCP 工具注入 LangGraph 的 **ReAct** 代理(`ToolNode` 自动执行工具)。
- 支持 **多次工具调用**;通过 `config={"recursion_limit": N}` **显式限制步数**
- 适合“让模型自由决策何时调用哪个工具”的通用智能体场景。
**典型场景**
- 对话检索、计划+查证、简单自动化任务编排。
- 工具数量较多、先不想自己写调用逻辑。
**最小示例**
```python
from langchain_core.messages import HumanMessage
from langgraph_qwen.mcp import create_qwen_agent_with_mcp_async
SERVERS = {
"weather": {"url": "http://127.0.0.1:8000/mcp/", "transport": "streamable_http"},
# "airbnb": {"command":"npx","args":["-y","@openbnb/mcp-server-airbnb"],"transport":"stdio"},
}
agent = await create_qwen_agent_with_mcp_async(
servers=SERVERS, # 或者传 config_path / 用 MCP_SERVERS_JSON
tool_choice="auto", # 也可 "none" 做两阶段(先思考再注入)
)
res = await agent.ainvoke(
{"messages": [HumanMessage(content="列出可用工具,演示一次调用并总结。")]},
config={"recursion_limit": 6} # ★ 控制最大工具交互步数
)
print(res["messages"][-1].content)
```
---
## 用法二:直接工具循环(完全可控)
**特点**
- 不用 LangGraph 的 ToolNode由你在循环里**手动执行**工具(识别 `AIMessage.tool_calls`,再调用 `tool.invoke/ainvoke`,最后用 `ToolMessage` 回传)。
- 对**每一步**是否调用工具、如何合并结果、失败如何重试,有 **100%** 控制权。
- 适合需要严格可控的业务流程、精细化容错与审计。
**典型场景**
- 合规/金融/科研等对工具副作用与审计有严格要求的系统。
- 多工具“串行+并行”的复杂编排、**阶段性切换工具集**。
**最小示例**
```python
from langchain_core.messages import HumanMessage, ToolMessage
from langgraph_qwen.chat_model import ChatQwenOpenAICompat
from langgraph_qwen.mcp import load_mcp_tools
tools = await load_mcp_tools(servers={
"math": {"command":"python","args":["/abs/path/to/math_server.py"],"transport":"stdio"},
"weather":{"url":"http://127.0.0.1:8000/mcp/","transport":"streamable_http"},
})
model = ChatQwenOpenAICompat(temperature=0).bind_tools(tools).bind(tool_choice="auto")
tool_map = {t.name: t for t in tools}
msgs = [HumanMessage(content="先算 12*(3+5),再查北京天气,最后总结。")]
for _ in range(8): # ★ 最大步骤
ai = await model.ainvoke(msgs)
msgs.append(ai)
calls = getattr(ai, "tool_calls", []) or ai.additional_kwargs.get("tool_calls", [])
if not calls:
break
for call in calls:
name, args, call_id = call["name"], call.get("args", {}), call.get("id") or ""
tool = tool_map.get(name)
if not tool:
msgs.append(ToolMessage(tool_call_id=call_id, content=f"Unknown tool: {name}"))
continue
out = await tool.ainvoke(args) if hasattr(tool, "ainvoke") else tool.invoke(args)
msgs.append(ToolMessage(tool_call_id=call_id, content=str(out)))
final = await model.ainvoke(msgs)
print(final.content)
```
---
## 用法三:计划-执行Plan & Execute动态工具集
**特点**
- 第一步用模型把任务拆解为**多个步骤**Planner
- 对每个步骤,可**动态选择/切换工具集**(可按阶段加载不同的 MCP 配置)。
- 执行器部分类似“直接工具循环”,可精细控制每步最多调用几次工具。
**典型场景**
- 旅行/采购/研究类 **多阶段任务**:先找资源,再比较价格,再路线规划,再汇总。
- 不同阶段接入不同 MCP 工具Airbnb → Google Maps → Weather
**最小示例(摘录)**
```python
from langchain_core.messages import HumanMessage
from langgraph_qwen.chat_model import ChatQwenOpenAICompat
from langgraph_qwen.mcp import load_mcp_tools
# 规划
planner = ChatQwenOpenAICompat(temperature=0)
steps_ai = await planner.ainvoke([HumanMessage(content="把任务拆成可执行步骤(每行一步):...")])
steps = [s for s in str(steps_ai.content).splitlines() if s][:8]
# 每步执行(动态加载工具集)
for i, step in enumerate(steps, 1):
tools = await load_mcp_tools(servers=(
{"weather": {"url":"http://127.0.0.1:8000/mcp/","transport":"streamable_http"}}
if i % 2 == 0 else
{"airbnb": {"command":"npx","args":["-y","@openbnb/mcp-server-airbnb"],"transport":"stdio"}}
))
model = ChatQwenOpenAICompat(temperature=0).bind_tools(tools).bind(tool_choice="auto")
# … 按“直接工具循环”方式执行,给每步设 max_tool_steps_per_step
```
---
## 选择建议
- **先用 ReAct**:如果你需要最快跑通“让模型自己决定如何用工具”的智能体,且可接受自动化行为 → **用法一**
- **需要强控/审计/容错**:你希望精确掌控每次工具调用、失败重试、输出格式 → **用法二**
- **多阶段任务**:需要“先规划、再按阶段注入不同工具” → **用法三**
---
## 常见问题与提示
- **工具是异步还是同步?**
`langchain-mcp-adapters` 返回的工具通常是 `StructuredTool`/`BaseTool` 封装,可能只实现 `ainvoke`(异步)。
调用前建议检测:`await tool.ainvoke(args) if hasattr(tool, "ainvoke") else tool.invoke(args)`
- **连接失败 (`ConnectError`)**
大多是 HTTP MCP 服务器没启动/端口不对;请确认 `url` 可访问,或本地 `stdio``command/args` 正确。
- **工具模式兼容**
后端(如 vLLM / llama.cpp / llama-box`tools/tool_choice` 的支持程度有差异。若遇到 5xx/模板错误:
1) 先用最小工具 schema`type=object``properties` 简单)验证;
2) 暂时将 `tool_choice="none"` 做“两阶段”:先思考生成计划,再注入目标工具并允许调用。
- **代理与鉴权**
- 如需禁用系统代理:`QWEN_HTTP_TRUST_ENV=0`(适配器会传给 `httpx`)。
- 自定义鉴权头/前缀:`QWEN_AUTH_HEADER`(默认 `Authorization`)、`QWEN_AUTH_SCHEME`(默认 `Bearer`,设空即裸 Key
- **配置复用**
建议把多套 MCP 服务器写在一个 `mcp_servers.yaml`,运行时以 `config_path` 选择,或用 `MCP_SERVERS_JSON` 动态注入,配合三种模式灵活切换。
---
## 相关 API来自 `langgraph_qwen/mcp.py`
- `resolve_servers_config(servers=None, config_path=None) -> Dict`: 合并并解析配置。
- `load_mcp_tools(servers=None, config_path=None) -> List[Tool]`: 异步加载 MCP 工具。
- `create_qwen_agent_with_mcp_async(..., tool_choice="auto")`: **ReAct** 代理(异步)。
- `create_qwen_agent_with_mcp(..., tool_choice="auto")`: **ReAct** 代理(同步包装;在异步环境请用上面的异步接口)。
> 所有模式均依赖:`pip install langchain-mcp-adapters`。工具服务端可混合 `streamable_http` 与 `stdio`(本地 `npx/python/node` 等)。

View File

@@ -0,0 +1,13 @@
{
"servers": {
"weather": {
"url": "http://127.0.0.1:8000/mcp/",
"transport": "streamable_http"
},
"airbnb": {
"command": "npx",
"args": ["-y", "@openbnb/mcp-server-airbnb"],
"transport": "stdio"
}
}
}

View File

@@ -0,0 +1,10 @@
servers:
weather:
url: http://127.0.0.1:8000/mcp/
transport: streamable_http
playwright:
command: npx
args: ["@playwright/mcp@latest"]
transport: stdio
env:
DISPLAY: ":1"

View File

@@ -0,0 +1,52 @@
# examples/mcp_modes/direct_tool_loop.py
import asyncio
from langchain_core.messages import HumanMessage, ToolMessage
from langgraph_qwen.chat_model import ChatQwenOpenAICompat
from langgraph_qwen.mcp import load_mcp_tools
PHASE1_SERVERS = {
"math": {"command":"python","args":["/abs/path/to/math_server.py"],"transport":"stdio"}
}
PHASE2_SERVERS = {
"weather": {"url":"http://127.0.0.1:8000/mcp/","transport":"streamable_http"}
}
async def main(max_steps: int = 8):
# ★ 比如不同阶段加载不同工具集
tools_phase1 = await load_mcp_tools(servers=PHASE1_SERVERS)
tools_phase2 = await load_mcp_tools(servers=PHASE2_SERVERS)
model = ChatQwenOpenAICompat(temperature=0).bind_tools(
tools_phase1 + tools_phase2
).bind(tool_choice="auto")
tool_map = {t.name: t for t in (tools_phase1 + tools_phase2)}
msgs = [HumanMessage(content="先用数学工具算 12*(3+5),再查北京天气,最后给一句总结。")]
for _ in range(max_steps):
ai = await model.ainvoke(msgs)
msgs.append(ai)
calls = getattr(ai, "tool_calls", []) or ai.additional_kwargs.get("tool_calls", [])
if not calls:
break
for call in calls:
name, args, call_id = call.get("name"), call.get("args", {}), call.get("id") or ""
tool = tool_map.get(name)
if not tool:
msgs.append(ToolMessage(tool_call_id=call_id, content=f"Unknown tool: {name}"))
continue
try:
if hasattr(tool, "ainvoke"):
out = await tool.ainvoke(args)
else:
out = tool.invoke(args)
msgs.append(ToolMessage(tool_call_id=call_id, content=str(out)))
except Exception as e:
msgs.append(ToolMessage(tool_call_id=call_id, content=f"Error: {e}"))
final = await model.ainvoke(msgs)
print("=== Final ===")
print(final.content)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,61 @@
# examples/mcp_modes/plan_and_execute.py
import os, asyncio
from typing import List
from langchain_core.messages import HumanMessage, ToolMessage
from langgraph_qwen.chat_model import ChatQwenOpenAICompat
from langgraph_qwen.mcp import load_mcp_tools
async def plan(task: str) -> List[str]:
planner = ChatQwenOpenAICompat(temperature=0)
ai = await planner.ainvoke([HumanMessage(content=f"把任务拆成可执行步骤(每行一步):\n{task}")])
steps = [s.strip() for s in str(ai.content).splitlines() if s.strip()]
return steps[:8]
async def tools_for_step(step_idx: int):
# ★ 示例:偶数步用 weather奇数步用 play/test 工具;也可来自文件
if step_idx % 2 == 0:
return await load_mcp_tools(servers={
"weather": {"url":"http://127.0.0.1:8000/mcp/","transport":"streamable_http"}
})
else:
cfg = os.getenv("MCP_CONFIG_PATH") # 比如 ./mcp_servers.yaml
return await load_mcp_tools(config_path=cfg)
async def execute(steps: List[str], max_tool_steps_per_step: int = 4):
msgs = []
for i, step in enumerate(steps, 1):
tools = await tools_for_step(i)
tool_map = {t.name: t for t in tools}
model = ChatQwenOpenAICompat(temperature=0).bind_tools(tools).bind(tool_choice="auto")
msgs.append(HumanMessage(content=f"执行第{i}步:{step}"))
for _ in range(max_tool_steps_per_step):
ai = await model.ainvoke(msgs)
msgs.append(ai)
calls = getattr(ai, "tool_calls", []) or ai.additional_kwargs.get("tool_calls", [])
if not calls:
break
for call in calls:
name, args, call_id = call.get("name"), call.get("args", {}), call.get("id") or ""
tool = tool_map.get(name)
if not tool:
msgs.append(ToolMessage(tool_call_id=call_id, content=f"Unknown tool: {name}"))
continue
try:
out = await tool.ainvoke(args) if hasattr(tool, "ainvoke") else tool.invoke(args)
msgs.append(ToolMessage(tool_call_id=call_id, content=str(out)))
except Exception as e:
msgs.append(ToolMessage(tool_call_id=call_id, content=f"Error: {e}"))
final = await ChatQwenOpenAICompat(temperature=0).ainvoke(
msgs + [HumanMessage(content="请汇总执行结果,简洁给出结论。")]
)
print("=== Final ===")
print(final.content)
async def main():
steps = await plan("在巴塞罗那找一个带游泳池的民宿,然后搜索附近的餐厅和景点")
await execute(steps, max_tool_steps_per_step=4)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,33 @@
# examples/mcp_modes/react_agent.py
import os, asyncio
from langchain_core.messages import HumanMessage
from langgraph_qwen.mcp import create_qwen_agent_with_mcp_async
# 方式 A直接在代码里声明服务器HTTP + 本地 stdio 混用)
SERVERS = {
"weather": {
"url": "http://127.0.0.1:8000/mcp/",
"transport": "streamable_http",
},
# "airbnb": {"command":"npx","args":["-y","@openbnb/mcp-server-airbnb"],"transport":"stdio"},
}
# 方式 B集中配置文件JSON/YAML二选一
CONFIG_PATH = os.getenv("MCP_CONFIG_PATH") # 例如 ./mcp_servers.json 或 ./mcp_servers.yaml
async def main():
# 任选其一:传 servers或传 config_path或两者都传servers 会覆盖同名)
agent = await create_qwen_agent_with_mcp_async(
servers=SERVERS,
config_path=CONFIG_PATH,
tool_choice="auto",
)
res = await agent.ainvoke(
{"messages": [HumanMessage(content="列出可用工具,然后任选一个演示调用,并用简洁中文总结。")]},
config={"recursion_limit": 6}, # ★ 显式控制 ReAct 工具交互最大步数
)
print("=== Final ===")
print(res["messages"][-1].content)
if __name__ == "__main__":
asyncio.run(main())

176
langgraph_qwen/mcp.py Normal file
View File

@@ -0,0 +1,176 @@
# langgraph_qwen/mcp.py
from __future__ import annotations
import os
import json
import asyncio
from pathlib import Path
from typing import Any, Dict, List, Optional
from langgraph.prebuilt import create_react_agent
from .chat_model import ChatQwenOpenAICompat
__all__ = [
"load_mcp_tools",
"create_qwen_agent_with_mcp_async",
"create_qwen_agent_with_mcp",
"resolve_servers_config",
]
def _env(name: str, default: str = "") -> str:
return os.getenv(name) or default
# ---------- 配置加载 & 合并 ----------
def _read_config_file(path: str) -> Dict[str, Dict[str, Any]]:
p = Path(path)
if not p.exists():
raise FileNotFoundError(f"MCP config file not found: {path}")
ext = p.suffix.lower()
if ext in {".json", ""}:
data = json.loads(p.read_text(encoding="utf-8"))
elif ext in {".yml", ".yaml"}:
try:
import yaml # type: ignore
except Exception as e:
raise RuntimeError("需要解析 YAML请安装 `pip install pyyaml`") from e
data = yaml.safe_load(p.read_text(encoding="utf-8"))
else:
raise ValueError(f"Unsupported config extension: {ext}")
# 支持两种结构:
# 1) {"servers": { ... }}
# 2) { ... } 直接就是 servers 映射
if isinstance(data, dict) and "servers" in data and isinstance(data["servers"], dict):
return data["servers"]
if isinstance(data, dict):
return data
raise ValueError("Invalid config structure: expect a dict (optionally under key 'servers').")
def _servers_from_env_json() -> Optional[Dict[str, Dict[str, Any]]]:
raw = os.getenv("MCP_SERVERS_JSON")
if not raw:
return None
try:
data = json.loads(raw)
if isinstance(data, dict) and data:
return data # 直接就是 servers 映射
except Exception:
pass
return None
def _fallback_servers() -> Dict[str, Dict[str, Any]]:
# 最后兜底:单一 HTTP 测试服务(本地 FastAPI/fastmcp-http
return {
"weather": {
"url": _env("WEATHER_MCP_URL", "http://127.0.0.1:8000/mcp/"),
"transport": _env("WEATHER_TRANSPORT", "streamable_http"),
}
}
def resolve_servers_config(
servers: Optional[Dict[str, Dict[str, Any]]] = None,
config_path: Optional[str] = None,
) -> Dict[str, Dict[str, Any]]:
"""
统一解析 servers 配置的入口,优先级(后者覆盖前者):
1) 文件config_path 或 MCP_CONFIG_PATH
2) 环境变量 MCP_SERVERS_JSON
3) 代码传入的 servers作为最终覆盖
4) 兜底本地 weather HTTP 服务器
"""
merged: Dict[str, Dict[str, Any]] = {}
# 1) 文件
path = config_path or os.getenv("MCP_CONFIG_PATH")
if path:
try:
merged.update(_read_config_file(path))
except Exception as e:
raise RuntimeError(f"读取 MCP 配置文件失败:{e}")
# 2) 环境变量JSON 字符串)
env_servers = _servers_from_env_json()
if env_servers:
merged.update(env_servers)
# 3) 代码传入(最后覆盖)
if servers:
merged.update(servers)
# 4) 兜底
if not merged:
merged = _fallback_servers()
return merged
# ---------- MCP 工具加载 ----------
async def load_mcp_tools(
servers: Optional[Dict[str, Dict[str, Any]]] = None,
config_path: Optional[str] = None,
) -> List[Any]:
"""
异步加载 MCP 工具(支持 HTTP streamable 与 本地 stdio/npx
依赖langchain-mcp-adapters
"""
try:
from langchain_mcp_adapters.client import MultiServerMCPClient # type: ignore
except Exception as e:
raise RuntimeError("请安装:`uv pip install -e '.[mcp-adapters]'` 或 `pip install langchain-mcp_adapters`") from e
resolved_servers = resolve_servers_config(servers=servers, config_path=config_path)
client = MultiServerMCPClient(resolved_servers)
tools = await client.get_tools()
# 关闭会话(兼容不同版本)
try:
if hasattr(client, "close") and callable(getattr(client, "close")):
await client.close() # type: ignore
elif hasattr(client, "close_all_sessions") and callable(getattr(client, "close_all_sessions")):
await client.close_all_sessions() # type: ignore
except Exception:
pass
return tools
# ---------- Agent 工厂 ----------
async def create_qwen_agent_with_mcp_async(
*,
servers: Optional[Dict[str, Dict[str, Any]]] = None,
config_path: Optional[str] = None,
tool_choice: str = "auto",
model: Optional[ChatQwenOpenAICompat] = None,
):
"""
异步工厂:加载 MCP 工具 -> 绑定到 Qwen 模型 -> 创建 LangGraph ReAct agent
"""
tools = await load_mcp_tools(servers=servers, config_path=config_path)
model = (model or ChatQwenOpenAICompat(temperature=0)).bind_tools(tools).bind(tool_choice=tool_choice)
return create_react_agent(model, tools)
def _run_coro_sync(coro):
try:
asyncio.get_running_loop()
except RuntimeError:
return asyncio.run(coro)
raise RuntimeError("检测到事件循环,改用:`await create_qwen_agent_with_mcp_async(...)`")
def create_qwen_agent_with_mcp(
*,
model: Optional[ChatQwenOpenAICompat] = None,
servers: Optional[Dict[str, Dict[str, Any]]] = None,
config_path: Optional[str] = None,
tool_choice: str = "auto",
):
"""
同步工厂:内部临时事件循环拉工具,再创建 ReAct agent。
用于 quickstart/脚本场景;在异步框架里请用 create_qwen_agent_with_mcp_async。
"""
model = model or ChatQwenOpenAICompat(temperature=0)
return _run_coro_sync(
create_qwen_agent_with_mcp_async(
servers=servers, config_path=config_path, tool_choice=tool_choice, model=model
)
)