remove elicitation,需要客户端实现 elicitation_handler 支持。

This commit is contained in:
mm644706215
2025-07-29 16:38:04 +08:00
parent 50acba7486
commit 4243814fb1

View File

@@ -0,0 +1,270 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@file : mcp_server.py
@desc : MCP 2.0 服务端,支持多 session自动会话配置兼容 Studio/streamable-http 客户端
@version : 0.3.0
@author : lyzeng
@email : pylyzeng@gmail.com
"""
from typing import Dict, Optional, Any
from fastmcp import FastMCP, Context
from dataclasses import dataclass
from rustfs_s3_toolkit.s3_client import S3StorageToolkit
import os
# ==== MCP 服务端实例 ====
mcp = FastMCP(
name="RustFS-MCP",
version="0.3.0",
instructions="""
RustFS 对象存储工具S3 兼容,多 session 隔离)
- 支持文件上传、下载、列举、删除、目录操作。
- 首次使用需要通过 rustfs_configure_s3 工具配置 S3 连接参数。
- 支持 MCP Studio、streamable-http、LangChain、LlamaIndex 等多客户端自动发现与调用。
"""
)
# ==== S3 配置结构体,每个 session 独立一份 ====
@dataclass
class RustFSMCPConfig:
endpoint_url: str
access_key_id: str
secret_access_key: str
bucket_name: str
region_name: str = "us-east-1"
# ==== 配置缓存session_id -> S3 配置 ====
client_config_cache: Dict[str, RustFSMCPConfig] = {}
def get_config_for_client(session_id: str) -> Optional[RustFSMCPConfig]:
"""根据 session_id 获取 S3 配置"""
return client_config_cache.get(session_id)
def save_config_for_client(session_id: str, config: RustFSMCPConfig):
"""保存/更新 session_id 对应的 S3 配置"""
client_config_cache[session_id] = config
# ==== 通用 session_id/uuid 提取工具,兼容 MCP/streamable-http 多客户端 ====
def get_session_id(ctx: Context) -> str:
"""
优先提取 ctx.session_id兼容 headers/arguments/message 等参数名变体
若未提供则 fallback 为 default
"""
# MCP Studio 等标准写法
if hasattr(ctx, "session_id") and ctx.session_id:
return str(ctx.session_id)
# headers/arguments 各类别名兼容
for key in ("uuid", "session_id", "sessionId"):
if hasattr(ctx, "headers") and ctx.headers and key in ctx.headers:
return str(ctx.headers[key])
if hasattr(ctx, "arguments") and ctx.arguments and key in ctx.arguments:
return str(ctx.arguments[key])
# JSON-RPC message body 参数
if hasattr(ctx, "message") and ctx.message:
for key in ("uuid", "session_id", "sessionId"):
if key in ctx.message:
return str(ctx.message[key])
# fallback: 默认会话
return "default"
# ==== MCP 工具方法 ====
@mcp.tool
async def rustfs_configure_s3(
endpoint_url: str,
access_key_id: str,
secret_access_key: str,
bucket_name: str,
region_name: str = "us-east-1",
ctx: Context = None
) -> str:
"""
配置 S3 连接参数
Args:
endpoint_url: S3 端点 URL
access_key_id: 访问密钥 ID
secret_access_key: 秘密访问密钥
bucket_name: 存储桶名称
region_name: 区域名称(默认 us-east-1
"""
session_id = get_session_id(ctx) if ctx else "default"
config = RustFSMCPConfig(
endpoint_url=endpoint_url,
access_key_id=access_key_id,
secret_access_key=secret_access_key,
bucket_name=bucket_name,
region_name=region_name
)
# 测试连接
try:
toolkit = S3StorageToolkit(**vars(config))
check = toolkit.test_connection()
if not check.get("success"):
return f"配置失败,连接测试不通过: {check.get('message')}"
save_config_for_client(session_id, config)
return f"S3 配置成功session_id: {session_id}"
except Exception as e:
return f"配置异常: {str(e)}"
@mcp.tool
async def rustfs_upload_file(file_path: str, object_key: str, ctx: Context) -> str:
"""
上传文件到 S3 存储
Args:
file_path: 本地文件路径
object_key: S3 对象键名
"""
session_id = get_session_id(ctx)
config = get_config_for_client(session_id)
if not config:
return "错误:首次使用需要先调用 rustfs_configure_s3 配置 S3 连接参数。"
try:
toolkit = S3StorageToolkit(**vars(config))
check = toolkit.test_connection()
if not check.get("success"):
return f"连接失败: {check.get('message')}"
upload_result = toolkit.upload_file(file_path, object_key)
if upload_result.get("success"):
return f"上传成功访问URL: {upload_result.get('public_url')}"
return f"上传失败: {upload_result.get('error', '')}"
except Exception as e:
return f"上传异常: {str(e)}"
@mcp.tool
async def rustfs_download_file(object_key: str, local_path: str, ctx: Context) -> str:
"""
下载文件
Args:
object_key: 远端文件路径
local_path: 本地保存路径
"""
session_id = get_session_id(ctx)
config = get_config_for_client(session_id)
if not config:
return "错误:首次使用需要先调用 rustfs_configure_s3 配置 S3 连接参数。"
try:
toolkit = S3StorageToolkit(**vars(config))
dl = toolkit.download_file(object_key, local_path)
if dl.get("success"):
return f"下载成功,保存路径: {local_path}"
return f"下载失败: {dl.get('error', '')}"
except Exception as e:
return f"下载异常: {str(e)}"
@mcp.tool
async def rustfs_list_files(prefix: str = "", ctx: Context = None) -> Any:
"""
列举 bucket 文件
Args:
prefix: 文件前缀(可选,默认为空表示列举所有文件)
"""
session_id = get_session_id(ctx) if ctx else "default"
config = get_config_for_client(session_id)
if not config:
return "错误:首次使用需要先调用 rustfs_configure_s3 配置 S3 连接参数。"
toolkit = S3StorageToolkit(**vars(config))
result = toolkit.list_files(prefix=prefix)
if not result.get("success"):
return f"列举失败: {result.get('error', '')}"
files = result.get("files", [])
if not files:
return "该前缀下没有文件"
# 只展示前20个文件避免输出过长
return "\n".join(
f"{f['key']} | {f['size']}B | {f['public_url']}" for f in files[:20]
) + ("\n...等" if len(files) > 20 else "")
@mcp.tool
async def rustfs_delete_file(object_key: str, ctx: Context) -> str:
"""
删除文件
Args:
object_key: 要删除的文件键名
"""
session_id = get_session_id(ctx)
config = get_config_for_client(session_id)
if not config:
return "错误:首次使用需要先调用 rustfs_configure_s3 配置 S3 连接参数。"
toolkit = S3StorageToolkit(**vars(config))
result = toolkit.delete_file(object_key)
if result.get("success"):
return f"删除成功: {object_key}"
return f"删除失败: {result.get('error', '')}"
@mcp.tool
async def rustfs_create_folder(folder_path: str, ctx: Context) -> str:
"""
创建目录
Args:
folder_path: 要创建的文件夹路径(建议以 / 结尾)
"""
session_id = get_session_id(ctx)
config = get_config_for_client(session_id)
if not config:
return "错误:首次使用需要先调用 rustfs_configure_s3 配置 S3 连接参数。"
toolkit = S3StorageToolkit(**vars(config))
result = toolkit.create_folder(folder_path)
if result.get("success"):
return f"目录创建成功: {folder_path}"
return f"创建失败: {result.get('error', '')}"
@mcp.tool
async def rustfs_delete_directory(remote_prefix: str, ctx: Context) -> str:
"""
删除目录
Args:
remote_prefix: 要删除的目录前缀(如 foo/bar/
"""
session_id = get_session_id(ctx)
config = get_config_for_client(session_id)
if not config:
return "错误:首次使用需要先调用 rustfs_configure_s3 配置 S3 连接参数。"
toolkit = S3StorageToolkit(**vars(config))
result = toolkit.delete_directory(remote_prefix)
if result.get("success"):
return f"目录删除成功,共 {result.get('deleted_count', 0)}"
return f"删除失败: {result.get('error', '')}"
# ==== 推荐标准中间件(日志/限流/异常/性能)====
from fastmcp.server.middleware.logging import LoggingMiddleware
from fastmcp.server.middleware.timing import TimingMiddleware
from fastmcp.server.middleware.error_handling import ErrorHandlingMiddleware
from fastmcp.server.middleware.rate_limiting import RateLimitingMiddleware
mcp.add_middleware(ErrorHandlingMiddleware()) # 全局异常兜底
mcp.add_middleware(RateLimitingMiddleware(max_requests_per_second=30, burst_capacity=60))
mcp.add_middleware(TimingMiddleware())
mcp.add_middleware(LoggingMiddleware(include_payloads=True, max_payload_length=500))
# ==== MCP 服务端启动 ====
if __name__ == "__main__":
mcp.run(
transport=os.getenv("TRANSPORT", "streamable-http"), # 支持 stdio/http/streamable-http
host=os.getenv("HOST", "0.0.0.0"),
port=int(os.getenv("PORT", 9009))
)