存储架构¶
本文档详细介绍 Unifiles 的对象存储系统设计,包括 MinIO 配置、存储策略和数据组织方式。
存储技术栈¶
存储层级¶
Bucket 组织结构¶
unifiles-storage/
├── raw/ # 原始上传文件
│ └── {user_id}/
│ └── {file_id}/
│ └── {filename}
│
├── processed/ # 处理后文件
│ └── {user_id}/
│ └── {file_id}/
│ ├── converted.pdf
│ ├── pages/
│ │ ├── page_001.png
│ │ └── page_002.png
│ └── ocr_result.json
│
├── cache/ # 临时缓存
│ └── {date}/
│ └── {task_id}/
│
└── exports/ # 导出文件
└── {user_id}/
└── {export_id}/
存储键命名规范¶
# 原始文件
raw/{user_id}/{file_id}/{original_filename}
# 转换后 PDF
processed/{user_id}/{file_id}/converted.pdf
# 页面图片
processed/{user_id}/{file_id}/pages/page_{page_num:03d}.png
# OCR 结果
processed/{user_id}/{file_id}/ocr_result.json
# 缓存文件 (24小时过期)
cache/{date}/{task_id}/{filename}
# 导出文件
exports/{user_id}/{export_id}/{filename}
MinIO 配置¶
基础配置¶
# docker-compose.yml
services:
minio:
image: minio/minio:latest
command: server /data --console-address ":9001"
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
MINIO_REGION: us-east-1
volumes:
- minio_data:/data
ports:
- "9000:9000" # API
- "9001:9001" # Console
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s
timeout: 20s
retries: 3
环境变量¶
# MinIO 连接配置
MINIO_ENDPOINT=localhost:9000
MINIO_ACCESS_KEY=minioadmin
MINIO_SECRET_KEY=minioadmin
MINIO_SECURE=false # 生产环境设为 true
MINIO_REGION=us-east-1
# Bucket 名称
MINIO_BUCKET_RAW=unifiles-raw
MINIO_BUCKET_PROCESSED=unifiles-processed
MINIO_BUCKET_CACHE=unifiles-cache
MINIO_BUCKET_EXPORTS=unifiles-exports
Bucket 策略¶
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"AWS": ["*"]},
"Action": ["s3:GetObject"],
"Resource": ["arn:aws:s3:::unifiles-exports/*"],
"Condition": {
"StringEquals": {
"s3:ExistingObjectTag/public": "true"
}
}
}
]
}
存储服务实现¶
StorageService 接口¶
from abc import ABC, abstractmethod
from typing import BinaryIO, Optional
from dataclasses import dataclass
@dataclass
class StorageObject:
key: str
size: int
content_type: str
etag: str
last_modified: datetime
metadata: dict
class StorageService(ABC):
"""存储服务抽象接口"""
@abstractmethod
async def upload(
self,
bucket: str,
key: str,
data: BinaryIO,
content_type: str,
metadata: Optional[dict] = None
) -> str:
"""上传文件,返回 ETag"""
pass
@abstractmethod
async def download(
self,
bucket: str,
key: str
) -> bytes:
"""下载文件内容"""
pass
@abstractmethod
async def get_presigned_url(
self,
bucket: str,
key: str,
expires: int = 3600,
method: str = "GET"
) -> str:
"""生成预签名 URL"""
pass
@abstractmethod
async def delete(
self,
bucket: str,
key: str
) -> bool:
"""删除文件"""
pass
@abstractmethod
async def exists(
self,
bucket: str,
key: str
) -> bool:
"""检查文件是否存在"""
pass
@abstractmethod
async def list_objects(
self,
bucket: str,
prefix: str,
max_keys: int = 1000
) -> list[StorageObject]:
"""列出对象"""
pass
MinIO 实现¶
from minio import Minio
from minio.error import S3Error
import asyncio
from functools import partial
class MinIOStorageService(StorageService):
"""MinIO 存储服务实现"""
def __init__(
self,
endpoint: str,
access_key: str,
secret_key: str,
secure: bool = False,
region: str = "us-east-1"
):
self.client = Minio(
endpoint,
access_key=access_key,
secret_key=secret_key,
secure=secure,
region=region
)
self._loop = asyncio.get_event_loop()
async def _run_sync(self, func, *args, **kwargs):
"""在线程池中运行同步方法"""
return await self._loop.run_in_executor(
None,
partial(func, *args, **kwargs)
)
async def upload(
self,
bucket: str,
key: str,
data: BinaryIO,
content_type: str,
metadata: Optional[dict] = None
) -> str:
# 确保 bucket 存在
if not await self._run_sync(self.client.bucket_exists, bucket):
await self._run_sync(self.client.make_bucket, bucket)
# 获取文件大小
data.seek(0, 2)
size = data.tell()
data.seek(0)
# 上传文件
result = await self._run_sync(
self.client.put_object,
bucket,
key,
data,
size,
content_type=content_type,
metadata=metadata or {}
)
return result.etag
async def download(self, bucket: str, key: str) -> bytes:
response = await self._run_sync(
self.client.get_object,
bucket,
key
)
try:
return response.read()
finally:
response.close()
response.release_conn()
async def get_presigned_url(
self,
bucket: str,
key: str,
expires: int = 3600,
method: str = "GET"
) -> str:
from datetime import timedelta
if method == "GET":
return await self._run_sync(
self.client.presigned_get_object,
bucket,
key,
expires=timedelta(seconds=expires)
)
elif method == "PUT":
return await self._run_sync(
self.client.presigned_put_object,
bucket,
key,
expires=timedelta(seconds=expires)
)
else:
raise ValueError(f"Unsupported method: {method}")
async def delete(self, bucket: str, key: str) -> bool:
try:
await self._run_sync(
self.client.remove_object,
bucket,
key
)
return True
except S3Error:
return False
async def exists(self, bucket: str, key: str) -> bool:
try:
await self._run_sync(
self.client.stat_object,
bucket,
key
)
return True
except S3Error:
return False
async def list_objects(
self,
bucket: str,
prefix: str,
max_keys: int = 1000
) -> list[StorageObject]:
objects = []
result = await self._run_sync(
self.client.list_objects,
bucket,
prefix=prefix,
recursive=True
)
for obj in result:
if len(objects) >= max_keys:
break
objects.append(StorageObject(
key=obj.object_name,
size=obj.size,
content_type=obj.content_type or "application/octet-stream",
etag=obj.etag,
last_modified=obj.last_modified,
metadata={}
))
return objects
工厂模式¶
from enum import Enum
from typing import Optional
class StorageType(Enum):
MINIO = "minio"
S3 = "s3"
LOCAL = "local" # 开发/测试用
class StorageFactory:
"""存储服务工厂"""
_instances: dict[str, StorageService] = {}
@classmethod
def get_storage(
cls,
storage_type: StorageType = StorageType.MINIO
) -> StorageService:
key = storage_type.value
if key not in cls._instances:
cls._instances[key] = cls._create_storage(storage_type)
return cls._instances[key]
@classmethod
def _create_storage(cls, storage_type: StorageType) -> StorageService:
from unifiles.core.config.settings import settings
if storage_type == StorageType.MINIO:
return MinIOStorageService(
endpoint=settings.minio.endpoint,
access_key=settings.minio.access_key,
secret_key=settings.minio.secret_key,
secure=settings.minio.secure,
region=settings.minio.region
)
elif storage_type == StorageType.S3:
return S3StorageService(
region=settings.aws.region,
access_key=settings.aws.access_key,
secret_key=settings.aws.secret_key
)
elif storage_type == StorageType.LOCAL:
return LocalStorageService(
base_path=settings.storage.local_path
)
else:
raise ValueError(f"Unknown storage type: {storage_type}")
文件上传流程¶
直接上传¶
async def upload_file(
file: UploadFile,
user_id: str,
metadata: dict = None
) -> File:
storage = StorageFactory.get_storage()
# 生成文件 ID
file_id = str(uuid.uuid4())
# 构建存储键
storage_key = f"raw/{user_id}/{file_id}/{file.filename}"
# 计算文件哈希
hasher = hashlib.sha256()
content = await file.read()
hasher.update(content)
content_hash = hasher.hexdigest()
# 上传到存储
await storage.upload(
bucket="unifiles-raw",
key=storage_key,
data=BytesIO(content),
content_type=file.content_type,
metadata={
"user_id": user_id,
"original_filename": file.filename,
"content_hash": content_hash
}
)
# 保存到数据库
file_record = await save_file_record(
file_id=file_id,
user_id=user_id,
filename=file.filename,
storage_key=storage_key,
content_hash=content_hash,
file_size=len(content),
mime_type=file.content_type,
metadata=metadata
)
return file_record
预签名上传 (大文件)¶
async def get_upload_url(
user_id: str,
filename: str,
content_type: str
) -> dict:
storage = StorageFactory.get_storage()
# 生成文件 ID
file_id = str(uuid.uuid4())
storage_key = f"raw/{user_id}/{file_id}/{filename}"
# 生成预签名上传 URL
upload_url = await storage.get_presigned_url(
bucket="unifiles-raw",
key=storage_key,
expires=3600,
method="PUT"
)
# 预先创建文件记录 (状态为 pending)
await create_pending_file_record(
file_id=file_id,
user_id=user_id,
filename=filename,
storage_key=storage_key
)
return {
"file_id": file_id,
"upload_url": upload_url,
"expires_in": 3600
}
文件下载流程¶
直接下载¶
async def download_file(file_id: str, user_id: str) -> StreamingResponse:
# 获取文件记录
file_record = await get_file_record(file_id, user_id)
if not file_record:
raise FileNotFoundError(f"File {file_id} not found")
storage = StorageFactory.get_storage()
# 获取文件内容
content = await storage.download(
bucket="unifiles-raw",
key=file_record.storage_key
)
return StreamingResponse(
BytesIO(content),
media_type=file_record.mime_type,
headers={
"Content-Disposition": f'attachment; filename="{file_record.filename}"'
}
)
预签名下载¶
async def get_download_url(
file_id: str,
user_id: str,
expires: int = 3600
) -> str:
# 获取文件记录
file_record = await get_file_record(file_id, user_id)
if not file_record:
raise FileNotFoundError(f"File {file_id} not found")
storage = StorageFactory.get_storage()
# 生成预签名下载 URL
return await storage.get_presigned_url(
bucket="unifiles-raw",
key=file_record.storage_key,
expires=expires,
method="GET"
)
生命周期管理¶
缓存清理¶
# MinIO 生命周期规则 (JSON)
lifecycle_config = {
"Rules": [
{
"ID": "cleanup-cache",
"Status": "Enabled",
"Filter": {
"Prefix": "cache/"
},
"Expiration": {
"Days": 1
}
},
{
"ID": "cleanup-exports",
"Status": "Enabled",
"Filter": {
"Prefix": "exports/"
},
"Expiration": {
"Days": 7
}
}
]
}
软删除实现¶
async def soft_delete_file(file_id: str, user_id: str) -> bool:
"""软删除文件 (标记删除,不立即清理存储)"""
# 更新数据库记录
await update_file_status(
file_id=file_id,
user_id=user_id,
status="deleted",
deleted_at=datetime.utcnow()
)
return True
async def hard_delete_file(file_id: str) -> bool:
"""硬删除文件 (清理存储)"""
file_record = await get_file_record_by_id(file_id)
if not file_record:
return False
storage = StorageFactory.get_storage()
# 删除原始文件
await storage.delete(
bucket="unifiles-raw",
key=file_record.storage_key
)
# 删除处理后的文件
processed_prefix = f"processed/{file_record.user_id}/{file_id}/"
processed_objects = await storage.list_objects(
bucket="unifiles-processed",
prefix=processed_prefix
)
for obj in processed_objects:
await storage.delete(
bucket="unifiles-processed",
key=obj.key
)
# 删除数据库记录
await delete_file_record(file_id)
return True
定时清理任务¶
async def cleanup_deleted_files():
"""清理已软删除超过 30 天的文件"""
cutoff_date = datetime.utcnow() - timedelta(days=30)
# 查询待清理文件
files_to_delete = await get_files_deleted_before(cutoff_date)
for file_record in files_to_delete:
try:
await hard_delete_file(file_record.id)
logger.info(f"Cleaned up file: {file_record.id}")
except Exception as e:
logger.error(f"Failed to cleanup file {file_record.id}: {e}")
高可用配置¶
分布式 MinIO¶
# docker-compose-distributed.yml
services:
minio1:
image: minio/minio:latest
command: server http://minio{1...4}/data --console-address ":9001"
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
volumes:
- minio1_data:/data
hostname: minio1
minio2:
image: minio/minio:latest
command: server http://minio{1...4}/data --console-address ":9001"
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
volumes:
- minio2_data:/data
hostname: minio2
minio3:
image: minio/minio:latest
command: server http://minio{1...4}/data --console-address ":9001"
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
volumes:
- minio3_data:/data
hostname: minio3
minio4:
image: minio/minio:latest
command: server http://minio{1...4}/data --console-address ":9001"
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
volumes:
- minio4_data:/data
hostname: minio4
nginx:
image: nginx:latest
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
ports:
- "9000:9000"
depends_on:
- minio1
- minio2
- minio3
- minio4
负载均衡配置¶
# nginx.conf
upstream minio_servers {
server minio1:9000;
server minio2:9000;
server minio3:9000;
server minio4:9000;
}
server {
listen 9000;
server_name localhost;
# 允许大文件上传
client_max_body_size 1G;
location / {
proxy_pass http://minio_servers;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# WebSocket 支持
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
}
监控与告警¶
存储指标¶
# 存储使用统计
async def get_storage_stats(user_id: str) -> dict:
storage = StorageFactory.get_storage()
# 统计原始文件
raw_objects = await storage.list_objects(
bucket="unifiles-raw",
prefix=f"raw/{user_id}/"
)
raw_size = sum(obj.size for obj in raw_objects)
raw_count = len(raw_objects)
# 统计处理后文件
processed_objects = await storage.list_objects(
bucket="unifiles-processed",
prefix=f"processed/{user_id}/"
)
processed_size = sum(obj.size for obj in processed_objects)
return {
"user_id": user_id,
"raw_files": {
"count": raw_count,
"size_bytes": raw_size
},
"processed_files": {
"size_bytes": processed_size
},
"total_size_bytes": raw_size + processed_size
}
健康检查¶
async def check_storage_health() -> dict:
storage = StorageFactory.get_storage()
results = {}
buckets = ["unifiles-raw", "unifiles-processed", "unifiles-cache"]
for bucket in buckets:
try:
# 尝试列出对象
await storage.list_objects(bucket, prefix="", max_keys=1)
results[bucket] = {"status": "healthy"}
except Exception as e:
results[bucket] = {
"status": "unhealthy",
"error": str(e)
}
return {
"storage_type": "minio",
"buckets": results,
"overall": "healthy" if all(
b["status"] == "healthy" for b in results.values()
) else "unhealthy"
}