去年年底我们团队负责的 AI 对话平台用户量突然暴涨,日活从 5 万飙升到 80 万。之前没怎么优化的流式输出接口直接扛不住了——用户疯狂投诉"转圈等半天"、"回复断断续续"、"有时候直接卡死"。那段时间我基本住在公司,天天盯着 Grafana 看监控曲线,把流式输出从 SSE 到 WebSocket 到底层 TCP 参数翻了个底朝天。
这篇文章就是我踩了三个月坑之后总结出来的完整调优方案。从最基础的 SSE 配置到 WebSocket + Token Bucket 的高级架构,从客户端感知延迟到服务端吞吐量,全部用真实数据说话。我们的平台目前日处理流式请求超过 2 亿次,这些经验都是真金白银换来的。
目录
为什么流式输出是AI API体验的核心
先说一个数据:流式输出能把用户的感知延迟降低 80%。这不是我拍脑袋说的,是我们 A/B 测试的真实结果。
同样的 GPT-4 级别模型,生成一段 500 token 的回复,非流式模式需要等 8-12 秒才能看到完整结果。用户在这 8-12 秒里什么也看不到,只能盯着一个 loading 动画。而流式模式下,第一个 token 在 0.8-1.2 秒内就出现了,用户立刻知道"系统在工作",心理等待感大幅降低。
我们做了两组 A/B 测试,每组 10000 个用户:
| 指标 | 非流式 | 流式 | 变化 |
|---|---|---|---|
| 平均感知等待时间 | 9.2s | 1.8s | -80.4% |
| 用户中途放弃率 | 23.5% | 6.2% | -73.6% |
| 用户满意度评分 | 3.2/5 | 4.4/5 | +37.5% |
| 平均会话轮次 | 3.8 | 7.2 | +89.5% |
流式输出不仅让用户感觉更快,还直接影响了留存和活跃度。用户中途放弃率从 23.5% 降到了 6.2%,平均会话轮次翻了一倍。这意味着流式输出不是"锦上添花",而是 AI 产品体验的刚需。
核心指标:TTFT、TPS、Chunk间隔
要优化流式输出,首先得知道该优化什么。我们关注三个核心指标:
- TTFT(Time To First Token):从用户发送请求到收到第一个 token 的时间。这是感知延迟的核心。
- TPS(Tokens Per Second):每秒输出的 token 数量。决定了用户看到完整回复的速度。
- Chunk 间隔:相邻两个数据块之间的时间间隔。间隔越均匀,用户体验越流畅。
我们优化前的基线数据:
| 指标 | 优化前 | 优化后 | 提升 |
|---|---|---|---|
| TTFT (p50) | 1.85s | 0.72s | -61.1% |
| TTFT (p95) | 3.20s | 1.10s | -65.6% |
| TPS (p50) | 42.5 | 68.3 | +60.7% |
| Chunk 间隔标准差 | 85ms | 12ms | -85.9% |
| 断流率 | 3.8% | 0.2% | -94.7% |
下面我会逐一拆解每个优化点是怎么做到的。
SSE深度优化:从基础配置到生产级方案
SSE(Server-Sent Events)是 AI API 流式输出最常用的协议,OpenAI、Anthropic、Google 都用 SSE。但默认配置下 SSE 的性能远达不到生产要求。
3.1 Nginx 缓冲区配置
这是最容易踩的坑。Nginx 默认会缓冲上游响应,导致 SSE 事件被攒成一批发送,用户看到的就是"卡一会儿突然蹦出一大段文字"。
# nginx.conf - SSE 关键配置
location /api/chat/stream {
proxy_pass http://backend;
proxy_http_version 1.1;
# 关闭缓冲,数据立即转发
proxy_buffering off;
proxy_cache off;
# SSE 必须的 header 透传
proxy_set_header Connection '';
proxy_set_header Cache-Control 'no-cache';
proxy_set_header X-Accel-Buffering 'no';
# 超时设置(流式请求可能持续较长时间)
proxy_read_timeout 300s;
proxy_send_timeout 300s;
# 禁用 gzip(压缩会增加延迟)
gzip off;
}
光这一步配置,就把我们的 Chunk 间隔标准差从 85ms 降到了 35ms。很多团队的问题不是代码写得不好,而是 Nginx 默认配置在暗中搞鬼。
3.2 服务端 flush 优化
即使关了 Nginx 缓冲,如果应用层不主动 flush,数据还是会积攒在内存里。以 FastAPI 为例:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
app = FastAPI()
async def stream_response(prompt: str):
# 关键:使用 async generator
async for chunk in llm.generate_stream(prompt):
# 构造 SSE 格式数据
data = f"data: {chunk.json()}\n\n"
yield data
# 确保立即发送,不缓冲
await asyncio.sleep(0) # 让出事件循环,触发 flush
yield "data: [DONE]\n\n"
@app.post("/api/chat/stream")
async def chat_stream(request: ChatRequest):
return StreamingResponse(
stream_response(request.prompt),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # 双保险
}
)
await asyncio.sleep(0) 这一行是关键。它让出事件循环控制权,确保底层的 ASGI 服务器(Uvicorn/Hypercorn)有机会把缓冲区里的数据真正发出去。没有这一行,你会发现数据还是一坨一坨地出现。
3.3 SSE with Retry-After:达到 99% 可靠性
流式连接在公网环境下断开是常态。我们统计过,未优化的 SSE 连接在 60 秒内的断开率约 5%。加上 Retry-After 机制后,可靠性可以提升到 99%。
// 客户端 SSE 重连逻辑
class RobustEventSource {
constructor(url, options = {}) {
this.url = url;
this.retryDelay = options.retryDelay || 1000;
this.maxRetries = options.maxRetries || 5;
this.retryCount = 0;
this.lastEventId = null;
this.connect();
}
connect() {
this.es = new EventSource(this.url);
this.es.onmessage = (event) => {
this.retryCount = 0; // 成功接收,重置计数
this.lastEventId = event.lastEventId;
// 解析服务端的 Retry-After
if (event.data.includes('retry:')) {
const match = event.data.match(/retry:(\d+)/);
if (match) this.retryDelay = parseInt(match[1]);
}
this.onMessage?.(event);
};
this.es.onerror = () => {
this.es.close();
if (this.retryCount < this.maxRetries) {
// 指数退避 + 抖动
const delay = this.retryDelay * Math.pow(2, this.retryCount)
+ Math.random() * 500;
this.retryCount++;
console.log(`Reconnecting in ${delay}ms (attempt ${this.retryCount})`);
setTimeout(() => this.connect(), delay);
} else {
this.onError?.(new Error('Max retries exceeded'));
}
};
}
}
服务端配合发送 retry: 字段来动态调整重连间隔:
# 服务端在 SSE 流中插入 retry 指令
async def stream_with_retry_hint(prompt: str):
yield "retry: 3000\n\n" # 建议客户端 3 秒后重连
async for chunk in llm.generate_stream(prompt):
yield f"data: {chunk.json()}\n\n"
这套组合拳打下来,我们的 SSE 连接可靠性从 95% 提升到了 99.2%。剩下 0.8% 的失败主要是用户网络彻底断开(比如进电梯),这种情况任何重连策略都救不了。
WebSocket方案:什么时候值得升级
WebSocket 是双向通信协议,相比 SSE 的单向推送,它多了客户端主动发消息的能力。对于 AI 对话场景,WebSocket 的优势在于:
- 真正的全双工通信,客户端可以随时中断、追加输入
- 更低的帧开销(SSE 每条消息要带 "data: " 前缀和 "\n\n" 后缀)
- 更好的二进制数据支持
但 WebSocket 的复杂度也更高。我们的实测对比:
| 指标 | SSE (优化后) | WebSocket | 差异 |
|---|---|---|---|
| TTFT (p50) | 0.72s | 0.68s | -5.6% |
| TPS (p50) | 68.3 | 71.2 | +4.2% |
| Chunk 间隔波动 | ±0.8% | ±0.1% | -87.5% |
| 断流恢复时间 | 1.5-3.0s | <0.5s | -83.3% |
| 内存开销/连接 | ~8KB | ~16KB | +100% |
| 实现复杂度 | 低 | 高 | - |
最显著的差异在 Chunk 间隔波动上。SSE 经过 HTTP 协议栈,受 TCP 慢启动、Nagle 算法等因素影响,间隔波动约 ±0.8%。WebSocket 建立连接后复用同一条 TCP 通道,间隔波动只有 ±0.1%,用户看到的文字几乎是匀速出现的。
我们最终选择的方案是 WebSocket + Token Bucket 限流:
import asyncio
import json
from fastapi import WebSocket, WebSocketDisconnect
from collections import deque
class TokenBucket:
"""WebSocket Token Bucket 限流器"""
def __init__(self, rate: float, capacity: int):
self.rate = rate # tokens/second
self.capacity = capacity
self.tokens = capacity
self.last_refill = asyncio.get_event_loop().time()
self._lock = asyncio.Lock()
async def consume(self, tokens: int = 1):
async with self._lock:
now = asyncio.get_event_loop().time()
elapsed = now - self.last_refill
self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
self.last_refill = now
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
async def wait_for_token(self):
while not await self.consume():
await asyncio.sleep(1.0 / self.rate)
# 全局限流器:每秒 100 个 chunk
chunk_bucket = TokenBucket(rate=100, capacity=200)
@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
request = json.loads(data)
async for chunk in llm.generate_stream(request["prompt"]):
# Token Bucket 限流,确保均匀输出
await chunk_bucket.wait_for_token()
await websocket.send_text(json.dumps({
"type": "chunk",
"content": chunk.text,
"finish_reason": chunk.finish_reason
}))
await websocket.send_text(json.dumps({
"type": "done"
}))
except WebSocketDisconnect:
# 客户端断开,清理资源
pass
Token Bucket 限流的效果非常好。之前没有限流的时候,模型输出快的段落 chunk 间隔只有 5ms,慢的段落间隔 200ms+,用户看到的就是忽快忽慢。加上限流后,chunk 间隔稳定在 10ms 左右(对应 100 chunks/s),体验非常流畅。
断流检测与自动重连策略
这是流式输出中最容易被忽视但影响最大的问题。我们分析了 TokenMix 平台的数据发现一个惊人的规律:一个流式连接如果 10 秒内没有任何 chunk 到达,这条流有 90% 的概率已经失败了。
| 无 chunk 时长 | 最终失败概率 | 建议操作 |
|---|---|---|
| 0-3s | <5% | 正常等待 |
| 3-5s | 15% | 记录日志 |
| 5-10s | 45% | 准备重连 |
| 10s+ | 90% | 立即重连 |
| 30s+ | 99% | 已失败,通知用户 |
基于这个数据,我们设计了分级断流检测策略:
class StreamHealthMonitor:
def __init__(self):
self.last_chunk_time = None
self.warning_threshold = 5.0 # 5秒无chunk开始预警
self.failure_threshold = 10.0 # 10秒无chunk判定失败
self.check_interval = 1.0 # 每秒检查一次
async def monitor(self, on_warning, on_failure):
while True:
await asyncio.sleep(self.check_interval)
if self.last_chunk_time is None:
continue
elapsed = time.time() - self.last_chunk_time
if elapsed >= self.failure_threshold:
await on_failure(elapsed)
break # 停止监控,触发重连
elif elapsed >= self.warning_threshold:
await on_warning(elapsed)
def record_chunk(self):
self.last_chunk_time = time.time()
这个监控器配合自动重连,把我们的断流导致的用户可感知失败率从 3.8% 降到了 0.2%。用户几乎不会再遇到"回复到一半突然卡住"的情况了。
HTTP/2 Server Push的实际收益
HTTP/2 Server Push 可以在客户端请求之前就主动推送资源。对于 AI 对话场景,我们用它来预推送常用的 prompt 模板和上下文数据。
实测数据:HTTP/2 Server Push 在我们的场景下带来了约 ±1.2% 的延迟波动改善。说实话,这个收益不算大。HTTP/2 的多路复用本身对流式输出帮助更大——它允许多个流在同一条 TCP 连接上并行传输,避免了 HTTP/1.1 的队头阻塞问题。
| 协议 | TTFT (p50) | Chunk 间隔波动 | 连接复用 |
|---|---|---|---|
| HTTP/1.1 + SSE | 0.85s | ±1.2% | 不支持 |
| HTTP/2 + SSE | 0.72s | ±0.8% | 支持 |
| HTTP/2 + Server Push | 0.71s | ±0.7% | 支持 |
| WebSocket | 0.68s | ±0.1% | 天然支持 |
我的建议是:如果你的基础设施已经支持 HTTP/2,顺手开启 Server Push,不需要额外开发成本。但不要为了这 1.2% 的收益专门去做改造,投入产出比不高。优先把精力放在 SSE 基础优化和断流检测上,那边的收益大得多。
FastAPI亿级调用量演进实战
我们的流式 API 从日调用量 100 万到 2 亿,经历了几个关键阶段。每个阶段遇到的问题和解决方案都不同。
阶段一:100 万 - 1000 万/天
这个阶段主要是基础 SSE 配置优化。关 Nginx 缓冲、加 flush、调整超时。单机 Uvicorn + 4 worker 就能扛住。核心问题是 Nginx 缓冲导致的"攒批发送",解决后用户体验立刻好了很多。
阶段二:1000 万 - 5000 万/天
开始出现连接数瓶颈。每个流式请求占用一个长连接,默认的文件描述符限制(ulimit -n 1024)根本不够用。解决方案:
# 系统层面调优
# /etc/sysctl.conf
net.core.somaxconn = 65535
net.ipv4.tcp_max_syn_backlog = 65535
net.ipv4.ip_local_port_range = 1024 65535
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_fin_timeout = 15
# /etc/security/limits.conf
* soft nofile 65535
* hard nofile 65535
# Uvicorn 启动参数
# uvicorn main:app --workers 8 --limit-concurrency 10000 --timeout-keep-alive 300
阶段三:5000 万 - 2 亿/天
这个阶段需要分布式架构了。我们引入了 Redis 做流式消息队列,用 WebSocket Gateway 做连接管理:
# 简化的架构示意
# Client -> Nginx -> WebSocket Gateway (多实例)
# |
# Redis Pub/Sub
# |
# LLM Worker Pool (多实例)
# WebSocket Gateway 核心逻辑
import redis.asyncio as redis
class StreamGateway:
def __init__(self):
self.redis = redis.Redis(connection_pool=redis_pool)
self.connections = {} # session_id -> WebSocket
async def handle_stream(self, websocket: WebSocket, session_id: str):
await websocket.accept()
self.connections[session_id] = websocket
pubsub = self.redis.pubsub()
await pubsub.subscribe(f"stream:{session_id}")
async for message in pubsub.listen():
if message["type"] == "message":
await websocket.send_text(message["data"].decode())
del self.connections[session_id]
这个架构下,WebSocket Gateway 只负责连接管理和消息转发,LLM 推理完全解耦。Gateway 可以水平扩展,LLM Worker 也可以独立扩缩容。我们目前跑了 20 个 Gateway 实例 + 50 个 Worker 实例,日处理 2 亿次流式请求,p99 延迟稳定在 2 秒以内。
客户端优化:感知延迟降低80%的秘密
服务端优化只是故事的一半。客户端的渲染策略对感知延迟影响巨大。
8.1 增量渲染而非等待完整词
很多前端实现是收到一个完整词才渲染一次。但 LLM 输出是按 token 来的,一个中文词可能需要 2-3 个 token 才能拼完。如果等完整词再渲染,用户会感觉"卡顿"。
// 反面教材:等完整词再渲染
let currentWord = '';
eventSource.onmessage = (event) => {
const token = JSON.parse(event.data).content;
currentWord += token;
// 只在遇到空格或标点时才渲染
if (/[\s,。!?、]/.test(token)) {
container.innerHTML += currentWord;
currentWord = '';
}
};
// 正确做法:每个 token 立即渲染
eventSource.onmessage = (event) => {
const token = JSON.parse(event.data).content;
const span = document.createElement('span');
span.textContent = token;
span.style.opacity = '0'; // 先透明
container.appendChild(span);
// 微小延迟后淡入,制造"打字"效果
requestAnimationFrame(() => {
span.style.transition = 'opacity 0.1s';
span.style.opacity = '1';
});
};
8.2 预测性光标和打字动画
在等待下一个 chunk 的时候,显示一个闪烁的光标,让用户知道"系统还在输出"。这个简单的视觉反馈就能显著降低焦虑感。
// CSS 闪烁光标
.typing-cursor::after {
content: '|';
animation: blink 0.8s infinite;
color: var(--neon-cyan);
font-weight: 100;
}
@keyframes blink {
0%, 50% { opacity: 1; }
51%, 100% { opacity: 0; }
}
8.3 流式 Markdown 渲染
AI 输出通常包含 Markdown 格式。流式场景下需要增量解析 Markdown,而不是等全部内容到齐后再渲染。我们用了 marked.js 的流式模式:
import { marked } from 'marked';
import DOMPurify from 'dompurify';
// 流式 Markdown 渲染器
class StreamingMarkdownRenderer {
private buffer = '';
private element: HTMLElement;
render(token: string) {
this.buffer += token;
// 增量解析,只更新变化的部分
const html = DOMPurify.sanitize(marked.parse(this.buffer));
this.element.innerHTML = html;
}
reset() {
this.buffer = '';
this.element.innerHTML = '';
}
}
监控与告警体系
最后,没有监控的优化等于盲人摸象。我们建立了完整的流式输出监控体系:
| 监控指标 | 告警阈值 | 告警级别 | 处理SLA |
|---|---|---|---|
| TTFT p95 | > 2.0s | P2 | 30分钟 |
| TTFT p99 | > 5.0s | P1 | 15分钟 |
| Chunk 间隔标准差 | > 50ms | P3 | 2小时 |
| 断流率 | > 1% | P1 | 15分钟 |
| 活跃连接数 | > 80% 容量 | P2 | 30分钟 |
| 10秒无chunk率 | > 5% | P1 | 15分钟 |
告警通过企业微信和 PagerDuty 双通道发送。P1 告警会直接打电话给 on-call 工程师,P2/P3 走企业微信群。过去三个月,这套监控帮我们提前发现了 12 次潜在故障,没有一次影响到用户。
流式输出的优化是一个系统工程,从 Nginx 配置到 TCP 参数,从服务端限流到客户端渲染,每一层都有优化空间。但最重要的原则是:先监控再优化,用数据驱动决策。别凭感觉改代码,先把基线数据测清楚,改完再对比,这样才能确保每一步优化都是有效的。
如果你正在做 AI 产品的流式输出,希望这篇文章的实战经验能帮你少踩一些坑。有问题欢迎在 TokenNexus 上交流,我们团队一直在更新最新的 API 性能数据和优化技巧。