Webhook 集成¶
本教程讲解如何使用 Webhook 构建事件驱动的异步处理流程,这是生产环境中处理大量文档的推荐方式。
为什么使用 Webhook?¶
轮询 vs Webhook¶
轮询模式(不推荐):
Client: 状态? → Server: 处理中
Client: 状态? → Server: 处理中
Client: 状态? → Server: 处理中
Client: 状态? → Server: 完成!
问题:浪费 API 配额,增加延迟
Webhook 模式(推荐):
Client: 创建任务 → Server: 收到
... 无需轮询 ...
Server: 完成! → Client (Webhook)
好处:即时通知,节省资源
快速开始¶
1. 创建 Webhook¶
from unifiles import UnifilesClient
client = UnifilesClient(api_key="sk_...")
# 创建 Webhook
webhook = client.webhooks.create(
url="https://your-app.com/webhook/unifiles",
events=[
"extraction.completed",
"extraction.failed",
"document.indexed",
"document.index_failed"
],
secret="your_webhook_secret_key"
)
print(f"Webhook ID: {webhook.id}")
print(f"URL: {webhook.url}")
print(f"Events: {webhook.events}")
2. 实现 Webhook 处理器¶
Flask 示例¶
from flask import Flask, request, jsonify
import hmac
import hashlib
app = Flask(__name__)
WEBHOOK_SECRET = "your_webhook_secret_key"
def verify_signature(payload: bytes, signature: str) -> bool:
"""验证 Webhook 签名"""
expected = hmac.new(
WEBHOOK_SECRET.encode(),
payload,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, f"sha256={expected}")
@app.route("/webhook/unifiles", methods=["POST"])
def handle_webhook():
# 验证签名
signature = request.headers.get("X-Unifiles-Signature", "")
if not verify_signature(request.data, signature):
return jsonify({"error": "Invalid signature"}), 401
# 解析事件
event = request.json
event_type = event["type"]
data = event["data"]
# 处理不同事件
if event_type == "extraction.completed":
handle_extraction_completed(data)
elif event_type == "extraction.failed":
handle_extraction_failed(data)
elif event_type == "document.indexed":
handle_document_indexed(data)
elif event_type == "document.index_failed":
handle_document_index_failed(data)
return jsonify({"received": True}), 200
def handle_extraction_completed(data):
file_id = data["file_id"]
extraction_id = data["extraction_id"]
print(f"提取完成: {extraction_id}")
# 你的业务逻辑:例如自动添加到知识库
def handle_extraction_failed(data):
file_id = data["file_id"]
error = data.get("error", {})
print(f"提取失败: {file_id}, 错误: {error.get('message')}")
# 你的业务逻辑:例如发送告警
def handle_document_indexed(data):
doc_id = data["document_id"]
chunk_count = data["chunk_count"]
print(f"索引完成: {doc_id}, 分块数: {chunk_count}")
# 你的业务逻辑
def handle_document_index_failed(data):
doc_id = data["document_id"]
error = data.get("error", {})
print(f"索引失败: {doc_id}")
# 你的业务逻辑
if __name__ == "__main__":
app.run(port=5000)
FastAPI 示例¶
from fastapi import FastAPI, Request, HTTPException, Header, BackgroundTasks
import hmac
import hashlib
app = FastAPI()
WEBHOOK_SECRET = "your_webhook_secret_key"
def verify_signature(payload: bytes, signature: str) -> bool:
expected = hmac.new(
WEBHOOK_SECRET.encode(),
payload,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, f"sha256={expected}")
@app.post("/webhook/unifiles")
async def handle_webhook(
request: Request,
background_tasks: BackgroundTasks,
x_unifiles_signature: str = Header(...)
):
payload = await request.body()
# 验证签名
if not verify_signature(payload, x_unifiles_signature):
raise HTTPException(status_code=401, detail="Invalid signature")
event = await request.json()
# 立即响应,后台处理
background_tasks.add_task(process_event, event)
return {"received": True}
async def process_event(event: dict):
"""后台处理事件"""
event_type = event["type"]
data = event["data"]
match event_type:
case "extraction.completed":
await on_extraction_completed(data)
case "extraction.failed":
await on_extraction_failed(data)
case "document.indexed":
await on_document_indexed(data)
async def on_extraction_completed(data: dict):
# 异步处理逻辑
pass
3. 使用 Webhook 的异步流程¶
from unifiles import UnifilesClient
client = UnifilesClient(api_key="sk_...")
# 批量上传和处理,无需等待
def process_documents_async(file_paths: list):
tasks = []
for path in file_paths:
# 上传
file = client.files.upload(path)
# 创建提取任务(不等待)
extraction = client.extractions.create(file_id=file.id)
tasks.append({
"file_id": file.id,
"extraction_id": extraction.id
})
print(f"已提交: {path}")
print(f"\n共提交 {len(tasks)} 个任务")
print("完成后将通过 Webhook 通知")
return tasks
# 提交任务
tasks = process_documents_async([
"doc1.pdf",
"doc2.pdf",
"doc3.pdf"
])
# Webhook 处理器会在任务完成时被调用
完整工作流示例¶
自动化文档处理管道¶
# webhook_handler.py
from flask import Flask, request, jsonify
from unifiles import UnifilesClient
import hmac
import hashlib
app = Flask(__name__)
client = UnifilesClient(api_key="sk_...")
WEBHOOK_SECRET = "your_secret"
DEFAULT_KB_ID = "kb_default"
@app.route("/webhook/unifiles", methods=["POST"])
def webhook():
# 验证签名(省略)
event = request.json
if event["type"] == "extraction.completed":
# 提取完成后自动添加到知识库
file_id = event["data"]["file_id"]
try:
doc = client.knowledge_bases.documents.create(
kb_id=DEFAULT_KB_ID,
file_id=file_id
)
print(f"文档已添加到知识库: {doc.id}")
except Exception as e:
print(f"添加失败: {e}")
elif event["type"] == "document.indexed":
# 索引完成后通知业务系统
doc_id = event["data"]["document_id"]
chunk_count = event["data"]["chunk_count"]
notify_business_system(doc_id, chunk_count)
elif event["type"] in ["extraction.failed", "document.index_failed"]:
# 失败时发送告警
send_alert(event)
return jsonify({"ok": True})
def notify_business_system(doc_id: str, chunk_count: int):
"""通知业务系统文档已就绪"""
# 调用业务系统 API
pass
def send_alert(event: dict):
"""发送告警"""
# 发送 Slack/邮件通知
pass
带状态追踪的处理器¶
from flask import Flask, request, jsonify
import redis
import json
app = Flask(__name__)
redis_client = redis.Redis()
@app.route("/webhook/unifiles", methods=["POST"])
def webhook():
event = request.json
event_type = event["type"]
data = event["data"]
# 更新任务状态
if "file_id" in data:
task_key = f"unifiles:task:{data['file_id']}"
if event_type == "extraction.completed":
redis_client.hset(task_key, mapping={
"extraction_status": "completed",
"extraction_id": data["extraction_id"],
"pages": data.get("total_pages", 0)
})
elif event_type == "document.indexed":
redis_client.hset(task_key, mapping={
"index_status": "completed",
"document_id": data["document_id"],
"chunks": data.get("chunk_count", 0)
})
# 检查是否全部完成
task = redis_client.hgetall(task_key)
if task.get(b"extraction_status") == b"completed" and \
task.get(b"index_status") == b"completed":
# 任务全部完成
on_task_completed(data["file_id"], task)
return jsonify({"ok": True})
def on_task_completed(file_id: str, task: dict):
"""所有步骤完成后的处理"""
print(f"文件处理完成: {file_id}")
# 通知用户、更新数据库等
本地开发调试¶
使用 ngrok¶
# 1. 启动本地服务
python webhook_handler.py # 监听 localhost:5000
# 2. 启动 ngrok
ngrok http 5000
# 3. 获取公网 URL
# https://abc123.ngrok.io
# 4. 配置 Webhook
webhook = client.webhooks.create(
url="https://abc123.ngrok.io/webhook/unifiles",
events=["extraction.completed"],
secret="test_secret"
)
手动测试 Webhook¶
import requests
import hmac
import hashlib
import json
def test_webhook():
"""本地测试 Webhook 处理器"""
url = "http://localhost:5000/webhook/unifiles"
secret = "your_secret"
payload = {
"id": "evt_test123",
"type": "extraction.completed",
"data": {
"extraction_id": "ext_test",
"file_id": "file_test",
"status": "completed"
}
}
payload_bytes = json.dumps(payload).encode()
signature = "sha256=" + hmac.new(
secret.encode(),
payload_bytes,
hashlib.sha256
).hexdigest()
response = requests.post(
url,
json=payload,
headers={"X-Unifiles-Signature": signature}
)
print(f"Status: {response.status_code}")
print(f"Response: {response.json()}")
test_webhook()
错误处理与重试¶
处理器中的错误处理¶
@app.route("/webhook/unifiles", methods=["POST"])
def webhook():
try:
event = request.json
process_event(event)
return jsonify({"ok": True}), 200
except ValueError as e:
# 数据格式错误,不需要重试
app.logger.error(f"Invalid event data: {e}")
return jsonify({"error": str(e)}), 400
except ExternalServiceError as e:
# 外部服务错误,需要重试
app.logger.error(f"External service error: {e}")
return jsonify({"error": "retry"}), 500
except Exception as e:
# 未知错误
app.logger.exception("Unexpected error")
return jsonify({"error": "internal"}), 500
幂等性处理¶
import redis
redis_client = redis.Redis()
@app.route("/webhook/unifiles", methods=["POST"])
def webhook():
event = request.json
event_id = event["id"]
# 检查是否已处理
if redis_client.get(f"webhook:processed:{event_id}"):
return jsonify({"ok": True, "duplicate": True}), 200
try:
# 处理事件
process_event(event)
# 标记为已处理(24小时过期)
redis_client.setex(
f"webhook:processed:{event_id}",
86400,
"1"
)
return jsonify({"ok": True}), 200
except Exception as e:
# 不标记为已处理,允许重试
raise
管理 Webhooks¶
# 列出所有 Webhooks
webhooks = client.webhooks.list()
for wh in webhooks.items:
print(f"{wh.id}: {wh.url} - {wh.events}")
# 更新 Webhook
webhook = client.webhooks.update(
webhook_id=webhook.id,
events=["extraction.completed"], # 只监听提取完成
enabled=True
)
# 查看投递历史
deliveries = client.webhooks.deliveries(webhook.id, limit=10)
for d in deliveries.items:
print(f"{d.id}: {d.event_type} - {d.status}")
# 重新投递
client.webhooks.redeliver(webhook.id, delivery_id)
# 删除 Webhook
client.webhooks.delete(webhook.id)