广告位预留 (728x90)

AI API集成最佳实践:错误处理与性能优化

TokenNexus团队 · 2026年5月1日 · 15分钟阅读

去年我们团队在做一个内部知识库问答系统的时候,踩了一个很典型的坑:上线第一天,OpenAI的API就因为并发请求过多直接返回了429错误,整个服务瘫痪了将近四十分钟。后来复盘才发现,代码里连基本的重试逻辑都没有写。这件事让我意识到,AI API集成这件事,远不止"发个HTTP请求拿结果"那么简单。

这篇文章会把我们在生产环境里摸爬滚打总结出来的经验整理出来,涵盖错误处理、重试策略、流式响应、成本控制这些核心话题。所有代码都是Python实现,可以直接拿去用。

一、API错误处理:别让一次失败拖垮整个服务

调用AI API时,你大概率会遇到这几类错误:

根据我们的监控数据,在正常业务负载下,API调用失败率大约在2%-5%之间。如果不做任何错误处理,这意味着每20个请求里就有1个会直接报错给用户。这个体验是完全不能接受的。

基本的处理原则是:可重试的错误自动重试,不可重试的错误快速失败并记录日志。网络超时、5xx错误属于可重试范畴;4xx错误(除了429)通常是请求本身的问题,重试也没用。

实战经验
2025年12月OpenAI经历了一次持续约2小时的API故障(状态码503),当时我们系统因为配置了自动降级到DeepSeek,用户几乎没有感知。多模型备选方案比单纯的重试更可靠。

二、指数退避重试:别用固定间隔傻等

说到重试,很多人第一反应是"失败了就等一秒再试"。这在低并发场景下勉强能用,但一旦请求量上来,固定间隔重试反而会加剧拥塞——所有客户端同时重试,服务器压力更大。

正确做法是指数退避(Exponential Backoff),每次重试的等待时间按指数增长,再加上一个随机抖动(jitter)避免"惊群效应"。

下面这段代码是我们线上一直在用的重试装饰器:

import time
import random
import logging
from functools import wraps

logger = logging.getLogger(__name__)

