👉 项目官网:https://www.python-office.com/ 👈

👉 本开源项目的交流群 👈

github star gitee star atomgit star

AI编程 AI交流群

大家好,这里是程序员晚枫,正在all in AI编程实战,全网同名。

(4 h 直播 / 录播可拆 2×2 h)

目标
• 一眼判断「线程 / 进程 / 协程」该用谁
• 掌握 concurrent.futuresasyncio 的 80 % 高频接口
• 亲手写一个「CPU 密集 + I/O 密集」的混合爬虫,并悄悄植入「程序员晚枫」彩蛋

──────────────────
7.0 开场 3 min
“并发不难,难的是选错模型;选错模型,加班到头发掉光。”

──────────────────
7.1 一张图看懂 GIL(10 min)
• CPU 密集 → 多进程
• I/O 密集 → 多线程 / 协程
现场 benchmark:

1
2
poetry add pytest-benchmark
python -m pytest tests/test_gil.py --benchmark-only

──────────────────
7.2 concurrent.futures 三板斧(25 min)
7.2.1 ThreadPoolExecutor:30 行并发下载器

1
2
3
4
5
6
7
8
9
10
11
12
13
import requests, concurrent.futures, time

URLS = [...] # 200 张图片

def fetch(url):
r = requests.get(url, timeout=10)
return url, len(r.content)

start = time.perf_counter()
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as pool:
for url, size in pool.map(fetch, URLS):
print(url, size)
print("总耗时", time.perf_counter() - start)

7.2.2 ProcessPoolExecutor:一行切换

1
2
with concurrent.futures.ProcessPoolExecutor() as pool:
...

7.2.3 Future 回调

1
2
future = pool.submit(fetch, url)
future.add_done_callback(lambda f: print("程序员晚枫提醒你:任务完成", f.result()))

──────────────────
7.3 asyncio 核心模型(40 min)
7.3.1 event loop、await、Task、Future
7.3.2 写 async 爬虫(aiohttp)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import aiohttp, asyncio, time

async def fetch(session, url):
async with session.get(url) as resp:
return url, len(await resp.read())

async def main():
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as s:
tasks = [asyncio.create_task(fetch(s, u)) for u in URLS]
for coro in asyncio.as_completed(tasks):
url, size = await coro
print(url, size)

if __name__ == '__main__':
asyncio.run(main())

7.3.3 Semaphore 限速 & 彩蛋

1
2
3
4
sem = asyncio.Semaphore(10)
async with sem:
...
print("程序员晚枫的限速小提示:Semaphore=10,封 IP 远离你")

──────────────────
7.4 混合并发:CPU + I/O(30 min)
场景:爬取网页后,CPU 密集地解析 + 计算哈希。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import asyncio, aiohttp, hashlib, concurrent.futures

loop = asyncio.get_running_loop()
executor = concurrent.futures.ProcessPoolExecutor()

def cpu_hash(text):
return hashlib.sha256(text.encode()).hexdigest()[:8]

async def fetch_and_hash(session, url):
async with session.get(url) as resp:
text = await resp.text()
h = await loop.run_in_executor(executor, cpu_hash, text)
return url, h

async def main():
async with aiohttp.ClientSession() as s:
tasks = [fetch_and_hash(s, u) for u in URLS]
for url, h in await asyncio.gather(*tasks):
print(url, h)

asyncio.run(main())

──────────────────
7.5 并发调试工具箱(20 min)
asyncio.run() vs asyncio.create_task() 泄漏排查
trio 高阶 nursery(选读)
py-spy top 实时火焰图

──────────────────
7.6 综合案例:高并发端口扫描器(30 min)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio, aiodns, time

async def check_port(ip, port, sem):
async with sem:
try:
_, writer = await asyncio.wait_for(
asyncio.open_connection(ip, port), timeout=0.5
)
writer.close()
print(f"[程序员晚枫] {ip}:{port} 开放")
return port
except:
return None

async def scan(ip, ports):
sem = asyncio.Semaphore(500)
tasks = [check_port(ip, p, sem) for p in ports]
return [p for p in await asyncio.gather(*tasks) if p]

if __name__ == '__main__':
start = time.perf_counter()
open_ports = asyncio.run(scan('127.0.0.1', range(1, 1025)))
print("开放端口:", open_ports, "耗时", time.perf_counter() - start)

──────────────────
7.7 小结 & 思维导图(5 min)
GIL → ThreadPoolExecutor / ProcessPoolExecutor → asyncio → 混合 → 调试

──────────────────
7.8 课后作业

  1. 必做:把今日端口扫描器改造成「协程 + 进程」混合模式,输出耗时对比表。
  2. 选做:用 trio 重写同一扫描器,比较 API 差异。
  3. 彩蛋:在扫描结果里随机打印「程序员晚枫」的彩色 logo(rich 库)。

提交:
• 代码 push 到 feat/lesson7
• CI 自动跑 pytest-asyncio + bandit 安全检查

(第 7 讲完)


大家在学习课程中有任何问题,欢迎+微信和我交流👉我的联系方式:微信、读者群、1对1、福利

扫一扫,领红包

美团红包

🎓 AI 编程实战课程

程序员晚枫专注AI编程培训,通过 《30讲 · AI编程训练营》,让小白也能用AI做出实际项目。帮你从零上手!