Files
rustfs-s3-toolkit/src/rustfs_s3_toolkit/mcp_server_elicitation.py

276 lines
9.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@file : mcp_server.py
@desc : MCP 2.0 服务端,支持多 session自动会话配置兼容 Studio/streamable-http 客户端 #!User Elicitation: 需要客户端实现 elicitation_handler 支持
@version : 0.3.0
@author : lyzeng
@email : pylyzeng@gmail.com
"""
from typing import Dict, Optional, Any
from fastmcp import FastMCP, Context
from dataclasses import dataclass
from rustfs_s3_toolkit.s3_client import S3StorageToolkit
import os
# ==== MCP 服务端实例 ====
mcp = FastMCP(
name="RustFS-MCP",
version="0.3.0",
instructions="""
RustFS 对象存储工具S3 兼容,多 session 隔离)
- 支持文件上传、下载、列举、删除、目录操作。
- 推荐所有操作在请求参数或 headers 里包含唯一 session_id或 uuid用于自动检索/创建你的 S3 连接实例。
- 首次调用同一 session_id 时会自动引导填写 S3 配置信息endpoint_url, access_key_id, secret_access_key, bucket_name
- 同一 session_id 后续调用自动复用配置,无需重复输入。
- 支持 MCP Studio、streamable-http、LangChain、LlamaIndex 等多客户端自动发现与调用。
"""
)
# ==== S3 配置结构体,每个 session 独立一份 ====
@dataclass
class RustFSMCPConfig:
endpoint_url: str
access_key_id: str
secret_access_key: str
bucket_name: str
region_name: str = "us-east-1"
# ==== 配置缓存session_id -> S3 配置 ====
client_config_cache: Dict[str, RustFSMCPConfig] = {}
def get_config_for_client(session_id: str) -> Optional[RustFSMCPConfig]:
"""根据 session_id 获取 S3 配置"""
return client_config_cache.get(session_id)
def save_config_for_client(session_id: str, config: RustFSMCPConfig):
"""保存/更新 session_id 对应的 S3 配置"""
client_config_cache[session_id] = config
# ==== 通用 session_id/uuid 提取工具,兼容 MCP/streamable-http 多客户端 ====
def get_session_id(ctx: Context) -> str:
"""
优先提取 ctx.session_id兼容 headers/arguments/message 等参数名变体
若未提供则 fallback 为 default
"""
# MCP Studio 等标准写法
if hasattr(ctx, "session_id") and ctx.session_id:
return str(ctx.session_id)
# headers/arguments 各类别名兼容
for key in ("uuid", "session_id", "sessionId"):
if hasattr(ctx, "headers") and ctx.headers and key in ctx.headers:
return str(ctx.headers[key])
if hasattr(ctx, "arguments") and ctx.arguments and key in ctx.arguments:
return str(ctx.arguments[key])
# JSON-RPC message body 参数
if hasattr(ctx, "message") and ctx.message:
for key in ("uuid", "session_id", "sessionId"):
if key in ctx.message:
return str(ctx.message[key])
# fallback: 默认会话
return "default"
# ==== MCP 工具方法 ====
@mcp.tool
async def rustfs_upload_file(file_path: str, object_key: str, ctx: Context) -> str:
"""
上传文件到 S3 存储
Args:
file_path: 本地文件路径
object_key: S3 对象键名
"""
session_id = get_session_id(ctx)
config = get_config_for_client(session_id)
# 首次使用时配置 S3 连接
if not config:
res = await ctx.elicit(
"首次使用此 session请填写 S3 连接参数",
response_type=RustFSMCPConfig
)
if res.action != "accept":
return "已取消"
config = res.data
save_config_for_client(session_id, config)
# 直接使用传入的参数,无需再次征询
try:
toolkit = S3StorageToolkit(**vars(config))
check = toolkit.test_connection()
if not check.get("success"):
return f"连接失败: {check.get('message')}"
upload_result = toolkit.upload_file(file_path, object_key)
if upload_result.get("success"):
return f"上传成功访问URL: {upload_result.get('public_url')}"
return f"上传失败: {upload_result.get('error', '')}"
except Exception as e:
return f"上传异常: {str(e)}"
@mcp.tool
async def rustfs_download_file(object_key: str, local_path: str, ctx: Context) -> str:
"""
下载文件
Args:
object_key: 远端文件路径
local_path: 本地保存路径
"""
session_id = get_session_id(ctx)
config = get_config_for_client(session_id)
if not config:
res = await ctx.elicit(
"首次使用此 session请填写 S3 连接参数",
response_type=RustFSMCPConfig
)
if res.action != "accept":
return "已取消"
config = res.data
save_config_for_client(session_id, config)
try:
toolkit = S3StorageToolkit(**vars(config))
dl = toolkit.download_file(object_key, local_path)
if dl.get("success"):
return f"下载成功,保存路径: {local_path}"
return f"下载失败: {dl.get('error', '')}"
except Exception as e:
return f"下载异常: {str(e)}"
@mcp.tool
async def rustfs_list_files(prefix: str, ctx: Context) -> Any:
"""
列举 bucket 文件
Args:
prefix: 文件前缀(使用""显示默认说有文件)
"""
session_id = get_session_id(ctx)
config = get_config_for_client(session_id)
if not config:
res = await ctx.elicit(
"首次使用此 session请填写 S3 连接参数",
response_type=RustFSMCPConfig
)
if res.action != "accept":
return "已取消"
config = res.data
save_config_for_client(session_id, config)
toolkit = S3StorageToolkit(**vars(config))
result = toolkit.list_files(prefix=prefix)
if not result.get("success"):
return f"列举失败: {result.get('error', '')}"
files = result.get("files", [])
if not files:
return "该前缀下没有文件"
# 只展示前20个文件避免输出过长
return "\n".join(
f"{f['key']} | {f['size']}B | {f['public_url']}" for f in files[:20]
) + ("\n...等" if len(files) > 20 else "")
@mcp.tool
async def rustfs_delete_file(object_key: str, ctx: Context) -> str:
"""
删除文件
Args:
object_key: 要删除的文件键名
"""
session_id = get_session_id(ctx)
config = get_config_for_client(session_id)
if not config:
res = await ctx.elicit(
"首次使用此 session请填写 S3 连接参数",
response_type=RustFSMCPConfig
)
if res.action != "accept":
return "已取消"
config = res.data
save_config_for_client(session_id, config)
toolkit = S3StorageToolkit(**vars(config))
result = toolkit.delete_file(object_key)
if result.get("success"):
return f"删除成功: {object_key}"
return f"删除失败: {result.get('error', '')}"
@mcp.tool
async def rustfs_create_folder(folder_path: str, ctx: Context) -> str:
"""
创建目录
Args:
folder_path: 要创建的文件夹路径(建议以 / 结尾)
"""
session_id = get_session_id(ctx)
config = get_config_for_client(session_id)
if not config:
res = await ctx.elicit(
"首次使用此 session请填写 S3 连接参数",
response_type=RustFSMCPConfig
)
if res.action != "accept":
return "已取消"
config = res.data
save_config_for_client(session_id, config)
toolkit = S3StorageToolkit(**vars(config))
result = toolkit.create_folder(folder_path)
if result.get("success"):
return f"目录创建成功: {folder_path}"
return f"创建失败: {result.get('error', '')}"
@mcp.tool
async def rustfs_delete_directory(remote_prefix: str, ctx: Context) -> str:
"""
删除目录
Args:
remote_prefix: 要删除的目录前缀(如 foo/bar/
"""
session_id = get_session_id(ctx)
config = get_config_for_client(session_id)
if not config:
res = await ctx.elicit(
"首次使用此 session请填写 S3 连接参数",
response_type=RustFSMCPConfig
)
if res.action != "accept":
return "已取消"
config = res.data
save_config_for_client(session_id, config)
toolkit = S3StorageToolkit(**vars(config))
result = toolkit.delete_directory(remote_prefix)
if result.get("success"):
return f"目录删除成功,共 {result.get('deleted_count', 0)}"
return f"删除失败: {result.get('error', '')}"
# ==== 推荐标准中间件(日志/限流/异常/性能)====
from fastmcp.server.middleware.logging import LoggingMiddleware
from fastmcp.server.middleware.timing import TimingMiddleware
from fastmcp.server.middleware.error_handling import ErrorHandlingMiddleware
from fastmcp.server.middleware.rate_limiting import RateLimitingMiddleware
mcp.add_middleware(ErrorHandlingMiddleware()) # 全局异常兜底
mcp.add_middleware(RateLimitingMiddleware(max_requests_per_second=30, burst_capacity=60))
mcp.add_middleware(TimingMiddleware())
mcp.add_middleware(LoggingMiddleware(include_payloads=True, max_payload_length=500))
# ==== MCP 服务端启动 ====
if __name__ == "__main__":
mcp.run(
transport=os.getenv("TRANSPORT", "streamable-http"), # 支持 stdio/http/streamable-http
host=os.getenv("HOST", "0.0.0.0"),
port=int(os.getenv("PORT", 9009))
)