from concurrent.futures import ThreadPoolExecutor
import time
def task(n):
time.sleep(0.2)
return n * n
with ThreadPoolExecutor(max_workers=4) as pool:
results = list(pool.map(task, range(8)))
print(results) # [0, 1, 4, 9, 16, 25, 36, 49]
多进程(CPU 密集)
适用:计算、压缩、图像处理等 CPU 密集任务。
要点:使用 Process 或 ProcessPoolExecutor;注意在 if name == ‘main’: 下启动多进程。
示例:
from multiprocessing import Process, cpu_count
import time
def worker(i):
s = sum(x*x for x in range(10**6))
print(f"worker {i} done, sum={s}")
if name == ‘main’:
procs = [Process(target=worker, args=(i,)) for i in range(cpu_count())]
for p in procs: p.start()
for p in procs: p.join()
import asyncio, aiohttp
async def fetch(session, sem, url):
async with sem:
async with session.get(url, timeout=10) as r:
return await r.text()
async def main():
urls = [“https://httpbin.org/delay/1”] * 200
sem = asyncio.Semaphore(100)
async with aiohttp.ClientSession() as sess:
tasks = [fetch(sess, sem, u) for u in urls]
await asyncio.gather(*tasks)
asyncio.run(main())
协程 greenlet(I/O 密集、低侵入改造)
适用:已有同步代码,希望低成本提升 I/O 并发。
要点:在入口处 monkey.patch_all() 打补丁,替换阻塞调用为协作式切换。
示例:
from gevent import monkey; monkey.patch_all()
import gevent
def worker(i):
gevent.sleep(0.2)
return i*i
jobs = [gevent.spawn(worker, i) for i in range(8)]
gevent.joinall(jobs)
print([j.value for j in jobs])