Files
labweb/supabase-stack/examples/test_https_storage.py
zly 9fa602f21b feat(supabase): 整理 Storage 文档和示例代码
- 创建 docs/ 目录存放所有文档
  - QUICK_START.md: 快速入门指南
  - OPERATIONS_GUIDE.md: 完整运维指南
  - VUE_API_INTEGRATION.md: Vue 集成文档
  - DEPLOYMENT_INFO.md: 部署配置信息
  - versions.md: 版本信息

- 创建 examples/ 目录存放示例代码
  - storage_client.py: Python 完整客户端
  - storage_client.js: JavaScript 完整客户端
  - test_https_storage.py: 功能测试脚本

- 新增 README_STORAGE.md 作为 Storage 使用指南

- 修复签名 URL 生成问题(需要 /storage/v1 前缀)
- 测试脚本支持资源已存在的情况
- 所有客户端代码已验证可用

功能特性:
✓ 公网 HTTPS 访问
✓ 文件上传/下载
✓ 生成临时下载链接
✓ 完整的 REST API 客户端
✓ 支持 Python 和 JavaScript
2025-11-22 21:03:00 +08:00

195 lines
6.6 KiB
Python
Executable File
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
测试公网 HTTPS Supabase Storage API
快速验证所有功能是否正常
"""
import requests
import sys
BASE_URL = 'https://amiap.hzau.edu.cn/supa'
API_KEY = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJyb2xlIjoic2VydmljZV9yb2xlIiwiaXNzIjoic3VwYWJhc2UiLCJpYXQiOjE3NjM4MDI2NjksImV4cCI6MjA3OTE2MjY2OX0.gQWUaTkZ6mjjlv2TED0cODp2meqqWuCGKZR1ptIbovg'
headers = {
'apikey': API_KEY,
'Authorization': f'Bearer {API_KEY}'
}
print("=" * 70)
print(" Supabase Storage 公网 HTTPS 功能测试")
print("=" * 70)
print()
print(f"端点: {BASE_URL}/storage/v1")
print()
success_count = 0
total_tests = 5
# 测试 1: 列出 Buckets
print("1⃣ 列出 Buckets")
print("-" * 70)
try:
response = requests.get(f'{BASE_URL}/storage/v1/bucket', headers=headers)
if response.status_code == 200:
buckets = response.json()
print(f"✓ 成功! 找到 {len(buckets)} 个 bucket")
for b in buckets:
print(f" - {b['name']}")
success_count += 1
else:
print(f"✗ 失败: {response.status_code}")
except Exception as e:
print(f"✗ 错误: {e}")
print()
# 测试 2: 创建 Bucket或确认存在
print("2⃣ 确认测试 Bucket 存在")
print("-" * 70)
try:
response = requests.post(
f'{BASE_URL}/storage/v1/bucket',
headers=headers,
json={'name': 'test-https', 'public': False}
)
if response.status_code in [200, 201]:
print("✓ 创建成功")
success_count += 1
elif response.status_code == 400 and 'already exists' in response.text.lower():
print("✓ Bucket 已存在(正常)")
success_count += 1
else:
print(f"⚠️ 响应: {response.status_code} - {response.text[:100]}")
# 检查是否能列出该 bucket说明存在
list_resp = requests.get(f'{BASE_URL}/storage/v1/bucket', headers=headers)
if list_resp.ok and any(b['name'] == 'test-https' for b in list_resp.json()):
print("✓ Bucket 已确认存在")
success_count += 1
except Exception as e:
print(f"✗ 错误: {e}")
print()
# 测试 3: 上传文件(如果存在则更新)
print("3⃣ 上传/更新文件")
print("-" * 70)
test_data = b"Hello from HTTPS Storage API!\nThis is a test file.\nTimestamp: " + str(__import__('time').time()).encode()
try:
# 先尝试上传
response = requests.post(
f'{BASE_URL}/storage/v1/object/test-https/test.txt',
headers={**headers, 'Content-Type': 'text/plain'},
data=test_data
)
if response.status_code in [200, 201]:
print("✓ 上传成功")
print(f" 响应: {response.json()}")
success_count += 1
elif response.status_code == 400 and 'already exists' in response.text.lower():
# 文件已存在,使用 PUT 更新
print(" 文件已存在,尝试更新...")
response = requests.put(
f'{BASE_URL}/storage/v1/object/test-https/test.txt',
headers={**headers, 'Content-Type': 'text/plain'},
data=test_data
)
if response.status_code in [200, 201]:
print("✓ 更新成功")
print(f" 响应: {response.json()}")
success_count += 1
else:
print(f"✗ 更新失败: {response.status_code} - {response.text[:100]}")
else:
print(f"✗ 失败: {response.status_code} - {response.text[:100]}")
except Exception as e:
print(f"✗ 错误: {e}")
print()
# 测试 4: 下载文件
print("4⃣ 下载文件")
print("-" * 70)
try:
response = requests.get(
f'{BASE_URL}/storage/v1/object/test-https/test.txt',
headers=headers
)
if response.status_code == 200:
content = response.content.decode('utf-8')
print("✓ 下载成功")
print(f" 内容预览: {content[:50]}...")
print(f" 文件大小: {len(response.content)} bytes")
success_count += 1
else:
print(f"✗ 失败: {response.status_code}")
if response.text:
print(f" 错误信息: {response.text[:100]}")
except Exception as e:
print(f"✗ 错误: {e}")
print()
# 测试 5: 生成签名 URL
print("5⃣ 生成签名 URL临时下载链接")
print("-" * 70)
try:
response = requests.post(
f'{BASE_URL}/storage/v1/object/sign/test-https/test.txt',
headers=headers,
json={'expiresIn': 3600}
)
if response.status_code == 200:
data = response.json()
# 签名 URL 需要加上 /storage/v1 前缀
signed_url = BASE_URL + '/storage/v1' + data['signedURL']
print("✓ 生成成功")
print(f" 完整 URL:")
print(f" {signed_url}")
print(f" 有效期: 1 小时")
success_count += 1
# 验证签名 URL 是否可以下载(不带认证 headers因为 token 在 URL 中)
print("\n 验证签名 URL 下载...")
try:
verify_response = requests.get(signed_url) # 不带 headers
if verify_response.status_code == 200:
content_preview = verify_response.content[:50].decode('utf-8', errors='ignore')
print(f" ✓ 签名 URL 可以正常下载!")
print(f" 大小: {len(verify_response.content)} bytes")
print(f" 内容: {content_preview}...")
print(f"\n 💡 这个 URL 可以直接在浏览器中打开或分享给他人")
else:
print(f" ⚠️ 签名 URL 返回: {verify_response.status_code}")
if verify_response.text:
print(f" 错误: {verify_response.text[:100]}")
except Exception as ve:
print(f" ⚠️ 验证失败: {ve}")
else:
print(f"✗ 失败: {response.status_code}")
except Exception as e:
print(f"✗ 错误: {e}")
print()
# 总结
print("=" * 70)
print(f" 测试完成: {success_count}/{total_tests} 通过")
print("=" * 70)
print()
if success_count == total_tests:
print("🎉 所有功能正常!")
print()
print("你现在可以:")
print(" ✓ 通过公网 HTTPS 上传文件")
print(" ✓ 下载文件")
print(" ✓ 生成临时下载链接")
print(" ✓ 管理 buckets")
print()
print("📚 查看完整文档:")
print(" - QUICK_START.md 快速入门")
print(" - OPERATIONS_GUIDE.md 运维指南")
print(" - storage_client.py Python 客户端")
print(" - storage_client.js JavaScript 客户端")
sys.exit(0)
else:
print(f"⚠️ {total_tests - success_count} 个测试失败")
print("请检查配置或查看文档")
sys.exit(1)