关于Python 示例 run_in_executor的代码
import asyncio
import concurrent.futures
def blocking_io():
# File operations (such as logging) can block the
# event loop: run them in a thread pool.
with open('/dev/urandom', 'rb') as f:
return f.read(100)
def cpu_bound():
# CPU-bound operations will block the event loop:
# in general it is preferable to run them in a
# process pool.
return sum(i * i for i in range(10 ** 7))
async def main():
loop = asyncio.get_running_loop()
## Options:
# 1. Run in the default loop's executor:
result = await loop.run_in_executor(
None, blocking_io)
print('default thread pool', result)
# 3. Run in a custom process pool:
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(
pool, cpu_bound)
print('custom process pool', result)
asyncio.run(main())示例(注释中)建议使用ThreadPoolExecutor运行i/o绑定函数,使用ProcessPoolExecutor运行cpu绑定函数。我想用以下三个问题来验证我对此背后的原因的理解:
发布于 2021-12-23 10:06:23
这些建议并不是真正的建议,因为否则事件循环将阻塞。因此,我们将失去事件编程的主要好处,对吗?
如果您在协同线中调用阻塞( I/O和CPU阻塞)功能,则事件循环将阻塞,而无需等待执行器。在这方面,是的,你不应该允许这种情况发生。
对于每种类型的阻塞代码,我会说它是一种执行器类型:使用ProcessPoolExecutor进行CPU绑定,对I/O绑定使用ThreadPoolExecutor。
将io/绑定任务作为单独的线程运行,需要以下假设: i/o调用将释放GIL,对吗?因为除此之外,os将无法在事件循环和这个新的单独线程之间进行上下文切换。
当涉及到多线程时,Python将在非常短时间之后在线程之间切换,而不释放GIL。但是,如果一个或多个线程具有I/O (或C代码),则GIL将被释放,允许解释器在需要它的线程上花费更多的时间。
底线是:
我编写了一个示例来演示它是如何工作的:
import sys
import asyncio
import time
import concurrent.futures
import requests
from contextlib import contextmanager
process_pool = concurrent.futures.ProcessPoolExecutor(2)
thread_pool = concurrent.futures.ThreadPoolExecutor(2)
def io_bound():
for i in range(3):
requests.get("https://httpbin.org/delay/0.4") # I/O blocking
print(f"I/O bound {i}")
sys.stdout.flush()
def cpu_bound():
for i in range(3):
sum(i * i for i in range(10 ** 7)) # CPU blocking
print(f"CPU bound {i}")
sys.stdout.flush()
async def run_as_is(func):
func()
async def run_in_process(func):
loop = asyncio.get_event_loop()
await loop.run_in_executor(process_pool, func)
async def run_in_thread(func):
loop = asyncio.get_event_loop()
await loop.run_in_executor(thread_pool, func)
@contextmanager
def print_time():
start = time.time()
yield
finished = time.time() - start
print(f"Finished in {round(finished, 1)}\n")
async def main():
print("Wrong due to blocking code in coroutine,")
print(
"you get neither performance, nor concurrency (which breaks async nature of the code)"
)
print("don't allow this to happen")
with print_time():
await asyncio.gather(run_as_is(cpu_bound), run_as_is(io_bound))
print("CPU bound works concurrently with threads,")
print("but you gain no performance due to GIL")
with print_time():
await asyncio.gather(run_in_thread(cpu_bound), run_in_thread(cpu_bound))
print("To get perfromance for CPU-bound,")
print("use process executor")
with print_time():
await asyncio.gather(run_in_process(cpu_bound), run_in_process(cpu_bound))
print("I/O bound will gain benefit from processes as well...")
with print_time():
await asyncio.gather(run_in_process(io_bound), run_in_process(io_bound))
print(
"... but there's no need in processes since you can use lighter threads for I/O"
)
with print_time():
await asyncio.gather(run_in_thread(io_bound), run_in_thread(io_bound))
print("Long story short,")
print("Use processes for CPU bound due to GIL")
print(
"and use threads for I/O bound since you benefit from concurrency regardless of GIL"
)
with print_time():
await asyncio.gather(run_in_thread(io_bound), run_in_process(cpu_bound))
if __name__ == "__main__":
asyncio.run(main())输出:
Wrong due to blocking code in coroutine,
you get neither performance, nor concurrency (which breaks async nature of the code)
don't allow this to happen
CPU bound 0
CPU bound 1
CPU bound 2
I/O bound 0
I/O bound 1
I/O bound 2
Finished in 5.3
CPU bound works concurrently with threads,
but you gain no performance due to GIL
CPU bound 0
CPU bound 0
CPU bound 1
CPU bound 1
CPU bound 2
CPU bound 2
Finished in 4.6
To get perfromance for CPU-bound,
use process executor
CPU bound 0
CPU bound 0
CPU bound 1
CPU bound 1
CPU bound 2
CPU bound 2
Finished in 2.5
I/O bound will gain benefit from processes as well...
I/O bound 0
I/O bound 0
I/O bound 1
I/O bound 1
I/O bound 2
I/O bound 2
Finished in 3.3
... but there's no need in processes since you can use lighter threads for I/O
I/O bound 0
I/O bound 0
I/O bound 1
I/O bound 1
I/O bound 2
I/O bound 2
Finished in 3.1
Long story short,
Use processes for CPU bound due to GIL
and use threads for I/O bound since you benefit from concurrency regardless of GIL
CPU bound 0
I/O bound 0
CPU bound 1
I/O bound 1
CPU bound 2
I/O bound 2
Finished in 2.9https://stackoverflow.com/questions/70459437
复制相似问题