AI API网关设计与实现:企业级统一接入架构实战

AI API网关架构设计

去年年底,我们团队接手了一个棘手的问题:公司同时对接了 OpenAI、Anthropic、DeepSeek 和阿里云百炼四家 AI 服务商,散落在各个微服务里的 API Key 超过 30 个,每月账单对不上,某个服务挂了整个流程就卡死。CTO 给了我两周时间,要求统一收口。最终我们用 Apache APISIX 搭了一套 AI API 网关,把所有调用收敛到一个入口,顺带把成本降了 60%。

这篇文章就是这两周踩坑的完整复盘。我会从网关选型、核心模块设计到具体代码实现,把整个过程掰开讲清楚。

为什么 AI 场景需要专用网关

传统的 API 网关(比如 Kong、APISIX)主要解决的是微服务间路由、鉴权和限流的问题。但 AI API 有几个独特的痛点,传统网关覆盖不了:

根据 Gartner 2025 年底的报告,超过 72% 的企业 AI 项目同时对接了 3 家以上的 AI 服务商。没有一个统一的网关层,运维和成本管理就是灾难。

网关选型:Kong vs APISIX vs 自研

选型阶段我们对比了三个方案。先说结论:最终选了 Apache APISIX,但 Kong 在某些场景下也值得考虑。

维度Apache APISIXKong自研(Python/FastAPI)
P99 延迟0.4ms1.2ms2-5ms
吞吐量(RPS)23,000+18,000+5,000-8,000
插件生态80+ 内置100+ 内置需自行开发
配置热更新支持(etcd)支持(数据库)需重启
AI 专用功能需自定义插件有 AI Gateway 插件完全自定义
学习成本中等中等低(但维护成本高)

数据来源:API7 官方基准测试和 Kong 工程博客公布的 AI 网关对比数据。APISIX 在纯性能上优势明显,P99 延迟只有 Kong 的三分之一,吞吐量高出约 28%。Kong 的优势在于有官方 AI Gateway 插件,开箱即用支持 OpenAI/Anthropic 格式转换。

我们选 APISIX 的原因很简单:延迟低、基于 etcd 的配置热更新不需要数据库、Lua 插件开发灵活。团队里有同事之前用过 APISIX,上手快。

如果你团队规模小(不到 5 个后端开发),或者 AI 调用量不大(日均 < 10 万次),直接用 Kong 的 AI Gateway 插件可能更省事。但如果你对延迟敏感、调用量大,APISIX 是更好的选择。

核心架构设计

整个网关的架构分四层:

对外只暴露一个统一的 OpenAI 兼容接口,所有业务服务都调这个接口,完全不感知背后是哪家服务商。

# 统一入口格式(OpenAI 兼容)
POST /v1/chat/completions
{
    "model": "gpt-4o",          # 网关会自动映射到实际服务商
    "messages": [...],
    "stream": true,
    "x-team": "product",        # 自定义 header,用于成本归集
    "x-priority": "high"        # 优先级,影响路由策略
}

负载均衡与故障转移

AI API 的负载均衡跟传统微服务不太一样。传统场景是多个相同实例轮询,但 AI 场景往往是多个不同的服务商,性能和价格差异很大。

我们设计了三种策略:

策略一:成本优先(默认)

优先路由到成本最低的服务商,只有当该服务商返回 429 或 5xx 时才切换到备选。比如 DeepSeek V3 的价格是 GPT-4o 的十分之一,非关键业务默认走 DeepSeek。

策略二:质量优先

对质量要求高的场景(比如合同审查、医疗报告),固定走 GPT-4o 或 Claude Sonnet,只有主服务商完全不可用时才降级到备选模型。

策略三:延迟优先

实时交互场景(比如客服对话),根据各服务商的实时延迟动态选择。APISIX 的 proxy-next-upstream 配合自定义 Lua 插件可以实现这个逻辑。

# APISIX 路由配置示例(成本优先策略)
upstream:
  - id: ai-chat-completions
    type: roundrobin
    nodes:
      "deepseek-api.example.com:443": 70    # 70% 流量走 DeepSeek(便宜)
      "openai-api.example.com:443": 20      # 20% 走 OpenAI
      "anthropic-api.example.com:443": 10   # 10% 走 Anthropic
    checks:
      active:
        type: http
        http_path: /health
        healthy:
          interval: 5
          successes: 2
        unhealthy:
          interval: 3
          http_statuses: [429, 500, 502, 503]
          successes: 1

这里有个关键细节:健康检查必须把 429 也标记为不健康。很多传统网关只检查 5xx,但 AI 服务商的 429 意味着"你超限了,短时间内别再来了",继续打过去只会浪费时间和钱。

