跳到主要内容

批处理

批处理是指同时处理多个 LLM 请求的技术,适用于需要处理大量数据的场景,如批量翻译、内容生成、数据标注等。

应用场景

常见批处理任务

  • 批量翻译: 翻译大量文档或字符串
  • 内容生成: 批量生成产品描述、文章
  • 数据标注: 自动分类、打标签
  • 内容审核: 批量检查内容合规性
  • 信息提取: 从大量文本中提取结构化数据

实现方式

1. 简单循环(串行)

最简单但效率最低:

from openai import OpenAI

client = OpenAI(api_key="your-api-key", base_url="https://api.weelinking.com/v1")

def process_batch_serial(items):
results = []
for item in items:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": item}]
)
results.append(response.choices[0].message.content)
return results

# 使用
items = ["翻译:Hello", "翻译:World", "翻译:Goodbye"]
results = process_batch_serial(items)

2. 异步并发

使用 asyncio 实现并发请求:

import asyncio
from openai import AsyncOpenAI

async_client = AsyncOpenAI(
api_key="your-api-key",
base_url="https://api.weelinking.com/v1"
)

async def process_single(item):
response = await async_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": item}]
)
return response.choices[0].message.content

async def process_batch_async(items, concurrency=10):
semaphore = asyncio.Semaphore(concurrency)

async def process_with_semaphore(item):
async with semaphore:
return await process_single(item)

tasks = [process_with_semaphore(item) for item in items]
return await asyncio.gather(*tasks)

# 使用
items = ["任务1", "任务2", "任务3", ...]
results = asyncio.run(process_batch_async(items, concurrency=10))

3. 线程池

使用 ThreadPoolExecutor:

from concurrent.futures import ThreadPoolExecutor, as_completed
from openai import OpenAI

client = OpenAI(api_key="your-api-key", base_url="https://api.weelinking.com/v1")

def process_single(item):
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": item}]
)
return response.choices[0].message.content

def process_batch_threaded(items, max_workers=10):
results = [None] * len(items)

with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_index = {
executor.submit(process_single, item): i
for i, item in enumerate(items)
}

for future in as_completed(future_to_index):
index = future_to_index[future]
try:
results[index] = future.result()
except Exception as e:
results[index] = f"Error: {str(e)}"

return results

4. OpenAI Batch API

OpenAI 提供专门的批处理 API,成本更低:

import json
from openai import OpenAI

client = OpenAI(api_key="your-api-key", base_url="https://api.weelinking.com/v1")

def create_batch_file(requests):
"""创建批处理输入文件"""
lines = []
for i, req in enumerate(requests):
line = {
"custom_id": f"request-{i}",
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": "gpt-4o-mini",
"messages": [{"role": "user", "content": req}]
}
}
lines.append(json.dumps(line))
return "\n".join(lines)

def submit_batch(requests):
# 创建文件内容
file_content = create_batch_file(requests)

# 上传文件
file = client.files.create(
file=file_content.encode(),
purpose="batch"
)

# 创建批处理任务
batch = client.batches.create(
input_file_id=file.id,
endpoint="/v1/chat/completions",
completion_window="24h"
)

return batch.id

def get_batch_results(batch_id):
batch = client.batches.retrieve(batch_id)

if batch.status == "completed":
# 下载结果
result_file = client.files.content(batch.output_file_id)
results = []
for line in result_file.text.split("\n"):
if line:
result = json.loads(line)
results.append(result)
return results

return {"status": batch.status}

高级技巧

1. 进度追踪

from tqdm import tqdm
import asyncio

async def process_batch_with_progress(items, concurrency=10):
semaphore = asyncio.Semaphore(concurrency)
results = []

async def process_with_semaphore(item, pbar):
async with semaphore:
result = await process_single(item)
pbar.update(1)
return result

with tqdm(total=len(items), desc="处理进度") as pbar:
tasks = [process_with_semaphore(item, pbar) for item in items]
results = await asyncio.gather(*tasks)

return results

2. 错误重试

import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10)
)
async def process_with_retry(item):
return await process_single(item)

async def process_batch_with_retry(items, concurrency=10):
semaphore = asyncio.Semaphore(concurrency)

async def process(item):
async with semaphore:
try:
return {"success": True, "result": await process_with_retry(item)}
except Exception as e:
return {"success": False, "error": str(e), "item": item}

tasks = [process(item) for item in items]
return await asyncio.gather(*tasks)

3. 分批处理

处理超大量数据时分批:

def chunk_list(lst, chunk_size):
for i in range(0, len(lst), chunk_size):
yield lst[i:i + chunk_size]

async def process_large_batch(items, batch_size=100, concurrency=10):
all_results = []

for batch in chunk_list(items, batch_size):
batch_results = await process_batch_async(batch, concurrency)
all_results.extend(batch_results)

# 批次间休息,避免过载
await asyncio.sleep(1)

return all_results

4. 结果持久化

import json
import os

async def process_batch_with_checkpoint(items, output_file, concurrency=10):
# 加载已完成的结果
completed = {}
if os.path.exists(output_file):
with open(output_file) as f:
for line in f:
result = json.loads(line)
completed[result["id"]] = result

# 找出未完成的任务
pending = [(i, item) for i, item in enumerate(items) if i not in completed]

# 处理未完成的任务
async def process_and_save(index, item):
result = await process_single(item)

# 立即保存
with open(output_file, "a") as f:
f.write(json.dumps({
"id": index,
"input": item,
"output": result
}) + "\n")

return result

semaphore = asyncio.Semaphore(concurrency)

async def process_with_semaphore(index, item):
async with semaphore:
return await process_and_save(index, item)

tasks = [process_with_semaphore(i, item) for i, item in pending]
await asyncio.gather(*tasks)

# 返回所有结果
with open(output_file) as f:
return [json.loads(line) for line in f]

成本优化

1. 使用更便宜的模型

# 简单任务用 mini 模型
model = "gpt-4o-mini" # 比 gpt-4o 便宜很多

2. 批量请求合并

将多个小任务合并为一个请求:

def batch_in_one_request(items, batch_size=10):
results = []

for batch in chunk_list(items, batch_size):
prompt = "请分别处理以下任务,每个任务结果用 --- 分隔:\n"
prompt += "\n".join([f"{i+1}. {item}" for i, item in enumerate(batch)])

response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}]
)

# 解析结果
batch_results = response.choices[0].message.content.split("---")
results.extend([r.strip() for r in batch_results])

return results

3. 使用 Batch API

Batch API 通常有 50% 的折扣。

监控与日志

import logging
import time

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

async def process_batch_with_monitoring(items, concurrency=10):
start_time = time.time()
success_count = 0
error_count = 0
total_tokens = 0

async def process(item):
nonlocal success_count, error_count, total_tokens
try:
response = await async_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": item}]
)
success_count += 1
total_tokens += response.usage.total_tokens
return response.choices[0].message.content
except Exception as e:
error_count += 1
logger.error(f"处理失败: {str(e)}")
return None

# 处理所有任务
results = await process_batch_async(items, concurrency)

# 输出统计
elapsed = time.time() - start_time
logger.info(f"完成: {success_count}/{len(items)}, 失败: {error_count}")
logger.info(f"耗时: {elapsed:.2f}s, Token: {total_tokens}")

return results

最佳实践

  1. 合理的并发数: 根据 API 限制设置,通常 10-50
  2. 错误处理: 实现重试和降级策略
  3. 进度保存: 支持断点续传
  4. 成本监控: 实时追踪 token 使用
  5. 速率限制: 遵守 API 调用限制