AI API流式输出深度优化:从SSE到WebSocket的完整性能调优指南

AI API流式输出优化

去年年底我们团队负责的 AI 对话平台用户量突然暴涨,日活从 5 万飙升到 80 万。之前没怎么优化的流式输出接口直接扛不住了——用户疯狂投诉"转圈等半天"、"回复断断续续"、"有时候直接卡死"。那段时间我基本住在公司,天天盯着 Grafana 看监控曲线,把流式输出从 SSE 到 WebSocket 到底层 TCP 参数翻了个底朝天。

这篇文章就是我踩了三个月坑之后总结出来的完整调优方案。从最基础的 SSE 配置到 WebSocket + Token Bucket 的高级架构,从客户端感知延迟到服务端吞吐量,全部用真实数据说话。我们的平台目前日处理流式请求超过 2 亿次,这些经验都是真金白银换来的。

为什么流式输出是AI API体验的核心

先说一个数据:流式输出能把用户的感知延迟降低 80%。这不是我拍脑袋说的,是我们 A/B 测试的真实结果。

同样的 GPT-4 级别模型,生成一段 500 token 的回复,非流式模式需要等 8-12 秒才能看到完整结果。用户在这 8-12 秒里什么也看不到,只能盯着一个 loading 动画。而流式模式下,第一个 token 在 0.8-1.2 秒内就出现了,用户立刻知道"系统在工作",心理等待感大幅降低。

我们做了两组 A/B 测试,每组 10000 个用户:

指标非流式流式变化
平均感知等待时间9.2s1.8s-80.4%
用户中途放弃率23.5%6.2%-73.6%
用户满意度评分3.2/54.4/5+37.5%
平均会话轮次3.87.2+89.5%

流式输出不仅让用户感觉更快,还直接影响了留存和活跃度。用户中途放弃率从 23.5% 降到了 6.2%,平均会话轮次翻了一倍。这意味着流式输出不是"锦上添花",而是 AI 产品体验的刚需。

核心指标:TTFT、TPS、Chunk间隔

要优化流式输出,首先得知道该优化什么。我们关注三个核心指标:

我们优化前的基线数据:

指标优化前优化后提升
TTFT (p50)1.85s0.72s-61.1%
TTFT (p95)3.20s1.10s-65.6%
TPS (p50)42.568.3+60.7%
Chunk 间隔标准差85ms12ms-85.9%
断流率3.8%0.2%-94.7%

下面我会逐一拆解每个优化点是怎么做到的。

SSE深度优化:从基础配置到生产级方案

SSE(Server-Sent Events)是 AI API 流式输出最常用的协议,OpenAI、Anthropic、Google 都用 SSE。但默认配置下 SSE 的性能远达不到生产要求。

3.1 Nginx 缓冲区配置

这是最容易踩的坑。Nginx 默认会缓冲上游响应,导致 SSE 事件被攒成一批发送,用户看到的就是"卡一会儿突然蹦出一大段文字"。

# nginx.conf - SSE 关键配置
location /api/chat/stream {
    proxy_pass http://backend;
    proxy_http_version 1.1;

    # 关闭缓冲,数据立即转发
    proxy_buffering off;
    proxy_cache off;

    # SSE 必须的 header 透传
    proxy_set_header Connection '';
    proxy_set_header Cache-Control 'no-cache';
    proxy_set_header X-Accel-Buffering 'no';

    # 超时设置(流式请求可能持续较长时间)
    proxy_read_timeout 300s;
    proxy_send_timeout 300s;

    # 禁用 gzip(压缩会增加延迟)
    gzip off;
}

光这一步配置,就把我们的 Chunk 间隔标准差从 85ms 降到了 35ms。很多团队的问题不是代码写得不好,而是 Nginx 默认配置在暗中搞鬼。

3.2 服务端 flush 优化

即使关了 Nginx 缓冲,如果应用层不主动 flush,数据还是会积攒在内存里。以 FastAPI 为例:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio

app = FastAPI()

async def stream_response(prompt: str):
    # 关键:使用 async generator
    async for chunk in llm.generate_stream(prompt):
        # 构造 SSE 格式数据
        data = f"data: {chunk.json()}\n\n"
        yield data
        # 确保立即发送,不缓冲
        await asyncio.sleep(0)  # 让出事件循环,触发 flush

    yield "data: [DONE]\n\n"

@app.post("/api/chat/stream")
async def chat_stream(request: ChatRequest):
    return StreamingResponse(
        stream_response(request.prompt),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",  # 双保险
        }
    )