熔断降级:别让一个服务拖垮全局

这是整个网关设计里我认为最重要的一环。2026 年 3 月,OpenAI 经历了一次长达 47 分钟的全球性故障,期间所有请求返回 502。如果没有熔断机制,上游所有依赖 OpenAI 的服务都会被拖垮。

我们的熔断策略参考了微软 Azure API Management 的设计模式,核心逻辑是:

# APISIX api-breaker 插件配置
plugins:
  api-breaker:
    break_response_code: 502
    unhealthy:
      http_statuses: [429, 500, 502, 503]
      failures: 5
    healthy:
      http_statuses: [200]
      successes: 2
    max_breaker_sec: 300        # 最长熔断 5 分钟

上线后第一个月,熔断机制触发了 17 次。其中 12 次是单个服务商的临时 429(高峰期限流),5 次是服务商故障。如果没有熔断,这 17 次事件会导致上游服务累计超时约 2.3 万次请求。

密钥管理与安全隔离

之前散落在代码仓库和环境变量里的 30 多个 API Key,是最大的安全隐患。2025 年就有安全研究机构报告,GitHub 上公开的 AI API Key 泄露事件同比增长了 340%。

我们的方案是:

这样做的好处是,即使某个开发者的虚拟 Key 泄露了,影响范围也仅限于他所在团队的配额,不会暴露真实的后端 Key。

成本追踪与用量配额

网关层统一记录每次调用的 token 用量、模型、服务商、耗时,写入 ClickHouse 做分析。这是我们搭建网关后最让 CTO 满意的功能——终于能看清钱花在哪了。

上线第一个月的成本分析报告揭示了几个惊人的事实:

# ClickHouse 成本分析 SQL 示例
SELECT
    team,
    model,
    count(*) AS total_calls,
    sum(input_tokens) / 1000000 AS input_tokens_m,
    sum(output_tokens) / 1000000 AS output_tokens_m,
    sum(cost_usd) AS total_cost
FROM ai_gateway.usage_log
WHERE date >= today() - 30
GROUP BY team, model
ORDER BY total_cost DESC
LIMIT 20

完整实现代码

下面是一个简化版的 Python 网关代理,适合小团队快速上手。如果你已经有 Nginx/APISIX 基础设施,建议用 Lua 插件实现,性能会好很多。

"""
AI API Gateway - 轻量级统一代理
支持 OpenAI 兼容格式,自动路由、熔断、成本记录
"""
import httpx
import time
import logging
from dataclasses import dataclass
from enum import Enum
from typing import Optional

logger = logging.getLogger("ai-gateway")

class Provider(Enum):
    OPENAI = "openai"
    ANTHROPIC = "anthropic"
    DEEPSEEK = "deepseek"

@dataclass
class BackendConfig:
    provider: Provider
    base_url: str
    api_key: str
    priority: int = 0          # 优先级,数字越小越优先
    max_rpm: int = 60          # 每分钟最大请求数
    cost_per_1m_input: float = 0  # 每百万输入 token 成本(USD)

# 后端配置
BACKENDS = {
    Provider.DEEPSEEK: BackendConfig(
        provider=Provider.DEEPSEEK,
        base_url="https://api.deepseek.com/v1",
        api_key="sk-xxx",  # 从 Vault 读取
        priority=1,
        max_rpm=120,
        cost_per_1m_input=0.27,
    ),
    Provider.OPENAI: BackendConfig(
        provider=Provider.OPENAI,
        base_url="https://api.openai.com/v1",
        api_key="sk-xxx",
        priority=2,
        max_rpm=60,
        cost_per_1m_input=2.50,
    ),
    Provider.ANTHROPIC: BackendConfig(
        provider=Provider.ANTHROPIC,
        base_url="https://api.anthropic.com/v1",
        api_key="sk-ant-xxx",
        priority=3,
        max_rpm=50,
        cost_per_1m_input=3.00,
    ),
}

class CircuitBreaker:
    """简易熔断器"""
    def __init__(self, failure_threshold=5, recovery_timeout=60):
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.last_failure_time = 0
        self.state = "closed"  # closed / open / half_open

    def is_available(self) -> bool:
        if self.state == "closed":
            return True
        if self.state == "open":
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = "half_open"
                return True
            return False
        return True  # half_open

    def record_success(self):
        self.failure_count = 0
        self.state = "closed"

    def record_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.state = "open"
            logger.warning(f"Circuit breaker OPEN, recovery in {self.recovery_timeout}s")

