AI API容错降级架构实战:让你的服务在API翻车时依然坚挺

Claude官方挂了2小时,我是如何让用户几乎无感知地扛过来的

2025年8月某天上午10点,我正在改一个AI客服功能的bug,突然群里炸了:"Claude崩了!全部报错!"

我赶紧打开状态页,官方公告赫然写着"服务中断,预计2小时恢复"。再看我们的系统,AI相关功能全线飘红:

那一刻我深刻意识到:我们没有做任何容错设计,单点依赖太严重了

后来我们花了两周时间重构了整个AI调用架构。重构完成后,类似的情况再发生:API挂了?没关系,我们自动切到备用Key,用户根本感知不到。限流了?降级响应,历史缓存照样能给出回答。

这篇文章就是踩坑后的经验总结,代码全部真实可运行。

三层降级策略:从Fail Fast到完全兜底

容错不是一件事,是一套组合拳。我的设计分三层:

层级 策略名称 触发条件 处理方式
第一层 Fail Fast(快速失败) API响应超时、返回错误 立即重试或切换Key,不让用户等
第二层 Fallback(降级响应) 重试失败、所有Key都不可用 返回预设答案或历史缓存
第三层 Circuit Breaker(断路器) 连续失败超过阈值 暂时熔断,防止雪崩

三层策略配合使用,才能在各种异常情况下都保持服务可用。

多API Key轮询与故障转移

这是最简单的容错手段,但很多人懒得做。原理很简单:一个Key挂了,切换到另一个。

import random
from typing import List, Optional
from dataclasses import dataclass
import time

@dataclass
class APIKey:
    key: str
    name: str  # 用于日志标识
    is_available: bool = True
    failure_count: int = 0
    last_failure_time: float = 0

class KeyRotator:
    """API Key轮询器,支持故障转移"""
    
    def __init__(self, keys: List[dict]):
        """
        keys格式: [{"key": "sk-xxx", "name": "openai-key1"}, ...]
        """
        self.keys = [APIKey(**k) for k in keys]
        self.current_index = 0
        self.failure_cooldown = 60  # 失败后60秒不重用
    
    def get_available_key(self) -> Optional[APIKey]:
        """获取一个可用的Key"""
        now = time.time()
        
        # 尝试所有Key,找一个可用的
        for _ in range(len(self.keys)):
            key = self.keys[self.current_index]
            
            if key.is_available:
                # 检查是否在冷却期
                if key.last_failure_time > 0 and (now - key.last_failure_time) < self.failure_cooldown:
                    # 在冷却期,跳过
                    self.current_index = (self.current_index + 1) % len(self.keys)
                    continue
                return key
            
            self.current_index = (self.current_index + 1) % len(self.keys)
        
        # 所有Key都在冷却,返回最近失败的
        return min(self.keys, key=lambda k: k.last_failure_time)
    
    def mark_success(self, key: APIKey):
        """标记Key调用成功"""
        key.failure_count = 0
        key.is_available = True
    
    def mark_failure(self, key: APIKey):
        """标记Key调用失败"""
        key.failure_count += 1
        key.last_failure_time = time.time()
        
        # 连续失败5次,标记为不可用
        if key.failure_count >= 5:
            key.is_available = False
            print(f"[警告] Key {key.name} 连续失败{key.failure_count}次,暂停使用")
    
    def rotate(self):
        """轮换到下一个Key"""
        self.current_index = (self.current_index + 1) % len(self.keys)

# 使用示例
keys = [
    {"key": "sk-openai-1", "name": "openai-primary"},
    {"key": "sk-openai-2", "name": "openai-backup"},
    {"key": "sk-anthropic-1", "name": "claude-primary"},
]
rotator = KeyRotator(keys)

# 获取可用Key
key = rotator.get_available_key()
print(f"使用Key: {key.name}")
实战经验

Key的数量建议至少准备2个以上,且最好分散在不同服务商。我见过有人只准备两个同一家服务的Key,结果那家服务整体挂了,两个Key一起凉凉。

指数退避+抖动的重试装饰器

简单重试不行,指数退避+抖动才是正确姿势。为什么?想象一下:API挂了,你一秒内重试100次,那叫DoS攻击,会被限流得更狠。

import time
import random
import functools
from typing import Callable, Tuple, Optional
import requests

