feat(supabase): 整理 Storage 文档和示例代码

- 创建 docs/ 目录存放所有文档
  - QUICK_START.md: 快速入门指南
  - OPERATIONS_GUIDE.md: 完整运维指南
  - VUE_API_INTEGRATION.md: Vue 集成文档
  - DEPLOYMENT_INFO.md: 部署配置信息
  - versions.md: 版本信息

- 创建 examples/ 目录存放示例代码
  - storage_client.py: Python 完整客户端
  - storage_client.js: JavaScript 完整客户端
  - test_https_storage.py: 功能测试脚本

- 新增 README_STORAGE.md 作为 Storage 使用指南

- 修复签名 URL 生成问题(需要 /storage/v1 前缀)
- 测试脚本支持资源已存在的情况
- 所有客户端代码已验证可用

功能特性:
✓ 公网 HTTPS 访问
✓ 文件上传/下载
✓ 生成临时下载链接
✓ 完整的 REST API 客户端
✓ 支持 Python 和 JavaScript
This commit is contained in:
zly
2025-11-22 21:03:00 +08:00
parent 8b068d8171
commit 9fa602f21b
9 changed files with 3261 additions and 0 deletions

View File

@@ -0,0 +1,349 @@
#!/usr/bin/env python3
"""
Supabase Storage REST API 完整客户端
无需 SDK仅使用 requests 库
"""
import requests
from pathlib import Path
from typing import Optional, List, Dict, Any
class SupabaseStorageClient:
"""
Supabase Storage REST API 客户端
使用示例:
client = SupabaseStorageClient(
'https://amiap.hzau.edu.cn/supa',
'your-service-role-key'
)
# 创建 bucket
client.create_bucket('my-bucket')
# 上传文件
client.upload_file('my-bucket', 'photo.jpg', 'uploads/photo.jpg')
# 生成临时下载链接
url = client.create_signed_url('my-bucket', 'uploads/photo.jpg', 3600)
"""
def __init__(self, base_url: str, api_key: str):
"""
初始化客户端
Args:
base_url: Supabase Base URL (如 https://amiap.hzau.edu.cn/supa)
api_key: SERVICE_ROLE_KEY 或 ANON_KEY
"""
self.base_url = base_url.rstrip('/')
self.headers = {
'apikey': api_key,
'Authorization': f'Bearer {api_key}'
}
# ==================== Bucket 管理 ====================
def list_buckets(self) -> Optional[List[Dict]]:
"""列出所有 buckets"""
response = requests.get(
f'{self.base_url}/storage/v1/bucket',
headers=self.headers
)
return response.json() if response.ok else None
def create_bucket(self, name: str, public: bool = False,
file_size_limit: int = 52428800) -> bool:
"""
创建 bucket
Args:
name: bucket 名称
public: 是否公开
file_size_limit: 文件大小限制(字节)
"""
response = requests.post(
f'{self.base_url}/storage/v1/bucket',
headers=self.headers,
json={
'name': name,
'public': public,
'file_size_limit': file_size_limit
}
)
return response.ok or response.status_code == 409
def delete_bucket(self, name: str) -> bool:
"""删除 bucket"""
response = requests.delete(
f'{self.base_url}/storage/v1/bucket/{name}',
headers=self.headers
)
return response.ok
def empty_bucket(self, name: str) -> bool:
"""清空 bucket"""
response = requests.post(
f'{self.base_url}/storage/v1/bucket/{name}/empty',
headers=self.headers
)
return response.ok
# ==================== 文件上传 ====================
def upload_file(self, bucket: str, file_path: str,
storage_path: Optional[str] = None) -> Optional[Dict]:
"""
上传文件
Args:
bucket: bucket 名称
file_path: 本地文件路径
storage_path: 云端存储路径(默认使用文件名)
Returns:
{'Key': 'path', 'Id': 'uuid'} 或 None
"""
if not storage_path:
storage_path = Path(file_path).name
with open(file_path, 'rb') as f:
response = requests.post(
f'{self.base_url}/storage/v1/object/{bucket}/{storage_path}',
headers=self.headers,
files={'file': f}
)
return response.json() if response.ok else None
def upload_bytes(self, bucket: str, data: bytes, storage_path: str,
content_type: str = 'application/octet-stream') -> Optional[Dict]:
"""
上传字节数据
Args:
bucket: bucket 名称
data: 字节数据
storage_path: 云端存储路径
content_type: MIME 类型
"""
response = requests.post(
f'{self.base_url}/storage/v1/object/{bucket}/{storage_path}',
headers={**self.headers, 'Content-Type': content_type},
data=data
)
return response.json() if response.ok else None
def update_file(self, bucket: str, file_path: str, storage_path: str) -> Optional[Dict]:
"""更新已存在的文件"""
with open(file_path, 'rb') as f:
response = requests.put(
f'{self.base_url}/storage/v1/object/{bucket}/{storage_path}',
headers=self.headers,
files={'file': f}
)
return response.json() if response.ok else None
# ==================== 文件下载 ====================
def download_file(self, bucket: str, storage_path: str, local_path: str) -> bool:
"""
下载文件到本地
Args:
bucket: bucket 名称
storage_path: 云端文件路径
local_path: 本地保存路径
"""
response = requests.get(
f'{self.base_url}/storage/v1/object/{bucket}/{storage_path}',
headers=self.headers
)
if response.ok:
with open(local_path, 'wb') as f:
f.write(response.content)
return True
return False
def download_bytes(self, bucket: str, storage_path: str) -> Optional[bytes]:
"""下载文件为字节数据"""
response = requests.get(
f'{self.base_url}/storage/v1/object/{bucket}/{storage_path}',
headers=self.headers
)
return response.content if response.ok else None
# ==================== 文件列表 ====================
def list_files(self, bucket: str, folder: str = '',
limit: int = 100, offset: int = 0) -> Optional[List[Dict]]:
"""
列出文件
Args:
bucket: bucket 名称
folder: 文件夹路径
limit: 返回数量限制
offset: 偏移量
"""
response = requests.post(
f'{self.base_url}/storage/v1/object/list/{bucket}',
headers=self.headers,
json={
'prefix': folder,
'limit': limit,
'offset': offset,
'sortBy': {'column': 'name', 'order': 'asc'}
}
)
return response.json() if response.ok else None
def search_files(self, bucket: str, search_text: str, limit: int = 100) -> Optional[List[Dict]]:
"""搜索文件"""
response = requests.post(
f'{self.base_url}/storage/v1/object/list/{bucket}',
headers=self.headers,
json={'search': search_text, 'limit': limit}
)
return response.json() if response.ok else None
# ==================== 文件操作 ====================
def move_file(self, bucket: str, from_path: str, to_path: str) -> bool:
"""移动文件"""
response = requests.post(
f'{self.base_url}/storage/v1/object/move',
headers=self.headers,
json={
'bucketId': bucket,
'sourceKey': from_path,
'destinationKey': to_path
}
)
return response.ok
def copy_file(self, bucket: str, from_path: str, to_path: str) -> Optional[Dict]:
"""复制文件"""
response = requests.post(
f'{self.base_url}/storage/v1/object/copy',
headers=self.headers,
json={
'bucketId': bucket,
'sourceKey': from_path,
'destinationKey': to_path
}
)
return response.json() if response.ok else None
def delete_file(self, bucket: str, storage_path: str) -> bool:
"""删除文件"""
response = requests.delete(
f'{self.base_url}/storage/v1/object/{bucket}/{storage_path}',
headers=self.headers
)
return response.ok
def delete_files(self, bucket: str, file_paths: List[str]) -> bool:
"""批量删除文件"""
response = requests.delete(
f'{self.base_url}/storage/v1/object/{bucket}',
headers=self.headers,
json={'prefixes': file_paths}
)
return response.ok
# ==================== URL 生成 ====================
def create_signed_url(self, bucket: str, storage_path: str,
expires_in: int = 3600) -> Optional[str]:
"""
生成签名 URL临时下载链接
Args:
bucket: bucket 名称
storage_path: 文件路径
expires_in: 过期时间(秒),默认 1 小时
Returns:
完整的签名 URL
"""
response = requests.post(
f'{self.base_url}/storage/v1/object/sign/{bucket}/{storage_path}',
headers=self.headers,
json={'expiresIn': expires_in}
)
if response.ok:
data = response.json()
# 签名 URL 需要加上 /storage/v1 前缀
return self.base_url + '/storage/v1' + data['signedURL']
return None
def create_upload_signed_url(self, bucket: str, storage_path: str) -> Optional[Dict]:
"""
生成上传签名 URL允许前端直接上传
Returns:
{'url': upload_url, 'token': token}
"""
response = requests.post(
f'{self.base_url}/storage/v1/object/upload/sign/{bucket}/{storage_path}',
headers=self.headers
)
if response.ok:
data = response.json()
return {
'url': self.base_url + '/storage/v1' + data['url'],
'token': data['token']
}
return None
def get_public_url(self, bucket: str, storage_path: str) -> str:
"""获取公开 URL仅适用于 public bucket"""
return f'{self.base_url}/storage/v1/object/public/{bucket}/{storage_path}'
# ==================== 使用示例 ====================
if __name__ == '__main__':
# 初始化客户端
client = SupabaseStorageClient(
base_url='https://amiap.hzau.edu.cn/supa',
api_key='your-service-role-key'
)
print("Supabase Storage 客户端示例")
print("=" * 60)
# 1. 创建 bucket
print("\n1. 创建 bucket...")
client.create_bucket('test-bucket', public=False)
# 2. 列出 buckets
print("\n2. 列出所有 buckets...")
buckets = client.list_buckets()
if buckets:
for bucket in buckets:
print(f" - {bucket['name']}")
# 3. 上传文件
print("\n3. 上传文件...")
result = client.upload_bytes(
'test-bucket',
b'Hello from Supabase Storage!',
'test/hello.txt',
'text/plain'
)
if result:
print(f" ✓ 上传成功: {result['Key']}")
# 4. 生成临时下载链接
print("\n4. 生成临时下载链接...")
url = client.create_signed_url('test-bucket', 'test/hello.txt', 3600)
if url:
print(f" ✓ URL (1小时有效): {url[:80]}...")
print("\n" + "=" * 60)
print("示例完成!查看 OPERATIONS_GUIDE.md 了解更多用法")