Files
rustfs-s3-toolkit/tests/test_mcp.py
2025-07-29 16:54:02 +08:00

40 lines
1.4 KiB
Python

import asyncio
from fastmcp import Client
from fastmcp.client.elicitation import ElicitResult
async def elicitation_handler(message, response_type, params, ctx):
# Mock providing RustFS configuration
if "RustFS 配置信息" in message:
return ElicitResult(action="accept", content=response_type(
endpoint_url="https://rfs.jmsu.top",
access_key_id="lingyuzeng",
secret_access_key="rustAdminlingyuzeng",
bucket_name="rustfs0",
region_name="us-east-1"
))
# Mock providing file upload info
elif "文件路径" in message:
return ElicitResult(action="accept", content={
"file_path": "/app/data/README.md",
"object_key": "README.md"
})
else:
return ElicitResult(action="decline")
async def test_server():
async with Client(
"http://127.0.0.1:8004/mcp/",
elicitation_handler=elicitation_handler
) as client:
# Test connection
await client.ping()
# List available tools
tools = await client.list_tools()
print("可用工具:", [tool.name for tool in tools])
# Call tool (will trigger configuration validation)
result = await client.call_tool("rustfs_upload_file", {})
print(result)
asyncio.run(test_server())