1. 开篇:我为什么需要这个方案
生图是"一发一收",几秒就回。但生视频是长任务 —— Sora、Veo3、可灵跑一条 720p 视频动辄一两分钟。如果你照着生图那套"同步等结果"的写法去接视频接口,会立刻撞上三个坑:
- HTTP 超时:一个请求挂两分钟,连接早断了。
- 不知道该等多久:有的平台支持回调(生成完主动推给你),有的只能你自己去轮询。
- 轮询逻辑写不对:要么轮太勤被限流,要么遇到临时网络抖动就误判失败、把用户的钱扣了。
本文用一套真实的 AI 生视频 SaaS 后端(FastAPI + Celery),讲清楚**"提交任务 → 拿 task_id → 回调/轮询拿结果"**这条异步通路怎么落地,以及通过中转站接 4SAPI 这类聚合平台的 Sora/Veo 时有哪些专属坑。
2. 原理速览
视频生成是两段式异步:
① 提交任务 你的应用 → 中转站 → 官方 返回 {task_id, status: queued}
② 拿结果 两条路二选一:
(A) 回调:平台生成完 POST 推到你的 callback_url ← 火山/可灵
(B) 轮询:你定时 GET /videos/{task_id} 查状态 ← Sora/Veo
③ 转存 status=completed → 把视频 URL 转存到自己的 OSS
关键决策:平台支不支持回调,决定了你走 (A) 还是 (B)。这套代码就是按平台分流的。
3. 接入实战(参考 4SAPI 异步接口)
① 提交任务,统一拿 task_id
不同平台返回字段不一样(Sora/Veo 是 id,火山/可灵是 task_id),代码做了归一:
# ai_generation_tasks.py —— 提交后兼容两种字段名
result = service.create_video_task(prompt=prompt, duration=duration, ratio=ratio, ...)
task_id = result.get("task_id") or result.get("id")
history.platform_task_id = task_id
history.status = "processing" # 等回调或轮询
② 按平台分流:回调 or 轮询
if platform in ["doubao", "kling"]:
# 支持回调,什么都不用做,等平台 POST 推过来
logger.info("视频任务已提交,等待回调")
else:
# sora/veo 不支持回调 → 启动轮询任务,30 秒后开始查
poll_video_status.apply_async(args=[history_id, platform, task_id], countdown=30)
③ 轮询:用 Celery 的 retry 当定时器
轮询不是写个 while True + sleep,而是用 Celery 任务的 max_retries + default_retry_delay 当节拍器 —— 每 30 秒一次,最多 60 次(即最长等 30 分钟):
@celery_app.task(bind=True, max_retries=60, default_retry_delay=30)
def poll_video_status(self, history_id, platform, platform_task_id):
result = service.query_video_task(platform_task_id)
status = result.get("status", "")
if status in ["succeeded", "completed"]:
... # 拿到 video_url,进入转存
elif status in ["queued", "running", "processing", "in_progress"]:
raise self.retry() # 还没好,30 秒后再查
elif status == "failed":
... # 失败处理(见下)
④ 避坑一:临时失败别误杀
视频跑挂了,不一定是真挂 —— 可能只是上游临时过载/限流。代码维护了一张"可重试关键词表",命中就继续轮询,不退款:
RETRYABLE_POLL_ERROR_KEYWORDS = [
"reCAPTCHA", "PERMISSION_DENIED", "temporary",
"overloaded", "rate limit", "服务繁忙", "请稍后再试",
]
if is_retryable_poll_failure(error_message):
raise self.retry(countdown=60) # 临时错误,1 分钟后再试
# 否则才判真失败 + 退积分
⑤ 避坑二:查询请求自己抖动,别判任务失败
"查状态"这个请求本身可能 SSL EOF / 503,但上游视频还在好好排队。这时绝不能把生成任务判失败,而是让这次轮询重试:
except Exception as e:
logger.warning(f"轮询任务异常,30s 后重试: {e}")
db.rollback()
raise self.retry(exc=e) # 网络抖动不背锅
⑥ 避坑三:4sapi 的 Sora-2 不直接给 video_url
这是接中转站 Sora 时最隐蔽的坑 —— completed 了但响应里没有 video_url,要自己拼下载地址,而且下载还得带 Bearer Token:
# Sora2/4sapi: 完成后构造下载 URL,不是直接给链接
video_id = response.get("id", task_id)
result["video_url"] = f"{base_url}/v1/videos/{video_id}/content"
# 下载这种网关 URL 需要带 token,不能走第三方异步转存(它没你的 token)
headers = {"Authorization": f"Bearer {api_key}"}
resp = httpx.Client(timeout=300).get(video_url, headers=headers)
代码对这类"OpenAI 兼容视频网关"专门走本地下载 + 直传 OSS,绕开拿不到 token 的异步转存路径。
4. 成本与风险提示
- 轮询频率与成本:30 秒一次是经验值,太勤可能触发中转站限流计费,太疏拖慢用户体验。按模型平均时长调。
- 超时上限:
max_retries=60(30 分钟)是兜底,到顶仍未完成就判失败退款,别让任务永远挂着。 - 回调安全:暴露
callback_url要校验来源、做幂等(同一个 task 可能被推多次),别被伪造回调刷量。 - 数据隐私:视频 URL 多为带签名的临时地址,务必尽快转存到自己的存储,且评估素材合规性。
- 稳定性:中转站是第三方依赖,建议像本项目一样做"临时错误重试 + 永久错误识别 + 失败退款"三件套。
5. 总结与系列导航
一句话总结:接长任务视频接口,记住三件事 —— 提交后只认 task_id;支持回调就等回调、不支持就用 Celery retry 当轮询节拍器;临时错误重试、查询抖动重试、真失败才退款。接 4sapi 的 Sora 还要额外处理"自己拼下载 URL + 带 token 下载"。
适用人群:要在应用里接 Sora / Veo3 / 可灵等异步视频模型的开发者。
中转站选型与异步接口文档可参考 4SAPI。你有更稳的轮询/回调方案,欢迎评论区交流。