#!/usr/bin/env python3 """ Supabase Storage REST API 完整客户端 无需 SDK,仅使用 requests 库 """ import requests from pathlib import Path from typing import Optional, List, Dict, Any class SupabaseStorageClient: """ Supabase Storage REST API 客户端 使用示例: client = SupabaseStorageClient( 'https://amiap.hzau.edu.cn/supa', 'your-service-role-key' ) # 创建 bucket client.create_bucket('my-bucket') # 上传文件 client.upload_file('my-bucket', 'photo.jpg', 'uploads/photo.jpg') # 生成临时下载链接 url = client.create_signed_url('my-bucket', 'uploads/photo.jpg', 3600) """ def __init__(self, base_url: str, api_key: str): """ 初始化客户端 Args: base_url: Supabase Base URL (如 https://amiap.hzau.edu.cn/supa) api_key: SERVICE_ROLE_KEY 或 ANON_KEY """ self.base_url = base_url.rstrip('/') self.headers = { 'apikey': api_key, 'Authorization': f'Bearer {api_key}' } # ==================== Bucket 管理 ==================== def list_buckets(self) -> Optional[List[Dict]]: """列出所有 buckets""" response = requests.get( f'{self.base_url}/storage/v1/bucket', headers=self.headers ) return response.json() if response.ok else None def create_bucket(self, name: str, public: bool = False, file_size_limit: int = 52428800) -> bool: """ 创建 bucket Args: name: bucket 名称 public: 是否公开 file_size_limit: 文件大小限制(字节) """ response = requests.post( f'{self.base_url}/storage/v1/bucket', headers=self.headers, json={ 'name': name, 'public': public, 'file_size_limit': file_size_limit } ) return response.ok or response.status_code == 409 def delete_bucket(self, name: str) -> bool: """删除 bucket""" response = requests.delete( f'{self.base_url}/storage/v1/bucket/{name}', headers=self.headers ) return response.ok def empty_bucket(self, name: str) -> bool: """清空 bucket""" response = requests.post( f'{self.base_url}/storage/v1/bucket/{name}/empty', headers=self.headers ) return response.ok # ==================== 文件上传 ==================== def upload_file(self, bucket: str, file_path: str, storage_path: Optional[str] = None) -> Optional[Dict]: """ 上传文件 Args: bucket: bucket 名称 file_path: 本地文件路径 storage_path: 云端存储路径(默认使用文件名) Returns: {'Key': 'path', 'Id': 'uuid'} 或 None """ if not storage_path: storage_path = Path(file_path).name with open(file_path, 'rb') as f: response = requests.post( f'{self.base_url}/storage/v1/object/{bucket}/{storage_path}', headers=self.headers, files={'file': f} ) return response.json() if response.ok else None def upload_bytes(self, bucket: str, data: bytes, storage_path: str, content_type: str = 'application/octet-stream') -> Optional[Dict]: """ 上传字节数据 Args: bucket: bucket 名称 data: 字节数据 storage_path: 云端存储路径 content_type: MIME 类型 """ response = requests.post( f'{self.base_url}/storage/v1/object/{bucket}/{storage_path}', headers={**self.headers, 'Content-Type': content_type}, data=data ) return response.json() if response.ok else None def update_file(self, bucket: str, file_path: str, storage_path: str) -> Optional[Dict]: """更新已存在的文件""" with open(file_path, 'rb') as f: response = requests.put( f'{self.base_url}/storage/v1/object/{bucket}/{storage_path}', headers=self.headers, files={'file': f} ) return response.json() if response.ok else None # ==================== 文件下载 ==================== def download_file(self, bucket: str, storage_path: str, local_path: str) -> bool: """ 下载文件到本地 Args: bucket: bucket 名称 storage_path: 云端文件路径 local_path: 本地保存路径 """ response = requests.get( f'{self.base_url}/storage/v1/object/{bucket}/{storage_path}', headers=self.headers ) if response.ok: with open(local_path, 'wb') as f: f.write(response.content) return True return False def download_bytes(self, bucket: str, storage_path: str) -> Optional[bytes]: """下载文件为字节数据""" response = requests.get( f'{self.base_url}/storage/v1/object/{bucket}/{storage_path}', headers=self.headers ) return response.content if response.ok else None # ==================== 文件列表 ==================== def list_files(self, bucket: str, folder: str = '', limit: int = 100, offset: int = 0) -> Optional[List[Dict]]: """ 列出文件 Args: bucket: bucket 名称 folder: 文件夹路径 limit: 返回数量限制 offset: 偏移量 """ response = requests.post( f'{self.base_url}/storage/v1/object/list/{bucket}', headers=self.headers, json={ 'prefix': folder, 'limit': limit, 'offset': offset, 'sortBy': {'column': 'name', 'order': 'asc'} } ) return response.json() if response.ok else None def search_files(self, bucket: str, search_text: str, limit: int = 100) -> Optional[List[Dict]]: """搜索文件""" response = requests.post( f'{self.base_url}/storage/v1/object/list/{bucket}', headers=self.headers, json={'search': search_text, 'limit': limit} ) return response.json() if response.ok else None # ==================== 文件操作 ==================== def move_file(self, bucket: str, from_path: str, to_path: str) -> bool: """移动文件""" response = requests.post( f'{self.base_url}/storage/v1/object/move', headers=self.headers, json={ 'bucketId': bucket, 'sourceKey': from_path, 'destinationKey': to_path } ) return response.ok def copy_file(self, bucket: str, from_path: str, to_path: str) -> Optional[Dict]: """复制文件""" response = requests.post( f'{self.base_url}/storage/v1/object/copy', headers=self.headers, json={ 'bucketId': bucket, 'sourceKey': from_path, 'destinationKey': to_path } ) return response.json() if response.ok else None def delete_file(self, bucket: str, storage_path: str) -> bool: """删除文件""" response = requests.delete( f'{self.base_url}/storage/v1/object/{bucket}/{storage_path}', headers=self.headers ) return response.ok def delete_files(self, bucket: str, file_paths: List[str]) -> bool: """批量删除文件""" response = requests.delete( f'{self.base_url}/storage/v1/object/{bucket}', headers=self.headers, json={'prefixes': file_paths} ) return response.ok # ==================== URL 生成 ==================== def create_signed_url(self, bucket: str, storage_path: str, expires_in: int = 3600) -> Optional[str]: """ 生成签名 URL(临时下载链接) Args: bucket: bucket 名称 storage_path: 文件路径 expires_in: 过期时间(秒),默认 1 小时 Returns: 完整的签名 URL """ response = requests.post( f'{self.base_url}/storage/v1/object/sign/{bucket}/{storage_path}', headers=self.headers, json={'expiresIn': expires_in} ) if response.ok: data = response.json() # 签名 URL 需要加上 /storage/v1 前缀 return self.base_url + '/storage/v1' + data['signedURL'] return None def create_upload_signed_url(self, bucket: str, storage_path: str) -> Optional[Dict]: """ 生成上传签名 URL(允许前端直接上传) Returns: {'url': upload_url, 'token': token} """ response = requests.post( f'{self.base_url}/storage/v1/object/upload/sign/{bucket}/{storage_path}', headers=self.headers ) if response.ok: data = response.json() return { 'url': self.base_url + '/storage/v1' + data['url'], 'token': data['token'] } return None def get_public_url(self, bucket: str, storage_path: str) -> str: """获取公开 URL(仅适用于 public bucket)""" return f'{self.base_url}/storage/v1/object/public/{bucket}/{storage_path}' # ==================== 使用示例 ==================== if __name__ == '__main__': # 初始化客户端 client = SupabaseStorageClient( base_url='https://amiap.hzau.edu.cn/supa', api_key='your-service-role-key' ) print("Supabase Storage 客户端示例") print("=" * 60) # 1. 创建 bucket print("\n1. 创建 bucket...") client.create_bucket('test-bucket', public=False) # 2. 列出 buckets print("\n2. 列出所有 buckets...") buckets = client.list_buckets() if buckets: for bucket in buckets: print(f" - {bucket['name']}") # 3. 上传文件 print("\n3. 上传文件...") result = client.upload_bytes( 'test-bucket', b'Hello from Supabase Storage!', 'test/hello.txt', 'text/plain' ) if result: print(f" ✓ 上传成功: {result['Key']}") # 4. 生成临时下载链接 print("\n4. 生成临时下载链接...") url = client.create_signed_url('test-bucket', 'test/hello.txt', 3600) if url: print(f" ✓ URL (1小时有效): {url[:80]}...") print("\n" + "=" * 60) print("示例完成!查看 OPERATIONS_GUIDE.md 了解更多用法")