去年我们团队在做一个内部知识库问答系统的时候,踩了一个很典型的坑:上线第一天,OpenAI的API就因为并发请求过多直接返回了429错误,整个服务瘫痪了将近四十分钟。后来复盘才发现,代码里连基本的重试逻辑都没有写。这件事让我意识到,AI API集成这件事,远不止"发个HTTP请求拿结果"那么简单。
这篇文章会把我们在生产环境里摸爬滚打总结出来的经验整理出来,涵盖错误处理、重试策略、流式响应、成本控制这些核心话题。所有代码都是Python实现,可以直接拿去用。
一、API错误处理:别让一次失败拖垮整个服务
调用AI API时,你大概率会遇到这几类错误:
429 Too Many Requests—— 触发了平台的速率限制500/502/503—— 服务端临时故障,OpenAI和Anthropic都出现过大规模宕机401/403—— API Key过期或权限不足400—— 请求参数有问题,比如Token超长- 网络超时 —— 你的服务器到API节点之间的链路不稳定
根据我们的监控数据,在正常业务负载下,API调用失败率大约在2%-5%之间。如果不做任何错误处理,这意味着每20个请求里就有1个会直接报错给用户。这个体验是完全不能接受的。
基本的处理原则是:可重试的错误自动重试,不可重试的错误快速失败并记录日志。网络超时、5xx错误属于可重试范畴;4xx错误(除了429)通常是请求本身的问题,重试也没用。
2025年12月OpenAI经历了一次持续约2小时的API故障(状态码503),当时我们系统因为配置了自动降级到DeepSeek,用户几乎没有感知。多模型备选方案比单纯的重试更可靠。
二、指数退避重试:别用固定间隔傻等
说到重试,很多人第一反应是"失败了就等一秒再试"。这在低并发场景下勉强能用,但一旦请求量上来,固定间隔重试反而会加剧拥塞——所有客户端同时重试,服务器压力更大。
正确做法是指数退避(Exponential Backoff),每次重试的等待时间按指数增长,再加上一个随机抖动(jitter)避免"惊群效应"。
下面这段代码是我们线上一直在用的重试装饰器:
import time
import random
import logging
from functools import wraps
logger = logging.getLogger(__name__)
def retry_with_backoff(max_retries=5, base_delay=1.0, max_delay=60.0):
"""
指数退避重试装饰器
max_retries: 最大重试次数
base_delay: 初始等待时间(秒)
max_delay: 最大等待时间上限(秒)
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
status = getattr(e, 'status_code', None)
# 不可重试的错误直接抛出
if status and 400 <= status < 500 and status != 429:
logger.error(f"不可重试错误 [{status}]: {e}")
raise
if attempt == max_retries:
logger.error(f"重试耗尽 ({max_retries}次): {e}")
break
# 计算退避时间:base * 2^attempt + 随机抖动
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay * 0.5)
wait_time = delay + jitter
logger.warning(
f"第{attempt + 1}次重试,"
f"等待 {wait_time:.1f}s,错误: {e}"
)
time.sleep(wait_time)
raise last_exception
return wrapper
return decorator
这套策略的退避序列大概是:1s -> 2s -> 4s -> 8s -> 16s(加上随机抖动)。5次重试总等待时间大约在30秒左右,对于大多数临时性故障来说足够了。根据我们的统计,超过90%的临时错误在前两次重试内就能恢复。
三、流式响应(SSE):让用户不用干等
大模型生成一段完整的回复,动辄需要5到15秒。如果用普通的请求-响应模式,用户盯着一个加载转圈看十几秒,体验极差。流式响应(Server-Sent Events)可以让模型边生成边返回,用户实时看到文字一个个蹦出来,体感延迟从十几秒降到不到1秒。
OpenAI、Anthropic、DeepSeek这些主流平台都支持流式输出。接入方式其实不复杂:
import httpx
def stream_chat(prompt: str, api_key: str):
"""流式调用OpenAI Chat API"""
url = "https://api.openai.com/v1/chat/completions"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
payload = {
"model": "gpt-4o-mini",
"messages": [{"role": "user", "content": prompt}],
"stream": True,
"temperature": 0.7
}
with httpx.stream("POST", url, headers=headers,
json=payload, timeout=30.0) as resp:
resp.raise_for_status()
for line in resp.iter_lines():
if not line or not line.startswith("data: "):
continue
data = line[6:]
if data == "[DONE]":
break
# 解析并实时处理每个chunk
import json
chunk = json.loads(data)
delta = chunk["choices"][0].get("delta", {})
content = delta.get("content", "")
if content:
yield content
不过流式响应也有几个需要注意的地方。首先是超时设置:不能用默认超时,因为流式请求可能持续很长时间,建议把读取超时设长一些,但连接超时保持较短(5秒以内)。其次是错误处理:流式传输中途如果断开了,需要决定是重新发起请求还是把已收到的内容拼起来返回。我们的做法是维护一个缓冲区,如果流中断了,用已收到的内容做一次非流式的补全请求。
流式响应的Token计费和普通请求一样,平台按完整输出计费,不会因为用了流式就多收费。但流式请求会占用更长的连接时间,在高并发场景下要注意连接池的配置。
四、Token计数与成本监控:别等账单来了才心疼
我们见过不少团队在月底看到API账单时才发现成本失控。AI API按Token计费的特点决定了,你必须有实时的成本感知能力。
最简单的做法是在每次请求前后计算Token数。OpenAI的API响应里会返回 usage 字段,包含 prompt_tokens、completion_tokens 和 total_tokens。你可以用一个中间层把这些数据收集起来:
import tiktoken
from datetime import datetime
class TokenTracker:
"""简易Token成本追踪器"""
def __init__(self):
self.encoder = tiktoken.encoding_for_model("gpt-4o-mini")
self.records = []
def count_input_tokens(self, messages: list) -> int:
"""预估输入Token数"""
total = 0
for msg in messages:
total += len(self.encoder.encode(msg["content"]))
return total
def log_usage(self, model: str, input_tokens: int,
output_tokens: int, user_id: str = ""):
# 各模型定价(每百万Token,美元)
PRICING = {
"gpt-4o": {"input": 2.50, "output": 10.00},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"deepseek-chat": {"input": 0.07, "output": 0.28},
"claude-3-haiku": {"input": 0.25, "output": 1.25},
}
prices = PRICING.get(model, {"input": 1.0, "output": 3.0})
cost = (input_tokens * prices["input"]
+ output_tokens * prices["output"]) / 1_000_000
self.records.append({
"time": datetime.now().isoformat(),
"model": model,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"cost_usd": cost,
"user_id": user_id
})
def daily_summary(self) -> dict:
"""按天汇总成本"""
from collections import defaultdict
daily = defaultdict(float)
for r in self.records:
date = r["time"][:10]
daily[date] += r["cost_usd"]
return dict(daily)
实际生产中,建议把这个追踪器接进你的日志系统或时序数据库(比如Prometheus + Grafana),设置每日预算告警。我们给自己设的阈值是日成本的80%,一旦触发就自动切换到更便宜的模型(比如从GPT-4o降级到GPT-4o-mini或DeepSeek)。
五、Rate Limiting应对:尊重规则,聪明绕行
每个AI API平台都有速率限制,OpenAI的Tier 1账户默认是500 RPM(每分钟请求数)和200,000 TPM(每分钟Token数)。Anthropic的限制也类似。当你收到429响应时,响应头里通常会带上 Retry-After 或 x-ratelimit-reset-requests 这些字段,告诉你什么时候可以继续。
应对Rate Limiting有几个层面的策略:
1. 客户端限流
在发请求之前就做好限流,而不是等429了再退避。Python里可以用 ratelimit 库或者自己实现一个令牌桶:
import threading
import time
class TokenBucket:
"""令牌桶限流器"""
def __init__(self, rate: float, capacity: int):
self.rate = rate # 每秒补充的令牌数
self.capacity = capacity # 桶容量
self.tokens = capacity
self.last_time = time.monotonic()
self.lock = threading.Lock()
def acquire(self, tokens: int = 1) -> float:
"""获取令牌,返回需要等待的时间"""
with self.lock:
now = time.monotonic()
elapsed = now - self.last_time
self.tokens = min(self.capacity,
self.tokens + elapsed * self.rate)
self.last_time = now
if self.tokens >= tokens:
self.tokens -= tokens
return 0.0
wait = (tokens - self.tokens) / self.rate
self.tokens = 0
return wait
# 使用示例:限制为每秒8个请求
limiter = TokenBucket(rate=8.0, capacity=10)
def api_call_with_limit(prompt):
wait = limiter.acquire()
if wait > 0:
time.sleep(wait)
return call_openai_api(prompt)
2. 多Key轮询
如果你有多个API Key(很多团队会注册多个账户),可以在Key池里轮询分发请求,成倍提升吞吐上限。不过要注意平台的服务条款,有些平台明确禁止多账户共享。
3. 请求队列
对于批量任务(比如批量翻译、数据标注),不要并发打满,而是用消息队列(Redis Stream、RabbitMQ)控制消费速率。我们用Redis实现了一个简单的延迟队列,收到429后自动把请求放回队列,等Retry-After的时间到了再消费。
六、缓存策略:相同的问题别问两遍
在实际业务中,用户的问题有很高的重复率。我们统计过,知识库问答场景下大约35%的查询在语义上是重复的。如果能命中缓存,不仅省了API调用费用,响应速度也能从秒级降到毫秒级。
缓存的关键在于如何定义"相同的请求"。直接用字符串匹配太死板,"什么是机器学习"和"机器学习是什么"应该命中同一条缓存。我们用的方案是对用户输入做一次embedding,然后用向量相似度做近似匹配,阈值设为0.95以上就认为语义相同。
import hashlib
import json
from openai import OpenAI
client = OpenAI()
def get_cache_key(messages: list, model: str,
temperature: float = 0.0) -> str:
"""
生成缓存键。temperature > 0 时不缓存,
因为输出具有随机性。
"""
if temperature > 0:
return None # 不缓存非确定性请求
content = json.dumps(messages, sort_keys=True)
return hashlib.sha256(content.encode()).hexdigest()
# 简单的内存缓存(生产环境建议用Redis)
_cache = {}
def cached_chat(messages, model="gpt-4o-mini", **kwargs):
temp = kwargs.get("temperature", 0.0)
key = get_cache_key(messages, model, temp)
if key and key in _cache:
return _cache[key] # 缓存命中
response = client.chat.completions.create(
model=model, messages=messages, **kwargs
)
result = response.choices[0].message.content
if key:
_cache[key] = result
return result
要注意的是,缓存只适合temperature为0的确定性请求。如果用户要求创意写作或者temperature设得比较高,每次输出都不一样,缓存就没意义了。另外,缓存要设置合理的过期时间(TTL),我们一般设为24小时,避免返回过时的信息。
七、并发控制与连接池:高并发场景的生存法则
当你需要同时处理大量请求时,连接池和并发控制就变得至关重要。直接用 requests 库每次创建新连接的方式在高并发下效率极低,因为TCP握手和TLS协商的开销很大。
推荐使用 httpx 的异步客户端,它内置了连接池复用:
import asyncio
import httpx
async def batch_chat(prompts: list, api_key: str,
max_concurrent: int = 10):
"""批量并发调用,控制最大并发数"""
semaphore = asyncio.Semaphore(max_concurrent)
async def single_call(prompt: str) -> str:
async with semaphore:
async with httpx.AsyncClient(
timeout=httpx.Timeout(30.0, connect=5.0)
) as client:
resp = await client.post(
"https://api.openai.com/v1/chat/completions",
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
},
json={
"model": "gpt-4o-mini",
"messages": [
{"role": "user", "content": prompt}
]
}
)
resp.raise_for_status()
return resp.json()["choices"][0]["message"]["content"]
tasks = [single_call(p) for p in prompts]
return await asyncio.gather(*tasks, return_exceptions=True)
这里用 asyncio.Semaphore 把并发数限制在10,避免瞬间打满API配额。连接池方面,httpx.AsyncClient 默认维护一个连接池,同一个host的请求会复用TCP连接。在实际测试中,使用连接池后批量请求的吞吐量提升了大约3倍,P99延迟降低了约40%。
我们曾经遇到过httpx连接池里的"僵尸连接"问题——某些连接被服务端关闭了但客户端不知道,复用时会报ConnectionError。解决方案是在创建client时设置
http2=False 并且加上 limits=httpx.Limits(max_keepalive_connections=20, keepalive_expiry=30),定期清理空闲连接。
八、一个完整的生产级调用封装
最后,把上面提到的核心策略整合到一起,给出一个可以直接用于生产环境的调用封装:
import asyncio
import httpx
import time
import random
import logging
from dataclasses import dataclass, field
logger = logging.getLogger(__name__)
@dataclass
class APIConfig:
api_key: str
base_url: str = "https://api.openai.com/v1"
model: str = "gpt-4o-mini"
max_retries: int = 5
timeout: float = 30.0
max_concurrent: int = 10
daily_budget_usd: float = 50.0
@dataclass
class UsageStats:
total_requests: int = 0
total_tokens: int = 0
total_cost_usd: float = 0.0
cache_hits: int = 0
errors: int = 0
class AIClient:
def __init__(self, config: APIConfig):
self.config = config
self.stats = UsageStats()
self._semaphore = asyncio.Semaphore(config.max_concurrent)
self._cache = {}
async def chat(self, messages: list, **kwargs) -> str:
"""带完整错误处理、重试和成本追踪的聊天接口"""
async with self._semaphore:
return await self._call_with_retry(messages, **kwargs)
async def _call_with_retry(self, messages, **kwargs):
last_error = None
for attempt in range(self.config.max_retries + 1):
try:
result = await self._single_request(messages, **kwargs)
self.stats.total_requests += 1
return result
except httpx.HTTPStatusError as e:
last_error = e
if e.response.status_code == 429:
reset = e.response.headers.get(
"Retry-After", "5")
wait = float(reset) + random.uniform(0, 1)
logger.warning(f"触发限流,等待 {wait:.1f}s")
await asyncio.sleep(wait)
elif e.response.status_code >= 500:
delay = min(2 ** attempt, 30)
await asyncio.sleep(delay + random.random())
else:
self.stats.errors += 1
raise
except (httpx.ConnectError, httpx.ReadTimeout) as e:
last_error = e
delay = min(2 ** attempt, 15)
await asyncio.sleep(delay)
self.stats.errors += 1
raise last_error
async def _single_request(self, messages, **kwargs):
async with httpx.AsyncClient(
timeout=httpx.Timeout(
self.config.timeout, connect=5.0
),
limits=httpx.Limits(
max_keepalive_connections=20,
keepalive_expiry=30
)
) as client:
resp = await client.post(
f"{self.config.base_url}/chat/completions",
headers={
"Authorization":
f"Bearer {self.config.api_key}",
"Content-Type": "application/json"
},
json={
"model": self.config.model,
"messages": messages,
**kwargs
}
)
resp.raise_for_status()
data = resp.json()
usage = data.get("usage", {})
self.stats.total_tokens += usage.get(
"total_tokens", 0
)
return data["choices"][0]["message"]["content"]
这个封装覆盖了本文讨论的大部分关键点:指数退避重试、并发控制、连接池管理、Token用量统计。你可以根据实际需求在此基础上扩展,比如加上缓存层、多模型降级、更细粒度的成本告警等。
写在最后
AI API集成这件事,表面上看就是发个HTTP请求,但要做到生产级可用,需要考虑的东西远比想象中多。错误处理、重试策略、流式响应、成本监控、限流应对、缓存优化、并发控制——每一环都可能成为系统的瓶颈或故障点。
我们团队踩过的坑总结起来就一句话:永远不要假设API调用一定会成功。做好兜底方案,做好监控告警,做好成本控制,你的AI应用才能稳定地跑下去。
如果你在集成过程中遇到了具体问题,欢迎在评论区交流,我们尽量帮忙解答。