先跟你说个真事。
上个月,我接了个任务:写一个爬虫,要爬取一万个网页。每个网页里又包含几十个图片链接,需要把这些图片也下载下来。
这活儿听着简单对吧?用 requests 循环一万次,里面再循环几十次,完事。
但我不能这么干。一万个网页,每个网页几十张图,加起来几十万次网络请求。要是同步地一个一个等,估计跑完得等到下个版本上线。
所以我用了异步。asyncio、aiohttp 都安排上了。
代码写完了,一跑——慢得要命。跟同步差不多,完全没有体现出异步的优势。
我懵了。
折腾了一整个晚上,翻了无数篇帖子,最后发现原因就藏在一个我完全没注意到的细节里。
今天我就把这个坑完完整整地讲给你听。保证你听完之后,不光知道怎么避坑,还能真正理解异步循环嵌套到底是怎么回事。
我们做一个小例子。假设你要从三个网站上抓数据,每个网站需要先请求 page 接口(耗时1秒),然后再根据返回的结果请求 detail 接口(也耗时1秒)。
同步写法很简单:
import time
def fetch_page(site):
time.sleep(1) # 模拟网络请求
return f"{site} 的数据"
def fetch_detail(site):
time.sleep(1)
return f"{site} 的详细信息"
def main():
sites = ["site_a", "site_b", "site_c"]
for site in sites:
page = fetch_page(site)
detail = fetch_detail(site)
print(page, detail)
start = time.time()
main()
print(f"耗时: {time.time() - start:.2f}秒")跑一下,耗时大概6秒。每个站点2秒,三个站点就是6秒。这没问题。
异步版本呢?理想情况下,三个站点的请求可以同时进行,总共只需要2秒左右。
我们来写一个异步版本:
import asyncio
async def fetch_page(site):
await asyncio.sleep(1)
return f"{site} 的数据"
async def fetch_detail(site):
await asyncio.sleep(1)
return f"{site} 的详细信息"
async def process_site(site):
page = await fetch_page(site)
detail = await fetch_detail(site)
return page, detail
async def main():
sites = ["site_a", "site_b", "site_c"]
tasks = [process_site(site) for site in sites]
results = await asyncio.gather(*tasks)
for result in results:
print(result)
start = time.time()
asyncio.run(main())
print(f"耗时: {time.time() - start:.2f}秒")这个版本耗时多少?2秒左右。完美。
这个例子看起来很简单,对吧?但就是在这个基础上,稍微嵌套一层循环,问题就来了。
我当时的代码大概是这个结构:
async def fetch_page(site, page_num):
await asyncio.sleep(0.1) # 模拟请求
return f"{site} 第{page_num}页的数据"
async def fetch_images(page_data):
await asyncio.sleep(0.05) # 模拟请求图片
return [f"image_{i}" for i in range(3)]
async def process_site(site):
all_images = []
# 外层循环:这个站点的每一页
for page_num in range(1, 11): # 假设每个站点10页
page_data = await fetch_page(site, page_num)
# 内层循环:这一页的每一张图片
images = await fetch_images(page_data)
all_images.extend(images)
return all_images
async def main():
sites = ["site_a", "site_b", "site_c"]
tasks = [process_site(site) for site in sites]
results = await asyncio.gather(*tasks)乍一看没问题啊?外层循环是每个站点,内层循环是每个站点里的每一页,每页里的图片又是异步请求的。这不挺好的吗?
但跑起来发现,三个站点之间确实是并发的,但每个站点内部的10页是顺序执行的——先请求第1页,等返回了,再请求第1页的图片,然后才能开始第2页,再等图片,再第3页……
这就相当于:三个站点各自排成一队,一页一页地处理。完全没有利用到“一页里的多张图片可以同时下”这个优化机会。
更糟糕的是,如果我每个站点有100页,每页有50张图,那这个顺序执行的问题会被放大100倍。
我当时愣是没看出来问题在哪。直到我在纸上把执行顺序画出来。
咱们用手画一下这个执行过程。假设只有两个站点,每个站点只有两页,每页两张图。
我当时的代码执行顺序是这样的:
站点A:请求第1页 → 等待 → 拿到数据 → 请求图1 → 等待 → 请求图2 → 等待 → 存图
站点A:请求第2页 → 等待 → 拿到数据 → 请求图1 → 等待 → 请求图2 → 等待 → 存图
站点B:请求第1页 → 等待 → 拿到数据 → 请求图1 → 等待 → 请求图2 → 等待 → 存图
站点B:请求第2页 → 等待 → 拿到数据 → 请求图1 → 等待 → 请求图2 → 等待 → 存图每个“等待”的位置,CPU其实都闲着,但程序就是不去做别的事,非得等这个请求回来。
这就是问题核心:**await 会挂起当前这个异步函数,但它只挂起自己,不会影响到同一层级的其他任务**。
等等,这句话有点绕。我用大白话再说一遍:
当你在一个异步函数里写 await something(),这个函数就会停在这里,等 something() 完成。但这不意味着整个程序都停了——程序可以去执行别的异步任务,比如另一个站点的任务。
所以在我上面的代码里,process_site('site_a') 这个任务在等待第1页返回的时候,程序确实可以去处理 process_site('site_b')。这一点是好的,所以三个站点之间是并发的。
问题在于:在同一个 process_site 任务内部,for 循环里的每一次 await 都会让这个任务停下来,直到这次请求完成,才会进入下一次循环。内层循环也是这样。
所以每个站点内部,所有请求是串行的。
如果你想让一个站点内部的多个请求也并发执行,你需要把那些独立的请求批量收集起来,然后用 asyncio.gather 或 asyncio.wait 一次性发出去。
拿图片下载来说,正确的做法应该是:先把这个页面的所有图片链接收集好,然后一次性创建所有图片的异步任务,同时等待它们全部完成。
代码大概是这样的:
async def process_site_correct(site):
all_images = []
for page_num in range(1, 11):
page_data = await fetch_page(site, page_num)
# 先获取这一页的所有图片链接
image_urls = extract_image_urls(page_data)
# 关键在这里:一次性创建所有图片任务,并发执行
image_tasks = [fetch_image(url) for url in image_urls]
images = await asyncio.gather(*image_tasks)
all_images.extend(images)
return all_images这样改完之后,执行顺序就变成了:
站点A:请求第1页 → 等待
(等待期间,站点B可以做自己的事)
第1页返回 → 同时请求该页的所有图片(假设10张图同时发请求)
等待所有图片返回 → 然后继续第2页图片下载这部分就从串行变成了并发。
但这里还有一个优化空间:页面请求本身能不能也并发?比如一个站点有10页,我可不可以同时请求这10页?
可以。但要注意:同时请求10页可能会对目标服务器造成压力,也可能导致你自己的网络连接数爆掉。合理控制并发数是个单独的话题,今天不展开。
再深入一层。假设我每个站点的每一页,返回的数据里包含的不只是图片链接,还有另外的 API 需要调用(比如每个图片需要额外请求一个评论接口)。
这时候代码可能变成这样:
async def fetch_image_with_comments(image_url):
image_data = await fetch_image(image_url)
comments = await fetch_comments(image_url)
return {"image": image_data, "comments": comments}
async def process_page(page_num):
page_data = await fetch_page(page_num)
image_urls = extract_urls(page_data)
# 这里看起来是并发的
tasks = [fetch_image_with_comments(url) for url in image_urls]
results = await asyncio.gather(*tasks)
return results这个看起来没问题吧?每个 fetch_image_with_comments 内部其实是串行的(先等图,再等评论),但不同图片之间是并发的。
这已经很好了。
但如果你写出这样的代码:
# 错误示范
async def fetch_image_with_comments_wrong(image_url):
# 里面又套了一层循环?或者又用了 gather 但忘了 await?
tasks = [fetch_image(image_url), fetch_comments(image_url)]
# 这里没有 await,返回的是一个协程对象,不是结果
return asyncio.gather(*tasks) # 注意:这里没有 await你会在某个地方发现结果不对,或者更糟——程序根本没执行这些请求,因为你返回的是一个还没被调度的协程对象。
这属于另一个经典错误:asyncio.gather 返回的是一个 awaitable 对象,你必须 await 它,或者用 asyncio.run 去跑,否则它不会真正执行。
如果你不确定自己的异步代码是不是真的并发了,最简单的办法就是打时间戳。
import time
async def fetch_with_log(name, delay):
start = time.time()
print(f"[{start:.3f}] 开始 {name}")
await asyncio.sleep(delay)
end = time.time()
print(f"[{end:.3f}] 结束 {name},耗时 {end-start:.2f}秒")
return name
async def test_serial():
print("串行版本:")
for i in range(3):
await fetch_with_log(f"任务{i}", 0.5)
async def test_concurrent():
print("并发版本:")
tasks = [fetch_with_log(f"任务{i}", 0.5) for i in range(3)]
await asyncio.gather(*tasks)
# 跑一下你就看到区别了
# 串行:开始时间依次相差0.5秒
# 并发:三个任务的开始时间几乎相同这个技巧我用了无数遍。每当你怀疑某个地方的循环是不是串行的,就把里面的关键操作加上日志,看看开始时间是不是挤在一起的。
如果开始时间是连成一串的,那就是串行。如果几乎同时打印出来,那就是并发。
经过那次加班之后,我给自己定了几条规则,你可以参考:
规则1:看见 await 在循环里,就要警惕
for 循环里面如果直接 await 一个异步函数,那这个循环一定是串行的。除非你就是想要串行,否则要考虑改成先收集任务再 gather。
规则2:搞清楚“谁和谁可以并发”
规则3:gather 不是万能的,它只是“同时等待”
很多人以为用了 gather 就自动并发了。其实 gather 做的事情很简单:把你传给它的多个协程任务同时调度起来,然后等待它们全部完成。但前提是这些任务本身要独立。
如果你传给 gather 的是一堆 [fetch_page(1), fetch_page(2), fetch_page(3)],这三个请求会同时发出去,很好。
但如果你传给 gather 的是一堆 [process_page(1), process_page(2), process_page(3)],而每个 process_page 内部又是串行的,那 gather 也救不了你。
规则4:异步不是自动并行
这是最容易被误解的一点。async/await 给你的只是“在等待的时候不阻塞”,而不是“自动把循环拆成多线程”。并发需要你显式地用 gather、create_task、wait 等工具来组织。
最后我的爬虫改成了这样:
async def process_site_optimized(site):
# 先获取这个站点所有需要抓的页面列表
page_tasks = [fetch_page(site, page_num) for page_num in range(1, 101)]
# 同时请求所有页面(限制并发数,用 semaphore)
pages_data = await limited_gather(page_tasks, max_concurrent=10)
# 收集所有图片 URL
all_image_tasks = []
for page_data in pages_data:
image_urls = extract_image_urls(page_data)
all_image_tasks.extend([fetch_image(url) for url in image_urls])
# 同时下载所有图片(同样限制并发)
images = await limited_gather(all_image_tasks, max_concurrent=20)
return images这里的 limited_gather 是自己写的一个包装,用 asyncio.Semaphore 控制同时进行的请求数量。这样既利用了异步并发的优势,又不会把服务器打爆或者把自己的连接池耗尽。
改完之后,原来要跑20分钟的活儿,1分多钟就跑完了。
异步编程的难点不在于 async/await 这两个关键字,而在于思维的转换。
在同步编程里,你写 for 循环,脑子里想的是“一个一个来”。在异步编程里,你需要想的是“哪些事情可以同时做,哪些事情必须等”。
当你看到嵌套循环的时候,不要急着写代码。先在纸上画一下:外层循环的每一次迭代,是否依赖上一次的结果?内层循环的每一次迭代,是否互相依赖?
如果不依赖,那它们就可以并发。并发的方式就是先把所有任务收集到一个列表里,然后一次性 await gather。
就这么简单。
但就是这么简单的事情,我当时愣是想了一整个晚上。
希望你看完这篇文章之后,不用再像我一样加班到凌晨了。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。