
目录
在现代Web应用和API设计中,高效的数据传输至关重要。传统的HTTP请求/响应模型通常是一次性返回所有数据,这对于小数据量来说没有问题。但当处理大规模数据集、实时数据流或像大型语言模型(LLM)那样逐步生成结果的场景时,这种模式可能会导致高延迟和糟糕的用户体验。
什么是流式HTTP请求(Streaming HTTP Request)?
流式HTTP请求允许服务器在数据完全准备好之前,就开始分块(chunks)向客户端发送数据。客户端也可以在接收到第一个数据块后立即开始处理,而无需等待整个响应下载完毕。这种方式特别适用于那些响应内容是动态生成或体积庞大的情况。
为什么流式返回很重要?
本篇博客将带你深入了解如何在Python中使用不同的库来实现HTTP流式请求,并通过搭建本地Ollama大模型服务进行实战测试,最后对不同方案的性能进行比较。
Python生态系统提供了多个优秀的库来处理HTTP请求,其中一些对流式传输有很好的支持。
requests 是Python中最流行和用户友好的HTTP库之一。它以简洁的API著称,并支持流式请求。
stream=True 参数,requests 不会立即下载响应内容,而是允许你迭代响应体。httpx 是一个功能齐全的HTTP客户端,被誉为下一代Python HTTP库。它同时支持同步和异步操作,并且API设计与requests非常相似。
requests兼容性高,迁移成本低,支持HTTP/2。requests较新,但已足够稳定和成熟。stream=True 参数启用流式模式,在同步和异步模式下均可使用。aiohttp 是一个基于 asyncio 的异步HTTP客户端/服务器框架。它是纯异步库,非常适合构建高性能的异步网络应用。
requests不同,需要熟悉asyncio编程模型。aiohttp 的响应对象本身就支持异步迭代,可以方便地处理流式数据。接下来,我们将搭建一个本地的Ollama服务,用它作为我们流式请求的测试对象。
Ollama 是一个可以让你在本地轻松运行大型语言模型(如 Qwen 等)的工具。它提供了一个HTTP API接口,非常适合用来测试我们的流式请求。
Ollama 将模型权重、配置和数据打包成一个单一的包(通过Modelfile管理)。它简化了模型的下载、部署和管理过程。
Ollama 提供了命令行工具来管理模型。
拉取模型:选择一个你想要测试的模型。例如,拉取一个较小的模型如 phi 或 qwen3:1.7b 以便快速测试:
ollama pull qwen3:1.7b
运行模型(Ollama服务会自动加载): Ollama服务启动后,它会自动处理模型的加载和运行。你不需要显式“运行”一个模型,当你向其API发送请求时,如果模型已下载,它会被加载并用于生成响应。
Ollama默认在 http://localhost:11434 上提供服务。你可以通过 curl 来测试服务是否正常以及模型的流式输出。
打开一个新的终端,运行以下命令:
curl.exe -X POST http://localhost:11434/api/generate -H "Content-Type: application/json"
-d '{\"model\":\"qwen3:1.7b\", \"prompt\":\"给michael阿明讲个笑话\", \"stream\":true}'
如果一切正常,你会看到一系列JSON对象被逐行打印出来,每个对象代表模型生成的一部分内容。"stream": true 是关键,它告诉Ollama以流式方式返回响应。如果为 false (默认),则会等待所有内容生成完毕后一次性返回。
例如,输出可能如下所示(每行是一个JSON对象):
PS C:\Users\mingm> curl.exe -X POST http://localhost:11434/api/generate -H "Content-Type: application/json" -d '{\"model\":\"qwen3:1.7b\", \"prompt\":\"给michael阿明讲个笑话\", \"stream\":true}'
{"model":"qwen3:1.7b","created_at":"2025-05-16T14:29:42.259136Z","response":"\u003cthink\u003e","done":false}
{"model":"qwen3:1.7b","created_at":"2025-05-16T14:29:42.3206735Z","response":"\n","done":false}
{"model":"qwen3:1.7b","created_at":"2025-05-16T14:29:42.3830886Z","response":"好的","done":false}
{"model":"qwen3:1.7b","created_at":"2025-05-16T14:29:42.44524Z","response":",","done":false}
{"model":"qwen3:1.7b","created_at":"2025-05-16T14:29:42.5068419Z","response":"用户","done":false}
{"model":"qwen3:1.7b","created_at":"2025-05-16T14:29:42.5690238Z","response":"让我","done":false}
{"model":"qwen3:1.7b","created_at":"2025-05-16T14:29:42.6310473Z","response":"给","done":false}
{"model":"qwen3:1.7b","created_at":"2025-05-16T14:29:42.6935324Z","response":"Michael","done":false}
{"model":"qwen3:1.7b","created_at":"2025-05-16T14:29:42.7554825Z","response":"讲","done":false}
注意 done: false 表示流仍在继续,done: true 表示流结束。
现在我们有了一个可以流式返回数据的本地服务,接下来我们将用Python代码来与之交互。
在这一部分,我们将展示如何使用 requests、httpx(同步和异步)以及 aiohttp 来从Ollama服务获取流式响应。
我们将向Ollama的 /api/generate 端点发送请求,并设置 "stream": true。
通用请求体:
ollama_payload = {
"model": "qwen3:1.7b", # 确保你已经拉取了这个模型,或者替换成你已有的模型
"prompt": "给我写一个关于Python异步编程的简短介绍",
"stream": True
}
ollama_url = "http://localhost:11434/api/generate"
import requests
import json
import time
ollama_payload = {
"model": "qwen3:1.7b",
"prompt": "给我写一个关于Python异步编程的简短介绍,用中文回答。",
"stream": True
}
ollama_url = "http://localhost:11434/api/generate"
def stream_with_requests():
print("--- 使用 requests (同步) ---")
start_time = time.time()
full_response = ""
first_chunk_received = False
time_to_first_chunk = 0
try:
with requests.post(ollama_url, json=ollama_payload, stream=True) as response:
response.raise_for_status() # 检查HTTP请求错误
print(f"请求状态码: {response.status_code}")
for line in response.iter_lines():
if line:
if not first_chunk_received:
time_to_first_chunk = time.time() - start_time
first_chunk_received = True
decoded_line = line.decode('utf-8')
try:
json_chunk = json.loads(decoded_line)
content = json_chunk.get("response", "")
full_response += content
print(content, end='', flush=True)
if json_chunk.get("done"):
print("\n--- 流结束 ---")
break
except json.JSONDecodeError:
print(f"\n无法解析JSON行: {decoded_line}")
continue
end_time = time.time()
total_time = end_time - start_time
print(f"\n\n首次接收到数据块耗时: {time_to_first_chunk:.4f} 秒")
print(f"总耗时: {total_time:.4f} 秒")
# print(f"完整响应内容:\n{full_response}")
except requests.exceptions.RequestException as e:
print(f"\n请求发生错误: {e}")
except Exception as e:
print(f"\n处理过程中发生未知错误: {e}")
if __name__ == '__main__':
stream_with_requests()
pass
首次接收到数据块耗时: 2.2818 秒 总耗时: 36.6709 秒
代码说明:
ollama_url 发送一个POST请求,json=ollama_payload 会自动将字典转换为JSON字符串并设置 Content-Type: application/json。stream=True 是关键,它告诉 requests 不要立即下载整个响应。response.iter_lines() 允许我们逐行迭代响应。Ollama的流式输出是每行一个JSON对象。response 字段的内容并打印。flush=True 确保内容立即输出到控制台。done: true 时,表示模型输出完毕。httpx 的同步API与requests非常相似。
import httpx
import json
import time
def stream_with_httpx_sync():
print("\n--- 使用 httpx (同步) ---")
start_time = time.time()
full_response = ""
first_chunk_received = False
time_to_first_chunk = 0
try:
with httpx.stream("POST", ollama_url, json=ollama_payload, timeout=300.0) as response:
response.raise_for_status()
print(f"请求状态码: {response.status_code}")
for line in response.iter_lines():
if line:
if not first_chunk_received:
time_to_first_chunk = time.time() - start_time
first_chunk_received = True
try:
json_chunk = json.loads(line) # httpx.iter_lines() 已经解码
content = json_chunk.get("response", "")
full_response += content
print(content, end='', flush=True)
if json_chunk.get("done"):
print("\n--- 流结束 ---")
break
except json.JSONDecodeError:
print(f"\n无法解析JSON行: {line}")
continue
end_time = time.time()
total_time = end_time - start_time
print(f"\n\n首次接收到数据块耗时: {time_to_first_chunk:.4f} 秒")
print(f"总耗时: {total_time:.4f} 秒")
except httpx.RequestError as e:
print(f"\n请求发生错误: {e}")
except Exception as e:
print(f"\n处理过程中发生未知错误: {e}")
if __name__ == '__main__':
stream_with_httpx_sync()
pass
代码说明:
httpx.stream("POST", ollama_url, json=ollama_payload) 用于发起流式请求。注意,与requests不同,stream是方法名的一部分,而不是参数。response.iter_lines() 的行为与requests中的类似。httpx的iter_lines默认返回解码后的字符串。首次接收到数据块耗时: 2.6894 秒 总耗时: 38.6338 秒
现在我们来看看 httpx 的异步版本,这需要使用 async 和 await 关键字。
import json
import httpx
import json
import asyncio
async def stream_with_httpx_async():
print("\n--- 使用 httpx (异步) ---")
start_time = asyncio.get_event_loop().time() # 使用事件循环时间作为起始时间
full_response = ""
first_chunk_received = False
time_to_first_chunk = 0
try:
async with httpx.AsyncClient(timeout=300.0) as client:
async with client.stream("POST", ollama_url, json=ollama_payload) as response:
response.raise_for_status()
print(f"请求状态码: {response.status_code}")
async for line in response.aiter_lines():
if line:
if not first_chunk_received:
# 使用asyncio.get_event_loop().time()获取事件循环的当前时间
# 这比time.time()更适合异步环境,因为它与事件循环的时间基准一致
# 特别是在长时间运行的应用中或需要高精度计时时
time_to_first_chunk = asyncio.get_event_loop().time() - start_time
first_chunk_received = True
try:
json_chunk = json.loads(line)
content = json_chunk.get("response", "")
full_response += content
print(content, end='', flush=True)
if json_chunk.get("done"):
print("\n--- 流结束 ---")
break
except json.JSONDecodeError:
print(f"\n无法解析JSON行: {line}")
continue
end_time = asyncio.get_event_loop().time() # 使用事件循环时间作为结束时间
total_time = end_time - start_time
print(f"\n\n首次接收到数据块耗时: {time_to_first_chunk:.4f} 秒")
print(f"总耗时: {total_time:.4f} 秒")
except httpx.RequestError as e:
print(f"\n请求发生错误: {e}")
except Exception as e:
print(f"\n处理过程中发生未知错误: {e}")
if __name__ == '__main__':
asyncio.run(stream_with_httpx_async())
pass
代码说明:
httpx.AsyncClient。async with。response.aiter_lines(),这是一个异步迭代器。async def 定义,并通过 asyncio.run() 来执行。首次接收到数据块耗时: 0.9680 秒 总耗时: 44.3120 秒
aiohttp 是一个纯异步库,API风格与 requests 和 httpx 有所不同。
import aiohttp
import json
import asyncio
import time
async def stream_with_aiohttp():
print("\n--- 使用 aiohttp (异步) ---")
start_time = asyncio.get_event_loop().time()
full_response = ""
first_chunk_received = False
time_to_first_chunk = 0
try:
async with aiohttp.ClientSession() as session:
async with session.post(ollama_url, json=ollama_payload, timeout=aiohttp.ClientTimeout(total=300)) as response:
response.raise_for_status()
print(f"请求状态码: {response.status}") # 注意是 .status
# aiohttp的content是aiohttp.StreamReader对象,我们可以异步迭代它
async for line_bytes in response.content: # line_bytes 是 bytes 类型
if line_bytes: # 确保不是空的心跳包等
if not first_chunk_received:
time_to_first_chunk = asyncio.get_event_loop().time() - start_time
first_chunk_received = True
print(f"首次接收到数据块耗时: {time_to_first_chunk:.4f} 秒")
line = line_bytes.decode('utf-8').strip() # 解码并去除可能的空白
if not line: # 跳过解码后为空的行
continue
try:
json_chunk = json.loads(line)
content = json_chunk.get("response", "")
full_response += content
print(content, end='', flush=True)
if json_chunk.get("done"):
print("\n--- 流结束 ---")
# 在aiohttp中,当Ollama发送done:true后,它会关闭连接,
# 这通常会导致StreamReader结束迭代,所以break可能不是严格必需的,
# 但显式break更清晰。
break
except json.JSONDecodeError:
print(f"\n无法解析JSON行: {line}")
continue
end_time = asyncio.get_event_loop().time()
total_time = end_time - start_time
print(f"\n\n首次接收到数据块耗时: {time_to_first_chunk:.4f} 秒")
print(f"总耗时: {total_time:.4f} 秒")
except aiohttp.ClientError as e:
print(f"\n请求发生错误: {e}")
except json.JSONDecodeError as e:
print(f"\nJSON解析错误: {e}")
except asyncio.TimeoutError:
print(f"\n请求超时")
except Exception as e:
print(f"\n处理过程中发生未知错误: {e}")
代码说明:
aiohttp.ClientSession 来管理连接。response.content 是一个 StreamReader 对象,我们可以直接异步迭代它来获取数据块(bytes 类型)。line_bytes)需要被解码。Ollama的流式响应中,每个JSON对象通常是单独的一行,所以我们按行处理。首次接收到数据块耗时: 0.5000 秒 总耗时: 41.5470 秒
requests 作为纯同步库,在等待每个数据块时会阻塞执行线程。httpx (同步) 与 requests 类似。httpx (异步) 和 aiohttp 在等待数据时不会阻塞主线程,它们会将CPU时间让渡给事件循环处理其他任务(如果存在)。对于单个流,这可能不直接减少总时间,但能提高应用的整体响应能力和并发处理能力。资源消耗与并发处理:
requests, httpx sync): 每个请求通常需要一个独立的线程来避免阻塞主应用。在高并发场景下,大量线程会导致显著的内存开销和上下文切换成本。httpx async, aiohttp): 使用单线程事件循环处理多个并发连接。它们在等待IO时不会阻塞,因此可以用更少的线程(通常是一个)处理大量并发请求,内存占用更低,上下文切换开销小。这使得异步库在处理大量并发流式请求时具有巨大优势。