def retry_with_backoff(
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 30.0,
    jitter: bool = True,
    retry_on: Tuple[type, ...] = (requests.RequestException,),
):
    """
    带指数退避和抖动的重试装饰器
    
    参数:
        max_retries: 最大重试次数
        base_delay: 基础延迟秒数
        max_delay: 最大延迟秒数
        jitter: 是否添加随机抖动
        retry_on: 需要重试的异常类型
    """
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None
            
            for attempt in range(max_retries + 1):
                try:
                    return func(*args, **kwargs)
                except retry_on as e:
                    last_exception = e
                    
                    if attempt == max_retries:
                        # 达到最大重试次数
                        raise
                    
                    # 计算延迟:指数退避
                    delay = min(base_delay * (2 ** attempt), max_delay)
                    
                    # 添加抖动:避免多客户端同时重试造成雷鸣般效应
                    if jitter:
                        delay = delay * (0.5 + random.random())  # 0.5~1.5倍
                    
                    print(f"[重试] {func.__name__} 失败,{delay:.2f}秒后第{attempt + 2}次尝试: {e}")
                    time.sleep(delay)
            
            raise last_exception
        
        return wrapper
    return decorator

# 使用示例
class APIClient:
    
    @retry_with_backoff(max_retries=3, base_delay=1.0, max_delay=10.0)
    def call_openai(self, prompt: str) -> str:
        """调用OpenAI API,带自动重试"""
        response = requests.post(
            "https://api.openai.com/v1/chat/completions",
            headers={"Authorization": f"Bearer {self.api_key}"},
            json={"model": "gpt-4o", "messages": [{"role": "user", "content": prompt}]},
            timeout=10
        )
        response.raise_for_status()
        return response.json()["choices"][0]["message"]["content"]
    
    @retry_with_backoff(max_retries=3, base_delay=1.0, max_delay=10.0)
    def call_claude(self, prompt: str) -> str:
        """调用Claude API,带自动重试"""
        response = requests.post(
            "https://api.anthropic.com/v1/messages",
            headers={"x-api-key": self.api_key, "anthropic-version": "2023-06-01"},
            json={"model": "claude-sonnet-4-20250514", "messages": [{"role": "user", "content": prompt}]},
            timeout=10
        )
        response.raise_for_status()
        return response.json()["content"][0]["text"]

重试策略的选择:

断路器模式:防止雪崩的最后防线

断路器是容错架构的核心。它的灵感来自电路:当电流过载时,断路器跳闸保护整个系统。

断路器有三种状态:

from enum import Enum
from dataclasses import dataclass, field
from typing import Callable, Any, Optional
import time
import threading

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

@dataclass
class CircuitBreaker:
    """断路器实现"""
    
    name: str
    failure_threshold: int = 5  # 失败多少次后开启断路器
    recovery_timeout: float = 30.0  # 多少秒后尝试恢复
    success_threshold: int = 2  # 半开状态下成功多少次后关闭
    
    state: CircuitState = field(default=CircuitState.CLOSED, init=False)
    failure_count: int = field(default=0, init=False)
    success_count: int = field(default=0, init=False)
    last_failure_time: float = field(default=0.0, init=False)
    _lock: threading.Lock = field(default_factory=threading.Lock, init=False)
    
    def call(self, func: Callable, *args, **kwargs) -> Any:
        """通过断路器执行函数"""
        with self._lock:
            # 检查是否可以尝试请求
            if self.state == CircuitState.OPEN:
                # 检查是否超时可以尝试半开
                if time.time() - self.last_failure_time >= self.recovery_timeout:
                    self._to_half_open()
                else:
                    raise CircuitBreakerOpenError(f"断路器 {self.name} 已开启,拒绝请求")
        
        # 在锁外执行实际调用,避免阻塞其他请求
        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise
    
    def _on_success(self):
        """处理成功"""
        with self._lock:
            if self.state == CircuitState.HALF_OPEN:
                self.success_count += 1
                if self.success_count >= self.success_threshold:
                    self._to_closed()
            elif self.state == CircuitState.CLOSED:
                self.failure_count = 0
    
    def _on_failure(self):
        """处理失败"""
        with self._lock:
            self.failure_count += 1
            self.last_failure_time = time.time()
            
            if self.state == CircuitState.HALF_OPEN:
                # 半开状态下失败,立刻重新开启
                self._to_open()
            elif self.state == CircuitState.CLOSED:
                if self.failure_count >= self.failure_threshold:
                    self._to_open()
    
    def _to_open(self):
        """切换到开启状态"""
        if self.state != CircuitState.OPEN:
            print(f"[断路器] {self.name} 状态: {self.state.value} -> OPEN (失败{self.failure_count}次)")
            self.state = CircuitState.OPEN
            self.success_count = 0
    
    def _to_half_open(self):
        """切换到半开状态"""
        print(f"[断路器] {self.name} 状态: {self.state.value} -> HALF_OPEN (尝试恢复)")
        self.state = CircuitState.HALF_OPEN
        self.success_count = 0
    
    def _to_closed(self):
        """切换到关闭状态"""
        print(f"[断路器] {self.name} 状态: {self.state.value} -> CLOSED (恢复成功)")
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.success_count = 0


