diff --git a/src/rustfs_s3_toolkit/mcp_server_elicitation.py b/src/rustfs_s3_toolkit/mcp_server_elicitation.py new file mode 100644 index 0000000..9842582 --- /dev/null +++ b/src/rustfs_s3_toolkit/mcp_server_elicitation.py @@ -0,0 +1,275 @@ +#!/usr/bin/env python +# -*- encoding: utf-8 -*- +""" +@file : mcp_server.py +@desc : MCP 2.0 服务端,支持多 session,自动会话配置,兼容 Studio/streamable-http 客户端 #!User Elicitation: 需要客户端实现 elicitation_handler 支持 +@version : 0.3.0 +@author : lyzeng +@email : pylyzeng@gmail.com +""" + +from typing import Dict, Optional, Any +from fastmcp import FastMCP, Context +from dataclasses import dataclass +from rustfs_s3_toolkit.s3_client import S3StorageToolkit +import os + +# ==== MCP 服务端实例 ==== +mcp = FastMCP( + name="RustFS-MCP", + version="0.3.0", + instructions=""" +RustFS 对象存储工具(S3 兼容,多 session 隔离) + +- 支持文件上传、下载、列举、删除、目录操作。 +- 推荐所有操作在请求参数或 headers 里包含唯一 session_id(或 uuid),用于自动检索/创建你的 S3 连接实例。 +- 首次调用同一 session_id 时会自动引导填写 S3 配置信息(endpoint_url, access_key_id, secret_access_key, bucket_name)。 +- 同一 session_id 后续调用自动复用配置,无需重复输入。 +- 支持 MCP Studio、streamable-http、LangChain、LlamaIndex 等多客户端自动发现与调用。 +""" +) + +# ==== S3 配置结构体,每个 session 独立一份 ==== +@dataclass +class RustFSMCPConfig: + endpoint_url: str + access_key_id: str + secret_access_key: str + bucket_name: str + region_name: str = "us-east-1" + +# ==== 配置缓存:session_id -> S3 配置 ==== +client_config_cache: Dict[str, RustFSMCPConfig] = {} + +def get_config_for_client(session_id: str) -> Optional[RustFSMCPConfig]: + """根据 session_id 获取 S3 配置""" + return client_config_cache.get(session_id) + +def save_config_for_client(session_id: str, config: RustFSMCPConfig): + """保存/更新 session_id 对应的 S3 配置""" + client_config_cache[session_id] = config + +# ==== 通用 session_id/uuid 提取工具,兼容 MCP/streamable-http 多客户端 ==== +def get_session_id(ctx: Context) -> str: + """ + 优先提取 ctx.session_id,兼容 headers/arguments/message 等参数名变体 + 若未提供则 fallback 为 default + """ + # MCP Studio 等标准写法 + if hasattr(ctx, "session_id") and ctx.session_id: + return str(ctx.session_id) + # headers/arguments 各类别名兼容 + for key in ("uuid", "session_id", "sessionId"): + if hasattr(ctx, "headers") and ctx.headers and key in ctx.headers: + return str(ctx.headers[key]) + if hasattr(ctx, "arguments") and ctx.arguments and key in ctx.arguments: + return str(ctx.arguments[key]) + # JSON-RPC message body 参数 + if hasattr(ctx, "message") and ctx.message: + for key in ("uuid", "session_id", "sessionId"): + if key in ctx.message: + return str(ctx.message[key]) + # fallback: 默认会话 + return "default" + +# ==== MCP 工具方法 ==== + +@mcp.tool +async def rustfs_upload_file(file_path: str, object_key: str, ctx: Context) -> str: + """ + 上传文件到 S3 存储 + + Args: + file_path: 本地文件路径 + object_key: S3 对象键名 + """ + session_id = get_session_id(ctx) + config = get_config_for_client(session_id) + + # 首次使用时配置 S3 连接 + if not config: + res = await ctx.elicit( + "首次使用此 session,请填写 S3 连接参数", + response_type=RustFSMCPConfig + ) + if res.action != "accept": + return "已取消" + config = res.data + save_config_for_client(session_id, config) + + # 直接使用传入的参数,无需再次征询 + try: + toolkit = S3StorageToolkit(**vars(config)) + check = toolkit.test_connection() + if not check.get("success"): + return f"连接失败: {check.get('message')}" + upload_result = toolkit.upload_file(file_path, object_key) + if upload_result.get("success"): + return f"上传成功,访问URL: {upload_result.get('public_url')}" + return f"上传失败: {upload_result.get('error', '')}" + except Exception as e: + return f"上传异常: {str(e)}" + +@mcp.tool +async def rustfs_download_file(object_key: str, local_path: str, ctx: Context) -> str: + """ + 下载文件 + + Args: + object_key: 远端文件路径 + local_path: 本地保存路径 + """ + session_id = get_session_id(ctx) + config = get_config_for_client(session_id) + + if not config: + res = await ctx.elicit( + "首次使用此 session,请填写 S3 连接参数", + response_type=RustFSMCPConfig + ) + if res.action != "accept": + return "已取消" + config = res.data + save_config_for_client(session_id, config) + + try: + toolkit = S3StorageToolkit(**vars(config)) + dl = toolkit.download_file(object_key, local_path) + if dl.get("success"): + return f"下载成功,保存路径: {local_path}" + return f"下载失败: {dl.get('error', '')}" + except Exception as e: + return f"下载异常: {str(e)}" + +@mcp.tool +async def rustfs_list_files(prefix: str, ctx: Context) -> Any: + """ + 列举 bucket 文件 + + Args: + prefix: 文件前缀(使用""显示默认说有文件) + """ + session_id = get_session_id(ctx) + config = get_config_for_client(session_id) + + if not config: + res = await ctx.elicit( + "首次使用此 session,请填写 S3 连接参数", + response_type=RustFSMCPConfig + ) + if res.action != "accept": + return "已取消" + config = res.data + save_config_for_client(session_id, config) + + toolkit = S3StorageToolkit(**vars(config)) + result = toolkit.list_files(prefix=prefix) + if not result.get("success"): + return f"列举失败: {result.get('error', '')}" + files = result.get("files", []) + if not files: + return "该前缀下没有文件" + # 只展示前20个文件,避免输出过长 + return "\n".join( + f"{f['key']} | {f['size']}B | {f['public_url']}" for f in files[:20] + ) + ("\n...等" if len(files) > 20 else "") + +@mcp.tool +async def rustfs_delete_file(object_key: str, ctx: Context) -> str: + """ + 删除文件 + + Args: + object_key: 要删除的文件键名 + """ + session_id = get_session_id(ctx) + config = get_config_for_client(session_id) + + if not config: + res = await ctx.elicit( + "首次使用此 session,请填写 S3 连接参数", + response_type=RustFSMCPConfig + ) + if res.action != "accept": + return "已取消" + config = res.data + save_config_for_client(session_id, config) + + toolkit = S3StorageToolkit(**vars(config)) + result = toolkit.delete_file(object_key) + if result.get("success"): + return f"删除成功: {object_key}" + return f"删除失败: {result.get('error', '')}" + +@mcp.tool +async def rustfs_create_folder(folder_path: str, ctx: Context) -> str: + """ + 创建目录 + + Args: + folder_path: 要创建的文件夹路径(建议以 / 结尾) + """ + session_id = get_session_id(ctx) + config = get_config_for_client(session_id) + + if not config: + res = await ctx.elicit( + "首次使用此 session,请填写 S3 连接参数", + response_type=RustFSMCPConfig + ) + if res.action != "accept": + return "已取消" + config = res.data + save_config_for_client(session_id, config) + + toolkit = S3StorageToolkit(**vars(config)) + result = toolkit.create_folder(folder_path) + if result.get("success"): + return f"目录创建成功: {folder_path}" + return f"创建失败: {result.get('error', '')}" + +@mcp.tool +async def rustfs_delete_directory(remote_prefix: str, ctx: Context) -> str: + """ + 删除目录 + + Args: + remote_prefix: 要删除的目录前缀(如 foo/bar/) + """ + session_id = get_session_id(ctx) + config = get_config_for_client(session_id) + + if not config: + res = await ctx.elicit( + "首次使用此 session,请填写 S3 连接参数", + response_type=RustFSMCPConfig + ) + if res.action != "accept": + return "已取消" + config = res.data + save_config_for_client(session_id, config) + + toolkit = S3StorageToolkit(**vars(config)) + result = toolkit.delete_directory(remote_prefix) + if result.get("success"): + return f"目录删除成功,共 {result.get('deleted_count', 0)} 项" + return f"删除失败: {result.get('error', '')}" + +# ==== 推荐标准中间件(日志/限流/异常/性能)==== +from fastmcp.server.middleware.logging import LoggingMiddleware +from fastmcp.server.middleware.timing import TimingMiddleware +from fastmcp.server.middleware.error_handling import ErrorHandlingMiddleware +from fastmcp.server.middleware.rate_limiting import RateLimitingMiddleware + +mcp.add_middleware(ErrorHandlingMiddleware()) # 全局异常兜底 +mcp.add_middleware(RateLimitingMiddleware(max_requests_per_second=30, burst_capacity=60)) +mcp.add_middleware(TimingMiddleware()) +mcp.add_middleware(LoggingMiddleware(include_payloads=True, max_payload_length=500)) + +# ==== MCP 服务端启动 ==== +if __name__ == "__main__": + mcp.run( + transport=os.getenv("TRANSPORT", "streamable-http"), # 支持 stdio/http/streamable-http + host=os.getenv("HOST", "0.0.0.0"), + port=int(os.getenv("PORT", 9009)) + )