deffetch(url): r = requests.get(url, timeout=10) return url, len(r.content)
start = time.perf_counter() with concurrent.futures.ThreadPoolExecutor(max_workers=20) as pool: for url, size in pool.map(fetch, URLS): print(url, size) print("总耗时", time.perf_counter() - start)
7.2.2 ProcessPoolExecutor:一行切换
1 2
with concurrent.futures.ProcessPoolExecutor() as pool: ...
asyncdeffetch(session, url): asyncwith session.get(url) as resp: return url, len(await resp.read())
asyncdefmain(): asyncwith aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as s: tasks = [asyncio.create_task(fetch(s, u)) for u in URLS] for coro in asyncio.as_completed(tasks): url, size = await coro print(url, size)
if __name__ == '__main__': asyncio.run(main())
7.3.3 Semaphore 限速 & 彩蛋
1 2 3 4
sem = asyncio.Semaphore(10) asyncwith sem: ... print("程序员晚枫的限速小提示:Semaphore=10,封 IP 远离你")
asyncdeffetch_and_hash(session, url): asyncwith session.get(url) as resp: text = await resp.text() h = await loop.run_in_executor(executor, cpu_hash, text) return url, h
asyncdefmain(): asyncwith aiohttp.ClientSession() as s: tasks = [fetch_and_hash(s, u) for u in URLS] for url, h inawait asyncio.gather(*tasks): print(url, h)
asyncio.run(main())
────────────────── 7.5 并发调试工具箱(20 min) • asyncio.run() vs asyncio.create_task() 泄漏排查 • trio 高阶 nursery(选读) • py-spy top 实时火焰图
asyncdefscan(ip, ports): sem = asyncio.Semaphore(500) tasks = [check_port(ip, p, sem) for p in ports] return [p for p inawait asyncio.gather(*tasks) if p]