class CircuitBreakerOpenError(Exception):
    """断路器开启异常"""
    pass

# 使用示例
def get_circuit_breaker(name: str) -> CircuitBreaker:
    """获取或创建断路器(单例模式)"""
    if not hasattr(get_circuit_breaker, '_instances'):
        get_circuit_breaker._instances = {}
    
    if name not in get_circuit_breaker._instances:
        get_circuit_breaker._instances[name] = CircuitBreaker(
            name=name,
            failure_threshold=5,
            recovery_timeout=30,
            success_threshold=2
        )
    
    return get_circuit_breaker._instances[name]


# 实际使用
breaker = get_circuit_breaker("openai-gpt4")

try:
    result = breaker.call(call_openai_api, prompt="你好")
except CircuitBreakerOpenError:
    # 断路器开启,直接返回降级响应
    result = get_fallback_response("openai")
except Exception as e:
    # 其他错误
    result = get_fallback_response("openai")
参数调优建议

断路器的参数要根据业务场景调:
- failure_threshold:太低了容易误触发,太高了等太久才熔断。建议5-10
- recovery_timeout:API恢复通常需要5-30分钟,建议30秒起步试探
- success_threshold:不要设太低,建议2-3,防止假恢复

限流兜底降级策略

当API限流了(返回429),或者配额用光了,咋办?不能干等着吧。我的策略是:

  1. 先检查本地缓存有没有
  2. 没有的话,返回预设的兜底回答
  3. 同时把请求加入队列,稍后重试
from typing import Optional, Dict
import json
import hashlib
from datetime import datetime, timedelta

class RateLimitFallback:
    """限流兜底策略"""
    
    def __init__(self, cache_ttl: int = 3600):
        self.cache: Dict[str, tuple] = {}  # {cache_key: (response, timestamp)}
        self.cache_ttl = cache_ttl
        self.fallback_responses = self._load_fallback_responses()
    
    def _make_cache_key(self, prompt: str) -> str:
        """生成缓存key"""
        return hashlib.md5(prompt.encode()).hexdigest()
    
    def _load_fallback_responses(self) -> Dict[str, str]:
        """加载预设兜底响应"""
        return {
            "greeting": "您好!当前服务繁忙,您的请求已加入队列,我们会尽快处理。您也可以稍后再试,或者描述一下您的具体需求,我会尽力帮助您。",
            "general": "抱歉,当前AI服务正在处理较多请求。请稍等片刻,或者换个方式描述您的问题,我会尽快为您服务。",
            "urgent": "尊敬的用户,当前服务响应较慢。您的紧急需求建议拨打客服热线xxxx-xxxx,我们会立即处理。",
        }
    
    def get_cached_response(self, prompt: str) -> Optional[str]:
        """尝试从缓存获取响应"""
        key = self._make_cache_key(prompt)
        
        if key in self.cache:
            response, timestamp = self.cache[key]
            # 检查是否过期
            if (datetime.now() - timestamp).total_seconds() < self.cache_ttl:
                print(f"[缓存命中] key={key}")
                return response
            else:
                # 过期了,删除
                del self.cache[key]
        
        return None
    
    def cache_response(self, prompt: str, response: str):
        """缓存响应"""
        key = self._make_cache_key(prompt)
        self.cache[key] = (response, datetime.now())
        print(f"[缓存写入] key={key}")
    
    def get_fallback(self, prompt: str, is_urgent: bool = False) -> str:
        """获取兜底响应"""
        # 先检查缓存
        cached = self.get_cached_response(prompt)
        if cached:
            return f"[缓存回复] {cached}"
        
        # 选择合适的兜底话术
        if is_urgent:
            return self.fallback_responses["urgent"]
        elif any(kw in prompt for kw in ["你好", "hi", "hello"]):
            return self.fallback_responses["greeting"]
        else:
            return self.fallback_responses["general"]
    
    def handle_rate_limit(self, prompt: str) -> str:
        """处理限流:缓存+兜底"""
        # 缓存这个请求的标识
        self.cache_response(prompt, "[pending]")
        return self.get_fallback(prompt)