await asyncio.sleep(0) 这一行是关键。它让出事件循环控制权,确保底层的 ASGI 服务器(Uvicorn/Hypercorn)有机会把缓冲区里的数据真正发出去。没有这一行,你会发现数据还是一坨一坨地出现。

3.3 SSE with Retry-After:达到 99% 可靠性

流式连接在公网环境下断开是常态。我们统计过,未优化的 SSE 连接在 60 秒内的断开率约 5%。加上 Retry-After 机制后,可靠性可以提升到 99%。

// 客户端 SSE 重连逻辑
class RobustEventSource {
  constructor(url, options = {}) {
    this.url = url;
    this.retryDelay = options.retryDelay || 1000;
    this.maxRetries = options.maxRetries || 5;
    this.retryCount = 0;
    this.lastEventId = null;
    this.connect();
  }

  connect() {
    this.es = new EventSource(this.url);

    this.es.onmessage = (event) => {
      this.retryCount = 0; // 成功接收,重置计数
      this.lastEventId = event.lastEventId;

      // 解析服务端的 Retry-After
      if (event.data.includes('retry:')) {
        const match = event.data.match(/retry:(\d+)/);
        if (match) this.retryDelay = parseInt(match[1]);
      }

      this.onMessage?.(event);
    };

    this.es.onerror = () => {
      this.es.close();
      if (this.retryCount < this.maxRetries) {
        // 指数退避 + 抖动
        const delay = this.retryDelay * Math.pow(2, this.retryCount)
          + Math.random() * 500;
        this.retryCount++;
        console.log(`Reconnecting in ${delay}ms (attempt ${this.retryCount})`);
        setTimeout(() => this.connect(), delay);
      } else {
        this.onError?.(new Error('Max retries exceeded'));
      }
    };
  }
}

服务端配合发送 retry: 字段来动态调整重连间隔:

# 服务端在 SSE 流中插入 retry 指令
async def stream_with_retry_hint(prompt: str):
    yield "retry: 3000\n\n"  # 建议客户端 3 秒后重连
    async for chunk in llm.generate_stream(prompt):
        yield f"data: {chunk.json()}\n\n"

这套组合拳打下来,我们的 SSE 连接可靠性从 95% 提升到了 99.2%。剩下 0.8% 的失败主要是用户网络彻底断开(比如进电梯),这种情况任何重连策略都救不了。

WebSocket方案:什么时候值得升级

WebSocket 是双向通信协议,相比 SSE 的单向推送,它多了客户端主动发消息的能力。对于 AI 对话场景,WebSocket 的优势在于:

但 WebSocket 的复杂度也更高。我们的实测对比:

指标SSE (优化后)WebSocket差异
TTFT (p50)0.72s0.68s-5.6%
TPS (p50)68.371.2+4.2%
Chunk 间隔波动±0.8%±0.1%-87.5%
断流恢复时间1.5-3.0s<0.5s-83.3%
内存开销/连接~8KB~16KB+100%
实现复杂度-

最显著的差异在 Chunk 间隔波动上。SSE 经过 HTTP 协议栈,受 TCP 慢启动、Nagle 算法等因素影响,间隔波动约 ±0.8%。WebSocket 建立连接后复用同一条 TCP 通道,间隔波动只有 ±0.1%,用户看到的文字几乎是匀速出现的。

我们最终选择的方案是 WebSocket + Token Bucket 限流

import asyncio
import json
from fastapi import WebSocket, WebSocketDisconnect
from collections import deque

class TokenBucket:
    """WebSocket Token Bucket 限流器"""
    def __init__(self, rate: float, capacity: int):
        self.rate = rate        # tokens/second
        self.capacity = capacity
        self.tokens = capacity
        self.last_refill = asyncio.get_event_loop().time()
        self._lock = asyncio.Lock()

    async def consume(self, tokens: int = 1):
        async with self._lock:
            now = asyncio.get_event_loop().time()
            elapsed = now - self.last_refill
            self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
            self.last_refill = now

            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            return False

    async def wait_for_token(self):
        while not await self.consume():
            await asyncio.sleep(1.0 / self.rate)

# 全局限流器:每秒 100 个 chunk
chunk_bucket = TokenBucket(rate=100, capacity=200)

@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
    await websocket.accept()

    try:
        while True:
            data = await websocket.receive_text()
            request = json.loads(data)

            async for chunk in llm.generate_stream(request["prompt"]):
                # Token Bucket 限流,确保均匀输出
                await chunk_bucket.wait_for_token()
                await websocket.send_text(json.dumps({
                    "type": "chunk",
                    "content": chunk.text,
                    "finish_reason": chunk.finish_reason
                }))

            await websocket.send_text(json.dumps({
                "type": "done"
            }))

    except WebSocketDisconnect:
        # 客户端断开,清理资源
        pass

