Files
rustfs-s3-toolkit/tests/upload_test.py
2025-07-29 16:54:02 +08:00

62 lines
2.0 KiB
Python

#!/usr/bin/env python3
"""
上传测试文件到RustFS S3存储
使用测试配置信息
"""
import sys
import os
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))
from rustfs_s3_toolkit.s3_client import S3StorageToolkit
def upload_test_file():
"""上传测试文件"""
# 配置信息(来自测试文件)
config = {
"endpoint_url": "https://rfs.jmsu.top",
"access_key_id": "lingyuzeng",
"secret_access_key": "rustAdminlingyuzeng",
"bucket_name": "rustfs0",
"region_name": "us-east-1"
}
# 创建S3工具包实例
toolkit = S3StorageToolkit(**config)
# 测试连接
print("正在测试S3连接...")
conn_result = toolkit.test_connection()
if conn_result["success"]:
print(f"✓ 连接成功!找到 {conn_result['bucket_count']} 个存储桶")
print(f"✓ 目标存储桶 '{config['bucket_name']}' 存在: {conn_result['target_bucket_exists']}")
else:
print(f"✗ 连接失败: {conn_result['error']}")
return False
# 上传文件
local_file = "/srv/project/p2rank_2.5/p2rank_2.5/rustfs-s3-toolkit/docker/data/test-upload.txt"
remote_key = "test-upload.txt"
print(f"\n正在上传文件: {local_file}")
print(f"目标对象键: {remote_key}")
upload_result = toolkit.upload_file(local_file, remote_key)
if upload_result["success"]:
print(f"✓ 文件上传成功!")
print(f" 存储桶: {upload_result['bucket']}")
print(f" 对象键: {upload_result['key']}")
print(f" 文件大小: {upload_result['file_size']} 字节")
print(f" 公开URL: {upload_result['public_url']}")
print(f" 上传时间: {upload_result['upload_time']}")
return True
else:
print(f"✗ 文件上传失败: {upload_result['error']}")
return False
if __name__ == "__main__":
success = upload_test_file()
sys.exit(0 if success else 1)