def retry_with_backoff(max_retries=5, base_delay=1.0, max_delay=60.0):
    """
    指数退避重试装饰器
    max_retries: 最大重试次数
    base_delay: 初始等待时间(秒)
    max_delay: 最大等待时间上限(秒)
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None
            for attempt in range(max_retries + 1):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    last_exception = e
                    status = getattr(e, 'status_code', None)

                    # 不可重试的错误直接抛出
                    if status and 400 <= status < 500 and status != 429:
                        logger.error(f"不可重试错误 [{status}]: {e}")
                        raise

                    if attempt == max_retries:
                        logger.error(f"重试耗尽 ({max_retries}次): {e}")
                        break

                    # 计算退避时间:base * 2^attempt + 随机抖动
                    delay = min(base_delay * (2 ** attempt), max_delay)
                    jitter = random.uniform(0, delay * 0.5)
                    wait_time = delay + jitter

                    logger.warning(
                        f"第{attempt + 1}次重试,"
                        f"等待 {wait_time:.1f}s,错误: {e}"
                    )
                    time.sleep(wait_time)

            raise last_exception
        return wrapper
    return decorator

这套策略的退避序列大概是:1s -> 2s -> 4s -> 8s -> 16s(加上随机抖动)。5次重试总等待时间大约在30秒左右,对于大多数临时性故障来说足够了。根据我们的统计,超过90%的临时错误在前两次重试内就能恢复。

三、流式响应(SSE):让用户不用干等

大模型生成一段完整的回复,动辄需要5到15秒。如果用普通的请求-响应模式,用户盯着一个加载转圈看十几秒,体验极差。流式响应(Server-Sent Events)可以让模型边生成边返回,用户实时看到文字一个个蹦出来,体感延迟从十几秒降到不到1秒。

OpenAI、Anthropic、DeepSeek这些主流平台都支持流式输出。接入方式其实不复杂:

import httpx

def stream_chat(prompt: str, api_key: str):
    """流式调用OpenAI Chat API"""
    url = "https://api.openai.com/v1/chat/completions"
    headers = {
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json"
    }
    payload = {
        "model": "gpt-4o-mini",
        "messages": [{"role": "user", "content": prompt}],
        "stream": True,
        "temperature": 0.7
    }

    with httpx.stream("POST", url, headers=headers,
                       json=payload, timeout=30.0) as resp:
        resp.raise_for_status()
        for line in resp.iter_lines():
            if not line or not line.startswith("data: "):
                continue
            data = line[6:]
            if data == "[DONE]":
                break
            # 解析并实时处理每个chunk
            import json
            chunk = json.loads(data)
            delta = chunk["choices"][0].get("delta", {})
            content = delta.get("content", "")
            if content:
                yield content

不过流式响应也有几个需要注意的地方。首先是超时设置:不能用默认超时,因为流式请求可能持续很长时间,建议把读取超时设长一些,但连接超时保持较短(5秒以内)。其次是错误处理:流式传输中途如果断开了,需要决定是重新发起请求还是把已收到的内容拼起来返回。我们的做法是维护一个缓冲区,如果流中断了,用已收到的内容做一次非流式的补全请求。

注意
流式响应的Token计费和普通请求一样,平台按完整输出计费,不会因为用了流式就多收费。但流式请求会占用更长的连接时间,在高并发场景下要注意连接池的配置。
广告位预留 (336x280)

四、Token计数与成本监控:别等账单来了才心疼

我们见过不少团队在月底看到API账单时才发现成本失控。AI API按Token计费的特点决定了,你必须有实时的成本感知能力。

最简单的做法是在每次请求前后计算Token数。OpenAI的API响应里会返回 usage 字段,包含 prompt_tokenscompletion_tokenstotal_tokens。你可以用一个中间层把这些数据收集起来:

import tiktoken
from datetime import datetime

class TokenTracker:
    """简易Token成本追踪器"""

    def __init__(self):
        self.encoder = tiktoken.encoding_for_model("gpt-4o-mini")
        self.records = []

    def count_input_tokens(self, messages: list) -> int:
        """预估输入Token数"""
        total = 0
        for msg in messages:
            total += len(self.encoder.encode(msg["content"]))
        return total

    def log_usage(self, model: str, input_tokens: int,
                  output_tokens: int, user_id: str = ""):
        # 各模型定价(每百万Token,美元)
        PRICING = {
            "gpt-4o": {"input": 2.50, "output": 10.00},
            "gpt-4o-mini": {"input": 0.15, "output": 0.60},
            "deepseek-chat": {"input": 0.07, "output": 0.28},
            "claude-3-haiku": {"input": 0.25, "output": 1.25},
        }
        prices = PRICING.get(model, {"input": 1.0, "output": 3.0})
        cost = (input_tokens * prices["input"]
                + output_tokens * prices["output"]) / 1_000_000

        self.records.append({
            "time": datetime.now().isoformat(),
            "model": model,
            "input_tokens": input_tokens,
            "output_tokens": output_tokens,
            "cost_usd": cost,
            "user_id": user_id
        })

    def daily_summary(self) -> dict:
        """按天汇总成本"""
        from collections import defaultdict
        daily = defaultdict(float)
        for r in self.records:
            date = r["time"][:10]
            daily[date] += r["cost_usd"]
        return dict(daily)

实际生产中,建议把这个追踪器接进你的日志系统或时序数据库(比如Prometheus + Grafana),设置每日预算告警。我们给自己设的阈值是日成本的80%,一旦触发就自动切换到更便宜的模型(比如从GPT-4o降级到GPT-4o-mini或DeepSeek)。

五、Rate Limiting应对:尊重规则,聪明绕行

每个AI API平台都有速率限制,OpenAI的Tier 1账户默认是500 RPM(每分钟请求数)和200,000 TPM(每分钟Token数)。Anthropic的限制也类似。当你收到429响应时,响应头里通常会带上 Retry-Afterx-ratelimit-reset-requests 这些字段,告诉你什么时候可以继续。

应对Rate Limiting有几个层面的策略:

1. 客户端限流

在发请求之前就做好限流,而不是等429了再退避。Python里可以用 ratelimit 库或者自己实现一个令牌桶:

import threading
import time

class TokenBucket:
    """令牌桶限流器"""
    def __init__(self, rate: float, capacity: int):
        self.rate = rate          # 每秒补充的令牌数
        self.capacity = capacity  # 桶容量
        self.tokens = capacity
        self.last_time = time.monotonic()
        self.lock = threading.Lock()

    def acquire(self, tokens: int = 1) -> float:
        """获取令牌,返回需要等待的时间"""
        with self.lock:
            now = time.monotonic()
            elapsed = now - self.last_time
            self.tokens = min(self.capacity,
                              self.tokens + elapsed * self.rate)
            self.last_time = now

            if self.tokens >= tokens:
                self.tokens -= tokens
                return 0.0

            wait = (tokens - self.tokens) / self.rate
            self.tokens = 0
            return wait

# 使用示例:限制为每秒8个请求
limiter = TokenBucket(rate=8.0, capacity=10)

def api_call_with_limit(prompt):
    wait = limiter.acquire()
    if wait > 0:
        time.sleep(wait)
    return call_openai_api(prompt)

2. 多Key轮询

如果你有多个API Key(很多团队会注册多个账户),可以在Key池里轮询分发请求,成倍提升吞吐上限。不过要注意平台的服务条款,有些平台明确禁止多账户共享。

3. 请求队列

对于批量任务(比如批量翻译、数据标注),不要并发打满,而是用消息队列(Redis Stream、RabbitMQ)控制消费速率。我们用Redis实现了一个简单的延迟队列,收到429后自动把请求放回队列,等Retry-After的时间到了再消费。

六、缓存策略:相同的问题别问两遍

在实际业务中,用户的问题有很高的重复率。我们统计过,知识库问答场景下大约35%的查询在语义上是重复的。如果能命中缓存,不仅省了API调用费用,响应速度也能从秒级降到毫秒级。

缓存的关键在于如何定义"相同的请求"。直接用字符串匹配太死板,"什么是机器学习"和"机器学习是什么"应该命中同一条缓存。我们用的方案是对用户输入做一次embedding,然后用向量相似度做近似匹配,阈值设为0.95以上就认为语义相同。

import hashlib
import json
from openai import OpenAI

client = OpenAI()

def get_cache_key(messages: list, model: str,
                  temperature: float = 0.0) -> str:
    """
    生成缓存键。temperature > 0 时不缓存,
    因为输出具有随机性。
    """
    if temperature > 0:
        return None  # 不缓存非确定性请求
    content = json.dumps(messages, sort_keys=True)
    return hashlib.sha256(content.encode()).hexdigest()

# 简单的内存缓存(生产环境建议用Redis)
_cache = {}

def cached_chat(messages, model="gpt-4o-mini", **kwargs):
    temp = kwargs.get("temperature", 0.0)
    key = get_cache_key(messages, model, temp)
    if key and key in _cache:
        return _cache[key]  # 缓存命中

    response = client.chat.completions.create(
        model=model, messages=messages, **kwargs
    )
    result = response.choices[0].message.content

    if key:
        _cache[key] = result
    return result

要注意的是,缓存只适合temperature为0的确定性请求。如果用户要求创意写作或者temperature设得比较高,每次输出都不一样,缓存就没意义了。另外,缓存要设置合理的过期时间(TTL),我们一般设为24小时,避免返回过时的信息。

广告位预留 (336x280)

七、并发控制与连接池:高并发场景的生存法则

当你需要同时处理大量请求时,连接池和并发控制就变得至关重要。直接用 requests 库每次创建新连接的方式在高并发下效率极低,因为TCP握手和TLS协商的开销很大。

推荐使用 httpx 的异步客户端,它内置了连接池复用:

import asyncio
import httpx

async def batch_chat(prompts: list, api_key: str,
                     max_concurrent: int = 10):
    """批量并发调用,控制最大并发数"""
    semaphore = asyncio.Semaphore(max_concurrent)

    async def single_call(prompt: str) -> str:
        async with semaphore:
            async with httpx.AsyncClient(
                timeout=httpx.Timeout(30.0, connect=5.0)
            ) as client:
                resp = await client.post(
                    "https://api.openai.com/v1/chat/completions",
                    headers={
                        "Authorization": f"Bearer {api_key}",
                        "Content-Type": "application/json"
                    },
                    json={
                        "model": "gpt-4o-mini",
                        "messages": [
                            {"role": "user", "content": prompt}
                        ]
                    }
                )
                resp.raise_for_status()
                return resp.json()["choices"][0]["message"]["content"]

    tasks = [single_call(p) for p in prompts]
    return await asyncio.gather(*tasks, return_exceptions=True)

这里用 asyncio.Semaphore 把并发数限制在10,避免瞬间打满API配额。连接池方面,httpx.AsyncClient 默认维护一个连接池,同一个host的请求会复用TCP连接。在实际测试中,使用连接池后批量请求的吞吐量提升了大约3倍,P99延迟降低了约40%。

生产环境踩坑记录
我们曾经遇到过httpx连接池里的"僵尸连接"问题——某些连接被服务端关闭了但客户端不知道,复用时会报ConnectionError。解决方案是在创建client时设置 http2=False 并且加上 limits=httpx.Limits(max_keepalive_connections=20, keepalive_expiry=30),定期清理空闲连接。

八、一个完整的生产级调用封装

最后,把上面提到的核心策略整合到一起,给出一个可以直接用于生产环境的调用封装:

import asyncio
import httpx
import time
import random
import logging
from dataclasses import dataclass, field

logger = logging.getLogger(__name__)

@dataclass
class APIConfig:
    api_key: str
    base_url: str = "https://api.openai.com/v1"
    model: str = "gpt-4o-mini"
    max_retries: int = 5
    timeout: float = 30.0
    max_concurrent: int = 10
    daily_budget_usd: float = 50.0

@dataclass
class UsageStats:
    total_requests: int = 0
    total_tokens: int = 0
    total_cost_usd: float = 0.0
    cache_hits: int = 0
    errors: int = 0

class AIClient:
    def __init__(self, config: APIConfig):
        self.config = config
        self.stats = UsageStats()
        self._semaphore = asyncio.Semaphore(config.max_concurrent)
        self._cache = {}

    async def chat(self, messages: list, **kwargs) -> str:
        """带完整错误处理、重试和成本追踪的聊天接口"""
        async with self._semaphore:
            return await self._call_with_retry(messages, **kwargs)

    async def _call_with_retry(self, messages, **kwargs):
        last_error = None
        for attempt in range(self.config.max_retries + 1):
            try:
                result = await self._single_request(messages, **kwargs)
                self.stats.total_requests += 1
                return result
            except httpx.HTTPStatusError as e:
                last_error = e
                if e.response.status_code == 429:
                    reset = e.response.headers.get(
                        "Retry-After", "5")
                    wait = float(reset) + random.uniform(0, 1)
                    logger.warning(f"触发限流,等待 {wait:.1f}s")
                    await asyncio.sleep(wait)
                elif e.response.status_code >= 500:
                    delay = min(2 ** attempt, 30)
                    await asyncio.sleep(delay + random.random())
                else:
                    self.stats.errors += 1
                    raise
            except (httpx.ConnectError, httpx.ReadTimeout) as e:
                last_error = e
                delay = min(2 ** attempt, 15)
                await asyncio.sleep(delay)

        self.stats.errors += 1
        raise last_error

    async def _single_request(self, messages, **kwargs):
        async with httpx.AsyncClient(
            timeout=httpx.Timeout(
                self.config.timeout, connect=5.0
            ),
            limits=httpx.Limits(
                max_keepalive_connections=20,
                keepalive_expiry=30
            )
        ) as client:
            resp = await client.post(
                f"{self.config.base_url}/chat/completions",
                headers={
                    "Authorization":
                        f"Bearer {self.config.api_key}",
                    "Content-Type": "application/json"
                },
                json={
                    "model": self.config.model,
                    "messages": messages,
                    **kwargs
                }
            )
            resp.raise_for_status()
            data = resp.json()
            usage = data.get("usage", {})
            self.stats.total_tokens += usage.get(
                "total_tokens", 0
            )
            return data["choices"][0]["message"]["content"]

这个封装覆盖了本文讨论的大部分关键点:指数退避重试、并发控制、连接池管理、Token用量统计。你可以根据实际需求在此基础上扩展,比如加上缓存层、多模型降级、更细粒度的成本告警等。

写在最后

AI API集成这件事,表面上看就是发个HTTP请求,但要做到生产级可用,需要考虑的东西远比想象中多。错误处理、重试策略、流式响应、成本监控、限流应对、缓存优化、并发控制——每一环都可能成为系统的瓶颈或故障点。

我们团队踩过的坑总结起来就一句话:永远不要假设API调用一定会成功。做好兜底方案,做好监控告警,做好成本控制,你的AI应用才能稳定地跑下去。

如果你在集成过程中遇到了具体问题,欢迎在评论区交流,我们尽量帮忙解答。