feat:更新fastmcp2.10.7,add testfile
This commit is contained in:
62
tests/upload_test.py
Normal file
62
tests/upload_test.py
Normal file
@@ -0,0 +1,62 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
上传测试文件到RustFS S3存储
|
||||
使用测试配置信息
|
||||
"""
|
||||
import sys
|
||||
import os
|
||||
|
||||
# 添加项目根目录到Python路径
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))
|
||||
|
||||
from rustfs_s3_toolkit.s3_client import S3StorageToolkit
|
||||
|
||||
def upload_test_file():
|
||||
"""上传测试文件"""
|
||||
|
||||
# 配置信息(来自测试文件)
|
||||
config = {
|
||||
"endpoint_url": "https://rfs.jmsu.top",
|
||||
"access_key_id": "lingyuzeng",
|
||||
"secret_access_key": "rustAdminlingyuzeng",
|
||||
"bucket_name": "rustfs0",
|
||||
"region_name": "us-east-1"
|
||||
}
|
||||
|
||||
# 创建S3工具包实例
|
||||
toolkit = S3StorageToolkit(**config)
|
||||
|
||||
# 测试连接
|
||||
print("正在测试S3连接...")
|
||||
conn_result = toolkit.test_connection()
|
||||
if conn_result["success"]:
|
||||
print(f"✓ 连接成功!找到 {conn_result['bucket_count']} 个存储桶")
|
||||
print(f"✓ 目标存储桶 '{config['bucket_name']}' 存在: {conn_result['target_bucket_exists']}")
|
||||
else:
|
||||
print(f"✗ 连接失败: {conn_result['error']}")
|
||||
return False
|
||||
|
||||
# 上传文件
|
||||
local_file = "/srv/project/p2rank_2.5/p2rank_2.5/rustfs-s3-toolkit/docker/data/test-upload.txt"
|
||||
remote_key = "test-upload.txt"
|
||||
|
||||
print(f"\n正在上传文件: {local_file}")
|
||||
print(f"目标对象键: {remote_key}")
|
||||
|
||||
upload_result = toolkit.upload_file(local_file, remote_key)
|
||||
|
||||
if upload_result["success"]:
|
||||
print(f"✓ 文件上传成功!")
|
||||
print(f" 存储桶: {upload_result['bucket']}")
|
||||
print(f" 对象键: {upload_result['key']}")
|
||||
print(f" 文件大小: {upload_result['file_size']} 字节")
|
||||
print(f" 公开URL: {upload_result['public_url']}")
|
||||
print(f" 上传时间: {upload_result['upload_time']}")
|
||||
return True
|
||||
else:
|
||||
print(f"✗ 文件上传失败: {upload_result['error']}")
|
||||
return False
|
||||
|
||||
if __name__ == "__main__":
|
||||
success = upload_test_file()
|
||||
sys.exit(0 if success else 1)
|
||||
Reference in New Issue
Block a user