# 使用示例
fallback_manager = RateLimitFallback(cache_ttl=3600)

def call_with_fallback(prompt: str, is_urgent: bool = False) -> str:
    """带兜底的API调用"""
    try:
        # 实际调用API
        result = call_openai_api(prompt)
        fallback_manager.cache_response(prompt, result)
        return result
    except RateLimitError as e:
        print(f"[限流] {e}")
        return fallback_manager.handle_rate_limit(prompt)
    except CircuitBreakerOpenError:
        return fallback_manager.get_fallback(prompt, is_urgent)
    except Exception as e:
        print(f"[错误] {e}")
        return fallback_manager.get_fallback(prompt)

隔离舱模式:不同模型用不同连接池

隔离舱(Bulkhead)模式来自造船术语:船舱漏水了,用隔板把水限制在局部,不让它淹没整艘船。

在API调用中,这个模式的意思是:不同模型/服务用不同的连接池,互不干扰

举个例子:你的服务同时调用OpenAI和Claude,如果用同一个连接池,Claude的慢响应会拖慢OpenAI的请求。如果用隔离舱模式,各自独立,谁出问题不影响另一方。

import asyncio
import aiohttp
from typing import Dict, Optional

class IsolatedConnectionPool:
    """隔离舱模式的连接池管理器"""
    
    _pools: Dict[str, aiohttp.ClientSession] = {}
    _pool_configs: Dict[str, dict] = {
        "openai": {
            "limit": 50,  # 最大并发连接数
            "limit_per_host": 20,  # 单主机最大连接
            "timeout": aiohttp.ClientTimeout(total=30),
        },
        "anthropic": {
            "limit": 30,
            "limit_per_host": 15,
            "timeout": aiohttp.ClientTimeout(total=30),
        },
        "google": {
            "limit": 20,
            "limit_per_host": 10,
            "timeout": aiohttp.ClientTimeout(total=30),
        },
    }
    
    @classmethod
    async def get_session(cls, service: str) -> aiohttp.ClientSession:
        """获取指定服务的连接池"""
        if service not in cls._pools or cls._pools[service].closed:
            config = cls._pool_configs.get(service, {
                "limit": 20,
                "limit_per_host": 10,
                "timeout": aiohttp.ClientTimeout(total=30),
            })
            connector = aiohttp.TCPConnector(
                limit=config["limit"],
                limit_per_host=config["limit_per_host"],
            )
            cls._pools[service] = aiohttp.ClientSession(
                connector=connector,
                timeout=config["timeout"],
            )
            print(f"[隔离舱] {service} 连接池已创建 (limit={config['limit']})")
        
        return cls._pools[service]
    
    @classmethod
    async def close_all(cls):
        """关闭所有连接池"""
        for name, pool in cls._pools.items():
            if not pool.closed:
                await pool.close()
                print(f"[隔离舱] {name} 连接池已关闭")
    
    @classmethod
    async def get_pool_status(cls) -> Dict[str, dict]:
        """获取各连接池状态"""
        status = {}
        for name, pool in cls._pools.items():
            if not pool.closed:
                status[name] = {
                    "connector": pool.connector,
                    "closed": False
                }
        return status


