大家好,我是正在实战各种 AI 项目的程序员晚枫。

Python 的并发编程,常被人误解。"有 GIL 所以多线程没用"——这是一个危险的过度简化。真正的问题是:你的任务是 CPU 密集型还是 IO 密集型?搞清楚这个,才能选对工具。


🧩 理解 GIL:真正的含义

GIL 是什么?

GIL(Global Interpreter Lock,全局解释器锁)是 CPython 解释器中的一把互斥锁,它确保同一时刻只有一个线程在执行 Python 字节码

1
2
3
线程 1 ─────[获取GIL]──执行字节码──[释放GIL]────────────────
线程 2 ────────────────────────────[获取GIL]──执行字节码──...
线程 3 ─────────────────────────────────────────[等待GIL]───

GIL 什么时候释放?

关键点:GIL 并不是永远锁住的——

1
2
3
4
5
6
7
8
9
import sys
# 每执行 sys.getswitchinterval() 秒(默认 0.005s)后,Python 强制切换线程
print(sys.getswitchinterval()) # 0.005

# 更重要的是:在 IO 操作时,Python 主动释放 GIL
# - 文件读写
# - 网络请求
# - time.sleep()
# - 等待锁/信号量

结论

  • IO 密集型任务 → GIL 在等待 IO 时主动释放 → 多线程有用
  • CPU 密集型任务 → 纯计算,GIL 不释放 → 多线程没用,用多进程

🧵 threading:IO 密集型的利器

基础用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import threading
import time
import requests

def download(url: str, results: list, index: int) -> None:
"""模拟下载任务"""
print(f"[Thread-{index}] 开始下载 {url}")
time.sleep(1) # 模拟 IO 等待(GIL 此时释放)
results[index] = f"Downloaded: {url}"
print(f"[Thread-{index}] 完成")

urls = [f"https://example.com/{i}" for i in range(5)]
results = [None] * len(urls)

# 顺序执行:约 5 秒
start = time.time()
for i, url in enumerate(urls):
download(url, results, i)
print(f"顺序执行耗时:{time.time() - start:.2f}s") # ~5.00s

# 并发执行:约 1 秒
start = time.time()
threads = []
for i, url in enumerate(urls):
t = threading.Thread(target=download, args=(url, results, i))
threads.append(t)
t.start()

for t in threads:
t.join()
print(f"并发执行耗时:{time.time() - start:.2f}s") # ~1.00s

线程同步:Lock 和 Event

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import threading

# 1. Lock:保护共享资源
counter = 0
lock = threading.Lock()

def increment():
global counter
for _ in range(10000):
with lock: # 上下文管理器方式使用 Lock
counter += 1