Token Bucket 限流的效果非常好。之前没有限流的时候,模型输出快的段落 chunk 间隔只有 5ms,慢的段落间隔 200ms+,用户看到的就是忽快忽慢。加上限流后,chunk 间隔稳定在 10ms 左右(对应 100 chunks/s),体验非常流畅。

断流检测与自动重连策略

这是流式输出中最容易被忽视但影响最大的问题。我们分析了 TokenMix 平台的数据发现一个惊人的规律:一个流式连接如果 10 秒内没有任何 chunk 到达,这条流有 90% 的概率已经失败了

无 chunk 时长最终失败概率建议操作
0-3s<5%正常等待
3-5s15%记录日志
5-10s45%准备重连
10s+90%立即重连
30s+99%已失败,通知用户

基于这个数据,我们设计了分级断流检测策略:

class StreamHealthMonitor:
    def __init__(self):
        self.last_chunk_time = None
        self.warning_threshold = 5.0   # 5秒无chunk开始预警
        self.failure_threshold = 10.0  # 10秒无chunk判定失败
        self.check_interval = 1.0      # 每秒检查一次

    async def monitor(self, on_warning, on_failure):
        while True:
            await asyncio.sleep(self.check_interval)
            if self.last_chunk_time is None:
                continue

            elapsed = time.time() - self.last_chunk_time

            if elapsed >= self.failure_threshold:
                await on_failure(elapsed)
                break  # 停止监控,触发重连
            elif elapsed >= self.warning_threshold:
                await on_warning(elapsed)

    def record_chunk(self):
        self.last_chunk_time = time.time()

这个监控器配合自动重连,把我们的断流导致的用户可感知失败率从 3.8% 降到了 0.2%。用户几乎不会再遇到"回复到一半突然卡住"的情况了。

HTTP/2 Server Push的实际收益

HTTP/2 Server Push 可以在客户端请求之前就主动推送资源。对于 AI 对话场景,我们用它来预推送常用的 prompt 模板和上下文数据。

实测数据:HTTP/2 Server Push 在我们的场景下带来了约 ±1.2% 的延迟波动改善。说实话,这个收益不算大。HTTP/2 的多路复用本身对流式输出帮助更大——它允许多个流在同一条 TCP 连接上并行传输,避免了 HTTP/1.1 的队头阻塞问题。

协议TTFT (p50)Chunk 间隔波动连接复用
HTTP/1.1 + SSE0.85s±1.2%不支持
HTTP/2 + SSE0.72s±0.8%支持
HTTP/2 + Server Push0.71s±0.7%支持
WebSocket0.68s±0.1%天然支持

我的建议是:如果你的基础设施已经支持 HTTP/2,顺手开启 Server Push,不需要额外开发成本。但不要为了这 1.2% 的收益专门去做改造,投入产出比不高。优先把精力放在 SSE 基础优化和断流检测上,那边的收益大得多。

FastAPI亿级调用量演进实战

我们的流式 API 从日调用量 100 万到 2 亿,经历了几个关键阶段。每个阶段遇到的问题和解决方案都不同。

阶段一:100 万 - 1000 万/天

这个阶段主要是基础 SSE 配置优化。关 Nginx 缓冲、加 flush、调整超时。单机 Uvicorn + 4 worker 就能扛住。核心问题是 Nginx 缓冲导致的"攒批发送",解决后用户体验立刻好了很多。

阶段二:1000 万 - 5000 万/天

开始出现连接数瓶颈。每个流式请求占用一个长连接,默认的文件描述符限制(ulimit -n 1024)根本不够用。解决方案:

# 系统层面调优
# /etc/sysctl.conf
net.core.somaxconn = 65535
net.ipv4.tcp_max_syn_backlog = 65535
net.ipv4.ip_local_port_range = 1024 65535
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_fin_timeout = 15

# /etc/security/limits.conf
* soft nofile 65535
* hard nofile 65535

# Uvicorn 启动参数
# uvicorn main:app --workers 8 --limit-concurrency 10000 --timeout-keep-alive 300

阶段三:5000 万 - 2 亿/天

这个阶段需要分布式架构了。我们引入了 Redis 做流式消息队列,用 WebSocket Gateway 做连接管理:

# 简化的架构示意
# Client -> Nginx -> WebSocket Gateway (多实例)
#                        |
#                   Redis Pub/Sub
#                        |
#                   LLM Worker Pool (多实例)