# 使用示例
async def call_openai_async(prompt: str) -> str:
    """异步调用OpenAI(走隔离舱)"""
    session = await IsolatedConnectionPool.get_session("openai")
    
    async with session.post(
        "https://api.openai.com/v1/chat/completions",
        headers={"Authorization": f"Bearer {OPENAI_KEY}", "Content-Type": "application/json"},
        json={"model": "gpt-4o", "messages": [{"role": "user", "content": prompt}]},
    ) as response:
        result = await response.json()
        return result["choices"][0]["message"]["content"]

async def call_claude_async(prompt: str) -> str:
    """异步调用Claude(走隔离舱,互不影响)"""
    session = await IsolatedConnectionPool.get_session("anthropic")
    
    async with session.post(
        "https://api.anthropic.com/v1/messages",
        headers={"x-api-key": CLAUDE_KEY, "anthropic-version": "2023-06-01", "Content-Type": "application/json"},
        json={"model": "claude-sonnet-4-20250514", "messages": [{"role": "user", "content": prompt}]},
    ) as response:
        result = await response.json()
        return result["content"][0]["text"]


# 批量调用示例
async def batch_process(prompts: list):
    """同时处理多个请求,OpenAI和Claude互不干扰"""
    tasks = []
    for p in prompts:
        if "代码" in p:
            tasks.append(call_openai_async(p))
        else:
            tasks.append(call_claude_async(p))
    
    results = await asyncio.gather(*tasks, return_exceptions=True)
    return results

主流容错库对比:Tenacity vs Backoff vs Hystrix

上面我手写了很多容错代码,其实业界已经有成熟的库。简单对比一下:

库名 优点 缺点 适用场景
Tenacity 功能最全,支持异步,装饰器优雅 学习曲线稍陡 生产环境首选
Backoff 轻量简单,上手快 功能有限,不支持断路器 简单重试场景
PyHystrix Hystrix的Python实现,概念完整 维护不活跃,依赖老旧 JVM转Python项目

Tenacity使用示例

# 用Tenacity重构上面的重试逻辑
from tenacity import (
    retry, stop_after_attempt, wait_exponential,
    retry_if_exception_type, before_sleep_log
)
import logging

logging.basicConfig(level=logging.INFO)

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=1, max=10),
    retry=retry_if_exception_type((requests.RequestException,)),
    before_sleep=before_sleep_log(logging.info),
)
def call_api_with_tenacity(url: str, data: dict) -> dict:
    """用Tenacity装饰的API调用"""
    response = requests.post(url, json=data, timeout=10)
    response.raise_for_status()
    return response.json()

# 支持异步
from tenacity import AsyncRetrying, stop_after_attempt, wait_exponential

@AsyncRetrying(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1))
async def call_api_async(url: str):
    async with aiohttp.ClientSession() as session:
        async with session.post(url) as response:
            return await response.json()

完整的API Client基类:整合所有容错策略

最后,给一个整合了所有策略的完整基类。

import time
import logging
from abc import ABC, abstractmethod
from typing import Optional, Any, Dict, List
from dataclasses import dataclass

logger = logging.getLogger(__name__)

@dataclass
class APIResponse:
    """统一的API响应格式"""
    success: bool
    data: Any = None
    error: Optional[str] = None
    source: str = "primary"  # primary, backup, fallback
    latency_ms: float = 0


