2025年8月某天上午10点,我正在改一个AI客服功能的bug,突然群里炸了:"Claude崩了!全部报错!"
我赶紧打开状态页,官方公告赫然写着"服务中断,预计2小时恢复"。再看我们的系统,AI相关功能全线飘红:
- 智能客服:用户发消息直接显示"服务暂时不可用"
- AI写作助手:点生成按钮就是500错误
- 图片描述:页面直接白屏
那一刻我深刻意识到:我们没有做任何容错设计,单点依赖太严重了。
后来我们花了两周时间重构了整个AI调用架构。重构完成后,类似的情况再发生:API挂了?没关系,我们自动切到备用Key,用户根本感知不到。限流了?降级响应,历史缓存照样能给出回答。
这篇文章就是踩坑后的经验总结,代码全部真实可运行。
三层降级策略:从Fail Fast到完全兜底
容错不是一件事,是一套组合拳。我的设计分三层:
| 层级 | 策略名称 | 触发条件 | 处理方式 |
|---|---|---|---|
| 第一层 | Fail Fast(快速失败) | API响应超时、返回错误 | 立即重试或切换Key,不让用户等 |
| 第二层 | Fallback(降级响应) | 重试失败、所有Key都不可用 | 返回预设答案或历史缓存 |
| 第三层 | Circuit Breaker(断路器) | 连续失败超过阈值 | 暂时熔断,防止雪崩 |
三层策略配合使用,才能在各种异常情况下都保持服务可用。
多API Key轮询与故障转移
这是最简单的容错手段,但很多人懒得做。原理很简单:一个Key挂了,切换到另一个。
import random
from typing import List, Optional
from dataclasses import dataclass
import time
@dataclass
class APIKey:
key: str
name: str # 用于日志标识
is_available: bool = True
failure_count: int = 0
last_failure_time: float = 0
class KeyRotator:
"""API Key轮询器,支持故障转移"""
def __init__(self, keys: List[dict]):
"""
keys格式: [{"key": "sk-xxx", "name": "openai-key1"}, ...]
"""
self.keys = [APIKey(**k) for k in keys]
self.current_index = 0
self.failure_cooldown = 60 # 失败后60秒不重用
def get_available_key(self) -> Optional[APIKey]:
"""获取一个可用的Key"""
now = time.time()
# 尝试所有Key,找一个可用的
for _ in range(len(self.keys)):
key = self.keys[self.current_index]
if key.is_available:
# 检查是否在冷却期
if key.last_failure_time > 0 and (now - key.last_failure_time) < self.failure_cooldown:
# 在冷却期,跳过
self.current_index = (self.current_index + 1) % len(self.keys)
continue
return key
self.current_index = (self.current_index + 1) % len(self.keys)
# 所有Key都在冷却,返回最近失败的
return min(self.keys, key=lambda k: k.last_failure_time)
def mark_success(self, key: APIKey):
"""标记Key调用成功"""
key.failure_count = 0
key.is_available = True
def mark_failure(self, key: APIKey):
"""标记Key调用失败"""
key.failure_count += 1
key.last_failure_time = time.time()
# 连续失败5次,标记为不可用
if key.failure_count >= 5:
key.is_available = False
print(f"[警告] Key {key.name} 连续失败{key.failure_count}次,暂停使用")
def rotate(self):
"""轮换到下一个Key"""
self.current_index = (self.current_index + 1) % len(self.keys)
# 使用示例
keys = [
{"key": "sk-openai-1", "name": "openai-primary"},
{"key": "sk-openai-2", "name": "openai-backup"},
{"key": "sk-anthropic-1", "name": "claude-primary"},
]
rotator = KeyRotator(keys)
# 获取可用Key
key = rotator.get_available_key()
print(f"使用Key: {key.name}")
Key的数量建议至少准备2个以上,且最好分散在不同服务商。我见过有人只准备两个同一家服务的Key,结果那家服务整体挂了,两个Key一起凉凉。
指数退避+抖动的重试装饰器
简单重试不行,指数退避+抖动才是正确姿势。为什么?想象一下:API挂了,你一秒内重试100次,那叫DoS攻击,会被限流得更狠。
import time
import random
import functools
from typing import Callable, Tuple, Optional
import requests
def retry_with_backoff(
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0,
jitter: bool = True,
retry_on: Tuple[type, ...] = (requests.RequestException,),
):
"""
带指数退避和抖动的重试装饰器
参数:
max_retries: 最大重试次数
base_delay: 基础延迟秒数
max_delay: 最大延迟秒数
jitter: 是否添加随机抖动
retry_on: 需要重试的异常类型
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except retry_on as e:
last_exception = e
if attempt == max_retries:
# 达到最大重试次数
raise
# 计算延迟:指数退避
delay = min(base_delay * (2 ** attempt), max_delay)
# 添加抖动:避免多客户端同时重试造成雷鸣般效应
if jitter:
delay = delay * (0.5 + random.random()) # 0.5~1.5倍
print(f"[重试] {func.__name__} 失败,{delay:.2f}秒后第{attempt + 2}次尝试: {e}")
time.sleep(delay)
raise last_exception
return wrapper
return decorator
# 使用示例
class APIClient:
@retry_with_backoff(max_retries=3, base_delay=1.0, max_delay=10.0)
def call_openai(self, prompt: str) -> str:
"""调用OpenAI API,带自动重试"""
response = requests.post(
"https://api.openai.com/v1/chat/completions",
headers={"Authorization": f"Bearer {self.api_key}"},
json={"model": "gpt-4o", "messages": [{"role": "user", "content": prompt}]},
timeout=10
)
response.raise_for_status()
return response.json()["choices"][0]["message"]["content"]
@retry_with_backoff(max_retries=3, base_delay=1.0, max_delay=10.0)
def call_claude(self, prompt: str) -> str:
"""调用Claude API,带自动重试"""
response = requests.post(
"https://api.anthropic.com/v1/messages",
headers={"x-api-key": self.api_key, "anthropic-version": "2023-06-01"},
json={"model": "claude-sonnet-4-20250514", "messages": [{"role": "user", "content": prompt}]},
timeout=10
)
response.raise_for_status()
return response.json()["content"][0]["text"]
重试策略的选择:
- base_delay = 1秒:适合响应较快的API
- max_delay = 30秒:适合对延迟不敏感的场景
- jitter = True:一定要开,减少雷鸣效应
断路器模式:防止雪崩的最后防线
断路器是容错架构的核心。它的灵感来自电路:当电流过载时,断路器跳闸保护整个系统。
断路器有三种状态:
- CLOSED(关闭):正常状态,请求通过,失败计数
- OPEN(开启):熔断状态,请求直接失败,不调用API
- HALF_OPEN(半开):试探状态,放一个请求过去看API恢复了没
from enum import Enum
from dataclasses import dataclass, field
from typing import Callable, Any, Optional
import time
import threading
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
@dataclass
class CircuitBreaker:
"""断路器实现"""
name: str
failure_threshold: int = 5 # 失败多少次后开启断路器
recovery_timeout: float = 30.0 # 多少秒后尝试恢复
success_threshold: int = 2 # 半开状态下成功多少次后关闭
state: CircuitState = field(default=CircuitState.CLOSED, init=False)
failure_count: int = field(default=0, init=False)
success_count: int = field(default=0, init=False)
last_failure_time: float = field(default=0.0, init=False)
_lock: threading.Lock = field(default_factory=threading.Lock, init=False)
def call(self, func: Callable, *args, **kwargs) -> Any:
"""通过断路器执行函数"""
with self._lock:
# 检查是否可以尝试请求
if self.state == CircuitState.OPEN:
# 检查是否超时可以尝试半开
if time.time() - self.last_failure_time >= self.recovery_timeout:
self._to_half_open()
else:
raise CircuitBreakerOpenError(f"断路器 {self.name} 已开启,拒绝请求")
# 在锁外执行实际调用,避免阻塞其他请求
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
"""处理成功"""
with self._lock:
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.success_threshold:
self._to_closed()
elif self.state == CircuitState.CLOSED:
self.failure_count = 0
def _on_failure(self):
"""处理失败"""
with self._lock:
self.failure_count += 1
self.last_failure_time = time.time()
if self.state == CircuitState.HALF_OPEN:
# 半开状态下失败,立刻重新开启
self._to_open()
elif self.state == CircuitState.CLOSED:
if self.failure_count >= self.failure_threshold:
self._to_open()
def _to_open(self):
"""切换到开启状态"""
if self.state != CircuitState.OPEN:
print(f"[断路器] {self.name} 状态: {self.state.value} -> OPEN (失败{self.failure_count}次)")
self.state = CircuitState.OPEN
self.success_count = 0
def _to_half_open(self):
"""切换到半开状态"""
print(f"[断路器] {self.name} 状态: {self.state.value} -> HALF_OPEN (尝试恢复)")
self.state = CircuitState.HALF_OPEN
self.success_count = 0
def _to_closed(self):
"""切换到关闭状态"""
print(f"[断路器] {self.name} 状态: {self.state.value} -> CLOSED (恢复成功)")
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
class CircuitBreakerOpenError(Exception):
"""断路器开启异常"""
pass
# 使用示例
def get_circuit_breaker(name: str) -> CircuitBreaker:
"""获取或创建断路器(单例模式)"""
if not hasattr(get_circuit_breaker, '_instances'):
get_circuit_breaker._instances = {}
if name not in get_circuit_breaker._instances:
get_circuit_breaker._instances[name] = CircuitBreaker(
name=name,
failure_threshold=5,
recovery_timeout=30,
success_threshold=2
)
return get_circuit_breaker._instances[name]
# 实际使用
breaker = get_circuit_breaker("openai-gpt4")
try:
result = breaker.call(call_openai_api, prompt="你好")
except CircuitBreakerOpenError:
# 断路器开启,直接返回降级响应
result = get_fallback_response("openai")
except Exception as e:
# 其他错误
result = get_fallback_response("openai")
断路器的参数要根据业务场景调:
- failure_threshold:太低了容易误触发,太高了等太久才熔断。建议5-10
- recovery_timeout:API恢复通常需要5-30分钟,建议30秒起步试探
- success_threshold:不要设太低,建议2-3,防止假恢复
限流兜底降级策略
当API限流了(返回429),或者配额用光了,咋办?不能干等着吧。我的策略是:
- 先检查本地缓存有没有
- 没有的话,返回预设的兜底回答
- 同时把请求加入队列,稍后重试
from typing import Optional, Dict
import json
import hashlib
from datetime import datetime, timedelta
class RateLimitFallback:
"""限流兜底策略"""
def __init__(self, cache_ttl: int = 3600):
self.cache: Dict[str, tuple] = {} # {cache_key: (response, timestamp)}
self.cache_ttl = cache_ttl
self.fallback_responses = self._load_fallback_responses()
def _make_cache_key(self, prompt: str) -> str:
"""生成缓存key"""
return hashlib.md5(prompt.encode()).hexdigest()
def _load_fallback_responses(self) -> Dict[str, str]:
"""加载预设兜底响应"""
return {
"greeting": "您好!当前服务繁忙,您的请求已加入队列,我们会尽快处理。您也可以稍后再试,或者描述一下您的具体需求,我会尽力帮助您。",
"general": "抱歉,当前AI服务正在处理较多请求。请稍等片刻,或者换个方式描述您的问题,我会尽快为您服务。",
"urgent": "尊敬的用户,当前服务响应较慢。您的紧急需求建议拨打客服热线xxxx-xxxx,我们会立即处理。",
}
def get_cached_response(self, prompt: str) -> Optional[str]:
"""尝试从缓存获取响应"""
key = self._make_cache_key(prompt)
if key in self.cache:
response, timestamp = self.cache[key]
# 检查是否过期
if (datetime.now() - timestamp).total_seconds() < self.cache_ttl:
print(f"[缓存命中] key={key}")
return response
else:
# 过期了,删除
del self.cache[key]
return None
def cache_response(self, prompt: str, response: str):
"""缓存响应"""
key = self._make_cache_key(prompt)
self.cache[key] = (response, datetime.now())
print(f"[缓存写入] key={key}")
def get_fallback(self, prompt: str, is_urgent: bool = False) -> str:
"""获取兜底响应"""
# 先检查缓存
cached = self.get_cached_response(prompt)
if cached:
return f"[缓存回复] {cached}"
# 选择合适的兜底话术
if is_urgent:
return self.fallback_responses["urgent"]
elif any(kw in prompt for kw in ["你好", "hi", "hello"]):
return self.fallback_responses["greeting"]
else:
return self.fallback_responses["general"]
def handle_rate_limit(self, prompt: str) -> str:
"""处理限流:缓存+兜底"""
# 缓存这个请求的标识
self.cache_response(prompt, "[pending]")
return self.get_fallback(prompt)
# 使用示例
fallback_manager = RateLimitFallback(cache_ttl=3600)
def call_with_fallback(prompt: str, is_urgent: bool = False) -> str:
"""带兜底的API调用"""
try:
# 实际调用API
result = call_openai_api(prompt)
fallback_manager.cache_response(prompt, result)
return result
except RateLimitError as e:
print(f"[限流] {e}")
return fallback_manager.handle_rate_limit(prompt)
except CircuitBreakerOpenError:
return fallback_manager.get_fallback(prompt, is_urgent)
except Exception as e:
print(f"[错误] {e}")
return fallback_manager.get_fallback(prompt)
隔离舱模式:不同模型用不同连接池
隔离舱(Bulkhead)模式来自造船术语:船舱漏水了,用隔板把水限制在局部,不让它淹没整艘船。
在API调用中,这个模式的意思是:不同模型/服务用不同的连接池,互不干扰。
举个例子:你的服务同时调用OpenAI和Claude,如果用同一个连接池,Claude的慢响应会拖慢OpenAI的请求。如果用隔离舱模式,各自独立,谁出问题不影响另一方。
import asyncio
import aiohttp
from typing import Dict, Optional
class IsolatedConnectionPool:
"""隔离舱模式的连接池管理器"""
_pools: Dict[str, aiohttp.ClientSession] = {}
_pool_configs: Dict[str, dict] = {
"openai": {
"limit": 50, # 最大并发连接数
"limit_per_host": 20, # 单主机最大连接
"timeout": aiohttp.ClientTimeout(total=30),
},
"anthropic": {
"limit": 30,
"limit_per_host": 15,
"timeout": aiohttp.ClientTimeout(total=30),
},
"google": {
"limit": 20,
"limit_per_host": 10,
"timeout": aiohttp.ClientTimeout(total=30),
},
}
@classmethod
async def get_session(cls, service: str) -> aiohttp.ClientSession:
"""获取指定服务的连接池"""
if service not in cls._pools or cls._pools[service].closed:
config = cls._pool_configs.get(service, {
"limit": 20,
"limit_per_host": 10,
"timeout": aiohttp.ClientTimeout(total=30),
})
connector = aiohttp.TCPConnector(
limit=config["limit"],
limit_per_host=config["limit_per_host"],
)
cls._pools[service] = aiohttp.ClientSession(
connector=connector,
timeout=config["timeout"],
)
print(f"[隔离舱] {service} 连接池已创建 (limit={config['limit']})")
return cls._pools[service]
@classmethod
async def close_all(cls):
"""关闭所有连接池"""
for name, pool in cls._pools.items():
if not pool.closed:
await pool.close()
print(f"[隔离舱] {name} 连接池已关闭")
@classmethod
async def get_pool_status(cls) -> Dict[str, dict]:
"""获取各连接池状态"""
status = {}
for name, pool in cls._pools.items():
if not pool.closed:
status[name] = {
"connector": pool.connector,
"closed": False
}
return status
# 使用示例
async def call_openai_async(prompt: str) -> str:
"""异步调用OpenAI(走隔离舱)"""
session = await IsolatedConnectionPool.get_session("openai")
async with session.post(
"https://api.openai.com/v1/chat/completions",
headers={"Authorization": f"Bearer {OPENAI_KEY}", "Content-Type": "application/json"},
json={"model": "gpt-4o", "messages": [{"role": "user", "content": prompt}]},
) as response:
result = await response.json()
return result["choices"][0]["message"]["content"]
async def call_claude_async(prompt: str) -> str:
"""异步调用Claude(走隔离舱,互不影响)"""
session = await IsolatedConnectionPool.get_session("anthropic")
async with session.post(
"https://api.anthropic.com/v1/messages",
headers={"x-api-key": CLAUDE_KEY, "anthropic-version": "2023-06-01", "Content-Type": "application/json"},
json={"model": "claude-sonnet-4-20250514", "messages": [{"role": "user", "content": prompt}]},
) as response:
result = await response.json()
return result["content"][0]["text"]
# 批量调用示例
async def batch_process(prompts: list):
"""同时处理多个请求,OpenAI和Claude互不干扰"""
tasks = []
for p in prompts:
if "代码" in p:
tasks.append(call_openai_async(p))
else:
tasks.append(call_claude_async(p))
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
主流容错库对比:Tenacity vs Backoff vs Hystrix
上面我手写了很多容错代码,其实业界已经有成熟的库。简单对比一下:
| 库名 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| Tenacity | 功能最全,支持异步,装饰器优雅 | 学习曲线稍陡 | 生产环境首选 |
| Backoff | 轻量简单,上手快 | 功能有限,不支持断路器 | 简单重试场景 |
| PyHystrix | Hystrix的Python实现,概念完整 | 维护不活跃,依赖老旧 | JVM转Python项目 |
Tenacity使用示例
# 用Tenacity重构上面的重试逻辑
from tenacity import (
retry, stop_after_attempt, wait_exponential,
retry_if_exception_type, before_sleep_log
)
import logging
logging.basicConfig(level=logging.INFO)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
retry=retry_if_exception_type((requests.RequestException,)),
before_sleep=before_sleep_log(logging.info),
)
def call_api_with_tenacity(url: str, data: dict) -> dict:
"""用Tenacity装饰的API调用"""
response = requests.post(url, json=data, timeout=10)
response.raise_for_status()
return response.json()
# 支持异步
from tenacity import AsyncRetrying, stop_after_attempt, wait_exponential
@AsyncRetrying(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1))
async def call_api_async(url: str):
async with aiohttp.ClientSession() as session:
async with session.post(url) as response:
return await response.json()
完整的API Client基类:整合所有容错策略
最后,给一个整合了所有策略的完整基类。
import time
import logging
from abc import ABC, abstractmethod
from typing import Optional, Any, Dict, List
from dataclasses import dataclass
logger = logging.getLogger(__name__)
@dataclass
class APIResponse:
"""统一的API响应格式"""
success: bool
data: Any = None
error: Optional[str] = None
source: str = "primary" # primary, backup, fallback
latency_ms: float = 0
class ResilientAPIClient(ABC):
"""
带完整容错能力的API客户端基类
容错策略:
1. 多Key轮询 + 故障转移
2. 指数退避 + 抖动重试
3. 断路器保护
4. 降级兜底
"""
def __init__(
self,
keys: List[Dict[str, str]],
fallback_enabled: bool = True,
circuit_breaker_threshold: int = 5,
circuit_breaker_timeout: float = 30.0,
):
self.key_rotator = KeyRotator(keys)
self.fallback = RateLimitFallback() if fallback_enabled else None
self.breakers: Dict[str, CircuitBreaker] = {}
self.circuit_breaker_threshold = circuit_breaker_threshold
self.circuit_breaker_timeout = circuit_breaker_timeout
def get_breaker(self, name: str) -> CircuitBreaker:
"""获取断路器"""
if name not in self.breakers:
self.breakers[name] = CircuitBreaker(
name=name,
failure_threshold=self.circuit_breaker_threshold,
recovery_timeout=self.circuit_breaker_timeout,
)
return self.breakers[name]
def call(self, prompt: str, model: str = "default", **kwargs) -> APIResponse:
"""
统一的调用入口,整合所有容错策略
"""
start_time = time.time()
# 策略1:多Key轮询
api_key = self.key_rotator.get_available_key()
if not api_key:
return self._fallback_response(prompt, "no_available_key")
breaker = self.get_breaker(api_key.name)
# 策略2:通过断路器执行
try:
result = breaker.call(
self._make_api_call,
prompt=prompt,
api_key=api_key.key,
model=model,
**kwargs
)
# 成功:标记Key可用,更新断路器
self.key_rotator.mark_success(api_key)
breaker._on_success()
return APIResponse(
success=True,
data=result,
source="primary",
latency_ms=(time.time() - start_time) * 1000
)
except CircuitBreakerOpenError:
# 断路器开启,尝试备用Key
return self._try_backup_key(prompt, model, start_time)
except RateLimitError as e:
# 限流:降级处理
self.key_rotator.mark_failure(api_key)
return self._fallback_response(prompt, f"rate_limit: {e}")
except Exception as e:
# 其他错误:标记失败,尝试备用Key
logger.error(f"API调用失败: {e}")
self.key_rotator.mark_failure(api_key)
return self._fallback_response(prompt, str(e))
def _try_backup_key(self, prompt: str, model: str, start_time: float) -> APIResponse:
"""尝试备用Key"""
# 遍历所有Key尝试
for _ in range(len(self.key_rotator.keys)):
self.key_rotator.rotate()
backup_key = self.key_rotator.get_available_key()
if backup_key and self.key_rotator.current_index != 0:
try:
result = self._make_api_call(
prompt, backup_key.key, model
)
self.key_rotator.mark_success(backup_key)
return APIResponse(
success=True,
data=result,
source="backup",
latency_ms=(time.time() - start_time) * 1000
)
except Exception as e:
self.key_rotator.mark_failure(backup_key)
continue
# 所有Key都失败,返回兜底
return self._fallback_response(prompt, "all_keys_failed")
def _fallback_response(self, prompt: str, error: str) -> APIResponse:
"""返回降级响应"""
if self.fallback:
fallback_text = self.fallback.get_fallback(prompt)
else:
fallback_text = "服务暂时不可用,请稍后再试。"
return APIResponse(
success=False,
data=fallback_text,
error=error,
source="fallback"
)
@abstractmethod
def _make_api_call(self, prompt: str, api_key: str, model: str, **kwargs) -> str:
"""实际的API调用逻辑,子类实现"""
pass
# 具体实现示例
class OpenAIResilientClient(ResilientAPIClient):
def _make_api_call(self, prompt: str, api_key: str, model: str = "gpt-4o", **kwargs) -> str:
import openai
client = openai.OpenAI(api_key=api_key)
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
timeout=kwargs.get("timeout", 30),
**kwargs
)
return response.choices[0].message.content
# 使用示例
client = OpenAIResilientClient(
keys=[
{"key": "sk-openai-1", "name": "openai-primary"},
{"key": "sk-openai-2", "name": "openai-backup"},
],
fallback_enabled=True,
)
response = client.call("你好,请介绍一下自己")
print(f"成功: {response.success}, 来源: {response.source}, 延迟: {response.latency_ms}ms")
print(f"结果: {response.data}")
压测验证:模拟故障来验证降级策略
代码写完了,怎么证明它真的管用?答案是压测模拟。
模拟API延迟
import random
import time
def mock_slow_api(delay_range=(0.1, 5.0), error_rate=0.0):
"""模拟慢响应或随机错误的API"""
def wrapper(*args, **kwargs):
# 模拟延迟
delay = random.uniform(*delay_range)
time.sleep(delay)
# 模拟随机错误
if random.random() < error_rate:
raise ConnectionError("模拟连接失败")
return f"模拟响应,延迟{delay:.2f}秒"
return wrapper
# 测试场景1:模拟API慢响应
@mock_slow_api(delay_range=(5.0, 10.0))
def test_timeout_handling():
"""测试超时处理"""
pass
# 测试场景2:模拟API随机失败
@mock_slow_api(error_rate=0.3)
def test_retry_mechanism():
"""测试重试机制"""
pass
模拟API完全不可用
import unittest.mock as mock
def test_circuit_breaker_opens():
"""测试断路器在连续失败后打开"""
breaker = CircuitBreaker("test", failure_threshold=3, recovery_timeout=1)
# 模拟3次失败
for i in range(3):
try:
breaker.call(mock.Mock(side_effect=Exception("API Error")))
except Exception:
pass
# 此时断路器应该打开
assert breaker.state == CircuitState.OPEN
print(f"断路器状态: {breaker.state.value}")
# 尝试调用应该立即失败
try:
breaker.call(mock.Mock(return_value="success"))
assert False, "应该抛出异常"
except CircuitBreakerOpenError:
print("断路器开启,正确拒绝了请求")
def test_key_failover():
"""测试Key故障转移"""
rotator = KeyRotator([
{"key": "key1", "name": "primary"},
{"key": "key2", "name": "backup"},
])
# 标记primary失败
rotator.mark_failure(rotator.keys[0])
rotator.mark_failure(rotator.keys[0])
rotator.mark_failure(rotator.keys[0])
rotator.mark_failure(rotator.keys[0])
rotator.mark_failure(rotator.keys[0]) # 连续5次,标记不可用
# 应该切换到backup
key = rotator.get_available_key()
assert key.name == "backup"
print(f"Key故障转移: {key.name}")
完整的压力测试脚本
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
async def stress_test():
"""
压力测试:模拟100个并发请求,观察容错效果
"""
client = OpenAIResilientClient(keys=[
{"key": "sk-test1", "name": "test1"},
{"key": "sk-test2", "name": "test2"},
])
success_count = 0
fallback_count = 0
error_count = 0
latencies = []
start_time = time.time()
# 模拟100个并发请求
with ThreadPoolExecutor(max_workers=20) as executor:
futures = [
executor.submit(client.call, f"测试请求{i}", model="gpt-4o")
for i in range(100)
]
for future in futures:
result = future.result()
if result.success:
success_count += 1
elif result.source == "fallback":
fallback_count += 1
else:
error_count += 1
latencies.append(result.latency_ms)
total_time = time.time() - start_time
print(f"""
========== 压测结果 ==========
总请求数: 100
成功数: {success_count}
降级响应: {fallback_count}
错误数: {error_count}
总耗时: {total_time:.2f}秒
平均延迟: {sum(latencies)/len(latencies):.2f}ms
最大延迟: {max(latencies):.2f}ms
最小延迟: {min(latencies):.2f}ms
=============================
""")
好的容错设计应该达到:
- 成功率:在单个API故障时,仍保持95%+的成功率
- 降级优雅:降级响应应该有合适的用户体验
- 恢复及时:API恢复后,断路器应在合理时间内关闭
写在最后
容错降级不是可选项,是AI服务上生产的必备能力。API服务商再稳定,也会有波动的时候。你的服务能不能扛住这些波动,决定了用户体验和系统稳定性。
核心原则就三条:
- 不把鸡蛋放一个篮子里:多Key、多服务商、隔离舱
- 让失败快速发生:超时设置合理,不要傻等
- 永远有兜底:就算所有API都挂了,也要给用户一个友好的响应
做完这套容错架构后,我们的服务可用性从99.5%提到了99.9%。更重要的是,那次Claude崩了两小时,我们的用户基本没感知。
如果你在找更多AI API相关的实战技巧,欢迎来 TokenNexus 看看,收录了330+国内外AI平台的对比评测。