做过AI助手类产品的同行肯定遇到过我这个问题:用户聊到第50轮,AI突然"失忆"了,不记得之前讨论的内容。一问才知道,上下文超限了,直接被截断或者报错了。
这问题太常见了。GPT-4o上下文窗口128K,看着挺大,但10轮中英文混合对话轻松就能用掉50K tokens。再继续聊,上下文直接原地爆炸。
这篇文章是我踩了无数坑后的完整总结,从Session管理到上下文策略,从Token计算到隐私保护,全部是生产环境验证过的方案。
目录
Session与Context的本质区别
先把这个两个概念搞清楚,很多人搞混了。
| 概念 | 定义 | 生命周期 | 存储位置 |
|---|---|---|---|
| Session(会话) | 用户与AI的一次连续交互过程 | 用户主动结束或超时自动清理 | 服务端(Redis/数据库) |
| Context(上下文) | 发送给模型的完整消息历史 | 每次请求时动态构建 | 内存/显存 |
举个例子:
- 用户打开对话框开始聊天,这是一个新的Session
- 用户发了10条消息,AI回复了10条,这构成上下文
- 用户关闭页面,Session结束
- 用户第二天打开,新Session,但如果保存了历史,可以恢复上下文
关键点:Context是消耗品,有上限(上下文窗口大小);Session是容器,可以无限长,但Context要精心管理。
Redis Session管理实战
Session管理方案很多,Redis是最常用的选择。速度快、支持过期、自动清理。
先安装依赖:
pip install redis tiktoken openai anthropic
核心Session管理代码:
import redis
import json
import tiktoken
from datetime import datetime, timedelta
from typing import List, Dict, Optional
class ConversationSession:
"""对话Session管理:Redis存储 + 自动过期 + 多端同步"""
def __init__(self, redis_url: str = "redis://localhost:6379/0"):
self.redis = redis.from_url(redis_url, decode_responses=True)
self.encoding = tiktoken.encoding_for_model("gpt-4o")
# Session默认过期时间:7天
self.DEFAULT_TTL = 7 * 24 * 3600
# 最大历史消息数
self.MAX_MESSAGES = 200
def create_session(self, user_id: str, device_id: str = "default") -> str:
"""创建新Session,返回session_id"""
session_id = f"session:{user_id}:{device_id}:{datetime.now().strftime('%Y%m%d%H%M%S')}"
session_data = {
"created_at": datetime.now().isoformat(),
"user_id": user_id,
"device_id": device_id,
"messages": [],
"metadata": {}
}
self.redis.setex(
session_id,
self.DEFAULT_TTL,
json.dumps(session_data)
)
return session_id
def get_session(self, session_id: str) -> Optional[Dict]:
"""获取Session数据,自动续期"""
data = self.redis.get(session_id)
if data:
# 访问时自动续期
self.redis.expire(session_id, self.DEFAULT_TTL)
return json.loads(data)
return None
def add_message(self, session_id: str, role: str, content: str) -> int:
"""
添加消息到Session
返回当前上下文总token数
"""
session = self.get_session(session_id)
if not session:
raise ValueError(f"Session不存在: {session_id}")
message = {
"role": role,
"content": content,
"timestamp": datetime.now().isoformat()
}
session["messages"].append(message)
# 限制消息数量
if len(session["messages"]) > self.MAX_MESSAGES:
session["messages"] = session["messages"][-self.MAX_MESSAGES:]
# 保存并更新TTL
self.redis.setex(
session_id,
self.DEFAULT_TTL,
json.dumps(session)
)
return self.count_tokens(session["messages"])
def get_messages(self, session_id: str) -> List[Dict]:
"""获取Session中的所有消息"""
session = self.get_session(session_id)
return session["messages"] if session else []
def count_tokens(self, messages: List[Dict]) -> int:
"""计算消息列表的总token数"""
total_tokens = 0
for msg in messages:
# 每个消息有role标记的开销(约4 tokens)
total_tokens += 4
total_tokens += len(self.encoding.encode(msg["content"]))
return total_tokens
def clear_session(self, session_id: str) -> bool:
"""清理Session"""
return bool(self.redis.delete(session_id))
def list_user_sessions(self, user_id: str) -> List[Dict]:
"""列出用户的所有Session"""
pattern = f"session:{user_id}:*"
sessions = []
for key in self.redis.scan_iter(match=pattern, count=100):
data = self.redis.get(key)
if data:
session = json.loads(data)
session["session_id"] = key
sessions.append(session)
return sorted(sessions, key=lambda x: x["created_at"], reverse=True)
# 使用示例
session_manager = ConversationSession()
# 创建新会话
session_id = session_manager.create_session(
user_id="user_12345",
device_id="web_browser"
)
# 添加用户消息
user_tokens = session_manager.add_message(
session_id, "user", "我想了解一下机器学习"
)
# 添加助手回复
assistant_tokens = session_manager.add_message(
session_id, "assistant",
"机器学习是人工智能的一个分支,让计算机通过数据学习并改进。"
)
# 获取当前上下文
messages = session_manager.get_messages(session_id)
print(f"当前消息数: {len(messages)}")
print(f"当前token数: {session_manager.count_tokens(messages)}")
多端同步的关键:同一个user_id可以创建多个device_id的Session,读写时用user_id作为索引就行。实现跨设备继续对话的核心逻辑:
def sync_across_devices(self, user_id: str, target_device_id: str):
"""
将最新Session同步到指定设备
用于:新设备打开时加载历史上下文
"""
# 找到用户最新的Session
sessions = self.list_user_sessions(user_id)
if not sessions:
return None
latest_session = sessions[0] # 已按时间倒序排列
# 复制到目标设备的新Session
new_session_id = self.create_session(user_id, target_device_id)
session = self.get_session(latest_session["session_id"])
self.redis.setex(
new_session_id,
self.DEFAULT_TTL,
json.dumps(session)
)
return new_session_id
上下文窗口三大策略
当对话历史越来越长,上下文窗口不够用了怎么办?有三大策略:
策略一:原样保留(Preserve All)
适合场景:对话短(<10轮)、上下文窗口大(>100K)、需要完整历史
def preserve_all_strategy(messages: List[Dict], max_tokens: int) -> List[Dict]:
"""
原样保留策略
只在超限时截断最旧的消息
"""
# 估算系统prompt开销(约500 tokens)
system_overhead = 500
available_tokens = max_tokens - system_overhead
# 统计当前token
current_tokens = sum(count_message_tokens(m) for m in messages)
if current_tokens <= available_tokens:
return messages
# 超限:从最旧的消息开始删除
truncated = []
current = 0
for msg in messages:
msg_tokens = count_message_tokens(msg)
if current + msg_tokens <= available_tokens:
truncated.append(msg)
current += msg_tokens
else:
break # 后续消息更晚,不再添加
return truncated
策略二:摘要压缩(Summarize)
适合场景:中等长度对话(10-30轮)、需要保留关键信息
import asyncio
from openai import OpenAI
client = OpenAI()
async def summarize_messages(
messages: List[Dict],
model: str = "gpt-4o-mini"
) -> List[Dict]:
"""
摘要压缩策略
将早期消息压缩成摘要,保留近几轮完整对话
"""
if len(messages) <= 4:
return messages # 对话太短,不需要压缩
# 保留最近3轮完整对话
recent_messages = messages[-6:] # 3轮对话 = 6条消息
old_messages = messages[:-6]
if not old_messages:
return messages
# 构建摘要prompt
summary_prompt = """请将以下对话历史压缩成简洁的摘要,保留所有重要信息和关键结论:
"""
for msg in old_messages:
role = "用户" if msg["role"] == "user" else "AI"
summary_prompt += f"{role}:{msg['content']}\n\n"
summary_prompt += """
请用100字以内总结这段对话的核心内容和结论。"""
# 调用AI生成摘要
response = client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": "你是一个对话摘要助手。"},
{"role": "user", "content": summary_prompt}
],
max_tokens=200,
temperature=0.3
)
summary = response.choices[0].message.content
# 返回压缩后的上下文
return [
{"role": "system", "content": f"【对话摘要】{summary}"}
] + recent_messages
def count_message_tokens(msg: Dict) -> int:
"""计算单条消息的token数"""
encoding = tiktoken.encoding_for_model("gpt-4o-mini")
return 4 + len(encoding.encode(msg.get("content", "")))
策略三:滑动窗口(Sliding Window)
适合场景:长对话(>30轮)、只需要最近上下文
def sliding_window_strategy(
messages: List[Dict],
max_tokens: int,
window_tokens: int = 8000,
include_system: bool = True
) -> List[Dict]:
"""
滑动窗口策略
只保留最近N个token的上下文
"""
encoding = tiktoken.encoding_for_model("gpt-4o")
# 分离系统消息
system_msg = None
conversation = []
for msg in messages:
if msg["role"] == "system" and include_system:
system_msg = msg
else:
conversation.append(msg)
# 限制用户输入的最大长度
max_input_tokens = 2000
available_for_history = window_tokens - max_input_tokens
# 从最新的消息开始,填充滑动窗口
result = []
current_tokens = 0
# 先添加系统消息(如果存在)
if system_msg:
system_tokens = count_message_tokens(system_msg)
if system_tokens <= max_tokens - max_input_tokens:
result.append(system_msg)
current_tokens += system_tokens
# 从后往前添加历史消息
for msg in reversed(conversation):
msg_tokens = count_message_tokens(msg)
if current_tokens + msg_tokens <= available_for_history:
result.insert(0 if not system_msg else 1, msg)
current_tokens += msg_tokens
else:
break # 窗口已满
return result
# 使用示例
def build_context(
session_messages: List[Dict],
max_context_tokens: int = 128000,
strategy: str = "auto"
) -> List[Dict]:
"""
自动选择最优上下文策略
"""
current_tokens = sum(count_message_tokens(m) for m in session_messages)
# 计算可用空间占比
usage_ratio = current_tokens / max_context_tokens
if usage_ratio < 0.5:
# 上下文充足,原样保留
return session_messages
elif usage_ratio < 0.8:
# 中等用量,滑动窗口
return sliding_window_strategy(session_messages, max_context_tokens)
else:
# 快满了,需要摘要压缩
return asyncio.run(summarize_messages(session_messages))
Token预算计算方法
上下文管理本质上是Token管理。你得知道什么时候该压缩、压缩多少。
Token计算公式:
import tiktoken
def calculate_context_budget(
model: str,
system_prompt: str,
max_context_tokens: int = 128000
) -> dict:
"""
计算上下文预算
"""
encoding = tiktoken.encoding_for_model(model)
# System prompt开销
system_tokens = len(encoding.encode(system_prompt))
# 每条消息的基础开销
per_message_overhead = 4 # role + content字段名等
# 估算可用空间
available = max_context_tokens - system_tokens - 500 # 留500 buffer
return {
"system_tokens": system_tokens,
"available_for_messages": available,
"max_context_tokens": max_context_tokens,
"usage_tips": f"系统消息占{system_tokens} tokens,剩余{available} tokens可用于对话"
}
# 示例输出
budget = calculate_context_budget(
model="gpt-4o",
system_prompt="你是一个专业的法律顾问,帮助用户解答法律问题。",
max_context_tokens=128000
)
print(budget)
# {'system_tokens': 32, 'available_for_messages': 127468, 'max_context_tokens': 128000, ...}
关键阈值判断:
| 上下文占用率 | 建议策略 | 触发时机 |
|---|---|---|
| <50% | 原样保留 | 对话开始 |
| 50-70% | 滑动窗口 | 约10-15轮后触发 |
| 70-90% | 摘要压缩 | 约20-30轮后触发 |
| >90% | 强制截断 | 最后保护机制 |
实测数据:10轮对话Token消耗
我用GPT-4o-mini做了实测,模拟真实对话场景:
测试设置:System prompt约100字,每轮对话用户输入50-150字,AI回复100-300字,10轮混合中英文。
| 对话轮次 | 累计User Tokens | 累计Assistant Tokens | 累计总Tokens | 占用率(128K) |
|---|---|---|---|---|
| 1轮 | 45 | 128 | ~400 | 0.3% |
| 3轮 | 156 | 412 | ~800 | 0.6% |
| 5轮 | 287 | 756 | ~1,500 | 1.2% |
| 10轮 | 612 | 1,523 | ~3,500 | 2.7% |
| 30轮 | 1,892 | 4,671 | ~10,500 | 8.2% |
| 50轮 | 3,124 | 7,892 | ~18,000 | 14% |
| 100轮 | 6,234 | 15,782 | ~35,000 | 27% |
看起来128K上下文能聊很久?但这是理想情况。实际生产环境中:
- System prompt通常更复杂(几百到几千tokens)
- 用户会粘贴代码、文档等内容(动不动就10K+ tokens)
- 有时需要注入RAG检索结果(又是几千tokens)
加上这些变量,50轮对话轻松达到60K+ tokens。上下文窗口看着大,其实并不够用。
不同模型的成本对比
| 模型 | 上下文窗口 | 10轮对话成本 | 100轮对话成本 | 性价比 |
|---|---|---|---|---|
| GPT-4o | 128K | $0.02 | $0.18 | 中等 |
| GPT-4o-mini | 128K | $0.004 | $0.04 | 最高 |
| Claude 3.5 Sonnet | 200K | $0.015 | $0.12 | 较高 |
| Claude 3 Opus | 200K | $0.12 | $1.20 | 低 |
结论:长对话场景优先用GPT-4o-mini或Claude 3.5 Sonnet,上下文窗口大,价格便宜,效果不差。
RAG增强的对话系统
当对话历史已经很长,但用户问的问题需要参考很久以前的内容怎么办?RAG(检索增强生成)就派上用场了。
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
class RAGEnhancedConversation:
"""
RAG增强的对话系统
在超长对话中检索相关历史/知识,注入到上下文中
"""
def __init__(self, session_manager: ConversationSession):
self.session = session_manager
self.vectorizer = TfidfVectorizer(max_features=768)
self.knowledge_base = [] # 外部知识库
self.embeddings_cache = {}
def add_knowledge(self, text: str, metadata: dict = None):
"""向知识库添加内容"""
self.knowledge_base.append({
"text": text,
"metadata": metadata or {}
})
def retrieve_relevant(
self,
query: str,
top_k: int = 3,
threshold: float = 0.3
) -> list:
"""检索与当前问题相关的知识/历史"""
# 1. 对话历史向量化检索
messages = self.session.get_messages()
history_texts = [m["content"] for m in messages]
# 2. 合并知识库和历史
all_texts = history_texts + [k["text"] for k in self.knowledge_base]
if len(all_texts) < 2:
return []
# 3. TF-IDF相似度计算
tfidf_matrix = self.vectorizer.fit_transform(all_texts)
query_vec = self.vectorizer.transform([query])
similarities = (tfidf_matrix * query_vec.T).toarray().flatten()
# 4. 取top_k最相关的
top_indices = np.argsort(similarities)[-top_k:][::-1]
results = []
for idx in top_indices:
if similarities[idx] >= threshold:
if idx < len(history_texts):
# 来自对话历史
results.append({
"source": "history",
"text": history_texts[idx],
"score": float(similarities[idx]),
"metadata": messages[idx]
})
else:
# 来自知识库
kb_idx = idx - len(history_texts)
results.append({
"source": "knowledge",
"text": self.knowledge_base[kb_idx]["text"],
"score": float(similarities[idx]),
"metadata": self.knowledge_base[kb_idx]["metadata"]
})
return results
def build_rag_context(self, session_id: str, query: str) -> str:
"""
构建RAG增强的上下文
"""
# 1. 获取对话历史
messages = self.session.get_messages(session_id)
# 2. 检索相关内容
relevant = self.retrieve_relevant(query, top_k=5)
# 3. 构建RAG提示
rag_context = "\n\n【相关参考】\n"
for item in relevant:
source = "历史对话" if item["source"] == "history" else "知识库"
rag_context += f"[来自{source}]: {item['text'][:200]}...\n"
return rag_context
# 使用示例
rag = RAGEnhancedConversation(session_manager)
# 添加知识库
rag.add_knowledge(
"公司退货政策:收货后7天内可无理由退货,15天内可换货,需保持商品完好。",
metadata={"category": "policy", "section": "return"}
)
rag.add_knowledge(
"VIP会员权益:享受9折优惠、优先发货、专属客服通道。",
metadata={"category": "membership", "section": "vip"}
)
# 构建增强上下文
rag_context = rag.build_rag_context(
session_id="session:user123:web:20250612",
query="我之前问过退货政策,VIP有什么特殊待遇吗"
)
print(rag_context)
多轮Agent完整框架
把Memory(记忆)、RAG(检索)、Tool(工具)整合起来,就是一个完整的多轮Agent框架。
import asyncio
from typing import List, Dict, Callable, Optional
from enum import Enum
class ContextStrategy(Enum):
PRESERVE = "preserve"
SUMMARIZE = "summarize"
SLIDING_WINDOW = "sliding_window"
class MultiTurnAgent:
"""
多轮对话Agent框架
整合:Memory + RAG + Tool + 上下文管理
"""
def __init__(
self,
llm_client,
session_manager: ConversationSession,
rag: RAGEnhancedConversation,
tools: List[Callable] = None
):
self.llm = llm_client
self.session = session_manager
self.rag = rag
self.tools = tools or []
# 上下文策略配置
self.max_context_tokens = 128000
self.strategy_threshold = 0.7 # 70%时切换策略
async def chat(
self,
session_id: str,
user_input: str,
system_prompt: str = None
) -> str:
"""
主对话入口
"""
# 1. 保存用户消息
self.session.add_message(session_id, "user", user_input)
# 2. 构建上下文
messages = await self._build_context(
session_id,
system_prompt
)
# 3. 规划是否需要调用工具
response = await self._plan_and_execute(messages)
# 4. 保存助手回复
self.session.add_message(session_id, "assistant", response)
# 5. 检查是否需要压缩上下文
await self._maybe_compress_context(session_id, system_prompt)
return response
async def _build_context(
self,
session_id: str,
system_prompt: str
) -> List[Dict]:
"""
构建发送给模型的完整上下文
"""
# 获取历史消息
history = self.session.get_messages(session_id)
# 获取最新用户输入
current_query = history[-1]["content"] if history else ""
# 根据策略处理上下文
if self._should_compress(history):
# 需要压缩
context_messages = await self._compress_context(history, system_prompt)
else:
context_messages = history
# RAG增强
rag_context = self.rag.build_rag_context(session_id, current_query)
# 构建最终消息列表
messages = []
# System prompt
system_content = system_prompt or "你是一个有帮助的AI助手。"
if rag_context:
system_content += f"\n\n{rag_context}"
messages.append({"role": "system", "content": system_content})
# 历史消息
messages.extend(context_messages)
return messages
def _should_compress(self, messages: List[Dict]) -> bool:
"""判断是否需要压缩上下文"""
total_tokens = self.session.count_tokens(messages)
usage_ratio = total_tokens / self.max_context_tokens
return usage_ratio > self.strategy_threshold
async def _compress_context(
self,
messages: List[Dict],
system_prompt: str
) -> List[Dict]:
"""压缩上下文"""
from .context_manager import summarize_messages, sliding_window_strategy
total_tokens = self.session.count_tokens(messages)
usage_ratio = total_tokens / self.max_context_tokens
if usage_ratio > 0.9:
# 严重超限:强制滑动窗口
return sliding_window_strategy(messages, self.max_context_tokens)
else:
# 中等用量:摘要压缩
return await summarize_messages(messages)
async def _plan_and_execute(self, messages: List[Dict]) -> str:
"""
规划执行:判断是否需要调用工具
"""
# 构建规划prompt
planning_messages = messages + [{
"role": "system",
"content": "判断是否需要调用工具。只在确实需要时调用。"
}]
# 调用LLM
response = await self.llm.chat(messages)
return response
async def _maybe_compress_context(
self,
session_id: str,
system_prompt: str
):
"""
压缩完成后,更新Session中的消息
"""
history = self.session.get_messages(session_id)
if self._should_compress(history):
# 已经压缩过了,不要重复压缩
# 实际实现中,这里需要更新Session
pass
# 使用示例
agent = MultiTurnAgent(
llm_client=openai_client,
session_manager=session_manager,
rag=rag,
tools=[search_web, calculate, query_database]
)
# 模拟多轮对话
async def run_conversation():
session_id = session_manager.create_session("user_001")
# 第1轮
response = await agent.chat(
session_id,
"我想买一台电脑,用于视频剪辑",
system_prompt="你是一个专业的电脑选购顾问。"
)
print(f"AI: {response}")
# 第10轮
# ...
# 第50轮 - 上下文已经很长,会自动压缩
response = await agent.chat(
session_id,
"我之前说要买电脑来着,记得我说的用途吗?"
)
print(f"AI: {response}")
asyncio.run(run_conversation())
隐私保护与安全
对话历史包含大量用户隐私数据,生产环境必须考虑安全措施。
对话历史加密存储
from cryptography.fernet import Fernet
import base64
import hashlib
class EncryptedSessionManager:
"""
加密版Session管理
对话内容加密存储,防止泄露
"""
def __init__(self, redis_url: str, encryption_key: str):
self.redis = redis.from_url(redis_url, decode_responses=True)
self.cipher = self._derive_cipher(encryption_key)
@staticmethod
def _derive_cipher(key: str) -> Fernet:
"""从用户密钥派生Fernet密钥"""
hashed = hashlib.sha256(key.encode()).digest()
fernet_key = base64.urlsafe_b64encode(hashed)
return Fernet(fernet_key)
def save_encrypted(self, session_id: str, data: dict, ttl: int = 604800):
"""加密保存"""
json_data = json.dumps(data)
encrypted = self.cipher.encrypt(json_data.encode())
self.redis.setex(session_id, ttl, encrypted)
def load_decrypted(self, session_id: str) -> dict:
"""解密读取"""
encrypted = self.redis.get(session_id)
if not encrypted:
return None
decrypted = self.cipher.decrypt(encrypted)
return json.loads(decrypted)
def delete_permanently(self, session_id: str):
"""彻底删除"""
self.redis.delete(session_id)
自动过期清理
def cleanup_expired_sessions(self, user_id: str) -> int:
"""
清理过期Session
返回删除数量
"""
pattern = f"session:{user_id}:*"
deleted = 0
for key in self.redis.scan_iter(match=pattern):
# 检查TTL
ttl = self.redis.ttl(key)
if ttl == -1: # 没有设置过期时间
self.redis.delete(key)
deleted += 1
elif ttl < 0: # 已过期
self.redis.delete(key)
deleted += 1
return deleted
# 建议:定期运行清理任务
# asyncio.create_task(schedule.every().day.do(cleanup_expired_sessions))
数据隔离原则
- 用户A看不到用户B的数据:Session ID必须包含user_id
- 管理员也无法查看对话内容:加密存储,密钥用户独有
- 敏感字段脱敏:手机号、身份证等自动替换为***
- 审计日志:谁在什么时间访问了什么Session
总结
上下文管理是AI产品的核心技术栈。核心要点:
- Session是容器,Context是消耗品,别搞混
- Redis存储+TTL过期是Session管理的标配
- 三种策略按需切换:短对话原样保留,中等长度滑动窗口,长对话摘要压缩
- Token预算心里有数,别等爆了再处理
- RAG是上下文不足时的救星,把关键知识提前注入
- 隐私保护是底线,加密存储、自动清理、权限隔离
上下文管理做不好,产品体验直接崩盘。用户聊着聊着AI就失忆了,谁受得了?希望这篇文章能帮你把这个问题彻底解决。
实际开发中遇到具体问题,欢迎来TokenNexus交流。我们收录了全球330+ AI API平台,有最新的上下文窗口大小、价格和稳定性数据,帮你选择最适合的模型。