class ResilientAPIClient(ABC):
    """
    带完整容错能力的API客户端基类
    
    容错策略:
    1. 多Key轮询 + 故障转移
    2. 指数退避 + 抖动重试
    3. 断路器保护
    4. 降级兜底
    """
    
    def __init__(
        self,
        keys: List[Dict[str, str]],
        fallback_enabled: bool = True,
        circuit_breaker_threshold: int = 5,
        circuit_breaker_timeout: float = 30.0,
    ):
        self.key_rotator = KeyRotator(keys)
        self.fallback = RateLimitFallback() if fallback_enabled else None
        self.breakers: Dict[str, CircuitBreaker] = {}
        self.circuit_breaker_threshold = circuit_breaker_threshold
        self.circuit_breaker_timeout = circuit_breaker_timeout
    
    def get_breaker(self, name: str) -> CircuitBreaker:
        """获取断路器"""
        if name not in self.breakers:
            self.breakers[name] = CircuitBreaker(
                name=name,
                failure_threshold=self.circuit_breaker_threshold,
                recovery_timeout=self.circuit_breaker_timeout,
            )
        return self.breakers[name]
    
    def call(self, prompt: str, model: str = "default", **kwargs) -> APIResponse:
        """
        统一的调用入口,整合所有容错策略
        """
        start_time = time.time()
        
        # 策略1:多Key轮询
        api_key = self.key_rotator.get_available_key()
        if not api_key:
            return self._fallback_response(prompt, "no_available_key")
        
        breaker = self.get_breaker(api_key.name)
        
        # 策略2:通过断路器执行
        try:
            result = breaker.call(
                self._make_api_call,
                prompt=prompt,
                api_key=api_key.key,
                model=model,
                **kwargs
            )
            
            # 成功:标记Key可用,更新断路器
            self.key_rotator.mark_success(api_key)
            breaker._on_success()
            
            return APIResponse(
                success=True,
                data=result,
                source="primary",
                latency_ms=(time.time() - start_time) * 1000
            )
            
        except CircuitBreakerOpenError:
            # 断路器开启,尝试备用Key
            return self._try_backup_key(prompt, model, start_time)
            
        except RateLimitError as e:
            # 限流:降级处理
            self.key_rotator.mark_failure(api_key)
            return self._fallback_response(prompt, f"rate_limit: {e}")
            
        except Exception as e:
            # 其他错误:标记失败,尝试备用Key
            logger.error(f"API调用失败: {e}")
            self.key_rotator.mark_failure(api_key)
            return self._fallback_response(prompt, str(e))
    
    def _try_backup_key(self, prompt: str, model: str, start_time: float) -> APIResponse:
        """尝试备用Key"""
        # 遍历所有Key尝试
        for _ in range(len(self.key_rotator.keys)):
            self.key_rotator.rotate()
            backup_key = self.key_rotator.get_available_key()
            
            if backup_key and self.key_rotator.current_index != 0:
                try:
                    result = self._make_api_call(
                        prompt, backup_key.key, model
                    )
                    self.key_rotator.mark_success(backup_key)
                    return APIResponse(
                        success=True,
                        data=result,
                        source="backup",
                        latency_ms=(time.time() - start_time) * 1000
                    )
                except Exception as e:
                    self.key_rotator.mark_failure(backup_key)
                    continue
        
        # 所有Key都失败,返回兜底
        return self._fallback_response(prompt, "all_keys_failed")
    
    def _fallback_response(self, prompt: str, error: str) -> APIResponse:
        """返回降级响应"""
        if self.fallback:
            fallback_text = self.fallback.get_fallback(prompt)
        else:
            fallback_text = "服务暂时不可用,请稍后再试。"
        
        return APIResponse(
            success=False,
            data=fallback_text,
            error=error,
            source="fallback"
        )
    
    @abstractmethod
    def _make_api_call(self, prompt: str, api_key: str, model: str, **kwargs) -> str:
        """实际的API调用逻辑,子类实现"""
        pass


# 具体实现示例
class OpenAIResilientClient(ResilientAPIClient):
    
    def _make_api_call(self, prompt: str, api_key: str, model: str = "gpt-4o", **kwargs) -> str:
        import openai
        client = openai.OpenAI(api_key=api_key)
        
        response = client.chat.completions.create(
            model=model,
            messages=[{"role": "user", "content": prompt}],
            timeout=kwargs.get("timeout", 30),
            **kwargs
        )
        return response.choices[0].message.content


# 使用示例
client = OpenAIResilientClient(
    keys=[
        {"key": "sk-openai-1", "name": "openai-primary"},
        {"key": "sk-openai-2", "name": "openai-backup"},
    ],
    fallback_enabled=True,
)

response = client.call("你好,请介绍一下自己")
print(f"成功: {response.success}, 来源: {response.source}, 延迟: {response.latency_ms}ms")
print(f"结果: {response.data}")

压测验证:模拟故障来验证降级策略

代码写完了,怎么证明它真的管用?答案是压测模拟

模拟API延迟

import random
import time

def mock_slow_api(delay_range=(0.1, 5.0), error_rate=0.0):
    """模拟慢响应或随机错误的API"""
    def wrapper(*args, **kwargs):
        # 模拟延迟
        delay = random.uniform(*delay_range)
        time.sleep(delay)
        
        # 模拟随机错误
        if random.random() < error_rate:
            raise ConnectionError("模拟连接失败")
        
        return f"模拟响应,延迟{delay:.2f}秒"
    
    return wrapper

