40 lines
1.4 KiB
Python
40 lines
1.4 KiB
Python
import asyncio
|
|
from fastmcp import Client
|
|
from fastmcp.client.elicitation import ElicitResult
|
|
|
|
async def elicitation_handler(message, response_type, params, ctx):
|
|
# Mock providing RustFS configuration
|
|
if "RustFS 配置信息" in message:
|
|
return ElicitResult(action="accept", content=response_type(
|
|
endpoint_url="https://rfs.jmsu.top",
|
|
access_key_id="lingyuzeng",
|
|
secret_access_key="rustAdminlingyuzeng",
|
|
bucket_name="rustfs0",
|
|
region_name="us-east-1"
|
|
))
|
|
# Mock providing file upload info
|
|
elif "文件路径" in message:
|
|
return ElicitResult(action="accept", content={
|
|
"file_path": "/app/data/README.md",
|
|
"object_key": "README.md"
|
|
})
|
|
else:
|
|
return ElicitResult(action="decline")
|
|
|
|
async def test_server():
|
|
async with Client(
|
|
"http://127.0.0.1:8004/mcp/",
|
|
elicitation_handler=elicitation_handler
|
|
) as client:
|
|
# Test connection
|
|
await client.ping()
|
|
|
|
# List available tools
|
|
tools = await client.list_tools()
|
|
print("可用工具:", [tool.name for tool in tools])
|
|
|
|
# Call tool (will trigger configuration validation)
|
|
result = await client.call_tool("rustfs_upload_file", {})
|
|
print(result)
|
|
|
|
asyncio.run(test_server()) |