elicitation版本,需要客户端实现 elicitation_handler 支持
This commit is contained in:
275
src/rustfs_s3_toolkit/mcp_server_elicitation.py
Normal file
275
src/rustfs_s3_toolkit/mcp_server_elicitation.py
Normal file
@@ -0,0 +1,275 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- encoding: utf-8 -*-
|
||||
"""
|
||||
@file : mcp_server.py
|
||||
@desc : MCP 2.0 服务端,支持多 session,自动会话配置,兼容 Studio/streamable-http 客户端 #!User Elicitation: 需要客户端实现 elicitation_handler 支持
|
||||
@version : 0.3.0
|
||||
@author : lyzeng
|
||||
@email : pylyzeng@gmail.com
|
||||
"""
|
||||
|
||||
from typing import Dict, Optional, Any
|
||||
from fastmcp import FastMCP, Context
|
||||
from dataclasses import dataclass
|
||||
from rustfs_s3_toolkit.s3_client import S3StorageToolkit
|
||||
import os
|
||||
|
||||
# ==== MCP 服务端实例 ====
|
||||
mcp = FastMCP(
|
||||
name="RustFS-MCP",
|
||||
version="0.3.0",
|
||||
instructions="""
|
||||
RustFS 对象存储工具(S3 兼容,多 session 隔离)
|
||||
|
||||
- 支持文件上传、下载、列举、删除、目录操作。
|
||||
- 推荐所有操作在请求参数或 headers 里包含唯一 session_id(或 uuid),用于自动检索/创建你的 S3 连接实例。
|
||||
- 首次调用同一 session_id 时会自动引导填写 S3 配置信息(endpoint_url, access_key_id, secret_access_key, bucket_name)。
|
||||
- 同一 session_id 后续调用自动复用配置,无需重复输入。
|
||||
- 支持 MCP Studio、streamable-http、LangChain、LlamaIndex 等多客户端自动发现与调用。
|
||||
"""
|
||||
)
|
||||
|
||||
# ==== S3 配置结构体,每个 session 独立一份 ====
|
||||
@dataclass
|
||||
class RustFSMCPConfig:
|
||||
endpoint_url: str
|
||||
access_key_id: str
|
||||
secret_access_key: str
|
||||
bucket_name: str
|
||||
region_name: str = "us-east-1"
|
||||
|
||||
# ==== 配置缓存:session_id -> S3 配置 ====
|
||||
client_config_cache: Dict[str, RustFSMCPConfig] = {}
|
||||
|
||||
def get_config_for_client(session_id: str) -> Optional[RustFSMCPConfig]:
|
||||
"""根据 session_id 获取 S3 配置"""
|
||||
return client_config_cache.get(session_id)
|
||||
|
||||
def save_config_for_client(session_id: str, config: RustFSMCPConfig):
|
||||
"""保存/更新 session_id 对应的 S3 配置"""
|
||||
client_config_cache[session_id] = config
|
||||
|
||||
# ==== 通用 session_id/uuid 提取工具,兼容 MCP/streamable-http 多客户端 ====
|
||||
def get_session_id(ctx: Context) -> str:
|
||||
"""
|
||||
优先提取 ctx.session_id,兼容 headers/arguments/message 等参数名变体
|
||||
若未提供则 fallback 为 default
|
||||
"""
|
||||
# MCP Studio 等标准写法
|
||||
if hasattr(ctx, "session_id") and ctx.session_id:
|
||||
return str(ctx.session_id)
|
||||
# headers/arguments 各类别名兼容
|
||||
for key in ("uuid", "session_id", "sessionId"):
|
||||
if hasattr(ctx, "headers") and ctx.headers and key in ctx.headers:
|
||||
return str(ctx.headers[key])
|
||||
if hasattr(ctx, "arguments") and ctx.arguments and key in ctx.arguments:
|
||||
return str(ctx.arguments[key])
|
||||
# JSON-RPC message body 参数
|
||||
if hasattr(ctx, "message") and ctx.message:
|
||||
for key in ("uuid", "session_id", "sessionId"):
|
||||
if key in ctx.message:
|
||||
return str(ctx.message[key])
|
||||
# fallback: 默认会话
|
||||
return "default"
|
||||
|
||||
# ==== MCP 工具方法 ====
|
||||
|
||||
@mcp.tool
|
||||
async def rustfs_upload_file(file_path: str, object_key: str, ctx: Context) -> str:
|
||||
"""
|
||||
上传文件到 S3 存储
|
||||
|
||||
Args:
|
||||
file_path: 本地文件路径
|
||||
object_key: S3 对象键名
|
||||
"""
|
||||
session_id = get_session_id(ctx)
|
||||
config = get_config_for_client(session_id)
|
||||
|
||||
# 首次使用时配置 S3 连接
|
||||
if not config:
|
||||
res = await ctx.elicit(
|
||||
"首次使用此 session,请填写 S3 连接参数",
|
||||
response_type=RustFSMCPConfig
|
||||
)
|
||||
if res.action != "accept":
|
||||
return "已取消"
|
||||
config = res.data
|
||||
save_config_for_client(session_id, config)
|
||||
|
||||
# 直接使用传入的参数,无需再次征询
|
||||
try:
|
||||
toolkit = S3StorageToolkit(**vars(config))
|
||||
check = toolkit.test_connection()
|
||||
if not check.get("success"):
|
||||
return f"连接失败: {check.get('message')}"
|
||||
upload_result = toolkit.upload_file(file_path, object_key)
|
||||
if upload_result.get("success"):
|
||||
return f"上传成功,访问URL: {upload_result.get('public_url')}"
|
||||
return f"上传失败: {upload_result.get('error', '')}"
|
||||
except Exception as e:
|
||||
return f"上传异常: {str(e)}"
|
||||
|
||||
@mcp.tool
|
||||
async def rustfs_download_file(object_key: str, local_path: str, ctx: Context) -> str:
|
||||
"""
|
||||
下载文件
|
||||
|
||||
Args:
|
||||
object_key: 远端文件路径
|
||||
local_path: 本地保存路径
|
||||
"""
|
||||
session_id = get_session_id(ctx)
|
||||
config = get_config_for_client(session_id)
|
||||
|
||||
if not config:
|
||||
res = await ctx.elicit(
|
||||
"首次使用此 session,请填写 S3 连接参数",
|
||||
response_type=RustFSMCPConfig
|
||||
)
|
||||
if res.action != "accept":
|
||||
return "已取消"
|
||||
config = res.data
|
||||
save_config_for_client(session_id, config)
|
||||
|
||||
try:
|
||||
toolkit = S3StorageToolkit(**vars(config))
|
||||
dl = toolkit.download_file(object_key, local_path)
|
||||
if dl.get("success"):
|
||||
return f"下载成功,保存路径: {local_path}"
|
||||
return f"下载失败: {dl.get('error', '')}"
|
||||
except Exception as e:
|
||||
return f"下载异常: {str(e)}"
|
||||
|
||||
@mcp.tool
|
||||
async def rustfs_list_files(prefix: str, ctx: Context) -> Any:
|
||||
"""
|
||||
列举 bucket 文件
|
||||
|
||||
Args:
|
||||
prefix: 文件前缀(使用""显示默认说有文件)
|
||||
"""
|
||||
session_id = get_session_id(ctx)
|
||||
config = get_config_for_client(session_id)
|
||||
|
||||
if not config:
|
||||
res = await ctx.elicit(
|
||||
"首次使用此 session,请填写 S3 连接参数",
|
||||
response_type=RustFSMCPConfig
|
||||
)
|
||||
if res.action != "accept":
|
||||
return "已取消"
|
||||
config = res.data
|
||||
save_config_for_client(session_id, config)
|
||||
|
||||
toolkit = S3StorageToolkit(**vars(config))
|
||||
result = toolkit.list_files(prefix=prefix)
|
||||
if not result.get("success"):
|
||||
return f"列举失败: {result.get('error', '')}"
|
||||
files = result.get("files", [])
|
||||
if not files:
|
||||
return "该前缀下没有文件"
|
||||
# 只展示前20个文件,避免输出过长
|
||||
return "\n".join(
|
||||
f"{f['key']} | {f['size']}B | {f['public_url']}" for f in files[:20]
|
||||
) + ("\n...等" if len(files) > 20 else "")
|
||||
|
||||
@mcp.tool
|
||||
async def rustfs_delete_file(object_key: str, ctx: Context) -> str:
|
||||
"""
|
||||
删除文件
|
||||
|
||||
Args:
|
||||
object_key: 要删除的文件键名
|
||||
"""
|
||||
session_id = get_session_id(ctx)
|
||||
config = get_config_for_client(session_id)
|
||||
|
||||
if not config:
|
||||
res = await ctx.elicit(
|
||||
"首次使用此 session,请填写 S3 连接参数",
|
||||
response_type=RustFSMCPConfig
|
||||
)
|
||||
if res.action != "accept":
|
||||
return "已取消"
|
||||
config = res.data
|
||||
save_config_for_client(session_id, config)
|
||||
|
||||
toolkit = S3StorageToolkit(**vars(config))
|
||||
result = toolkit.delete_file(object_key)
|
||||
if result.get("success"):
|
||||
return f"删除成功: {object_key}"
|
||||
return f"删除失败: {result.get('error', '')}"
|
||||
|
||||
@mcp.tool
|
||||
async def rustfs_create_folder(folder_path: str, ctx: Context) -> str:
|
||||
"""
|
||||
创建目录
|
||||
|
||||
Args:
|
||||
folder_path: 要创建的文件夹路径(建议以 / 结尾)
|
||||
"""
|
||||
session_id = get_session_id(ctx)
|
||||
config = get_config_for_client(session_id)
|
||||
|
||||
if not config:
|
||||
res = await ctx.elicit(
|
||||
"首次使用此 session,请填写 S3 连接参数",
|
||||
response_type=RustFSMCPConfig
|
||||
)
|
||||
if res.action != "accept":
|
||||
return "已取消"
|
||||
config = res.data
|
||||
save_config_for_client(session_id, config)
|
||||
|
||||
toolkit = S3StorageToolkit(**vars(config))
|
||||
result = toolkit.create_folder(folder_path)
|
||||
if result.get("success"):
|
||||
return f"目录创建成功: {folder_path}"
|
||||
return f"创建失败: {result.get('error', '')}"
|
||||
|
||||
@mcp.tool
|
||||
async def rustfs_delete_directory(remote_prefix: str, ctx: Context) -> str:
|
||||
"""
|
||||
删除目录
|
||||
|
||||
Args:
|
||||
remote_prefix: 要删除的目录前缀(如 foo/bar/)
|
||||
"""
|
||||
session_id = get_session_id(ctx)
|
||||
config = get_config_for_client(session_id)
|
||||
|
||||
if not config:
|
||||
res = await ctx.elicit(
|
||||
"首次使用此 session,请填写 S3 连接参数",
|
||||
response_type=RustFSMCPConfig
|
||||
)
|
||||
if res.action != "accept":
|
||||
return "已取消"
|
||||
config = res.data
|
||||
save_config_for_client(session_id, config)
|
||||
|
||||
toolkit = S3StorageToolkit(**vars(config))
|
||||
result = toolkit.delete_directory(remote_prefix)
|
||||
if result.get("success"):
|
||||
return f"目录删除成功,共 {result.get('deleted_count', 0)} 项"
|
||||
return f"删除失败: {result.get('error', '')}"
|
||||
|
||||
# ==== 推荐标准中间件(日志/限流/异常/性能)====
|
||||
from fastmcp.server.middleware.logging import LoggingMiddleware
|
||||
from fastmcp.server.middleware.timing import TimingMiddleware
|
||||
from fastmcp.server.middleware.error_handling import ErrorHandlingMiddleware
|
||||
from fastmcp.server.middleware.rate_limiting import RateLimitingMiddleware
|
||||
|
||||
mcp.add_middleware(ErrorHandlingMiddleware()) # 全局异常兜底
|
||||
mcp.add_middleware(RateLimitingMiddleware(max_requests_per_second=30, burst_capacity=60))
|
||||
mcp.add_middleware(TimingMiddleware())
|
||||
mcp.add_middleware(LoggingMiddleware(include_payloads=True, max_payload_length=500))
|
||||
|
||||
# ==== MCP 服务端启动 ====
|
||||
if __name__ == "__main__":
|
||||
mcp.run(
|
||||
transport=os.getenv("TRANSPORT", "streamable-http"), # 支持 stdio/http/streamable-http
|
||||
host=os.getenv("HOST", "0.0.0.0"),
|
||||
port=int(os.getenv("PORT", 9009))
|
||||
)
|
||||
Reference in New Issue
Block a user