# WebSocket Gateway 核心逻辑
import redis.asyncio as redis

class StreamGateway:
    def __init__(self):
        self.redis = redis.Redis(connection_pool=redis_pool)
        self.connections = {}  # session_id -> WebSocket

    async def handle_stream(self, websocket: WebSocket, session_id: str):
        await websocket.accept()
        self.connections[session_id] = websocket

        pubsub = self.redis.pubsub()
        await pubsub.subscribe(f"stream:{session_id}")

        async for message in pubsub.listen():
            if message["type"] == "message":
                await websocket.send_text(message["data"].decode())

        del self.connections[session_id]

这个架构下,WebSocket Gateway 只负责连接管理和消息转发,LLM 推理完全解耦。Gateway 可以水平扩展,LLM Worker 也可以独立扩缩容。我们目前跑了 20 个 Gateway 实例 + 50 个 Worker 实例,日处理 2 亿次流式请求,p99 延迟稳定在 2 秒以内。

客户端优化:感知延迟降低80%的秘密

服务端优化只是故事的一半。客户端的渲染策略对感知延迟影响巨大。

8.1 增量渲染而非等待完整词

很多前端实现是收到一个完整词才渲染一次。但 LLM 输出是按 token 来的,一个中文词可能需要 2-3 个 token 才能拼完。如果等完整词再渲染,用户会感觉"卡顿"。

// 反面教材:等完整词再渲染
let currentWord = '';
eventSource.onmessage = (event) => {
    const token = JSON.parse(event.data).content;
    currentWord += token;
    // 只在遇到空格或标点时才渲染
    if (/[\s,。!?、]/.test(token)) {
        container.innerHTML += currentWord;
        currentWord = '';
    }
};

// 正确做法:每个 token 立即渲染
eventSource.onmessage = (event) => {
    const token = JSON.parse(event.data).content;
    const span = document.createElement('span');
    span.textContent = token;
    span.style.opacity = '0';  // 先透明
    container.appendChild(span);
    // 微小延迟后淡入,制造"打字"效果
    requestAnimationFrame(() => {
        span.style.transition = 'opacity 0.1s';
        span.style.opacity = '1';
    });
};

8.2 预测性光标和打字动画

在等待下一个 chunk 的时候,显示一个闪烁的光标,让用户知道"系统还在输出"。这个简单的视觉反馈就能显著降低焦虑感。

// CSS 闪烁光标
.typing-cursor::after {
    content: '|';
    animation: blink 0.8s infinite;
    color: var(--neon-cyan);
    font-weight: 100;
}

@keyframes blink {
    0%, 50% { opacity: 1; }
    51%, 100% { opacity: 0; }
}

8.3 流式 Markdown 渲染

AI 输出通常包含 Markdown 格式。流式场景下需要增量解析 Markdown,而不是等全部内容到齐后再渲染。我们用了 marked.js 的流式模式:

import { marked } from 'marked';
import DOMPurify from 'dompurify';

// 流式 Markdown 渲染器
class StreamingMarkdownRenderer {
    private buffer = '';
    private element: HTMLElement;

    render(token: string) {
        this.buffer += token;
        // 增量解析,只更新变化的部分
        const html = DOMPurify.sanitize(marked.parse(this.buffer));
        this.element.innerHTML = html;
    }

    reset() {
        this.buffer = '';
        this.element.innerHTML = '';
    }
}

监控与告警体系

最后,没有监控的优化等于盲人摸象。我们建立了完整的流式输出监控体系:

监控指标告警阈值告警级别处理SLA
TTFT p95> 2.0sP230分钟
TTFT p99> 5.0sP115分钟
Chunk 间隔标准差> 50msP32小时
断流率> 1%P115分钟
活跃连接数> 80% 容量P230分钟
10秒无chunk率> 5%P115分钟

告警通过企业微信和 PagerDuty 双通道发送。P1 告警会直接打电话给 on-call 工程师,P2/P3 走企业微信群。过去三个月,这套监控帮我们提前发现了 12 次潜在故障,没有一次影响到用户。

流式输出的优化是一个系统工程,从 Nginx 配置到 TCP 参数,从服务端限流到客户端渲染,每一层都有优化空间。但最重要的原则是:先监控再优化,用数据驱动决策。别凭感觉改代码,先把基线数据测清楚,改完再对比,这样才能确保每一步优化都是有效的。

如果你正在做 AI 产品的流式输出,希望这篇文章的实战经验能帮你少踩一些坑。有问题欢迎在 TokenNexus 上交流,我们团队一直在更新最新的 API 性能数据和优化技巧。