# 为每个后端维护独立的熔断器
breakers = {p: CircuitBreaker() for p in Provider}

class AIGateway:
    def __init__(self):
        self.client = httpx.Client(timeout=60.0)

    def route(self, model: str, priority: str = "cost") -> BackendConfig:
        """根据策略选择后端"""
        candidates = sorted(
            BACKENDS.values(),
            key=lambda b: (b.priority, b.cost_per_1m_input)
        )
        for backend in candidates:
            breaker = breakers[backend.provider]
            if breaker.is_available():
                return backend
        raise Exception("All backends unavailable")

    def proxy(self, request_body: dict) -> dict:
        """代理请求到后端"""
        model = request_body.get("model", "gpt-4o")
        backend = self.route(model)

        headers = {
            "Authorization": f"Bearer {backend.api_key}",
            "Content-Type": "application/json",
        }

        # Anthropic 格式转换
        if backend.provider == Provider.ANTHROPIC:
            request_body = self._to_anthropic_format(request_body)

        start = time.time()
        try:
            resp = self.client.post(
                f"{backend.base_url}/chat/completions",
                headers=headers,
                json=request_body,
            )
            latency = time.time() - start

            if resp.status_code in (429, 500, 502, 503):
                breakers[backend.provider].record_failure()
                logger.warning(f"{backend.provider.value} returned {resp.status_code}")
                # 尝试降级到下一个后端
                return self._fallback(request_body, model)

            breakers[backend.provider].record_success()
            return resp.json()

        except Exception as e:
            breakers[backend.provider].record_failure()
            logger.error(f"{backend.provider.value} error: {e}")
            return self._fallback(request_body, model)

    def _fallback(self, request_body: dict, model: str) -> dict:
        """降级:尝试其他后端"""
        for provider, backend in BACKENDS.items():
            if provider != self.route(model).provider:
                if breakers[provider].is_available():
                    headers = {"Authorization": f"Bearer {backend.api_key}"}
                    resp = self.client.post(
                        f"{backend.base_url}/chat/completions",
                        headers=headers,
                        json=request_body,
                    )
                    if resp.status_code == 200:
                        return resp.json()
        return {"error": "All backends failed", "code": 503}

    def _to_anthropic_format(self, body: dict) -> dict:
        """OpenAI 格式转 Anthropic 格式"""
        return {
            "model": body.get("model", "claude-sonnet-4-20250514"),
            "messages": body.get("messages", []),
            "max_tokens": body.get("max_tokens", 4096),
            "stream": body.get("stream", False),
        }

# 使用示例
gateway = AIGateway()
result = gateway.proxy({
    "model": "gpt-4o",
    "messages": [{"role": "user", "content": "你好"}],
    "max_tokens": 100,
})

真实效果与踩坑总结

这套网关上线三个月后的数据:

指标上线前上线后变化
月均 API 成本$8,500$3,400↓ 60%
平均响应延迟(P99)3.2s1.8s↓ 44%
429 错误率2.1%0.03%↓ 98.6%
故障恢复时间手动,15-30 分钟自动,< 5 秒显著改善
API Key 数量(业务侧)30+5 个虚拟 Key集中管理

几个踩坑经验分享:

  1. 流式响应的熔断要特别处理:SSE(Server-Sent Events)连接建立后,如果中途后端挂了,需要主动关闭连接并通知客户端。我们用 Lua 的 ngx.exit(502) 处理,但要注意先 flush 已接收的数据。
  2. token 计数要在网关层做:不能只依赖后端返回的 usage 字段,因为有些服务商的 usage 统计有延迟。我们用 tiktoken 在网关层预估算 token 数,虽然不精确,但足以做成本预警。
  3. 日志别全量存:日均百万级调用的日志量非常大。我们只记录请求元数据(model、team、token 数、延迟),不记录完整的 prompt 和 response 内容。需要排查问题时,通过 trace_id 去各业务服务的日志里关联查。
  4. APISIX 的 etcd 别用单节点:我们一开始图省事用了单节点 etcd,结果 etcd 挂了一次,网关全部无法路由。后来改成三节点集群,再没出过问题。

如果你正在面临类似的 AI API 管理困境,希望这篇文章能帮你少走一些弯路。选型不一定要跟风,关键是搞清楚自己的痛点是什么——是成本、稳定性、还是开发效率?不同的痛点对应不同的方案。小团队可以从 Python 代理开始,大团队直接上 APISIX/Kong,别一上来就搞微服务全家桶。

有问题欢迎在 TokenNexus 留言交流,我们持续更新 AI API 相关的实战经验。