threads = [threading.Thread(target=increment) for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
print(counter) # 50000(无锁情况下可能不是 50000)


# 2. Event:线程间通信
ready = threading.Event()

def worker():
print("Worker 等待信号...")
ready.wait() # 阻塞,直到 ready.set()
print("Worker 收到信号,开始工作")

t = threading.Thread(target=worker)
t.start()
time.sleep(1)
ready.set() # 发出信号
t.join()

🔀 multiprocessing:CPU 密集型的救星

真正的并行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import multiprocessing
import time
import math

def cpu_task(n: int) -> float:
"""CPU 密集型任务:计算 n 的阶乘"""
result = 1
for i in range(1, n + 1):
result *= i
return math.log(result)

numbers = [50000] * 8 # 8 个任务

# 单进程:约 8 倍时间
start = time.time()
results = [cpu_task(n) for n in numbers]
print(f"单进程耗时:{time.time() - start:.2f}s")

# 多进程:约 1 倍时间(取决于 CPU 核数)
start = time.time()
with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool:
results = pool.map(cpu_task, numbers)
print(f"多进程耗时:{time.time() - start:.2f}s")
print(f"CPU 核数:{multiprocessing.cpu_count()}")

进程间通信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import multiprocessing as mp

# 1. Queue:进程间传递数据
def producer(queue: mp.Queue, items: list) -> None:
for item in items:
queue.put(item)
queue.put(None) # 哨兵值,通知消费者结束

def consumer(queue: mp.Queue, results: list) -> None:
while True:
item = queue.get()
if item is None:
break
results.append(item * 2)

queue = mp.Queue()
manager = mp.Manager()
results = manager.list()

p1 = mp.Process(target=producer, args=(queue, [1, 2, 3, 4, 5]))
p2 = mp.Process(target=consumer, args=(queue, results))

p1.start(); p2.start()
p1.join(); p2.join()
print(list(results)) # [2, 4, 6, 8, 10](顺序可能不同)

⚡ concurrent.futures:现代并发的首选

concurrent.futures 是 Python 标准库提供的高级并发接口,推荐优先使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import time

def io_task(n: int) -> str:
time.sleep(0.5) # 模拟 IO
return f"IO result {n}"

def cpu_task(n: int) -> int:
return sum(i ** 2 for i in range(n))

# 1. ThreadPoolExecutor:IO 密集型
with ThreadPoolExecutor(max_workers=5) as executor:
# submit 提交单个任务
future = executor.submit(io_task, 1)
print(future.result()) # "IO result 1"

# map 批量提交(保持顺序)
results = list(executor.map(io_task, range(10)))

# as_completed:谁先完成谁先处理
futures = {executor.submit(io_task, i): i for i in range(5)}
for future in as_completed(futures):
n = futures[future]
try:
result = future.result()
print(f"任务 {n} 完成:{result}")
except Exception as e:
print(f"任务 {n} 失败:{e}")

# 2. ProcessPoolExecutor:CPU 密集型
with ProcessPoolExecutor() as executor:
results = list(executor.map(cpu_task, [100000, 200000, 300000]))
print(results)

Future 对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from concurrent.futures import ThreadPoolExecutor
import time

def slow_task(n: int) -> int:
time.sleep(n)
return n * 2

with ThreadPoolExecutor(max_workers=3) as executor:
future = executor.submit(slow_task, 2)

print(future.done()) # False(还没完成)
print(future.running()) # True

result = future.result(timeout=5) # 等待最多 5 秒
print(result) # 4

print(future.done()) # True

📊 性能对比

实际测试中,不同场景的加速效果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def io_bound(n):
time.sleep(0.1) # 模拟 IO
return n

def cpu_bound(n):
return sum(i**2 for i in range(n)) # 纯计算

N = 20

# IO 密集型
start = time.time()
results = [io_bound(i) for i in range(N)]
print(f"IO 顺序: {time.time()-start:.2f}s") # ~2.00s

start = time.time()
with ThreadPoolExecutor(max_workers=N) as ex:
results = list(ex.map(io_bound, range(N)))
print(f"IO 多线程: {time.time()-start:.2f}s") # ~0.10s ✅ 大幅提升

# CPU 密集型(10万次计算)
tasks = [100_000] * N

start = time.time()
results = [cpu_bound(t) for t in tasks]
print(f"CPU 顺序: {time.time()-start:.2f}s")

start = time.time()
with ThreadPoolExecutor(max_workers=N) as ex:
results = list(ex.map(cpu_bound, tasks))
print(f"CPU 多线程: {time.time()-start:.2f}s") # 与顺序相当,GIL 导致没有提升

start = time.time()
with ProcessPoolExecutor() as ex:
results = list(ex.map(cpu_bound, tasks))
print(f"CPU 多进程: {time.time()-start:.2f}s") # ✅ 大幅提升

🗺️ 选择指南

场景推荐方案原因
网络请求、文件 IOThreadPoolExecutorGIL 在 IO 时释放,线程开销小
大量网络连接(万级)asyncio协程开销极小,见第18讲
CPU 密集计算ProcessPoolExecutor绕过 GIL,真正利用多核
数值计算NumPy + 多进程NumPy 内部释放 GIL
混合型任务线程池 + 进程池组合分层架构

⚠️ 常见陷阱

1. 多进程中 if name == 'main' 不可省略

1
2
3
4
# Windows 上必须有这行,否则子进程会递归启动
if __name__ == '__main__':
with ProcessPoolExecutor() as executor:
results = list(executor.map(cpu_task, range(10)))

2. 不要在线程间共享可变状态(除非有锁)

1
2
3
4
5
6
7
8
9
10
11
12
# ❌ 危险:竞态条件
shared_list = []

def unsafe_append(item):
shared_list.append(item) # list.append 在 CPython 中恰好是原子的,但不能依赖这点

# ✅ 安全:使用 Queue 或 Lock
from queue import Queue
result_queue = Queue()

def safe_append(item):
result_queue.put(item)

3. 进程间通信只能传递可 pickle 的对象

1
2
3
4
5
6
7
8
9
10
# ❌ lambda 不能 pickle,不能用于多进程
with ProcessPoolExecutor() as ex:
results = list(ex.map(lambda x: x**2, range(10))) # 报错!

# ✅ 用普通函数
def square(x):
return x ** 2

with ProcessPoolExecutor() as ex:
results = list(ex.map(square, range(10))) # 正常

🎯 本讲总结

GIL 的真相:只限制 CPU 密集型多线程;IO 等待时主动释放,多线程对 IO 密集型有显著提升。

threading:适合 IO 密集型;Lock/Event 处理同步;代码简单但需小心共享状态。

multiprocessing:适合 CPU 密集型;真正并行利用多核;进程间通信有开销,只能传 pickle 对象。

concurrent.futuresThreadPoolExecutorProcessPoolExecutor 提供统一接口;推荐优先使用,比直接用 threading/multiprocessing 更简洁。

选择原则:IO 密集 → 多线程;CPU 密集 → 多进程;海量连接 → asyncio(下一讲)。


📚 推荐教材

《Python 编程从入门到实践(第 3 版)》 | 《流畅的 Python(第 2 版)》 | 《CPython 设计与实现》

学习路线: 零基础 → 《从入门到实践》 → 《流畅的 Python》 → 本门课程 → 《CPython 设计与实现》


🎓 加入《流畅的 Python》直播共读营

学到这里,如果你想系统吃透这本书——欢迎加入我的直播共读课。

  • 每周直播精讲,逐章拆解核心知识点
  • 专属学习群,随时答疑交流
  • 试运营特惠:499 元299 元

👉 【立即报名《流畅的 Python》共读课】https://mp.weixin.qq.com/s/ivHJwn1nNx5ug4TFrapvGg

🔗 课程导航

上一讲:类型提示 | 下一讲:异步编程


💬 联系我

平台账号/链接
微信扫码加好友
B 站Python 自动化办公社区

主营业务:AI 编程培训、企业内训、技术咨询

🎓 AI 编程实战课程

想系统学习 AI 编程?程序员晚枫的 AI 编程实战课 帮你从零上手!

fluent-python.png