62 lines
2.0 KiB
Python
62 lines
2.0 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
上传测试文件到RustFS S3存储
|
|
使用测试配置信息
|
|
"""
|
|
import sys
|
|
import os
|
|
|
|
# 添加项目根目录到Python路径
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))
|
|
|
|
from rustfs_s3_toolkit.s3_client import S3StorageToolkit
|
|
|
|
def upload_test_file():
|
|
"""上传测试文件"""
|
|
|
|
# 配置信息(来自测试文件)
|
|
config = {
|
|
"endpoint_url": "https://rfs.jmsu.top",
|
|
"access_key_id": "lingyuzeng",
|
|
"secret_access_key": "rustAdminlingyuzeng",
|
|
"bucket_name": "rustfs0",
|
|
"region_name": "us-east-1"
|
|
}
|
|
|
|
# 创建S3工具包实例
|
|
toolkit = S3StorageToolkit(**config)
|
|
|
|
# 测试连接
|
|
print("正在测试S3连接...")
|
|
conn_result = toolkit.test_connection()
|
|
if conn_result["success"]:
|
|
print(f"✓ 连接成功!找到 {conn_result['bucket_count']} 个存储桶")
|
|
print(f"✓ 目标存储桶 '{config['bucket_name']}' 存在: {conn_result['target_bucket_exists']}")
|
|
else:
|
|
print(f"✗ 连接失败: {conn_result['error']}")
|
|
return False
|
|
|
|
# 上传文件
|
|
local_file = "/srv/project/p2rank_2.5/p2rank_2.5/rustfs-s3-toolkit/docker/data/test-upload.txt"
|
|
remote_key = "test-upload.txt"
|
|
|
|
print(f"\n正在上传文件: {local_file}")
|
|
print(f"目标对象键: {remote_key}")
|
|
|
|
upload_result = toolkit.upload_file(local_file, remote_key)
|
|
|
|
if upload_result["success"]:
|
|
print(f"✓ 文件上传成功!")
|
|
print(f" 存储桶: {upload_result['bucket']}")
|
|
print(f" 对象键: {upload_result['key']}")
|
|
print(f" 文件大小: {upload_result['file_size']} 字节")
|
|
print(f" 公开URL: {upload_result['public_url']}")
|
|
print(f" 上传时间: {upload_result['upload_time']}")
|
|
return True
|
|
else:
|
|
print(f"✗ 文件上传失败: {upload_result['error']}")
|
|
return False
|
|
|
|
if __name__ == "__main__":
|
|
success = upload_test_file()
|
|
sys.exit(0 if success else 1) |