# 测试场景1:模拟API慢响应
@mock_slow_api(delay_range=(5.0, 10.0))
def test_timeout_handling():
    """测试超时处理"""
    pass

# 测试场景2:模拟API随机失败
@mock_slow_api(error_rate=0.3)
def test_retry_mechanism():
    """测试重试机制"""
    pass

模拟API完全不可用

import unittest.mock as mock

def test_circuit_breaker_opens():
    """测试断路器在连续失败后打开"""
    breaker = CircuitBreaker("test", failure_threshold=3, recovery_timeout=1)
    
    # 模拟3次失败
    for i in range(3):
        try:
            breaker.call(mock.Mock(side_effect=Exception("API Error")))
        except Exception:
            pass
    
    # 此时断路器应该打开
    assert breaker.state == CircuitState.OPEN
    print(f"断路器状态: {breaker.state.value}")
    
    # 尝试调用应该立即失败
    try:
        breaker.call(mock.Mock(return_value="success"))
        assert False, "应该抛出异常"
    except CircuitBreakerOpenError:
        print("断路器开启,正确拒绝了请求")


def test_key_failover():
    """测试Key故障转移"""
    rotator = KeyRotator([
        {"key": "key1", "name": "primary"},
        {"key": "key2", "name": "backup"},
    ])
    
    # 标记primary失败
    rotator.mark_failure(rotator.keys[0])
    rotator.mark_failure(rotator.keys[0])
    rotator.mark_failure(rotator.keys[0])
    rotator.mark_failure(rotator.keys[0])
    rotator.mark_failure(rotator.keys[0])  # 连续5次,标记不可用
    
    # 应该切换到backup
    key = rotator.get_available_key()
    assert key.name == "backup"
    print(f"Key故障转移: {key.name}")

完整的压力测试脚本

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor

async def stress_test():
    """
    压力测试:模拟100个并发请求,观察容错效果
    """
    client = OpenAIResilientClient(keys=[
        {"key": "sk-test1", "name": "test1"},
        {"key": "sk-test2", "name": "test2"},
    ])
    
    success_count = 0
    fallback_count = 0
    error_count = 0
    latencies = []
    
    start_time = time.time()
    
    # 模拟100个并发请求
    with ThreadPoolExecutor(max_workers=20) as executor:
        futures = [
            executor.submit(client.call, f"测试请求{i}", model="gpt-4o")
            for i in range(100)
        ]
        
        for future in futures:
            result = future.result()
            if result.success:
                success_count += 1
            elif result.source == "fallback":
                fallback_count += 1
            else:
                error_count += 1
            
            latencies.append(result.latency_ms)
    
    total_time = time.time() - start_time
    
    print(f"""
    ========== 压测结果 ==========
    总请求数: 100
    成功数: {success_count}
    降级响应: {fallback_count}
    错误数: {error_count}
    总耗时: {total_time:.2f}秒
    平均延迟: {sum(latencies)/len(latencies):.2f}ms
    最大延迟: {max(latencies):.2f}ms
    最小延迟: {min(latencies):.2f}ms
    =============================
    """)
压测目标

好的容错设计应该达到:
- 成功率:在单个API故障时,仍保持95%+的成功率
- 降级优雅:降级响应应该有合适的用户体验
- 恢复及时:API恢复后,断路器应在合理时间内关闭

写在最后

容错降级不是可选项,是AI服务上生产的必备能力。API服务商再稳定,也会有波动的时候。你的服务能不能扛住这些波动,决定了用户体验和系统稳定性。

核心原则就三条:

  1. 不把鸡蛋放一个篮子里:多Key、多服务商、隔离舱
  2. 让失败快速发生:超时设置合理,不要傻等
  3. 永远有兜底:就算所有API都挂了,也要给用户一个友好的响应

做完这套容错架构后,我们的服务可用性从99.5%提到了99.9%。更重要的是,那次Claude崩了两小时,我们的用户基本没感知。

如果你在找更多AI API相关的实战技巧,欢迎来 TokenNexus 看看,收录了330+国内外AI平台的对比评测。

发现更多AI API平台

TokenNexus收录330+国内外AI API平台,帮你找到最适合的服务

立即探索