Files
labweb/supabase-stack/examples/storage_client.py
zly 9fa602f21b feat(supabase): 整理 Storage 文档和示例代码
- 创建 docs/ 目录存放所有文档
  - QUICK_START.md: 快速入门指南
  - OPERATIONS_GUIDE.md: 完整运维指南
  - VUE_API_INTEGRATION.md: Vue 集成文档
  - DEPLOYMENT_INFO.md: 部署配置信息
  - versions.md: 版本信息

- 创建 examples/ 目录存放示例代码
  - storage_client.py: Python 完整客户端
  - storage_client.js: JavaScript 完整客户端
  - test_https_storage.py: 功能测试脚本

- 新增 README_STORAGE.md 作为 Storage 使用指南

- 修复签名 URL 生成问题(需要 /storage/v1 前缀)
- 测试脚本支持资源已存在的情况
- 所有客户端代码已验证可用

功能特性:
✓ 公网 HTTPS 访问
✓ 文件上传/下载
✓ 生成临时下载链接
✓ 完整的 REST API 客户端
✓ 支持 Python 和 JavaScript
2025-11-22 21:03:00 +08:00

350 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Supabase Storage REST API 完整客户端
无需 SDK仅使用 requests 库
"""
import requests
from pathlib import Path
from typing import Optional, List, Dict, Any
class SupabaseStorageClient:
"""
Supabase Storage REST API 客户端
使用示例:
client = SupabaseStorageClient(
'https://amiap.hzau.edu.cn/supa',
'your-service-role-key'
)
# 创建 bucket
client.create_bucket('my-bucket')
# 上传文件
client.upload_file('my-bucket', 'photo.jpg', 'uploads/photo.jpg')
# 生成临时下载链接
url = client.create_signed_url('my-bucket', 'uploads/photo.jpg', 3600)
"""
def __init__(self, base_url: str, api_key: str):
"""
初始化客户端
Args:
base_url: Supabase Base URL (如 https://amiap.hzau.edu.cn/supa)
api_key: SERVICE_ROLE_KEY 或 ANON_KEY
"""
self.base_url = base_url.rstrip('/')
self.headers = {
'apikey': api_key,
'Authorization': f'Bearer {api_key}'
}
# ==================== Bucket 管理 ====================
def list_buckets(self) -> Optional[List[Dict]]:
"""列出所有 buckets"""
response = requests.get(
f'{self.base_url}/storage/v1/bucket',
headers=self.headers
)
return response.json() if response.ok else None
def create_bucket(self, name: str, public: bool = False,
file_size_limit: int = 52428800) -> bool:
"""
创建 bucket
Args:
name: bucket 名称
public: 是否公开
file_size_limit: 文件大小限制(字节)
"""
response = requests.post(
f'{self.base_url}/storage/v1/bucket',
headers=self.headers,
json={
'name': name,
'public': public,
'file_size_limit': file_size_limit
}
)
return response.ok or response.status_code == 409
def delete_bucket(self, name: str) -> bool:
"""删除 bucket"""
response = requests.delete(
f'{self.base_url}/storage/v1/bucket/{name}',
headers=self.headers
)
return response.ok
def empty_bucket(self, name: str) -> bool:
"""清空 bucket"""
response = requests.post(
f'{self.base_url}/storage/v1/bucket/{name}/empty',
headers=self.headers
)
return response.ok
# ==================== 文件上传 ====================
def upload_file(self, bucket: str, file_path: str,
storage_path: Optional[str] = None) -> Optional[Dict]:
"""
上传文件
Args:
bucket: bucket 名称
file_path: 本地文件路径
storage_path: 云端存储路径(默认使用文件名)
Returns:
{'Key': 'path', 'Id': 'uuid'} 或 None
"""
if not storage_path:
storage_path = Path(file_path).name
with open(file_path, 'rb') as f:
response = requests.post(
f'{self.base_url}/storage/v1/object/{bucket}/{storage_path}',
headers=self.headers,
files={'file': f}
)
return response.json() if response.ok else None
def upload_bytes(self, bucket: str, data: bytes, storage_path: str,
content_type: str = 'application/octet-stream') -> Optional[Dict]:
"""
上传字节数据
Args:
bucket: bucket 名称
data: 字节数据
storage_path: 云端存储路径
content_type: MIME 类型
"""
response = requests.post(
f'{self.base_url}/storage/v1/object/{bucket}/{storage_path}',
headers={**self.headers, 'Content-Type': content_type},
data=data
)
return response.json() if response.ok else None
def update_file(self, bucket: str, file_path: str, storage_path: str) -> Optional[Dict]:
"""更新已存在的文件"""
with open(file_path, 'rb') as f:
response = requests.put(
f'{self.base_url}/storage/v1/object/{bucket}/{storage_path}',
headers=self.headers,
files={'file': f}
)
return response.json() if response.ok else None
# ==================== 文件下载 ====================
def download_file(self, bucket: str, storage_path: str, local_path: str) -> bool:
"""
下载文件到本地
Args:
bucket: bucket 名称
storage_path: 云端文件路径
local_path: 本地保存路径
"""
response = requests.get(
f'{self.base_url}/storage/v1/object/{bucket}/{storage_path}',
headers=self.headers
)
if response.ok:
with open(local_path, 'wb') as f:
f.write(response.content)
return True
return False
def download_bytes(self, bucket: str, storage_path: str) -> Optional[bytes]:
"""下载文件为字节数据"""
response = requests.get(
f'{self.base_url}/storage/v1/object/{bucket}/{storage_path}',
headers=self.headers
)
return response.content if response.ok else None
# ==================== 文件列表 ====================
def list_files(self, bucket: str, folder: str = '',
limit: int = 100, offset: int = 0) -> Optional[List[Dict]]:
"""
列出文件
Args:
bucket: bucket 名称
folder: 文件夹路径
limit: 返回数量限制
offset: 偏移量
"""
response = requests.post(
f'{self.base_url}/storage/v1/object/list/{bucket}',
headers=self.headers,
json={
'prefix': folder,
'limit': limit,
'offset': offset,
'sortBy': {'column': 'name', 'order': 'asc'}
}
)
return response.json() if response.ok else None
def search_files(self, bucket: str, search_text: str, limit: int = 100) -> Optional[List[Dict]]:
"""搜索文件"""
response = requests.post(
f'{self.base_url}/storage/v1/object/list/{bucket}',
headers=self.headers,
json={'search': search_text, 'limit': limit}
)
return response.json() if response.ok else None
# ==================== 文件操作 ====================
def move_file(self, bucket: str, from_path: str, to_path: str) -> bool:
"""移动文件"""
response = requests.post(
f'{self.base_url}/storage/v1/object/move',
headers=self.headers,
json={
'bucketId': bucket,
'sourceKey': from_path,
'destinationKey': to_path
}
)
return response.ok
def copy_file(self, bucket: str, from_path: str, to_path: str) -> Optional[Dict]:
"""复制文件"""
response = requests.post(
f'{self.base_url}/storage/v1/object/copy',
headers=self.headers,
json={
'bucketId': bucket,
'sourceKey': from_path,
'destinationKey': to_path
}
)
return response.json() if response.ok else None
def delete_file(self, bucket: str, storage_path: str) -> bool:
"""删除文件"""
response = requests.delete(
f'{self.base_url}/storage/v1/object/{bucket}/{storage_path}',
headers=self.headers
)
return response.ok
def delete_files(self, bucket: str, file_paths: List[str]) -> bool:
"""批量删除文件"""
response = requests.delete(
f'{self.base_url}/storage/v1/object/{bucket}',
headers=self.headers,
json={'prefixes': file_paths}
)
return response.ok
# ==================== URL 生成 ====================
def create_signed_url(self, bucket: str, storage_path: str,
expires_in: int = 3600) -> Optional[str]:
"""
生成签名 URL临时下载链接
Args:
bucket: bucket 名称
storage_path: 文件路径
expires_in: 过期时间(秒),默认 1 小时
Returns:
完整的签名 URL
"""
response = requests.post(
f'{self.base_url}/storage/v1/object/sign/{bucket}/{storage_path}',
headers=self.headers,
json={'expiresIn': expires_in}
)
if response.ok:
data = response.json()
# 签名 URL 需要加上 /storage/v1 前缀
return self.base_url + '/storage/v1' + data['signedURL']
return None
def create_upload_signed_url(self, bucket: str, storage_path: str) -> Optional[Dict]:
"""
生成上传签名 URL允许前端直接上传
Returns:
{'url': upload_url, 'token': token}
"""
response = requests.post(
f'{self.base_url}/storage/v1/object/upload/sign/{bucket}/{storage_path}',
headers=self.headers
)
if response.ok:
data = response.json()
return {
'url': self.base_url + '/storage/v1' + data['url'],
'token': data['token']
}
return None
def get_public_url(self, bucket: str, storage_path: str) -> str:
"""获取公开 URL仅适用于 public bucket"""
return f'{self.base_url}/storage/v1/object/public/{bucket}/{storage_path}'
# ==================== 使用示例 ====================
if __name__ == '__main__':
# 初始化客户端
client = SupabaseStorageClient(
base_url='https://amiap.hzau.edu.cn/supa',
api_key='your-service-role-key'
)
print("Supabase Storage 客户端示例")
print("=" * 60)
# 1. 创建 bucket
print("\n1. 创建 bucket...")
client.create_bucket('test-bucket', public=False)
# 2. 列出 buckets
print("\n2. 列出所有 buckets...")
buckets = client.list_buckets()
if buckets:
for bucket in buckets:
print(f" - {bucket['name']}")
# 3. 上传文件
print("\n3. 上传文件...")
result = client.upload_bytes(
'test-bucket',
b'Hello from Supabase Storage!',
'test/hello.txt',
'text/plain'
)
if result:
print(f" ✓ 上传成功: {result['Key']}")
# 4. 生成临时下载链接
print("\n4. 生成临时下载链接...")
url = client.create_signed_url('test-bucket', 'test/hello.txt', 3600)
if url:
print(f" ✓ URL (1小时有效): {url[:80]}...")
print("\n" + "=" * 60)
print("示例完成!查看 OPERATIONS_GUIDE.md 了解更多用法")