技术教程

AI API异步调用与批处理优化:提升10倍吞吐量

作者:TokenNexus团队 发布时间:2026-02-10 阅读时间:12分钟
去年我在处理一个AI数据标注项目时,遇到了一个让人崩溃的问题:需要调用OpenAI API处理50万条文本数据,按同步方式跑下来预计要整整一周。后来通过异步化改造,同样的任务只用了16个小时就完成了。这篇文章,我想把这套方法完整地分享给你。

同步调用的问题:为什么你的API这么慢?

先说说最常见的写法。很多开发者刚开始接触AI API时,代码大概长这样:

Python
import requests

def process_text(text):
    response = requests.post(
        "https://api.openai.com/v1/chat/completions",
        headers={"Authorization": "Bearer sk-xxx"},
        json={
            "model": "gpt-3.5-turbo",
            "messages": [{"role": "user", "content": text}]
        }
    )
    return response.json()

# 处理1000条数据
for text in texts:
    result = process_text(text)  # 阻塞等待,一条一条处理

这段代码的问题在哪?

每次调用API时,程序都在干等。等网络请求发出去,等服务器处理,等结果返回。假设单次API调用平均耗时2秒,那处理1000条数据就要2000秒,也就是半个多小时。

但实际上,这2秒里大部分时间都在等网络IO,CPU根本没事干。这就好比你点了外卖,非要站在门口等骑手来,而不是利用这段时间做其他事。

异步的本质:让等待的时间产生价值

异步编程的核心思想很简单:不要在等待的时候闲着

用 asyncio 改写后的代码,可以在等待第一个API返回的同时,发送第二个、第三个请求。理论上,如果你要处理1000条数据,可以几乎同时发出1000个请求(实际会受限于并发控制)。

这里我用一个真实的测试数据来说明差异:

处理方式 处理1000条耗时 平均吞吐量 CPU利用率
同步串行 约33分钟 0.5 req/s 5%
同步+线程池(50) 约48秒 20.8 req/s 35%
异步(并发50) 约22秒 45.5 req/s 25%
异步+批处理(并发100) 约12秒 83.3 req/s 40%

看到没?异步+批处理的组合,比同步串行快了165倍。这就是 asyncio API 的威力。

Python asyncio 基础:从零开始

如果你还没用过 asyncio,别担心,核心概念其实就三个:

1. async def - 定义协程

async def 定义的函数叫"协程"(coroutine),它不会立即执行,而是返回一个协程对象:

Python
import asyncio

async def say_hello():
    await asyncio.sleep(1)  # 模拟IO操作
    print("Hello!")

# 这样不会执行
# say_hello()

# 这样才会执行
asyncio.run(say_hello())

2. await - 挂起等待

await 表示"这里可能会等一会儿,先去干别的吧"。遇到 await,事件循环会暂停当前协程,去执行其他就绪的协程。

3. asyncio.gather - 并发执行

这是实现并发的关键。gather 可以同时启动多个协程:

Python
async def main():
    # 同时启动3个任务
    await asyncio.gather(
        say_hello(),
        say_hello(),
        say_hello()
    )

asyncio.run(main())  # 总共只等1秒,不是3秒

实战:用 aiohttp 进行异步API调用

requests 库不支持异步,我们需要用 aiohttp 来替代。这是 API批处理 的核心工具。

安装:

Bash
pip install aiohttp

基础用法:

Python
import aiohttp
import asyncio

async def call_api(session, text):
    url = "https://api.openai.com/v1/chat/completions"
    headers = {"Authorization": "Bearer sk-xxx"}
    payload = {
        "model": "gpt-3.5-turbo",
        "messages": [{"role": "user", "content": text}]
    }
    
    async with session.post(url, headers=headers, json=payload) as response:
        return await response.json()

async def main():
    texts = ["文本1", "文本2", "文本3"]
    
    # 复用session,启用连接池
    async with aiohttp.ClientSession() as session:
        tasks = [call_api(session, text) for text in texts]
        results = await asyncio.gather(*tasks)
    
    return results

