#!/usr/bin/env python3 """ 上传测试文件到RustFS S3存储 使用测试配置信息 """ import sys import os # 添加项目根目录到Python路径 sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src')) from rustfs_s3_toolkit.s3_client import S3StorageToolkit def upload_test_file(): """上传测试文件""" # 配置信息(来自测试文件) config = { "endpoint_url": "https://rfs.jmsu.top", "access_key_id": "lingyuzeng", "secret_access_key": "rustAdminlingyuzeng", "bucket_name": "rustfs0", "region_name": "us-east-1" } # 创建S3工具包实例 toolkit = S3StorageToolkit(**config) # 测试连接 print("正在测试S3连接...") conn_result = toolkit.test_connection() if conn_result["success"]: print(f"✓ 连接成功!找到 {conn_result['bucket_count']} 个存储桶") print(f"✓ 目标存储桶 '{config['bucket_name']}' 存在: {conn_result['target_bucket_exists']}") else: print(f"✗ 连接失败: {conn_result['error']}") return False # 上传文件 local_file = "/srv/project/p2rank_2.5/p2rank_2.5/rustfs-s3-toolkit/docker/data/test-upload.txt" remote_key = "test-upload.txt" print(f"\n正在上传文件: {local_file}") print(f"目标对象键: {remote_key}") upload_result = toolkit.upload_file(local_file, remote_key) if upload_result["success"]: print(f"✓ 文件上传成功!") print(f" 存储桶: {upload_result['bucket']}") print(f" 对象键: {upload_result['key']}") print(f" 文件大小: {upload_result['file_size']} 字节") print(f" 公开URL: {upload_result['public_url']}") print(f" 上传时间: {upload_result['upload_time']}") return True else: print(f"✗ 文件上传失败: {upload_result['error']}") return False if __name__ == "__main__": success = upload_test_file() sys.exit(0 if success else 1)