跳到主要内容

流式输出

流式输出(Streaming)是一种让 AI 回复逐字/逐词显示的技术,提供更好的用户体验,特别是在生成长文本时。

为什么需要流式输出

传统方式的问题

非流式调用需要等待完整响应:

  • 长文本生成可能需要几十秒
  • 用户只能看到空白等待
  • 体验较差

流式输出的优势

  • 即时反馈: 第一个字立即显示
  • 用户体验: 打字机效果更自然
  • 提前终止: 用户可以随时中断
  • 减少等待焦虑: 能看到进度

基础实现

Python 同步流式

from openai import OpenAI

client = OpenAI(
api_key="your-api-key",
base_url="https://api.weelinking.com/v1"
)

def stream_chat(prompt):
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True
)

full_response = ""
for chunk in stream:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
print(content, end="", flush=True)
full_response += content

print() # 换行
return full_response

# 使用
response = stream_chat("讲一个短故事")

Python 异步流式

import asyncio
from openai import AsyncOpenAI

async_client = AsyncOpenAI(
api_key="your-api-key",
base_url="https://api.weelinking.com/v1"
)

async def async_stream_chat(prompt):
stream = await async_client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True
)

full_response = ""
async for chunk in stream:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
print(content, end="", flush=True)
full_response += content

return full_response

# 运行
asyncio.run(async_stream_chat("讲一个故事"))

Web 应用集成

FastAPI + SSE

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import OpenAI

app = FastAPI()
client = OpenAI(api_key="your-api-key", base_url="https://api.weelinking.com/v1")

async def generate_stream(prompt: str):
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True
)

for chunk in stream:
if chunk.choices[0].delta.content:
yield f"data: {chunk.choices[0].delta.content}\n\n"

yield "data: [DONE]\n\n"

@app.get("/chat/stream")
async def chat_stream(prompt: str):
return StreamingResponse(
generate_stream(prompt),
media_type="text/event-stream"
)

前端接收(JavaScript)

async function streamChat(prompt) {
const response = await fetch(`/chat/stream?prompt=${encodeURIComponent(prompt)}`);
const reader = response.body.getReader();
const decoder = new TextDecoder();

let result = '';

while (true) {
const { done, value } = await reader.read();
if (done) break;

const chunk = decoder.decode(value);
const lines = chunk.split('\n');

for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') break;

result += data;
document.getElementById('output').textContent = result;
}
}
}

return result;
}

React 组件

import { useState } from 'react';

function ChatStream() {
const [response, setResponse] = useState('');
const [loading, setLoading] = useState(false);

const sendMessage = async (message) => {
setLoading(true);
setResponse('');

try {
const res = await fetch('/chat/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message })
});

const reader = res.body.getReader();
const decoder = new TextDecoder();

while (true) {
const { done, value } = await reader.read();
if (done) break;

const chunk = decoder.decode(value);
// 解析 SSE 格式
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data !== '[DONE]') {
setResponse(prev => prev + data);
}
}
}
}
} finally {
setLoading(false);
}
};

return (
<div>
<button onClick={() => sendMessage('讲个故事')}>
发送
</button>
<div className="response">
{response}
{loading && <span className="cursor">|</span>}
</div>
</div>
);
}

高级用法

1. 收集完整响应

def stream_with_full_response(prompt):
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True
)

collected_chunks = []
collected_content = []

for chunk in stream:
collected_chunks.append(chunk)
if chunk.choices[0].delta.content:
collected_content.append(chunk.choices[0].delta.content)
yield chunk.choices[0].delta.content

# 获取完整响应
full_response = "".join(collected_content)
return full_response

2. 超时处理

import signal

class TimeoutError(Exception):
pass

def timeout_handler(signum, frame):
raise TimeoutError("流式响应超时")

def stream_with_timeout(prompt, timeout=30):
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(timeout)

try:
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True
)

for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
finally:
signal.alarm(0) # 取消超时

3. 流式 + 函数调用

def stream_with_functions(prompt, tools):
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
tools=tools,
stream=True
)

collected_tool_calls = []

for chunk in stream:
delta = chunk.choices[0].delta

# 处理文本内容
if delta.content:
yield {"type": "content", "content": delta.content}

# 处理工具调用
if delta.tool_calls:
for tool_call in delta.tool_calls:
if tool_call.index >= len(collected_tool_calls):
collected_tool_calls.append({
"id": tool_call.id,
"function": {"name": "", "arguments": ""}
})

if tool_call.function.name:
collected_tool_calls[tool_call.index]["function"]["name"] = tool_call.function.name
if tool_call.function.arguments:
collected_tool_calls[tool_call.index]["function"]["arguments"] += tool_call.function.arguments

if collected_tool_calls:
yield {"type": "tool_calls", "tool_calls": collected_tool_calls}

流式输出格式

Server-Sent Events (SSE)

标准的 SSE 格式:

data: {"content": "你"}

data: {"content": "好"}

data: {"content": "!"}

data: [DONE]

OpenAI 流式格式

# 每个 chunk 的结构
{
"id": "chatcmpl-xxx",
"object": "chat.completion.chunk",
"created": 1234567890,
"model": "gpt-4o",
"choices": [
{
"index": 0,
"delta": {
"role": "assistant", # 只在第一个 chunk
"content": "部分内容" # 后续 chunk
},
"finish_reason": None # 最后一个 chunk 为 "stop"
}
]
}

注意事项

1. 连接中断处理

def robust_stream(prompt):
try:
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True
)

for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content

except Exception as e:
yield f"\n[连接中断: {str(e)}]"

2. 取消请求

import threading

class CancellableStream:
def __init__(self):
self.cancelled = False

def cancel(self):
self.cancelled = True

def stream(self, prompt):
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True
)

for chunk in stream:
if self.cancelled:
break
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content

最佳实践

  1. 始终使用流式: 长文本生成必须使用流式
  2. 处理连接问题: 添加错误处理和重试
  3. 用户体验: 添加加载指示器
  4. 取消支持: 允许用户中断生成
  5. 日志记录: 记录完整响应用于调试