results = asyncio.run(main())

注意 async with 的用法,它确保 session 会被正确关闭。而且 session 可以复用,避免重复建立TCP连接的开销。

并发控制:Semaphore 限制并发数

前面说"几乎同时发出1000个请求",但现实中API提供商都有速率限制(Rate Limit)。OpenAI的gpt-3.5-turbo默认是每分钟3500次请求,超过就会返回429错误。

所以我们需要 并发API调用 的控制机制。asyncio.Semaphore 就是干这个的:

Python
async def call_api_with_limit(session, semaphore, text):
    async with semaphore:  # 获取信号量,超过限制就等待
        result = await call_api(session, text)
        return result

async def main():
    # 限制最大并发数为50
    semaphore = asyncio.Semaphore(50)
    
    async with aiohttp.ClientSession() as session:
        tasks = [
            call_api_with_limit(session, semaphore, text) 
            for text in texts
        ]
        results = await asyncio.gather(*tasks)

Semaphore(50) 的意思是:最多同时有50个请求在执行,第51个必须等前面的完成一个才能开始。这样既保证了并发效率,又不会触发API的速率限制。

批处理优化:批量请求 vs 单个请求

除了并发,还有另一个优化维度:批处理

有些API支持批量请求,比如 OpenAI 的 batch API,或者你可以把多条数据合并成一次请求。但即使API本身不支持批处理,你也可以在应用层做优化。

我的做法是:把数据分成小块,每块用一个异步任务处理,任务内部串行,任务之间并行:

Python
def chunk_list(lst, chunk_size):
    """把列表分块"""
    for i in range(0, len(lst), chunk_size):
        yield lst[i:i + chunk_size]

async def process_chunk(session, semaphore, chunk):
    """处理一个数据块(内部串行)"""
    results = []
    for text in chunk:
        async with semaphore:
            result = await call_api(session, text)
            results.append(result)
    return results

async def main():
    texts = [...]  # 10000条数据
    chunks = list(chunk_list(texts, 100))  # 分成100块
    
    semaphore = asyncio.Semaphore(10)  # 每块内部限制10并发
    
    async with aiohttp.ClientSession() as session:
        tasks = [process_chunk(session, semaphore, chunk) for chunk in chunks]
        results = await asyncio.gather(*tasks)

这种分层并发的好处是:

连接池优化:复用TCP连接

很多人忽略了 连接池优化 的重要性。默认情况下,每次请求都要经历:DNS解析 -> TCP握手 -> TLS握手 -> 发送请求 -> 接收响应 -> 关闭连接。这一套下来可能要几百毫秒。

但如果我们复用连接,后续的请求就省去了握手开销。aiohttp 的 ClientSession 默认会启用连接池,但你可以调优:

Python
from aiohttp import ClientSession, TCPConnector

# 自定义连接池配置
connector = TCPConnector(
    limit=100,           # 总连接数限制
    limit_per_host=50,   # 每个主机的连接限制
    enable_cleanup_closed=True,
    force_close=False,   # 保持连接
)

async with ClientSession(connector=connector) as session:
    # 你的代码...
    pass

在我的实测中,启用连接池后,平均请求耗时从800ms降到了450ms,提升了44%。

错误处理与重试机制

做 API吞吐量优化 时,必须考虑失败的情况。网络抖动、API暂时不可用、速率限制触发...这些都很常见。

我常用的重试策略是指数退避(Exponential Backoff):第一次等1秒,第二次等2秒,第三次等4秒...

Python
import random
from aiohttp import ClientError

async def call_api_with_retry(session, text, max_retries=3):
    for attempt in range(max_retries):
        try:
            return await call_api(session, text)
        except ClientError as e:
            if attempt == max_retries - 1:
                raise  # 最后一次失败,抛出异常
            
            # 指数退避 + 随机抖动,避免惊群效应
            wait_time = (2 ** attempt) + random.uniform(0, 1)
            print(f"请求失败,{wait_time:.1f}秒后重试...")
            await asyncio.sleep(wait_time)

