LLM 流式输出
流式输出(Streaming)是让 AI 回复实时显示的技术,可以大幅提升用户体验,特别是在生成长文本时。
为什么需要流式输出
传统模式问题
请求 → 等待3-30秒 → 一次性返回全部内容
用户体验问题:
- 长时间空白等待
- 无法判断是否正在处理
- 无法提前预览内容
流式模式优势
请求 → 立即返回第一个字 → 持续返回 → 完成
优势:
- 首字延迟极低
- 打字机效果更自然
- 可随时中断
基础实现
Python 同步流式
from openai import OpenAI
client = OpenAI(
api_key="your-api-key",
base_url="https://api.weelinking.com/v1"
)
def stream_chat(prompt):
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True
)
full_response = ""
for chunk in stream:
content = chunk.choices[0].delta.content
if content:
print(content, end="", flush=True)
full_response += content
print() # 换行
return full_response
Python 异步流式
import asyncio
from openai import AsyncOpenAI
async_client = AsyncOpenAI(
api_key="your-api-key",
base_url="https://api.weelinking.com/v1"
)
async def async_stream_chat(prompt):
stream = await async_client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True
)
full_response = ""
async for chunk in stream:
content = chunk.choices[0].delta.content
if content:
print(content, end="", flush=True)
full_response += content
return full_response
# 运行
asyncio.run(async_stream_chat("讲一个故事"))
流式数据格式
SSE 格式
Server-Sent Events 是流式输出的标准格式:
data: {"id":"chatcmpl-xxx","choices":[{"delta":{"role":"assistant"}}]}
data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"你"}}]}
data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"好"}}]}
data: {"id":"chatcmpl-xxx","choices":[{"delta":{}}],"finish_reason":"stop"}
data: [DONE]
Chunk 结构
{
"id": "chatcmpl-xxx",
"object": "chat.completion.chunk",
"created": 1234567890,
"model": "gpt-4o",
"choices": [{
"index": 0,
"delta": {
"role": "assistant", # 仅首个 chunk
"content": "部分内容" # 后续 chunk
},
"finish_reason": None # 最后为 "stop"
}]
}
Web 应用集成
FastAPI + SSE
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
async def generate_stream(prompt: str):
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
yield f"data: {content}\n\n"
yield "data: [DONE]\n\n"
@app.get("/chat/stream")
async def chat_stream(prompt: str):
return StreamingResponse(
generate_stream(prompt),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive"
}
)
Flask + SSE
from flask import Flask, Response
app = Flask(__name__)
def generate(prompt):
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
yield f"data: {chunk.choices[0].delta.content}\n\n"
yield "data: [DONE]\n\n"
@app.route('/stream')
def stream():
prompt = request.args.get('prompt')
return Response(
generate(prompt),
mimetype='text/event-stream'
)
前端接收
async function streamChat(prompt) {
const response = await fetch(`/chat/stream?prompt=${encodeURIComponent(prompt)}`);
const reader = response.body.getReader();
const decoder = new TextDecoder();
let result = '';
const outputElement = document.getElementById('output');
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') {
return result;
}
result += data;
outputElement.textContent = result;
}
}
}
return result;
}
React 组件
import { useState, useCallback } from 'react';
function ChatStream() {
const [response, setResponse] = useState('');
const [loading, setLoading] = useState(false);
const sendMessage = useCallback(async (message) => {
setLoading(true);
setResponse('');
try {
const res = await fetch('/chat/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message })
});
const reader = res.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
const lines = text.split('\n');
for (const line of lines) {
if (line.startsWith('data: ') && line !== 'data: [DONE]') {
const content = line.slice(6);
setResponse(prev => prev + content);
}
}
}
} finally {
setLoading(false);
}
}, []);
return (
<div>
<button onClick={() => sendMessage('讲个故事')} disabled={loading}>
{loading ? '生成中...' : '发送'}
</button>
<div className="response">
{response}
{loading && <span className="cursor blink">|</span>}
</div>
</div>
);
}
高级功能
收集完整响应
def stream_with_callback(prompt, on_chunk=None, on_complete=None):
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True
)
full_response = ""
for chunk in stream:
content = chunk.choices[0].delta.content
if content:
full_response += content
if on_chunk:
on_chunk(content)
if on_complete:
on_complete(full_response)
return full_response
# 使用
stream_with_callback(
"讲个故事",
on_chunk=lambda c: print(c, end=""),
on_complete=lambda r: print(f"\n\n总字数: {len(r)}")
)
流式 + 函数调用
def stream_with_tools(prompt, tools):
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
tools=tools,
stream=True
)
content_buffer = ""
tool_calls = {}
for chunk in stream:
delta = chunk.choices[0].delta
# 处理文本
if delta.content:
content_buffer += delta.content
yield {"type": "content", "content": delta.content}
# 处理工具调用
if delta.tool_calls:
for tc in delta.tool_calls:
if tc.index not in tool_calls:
tool_calls[tc.index] = {
"id": tc.id,
"function": {"name": "", "arguments": ""}
}
if tc.function.name:
tool_calls[tc.index]["function"]["name"] = tc.function.name
if tc.function.arguments:
tool_calls[tc.index]["function"]["arguments"] += tc.function.arguments
if tool_calls:
yield {"type": "tool_calls", "tool_calls": list(tool_calls.values())}
可取消的流式请求
class CancellableStream:
def __init__(self):
self.cancelled = False
def cancel(self):
self.cancelled = True
def stream(self, prompt):
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True
)
for chunk in stream:
if self.cancelled:
break
content = chunk.choices[0].delta.content
if content:
yield content
# 使用
streamer = CancellableStream()
# 在另一个线程中取消
# streamer.cancel()
超时处理
import signal
def stream_with_timeout(prompt, timeout=30):
def timeout_handler(signum, frame):
raise TimeoutError("流式响应超时")
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(timeout)
try:
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
finally:
signal.alarm(0)
错误处理
def robust_stream(prompt):
try:
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
except ConnectionError:
yield "\n[连接中断]"
except TimeoutError:
yield "\n[请求超时]"
except Exception as e:
yield f"\n[错误: {str(e)}]"
最佳实践
- 始终使用流式: 长文本生成必须使用流式
- 处理中断: 用户可能随时中断
- 错误恢复: 处理连接中断等问题
- 进度指示: 显示加载状态
- 资源清理: 取消时正确关闭连接
- 日志记录: 记录完整响应用于调试