Files
rustfs-s3-toolkit/tests/test_toolkit.py
2025-07-24 11:26:35 +08:00

316 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
RustFS S3 Storage Toolkit 测试文件
基于成功的 test_s3_flexible.py 创建的测试套件
已在 RustFS 1.0.0-alpha.34 上完成测试
环境变量控制:
- RUN_INTEGRATION_TESTS=1: 运行真实的集成测试
- 默认: 运行模拟测试
"""
import os
import tempfile
import shutil
from pathlib import Path
from datetime import datetime
import pytest
from rustfs_s3_toolkit import S3StorageToolkit
# 检查是否运行集成测试
RUN_INTEGRATION_TESTS = os.getenv('RUN_INTEGRATION_TESTS', '0') == '1'
# 测试配置 - 请根据实际情况修改
TEST_CONFIG = {
"endpoint_url": os.getenv("S3_ENDPOINT_URL", "https://rfs.jmsu.top"),
"access_key_id": os.getenv("S3_ACCESS_KEY_ID", "lingyuzeng"),
"secret_access_key": os.getenv("S3_SECRET_ACCESS_KEY", "rustAdminlingyuzeng"),
"bucket_name": os.getenv("S3_BUCKET_NAME", "test"),
"region_name": os.getenv("S3_REGION_NAME", "us-east-1")
}
def create_test_files():
"""创建测试文件和目录"""
test_dir = tempfile.mkdtemp(prefix='s3_toolkit_test_')
# 创建测试文件
test_files = {
"test.txt": "这是一个测试文件\n时间: " + datetime.now().isoformat(),
"data.json": '{"name": "test", "value": 123, "timestamp": "' + datetime.now().isoformat() + '"}',
"readme.md": "# 测试文件\n\n这是一个测试用的 Markdown 文件。\n\n- 项目: S3 存储测试\n- 时间: " + datetime.now().isoformat(),
}
# 创建子目录和文件
sub_dir = Path(test_dir) / "subdir"
sub_dir.mkdir()
for filename, content in test_files.items():
file_path = Path(test_dir) / filename
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
# 在子目录中创建文件
sub_file = sub_dir / "sub_test.txt"
with open(sub_file, 'w', encoding='utf-8') as f:
f.write("子目录中的测试文件\n时间: " + datetime.now().isoformat())
return test_dir
@pytest.mark.skipif(not RUN_INTEGRATION_TESTS, reason="集成测试需要设置 RUN_INTEGRATION_TESTS=1")
def test_all_functions():
"""测试所有 9 个核心功能(集成测试,需要真实 S3 服务)"""
print("🚀 RustFS S3 Storage Toolkit 完整功能测试")
print("🧪 测试环境: RustFS 1.0.0-alpha.34")
print("=" * 60)
# 初始化工具包
try:
toolkit = S3StorageToolkit(**TEST_CONFIG)
print("✅ 工具包初始化成功")
except Exception as e:
print(f"❌ 工具包初始化失败: {e}")
pytest.skip(f"无法连接到 S3 服务: {e}")
return False
test_results = {}
# 1. 测试连接
print("\n1. 测试连接...")
result = toolkit.test_connection()
test_results['connection'] = result['success']
if result['success']:
print(f"✅ 连接成功: {result['message']}")
else:
print(f"❌ 连接失败: {result['error']}")
return test_results
# 2. 测试上传文件
print("\n2. 测试上传文件...")
test_content = f"测试文件内容\n时间: {datetime.now().isoformat()}"
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt', encoding='utf-8') as f:
f.write(test_content)
test_file = f.name
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
remote_key = f"test_files/single_file_{timestamp}.txt"
result = toolkit.upload_file(test_file, remote_key)
test_results['upload_file'] = result['success']
if result['success']:
print(f"✅ 文件上传成功: {remote_key}")
uploaded_file_key = remote_key
else:
print(f"❌ 文件上传失败: {result['error']}")
uploaded_file_key = None
os.unlink(test_file)
# 3. 测试创建文件夹
print("\n3. 测试创建文件夹...")
folder_path = f"test_folders/folder_{timestamp}/"
result = toolkit.create_folder(folder_path)
test_results['create_folder'] = result['success']
if result['success']:
print(f"✅ 文件夹创建成功: {folder_path}")
else:
print(f"❌ 文件夹创建失败: {result['error']}")
# 4. 测试上传目录
print("\n4. 测试上传目录...")
test_dir = create_test_files()
remote_prefix = f"test_directories/dir_{timestamp}/"
result = toolkit.upload_directory(test_dir, remote_prefix)
test_results['upload_directory'] = result['success']
if result['success']:
print(f"✅ 目录上传成功: 上传了 {result['file_count']} 个文件")
uploaded_dir_prefix = remote_prefix
else:
print(f"❌ 目录上传失败: {result['error']}")
uploaded_dir_prefix = None
shutil.rmtree(test_dir)
# 5. 测试下载文件
print("\n5. 测试下载文件...")
if uploaded_file_key:
download_path = tempfile.mktemp(suffix='_downloaded.txt')
result = toolkit.download_file(uploaded_file_key, download_path)
test_results['download_file'] = result['success']
if result['success']:
print(f"✅ 文件下载成功: {download_path}")
# 验证内容
with open(download_path, 'r', encoding='utf-8') as f:
downloaded_content = f.read()
if test_content == downloaded_content:
print("✅ 文件内容验证成功")
else:
print("❌ 文件内容验证失败")
os.unlink(download_path)
else:
print(f"❌ 文件下载失败: {result['error']}")
else:
test_results['download_file'] = False
print("❌ 跳过文件下载测试(没有可下载的文件)")
# 6. 测试下载目录
print("\n6. 测试下载目录...")
if uploaded_dir_prefix:
download_dir = tempfile.mkdtemp(prefix='s3_download_')
result = toolkit.download_directory(uploaded_dir_prefix, download_dir)
test_results['download_directory'] = result['success']
if result['success']:
print(f"✅ 目录下载成功: 下载了 {result['file_count']} 个文件")
else:
print(f"❌ 目录下载失败: {result['error']}")
shutil.rmtree(download_dir)
else:
test_results['download_directory'] = False
print("❌ 跳过目录下载测试(没有可下载的目录)")
# 7. 测试列出文件
print("\n7. 测试列出文件...")
result = toolkit.list_files(max_keys=20)
test_results['list_files'] = result['success']
if result['success']:
print(f"✅ 文件列表获取成功: 找到 {result['file_count']} 个文件")
if result['files']:
print("📄 前几个文件:")
for i, file_info in enumerate(result['files'][:5], 1):
size_mb = file_info['size'] / (1024 * 1024)
print(f" {i}. {file_info['key']} ({size_mb:.2f} MB)")
else:
print(f"❌ 文件列表获取失败: {result['error']}")
# 8. 测试删除文件
print("\n8. 测试删除文件...")
if uploaded_file_key:
result = toolkit.delete_file(uploaded_file_key)
test_results['delete_file'] = result['success']
if result['success']:
print(f"✅ 文件删除成功: {uploaded_file_key}")
else:
print(f"❌ 文件删除失败: {result['error']}")
else:
test_results['delete_file'] = False
print("❌ 跳过文件删除测试(没有可删除的文件)")
# 9. 测试删除目录
print("\n9. 测试删除目录...")
if uploaded_dir_prefix:
result = toolkit.delete_directory(uploaded_dir_prefix)
test_results['delete_directory'] = result['success']
if result['success']:
print(f"✅ 目录删除成功: 删除了 {result['deleted_count']} 个文件")
else:
print(f"❌ 目录删除失败: {result['error']}")
else:
test_results['delete_directory'] = False
print("❌ 跳过目录删除测试(没有可删除的目录)")
# 打印测试摘要
print("\n" + "="*60)
print("🎯 测试结果摘要")
print("="*60)
test_names = {
'connection': 'Connection',
'upload_file': 'Upload File',
'create_folder': 'Create Folder',
'upload_directory': 'Upload Directory',
'download_file': 'Download File',
'download_directory': 'Download Directory',
'list_files': 'List Files',
'delete_file': 'Delete File',
'delete_directory': 'Delete Directory'
}
passed_tests = 0
for test_key, test_name in test_names.items():
status = "✅ 通过" if test_results.get(test_key, False) else "❌ 失败"
print(f"{test_name:20} : {status}")
if test_results.get(test_key, False):
passed_tests += 1
print("-" * 60)
print(f"总计: {passed_tests}/{len(test_names)} 个测试通过")
if passed_tests == len(test_names):
print("🎉 所有测试都通过了RustFS S3 Storage Toolkit 工作正常。")
elif passed_tests > 0:
print("⚠️ 部分测试通过。请检查失败的测试项。")
else:
print("❌ 所有测试都失败了。请检查 RustFS 配置和网络连接。")
# 对于 pytest使用 assert 而不是 return
assert passed_tests == len(test_names), f"只有 {passed_tests}/{len(test_names)} 个测试通过"
return test_results
def test_toolkit_initialization():
"""测试工具包初始化(单元测试,不需要真实 S3 服务)"""
# 测试基本初始化
toolkit = S3StorageToolkit(
endpoint_url="https://test.example.com",
access_key_id="test_key",
secret_access_key="test_secret",
bucket_name="test-bucket"
)
assert toolkit.bucket_name == "test-bucket"
assert toolkit.endpoint_url == "https://test.example.com"
# 测试带区域的初始化
toolkit_with_region = S3StorageToolkit(
endpoint_url="https://test.example.com",
access_key_id="test_key",
secret_access_key="test_secret",
bucket_name="test-bucket",
region_name="us-west-2"
)
assert toolkit_with_region.bucket_name == "test-bucket"
assert toolkit_with_region.endpoint_url == "https://test.example.com"
def test_basic_functionality():
"""测试基本功能(确保代码结构正确)"""
# 这个测试确保所有方法都存在且可调用
toolkit = S3StorageToolkit(
endpoint_url="https://test.example.com",
access_key_id="test_key",
secret_access_key="test_secret",
bucket_name="test-bucket"
)
# 检查所有核心方法是否存在
assert hasattr(toolkit, 'test_connection')
assert hasattr(toolkit, 'upload_file')
assert hasattr(toolkit, 'create_folder')
assert hasattr(toolkit, 'upload_directory')
assert hasattr(toolkit, 'download_file')
assert hasattr(toolkit, 'download_directory')
assert hasattr(toolkit, 'list_files')
assert hasattr(toolkit, 'delete_file')
assert hasattr(toolkit, 'delete_directory')
# 检查方法是否可调用
assert callable(toolkit.test_connection)
assert callable(toolkit.upload_file)
assert callable(toolkit.create_folder)
assert callable(toolkit.upload_directory)
assert callable(toolkit.download_file)
assert callable(toolkit.download_directory)
assert callable(toolkit.list_files)
assert callable(toolkit.delete_file)
assert callable(toolkit.delete_directory)
if __name__ == "__main__":
test_all_functions()