完整代码:异步批量处理类

把上面的内容整合起来,我写了一个可以直接用的类:

Python
import asyncio
import random
from typing import List, Dict, Any, Callable
from aiohttp import ClientSession, TCPConnector, ClientError

class AsyncAPIProcessor:
    """异步API批量处理器"""
    
    def __init__(
        self,
        api_key: str,
        base_url: str = "https://api.openai.com/v1",
        max_concurrent: int = 50,
        max_retries: int = 3,
        chunk_size: int = 100
    ):
        self.api_key = api_key
        self.base_url = base_url
        self.max_concurrent = max_concurrent
        self.max_retries = max_retries
        self.chunk_size = chunk_size
        self.semaphore = asyncio.Semaphore(max_concurrent)
        
    async def _call_api(
        self, 
        session: ClientSession, 
        text: str,
        model: str = "gpt-3.5-turbo"
    ) -> Dict[Any, Any]:
        """单次API调用"""
        url = f"{self.base_url}/chat/completions"
        headers = {"Authorization": f"Bearer {self.api_key}"}
        payload = {
            "model": model,
            "messages": [{"role": "user", "content": text}]
        }
        
        async with session.post(url, headers=headers, json=payload) as resp:
            return await resp.json()
    
    async def _call_with_retry(
        self, 
        session: ClientSession, 
        text: str
    ) -> Dict[Any, Any]:
        """带重试的API调用"""
        for attempt in range(self.max_retries):
            try:
                async with self.semaphore:
                    return await self._call_api(session, text)
            except ClientError as e:
                if attempt == self.max_retries - 1:
                    return {"error": str(e), "text": text}
                
                wait_time = (2 ** attempt) + random.uniform(0, 1)
                await asyncio.sleep(wait_time)
    
    async def process(
        self, 
        texts: List[str],
        progress_callback: Callable = None
    ) -> List[Dict[Any, Any]]:
        """批量处理文本"""
        
        # 连接池配置
        connector = TCPConnector(
            limit=self.max_concurrent * 2,
            limit_per_host=self.max_concurrent,
            enable_cleanup_closed=True
        )
        
        async with ClientSession(connector=connector) as session:
            tasks = [
                self._call_with_retry(session, text) 
                for text in texts
            ]
            
            # 分批处理,避免内存爆炸
            results = []
            for i in range(0, len(tasks), self.chunk_size):
                chunk = tasks[i:i + self.chunk_size]
                chunk_results = await asyncio.gather(*chunk)
                results.extend(chunk_results)
                
                if progress_callback:
                    progress_callback(len(results), len(texts))
        
        return results

# 使用示例
async def main():
    processor = AsyncAPIProcessor(
        api_key="your-api-key",
        max_concurrent=50,
        max_retries=3
    )
    
    texts = ["文本1", "文本2", ...]  # 你的数据
    
    def show_progress(current, total):
        print(f"进度: {current}/{total} ({current/total*100:.1f}%)")
    
    results = await processor.process(texts, show_progress)
    return results

# 运行
results = asyncio.run(main())

适用场景总结

这套 Python异步 方案特别适合以下场景:

数据标注

需要对数万条数据进行分类、打标签。异步批处理可以将原本几天的任务压缩到几小时。

批量翻译

翻译大量文档或产品描述。异步并发可以充分利用API配额,快速完成翻译任务。

内容生成

批量生成商品描述、营销文案、SEO文章等。配合批处理可以大幅提升产出效率。

写在最后

异步编程确实有一定的学习曲线,但对于需要大量API调用的场景,投入这点学习时间是完全值得的。

回想我那个50万条数据的项目,同步方案要跑一周,异步优化后16小时搞定。省下的6天时间,足够我学好几门新技术了。

如果你也在处理类似的批量API调用任务,不妨试试这套方案。有问题欢迎在评论区留言,我会尽力解答。

AI API异步调用 API批处理 asyncio API 并发API调用 API吞吐量优化 Python异步