大家好,我是正在实战各种 AI 项目的程序员晚枫。

CPython 如何实现多线程?线程状态如何管理?线程间如何通信?这一讲,结合 GIL 深入理解 Python 的多线程机制。


📖 开篇:Python 线程不是操作系统的线程

Python 有自己的线程概念——threading 模块:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import threading
import time

def worker(n):
print(f'线程 {n} 开始')
time.sleep(1)
print(f'线程 {n} 结束')

threads = [threading.Thread(target=worker, args=(i,)) for i in range(3)]
for t in threads:
t.start() # 启动线程
for t in threads:
t.join() # 等待线程结束

print('全部完成')

Python 的 threading.Thread 在 CPython 中底层是操作系统的原生线程(pthread 或 Windows threads),但受 GIL 限制。


🧵 PyThreadState(线程状态)

每个 Python 线程都有一个 PyThreadState 对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Include/pystate.h
typedef struct _ts {
// 链表指针(所有线程状态连在一起)
struct _ts *prev;
struct _ts *next;

// 所属解释器
PyInterpreterState *interp;

// 栈帧(当前正在执行的函数)
struct _frame *frame;

// 递归深度(用于检测递归溢出)
int recursion_depth;

// 异常状态(每个线程有自己的异常状态)
PyObject *exc_type;
PyObject *exc_value;
PyObject *exc_traceback;

// 线程 ID(操作系统分配的)
unsigned long thread_id;

// GIL 相关
char held; // 是否持有 GIL
} PyThreadState;

线程状态链表

1
2
3
4
5
interp

all_tstate链表: [tstate_0] <-> [tstate_1] <-> [tstate_2] <-> ...
↓ ↓ ↓
frame_a frame_b frame_c

🔄 线程调度

GIL + 线程调度 = 协作式调度

1
2
3
4
5
6
7
线程 A ──> 执行字节码(持有 GIL)
↓ (15ms 或 IO)
释放 GIL ───────────────────┐

线程 B ──> 获取 GIL ──> 执行字节码 ──> 释放 GIL
↓ (IO 或时间片耗尽)
等待 GIL <───────────────────┘

线程切换的实际过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import threading

counter = 0

def increment():
global counter
for _ in range(10**6):
counter += 1 # 这不是原子操作!

# 问题:两个线程同时 +=,会丢失更新!
t1 = threading.Thread(target=increment)
t2 = threading.Thread(target=increment)
t1.start(); t2.start()
t1.join(); t2.join()
print(counter) # 很可能小于 2000000!

原因:counter += 1 其实是 4 条字节码指令,两线程交错执行时会丢失中间结果。


🛡️ 线程同步原语

Lock(互斥锁)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import threading

lock = threading.Lock()
counter = 0

def safe_increment():
global counter
for _ in range(10**6):
with lock: # 获取锁
counter += 1

t1 = threading.Thread(target=safe_increment)
t2 = threading.Thread(target=safe_increment)
t1.start(); t2.start()
t1.join(); t2.join()
print(counter) # 正确:2000000

RLock(可重入锁)

同一个线程可以多次获取同一把锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
lock = threading.Lock()

def outer():
with lock:
inner() # 会再次尝试获取锁

def inner():
with lock:
print('OK')

# lock() 会死锁!因为 outer() 持有锁后又尝试获取
# RLock() 不会!因为记录了持有锁的线程
rlock = threading.RLock()

Semaphore(信号量)

控制同时访问的线程数量:

1
2
3
4
5
6
7
8
9
# 限制最多 5 个线程同时访问
semaphore = threading.Semaphore(5)

def access_resource():
with semaphore:
time.sleep(0.1) # 模拟资源访问

threads = [threading.Thread(target=access_resource) for _ in range(20)]
# 只有 5 个线程同时执行,其他等待

📊 线程间通信

Queue(线程安全队列)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import threading
import queue

q = queue.Queue()

def producer():
for i in range(10):
q.put(i)
print(f'生产: {i}')

def consumer():
while True:
item = q.get()
print(f'消费: {item}')
q.task_done()

# Queue 是线程安全的,不需要额外加锁
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer, daemon=True)
producer_thread.start()
consumer_thread.start()
producer_thread.join()

Event(事件)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import threading
import time

event = threading.Event()

def waiter():
print('等待中...')
event.wait() # 阻塞
print('收到信号!')

def setter():
time.sleep(2)
event.set() # 发送信号

t1 = threading.Thread(target=waiter)
t2 = threading.Thread(target=setter)
t1.start(); t2.start()
t1.join(); t2.join()

Condition(条件变量)

用于复杂的线程协调:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import threading

buffer = []
MAX_SIZE = 10
condition = threading.Condition()

def producer():
for i in range(20):
with condition:
while len(buffer) >= MAX_SIZE:
condition.wait()
buffer.append(i)
condition.notify() # 通知消费者

def consumer():
while True:
with condition:
while len(buffer) == 0:
condition.wait()
item = buffer.pop(0)
condition.notify() # 通知生产者
print(f'消费: {item}')

⚠️ 常见陷阱

陷阱1:守护线程

1
2
3
4
# 守护线程在主线程结束时会被强制终止
d = threading.Thread(target=background_task, daemon=True)
d.start()
# 主线程结束 -> 守护线程被强制杀掉!

陷阱2:死锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 循环等待:线程 A 等锁 B,线程 B 等锁 A
lock_a = threading.Lock()
lock_b = threading.Lock()

def task1():
with lock_a:
time.sleep(0.1)
with lock_b:
print('task1 done')

def task2():
with lock_b:
time.sleep(0.1)
with lock_a:
print('task2 done')

# task1() 和 task2() 同时运行 -> 死锁!

💡 本节作业

  1. 验证 counter += 1 在多线程下的丢失更新问题
  2. 用 Lock 修复上述问题
  3. 用 Queue 实现一个生产者-消费者模式

🎯 本讲总结

PyThreadState:每个 Python 线程的状态对象,包含栈帧、异常状态、GIL 持有状态。

线程调度:GIL 释放 -> 调度器选择 -> GIL 获取 -> 执行,协作式调度。

同步原语:Lock(互斥)、RLock(可重入)、Semaphore(计数信号量)。

线程通信:Queue(队列)、Event(事件)、Condition(条件变量)。


📚 推荐教材

《Python 编程从入门到实践(第 3 版)》 | 《流畅的 Python(第 2 版)》 | 《CPython 设计与实现》


🔗 课程导航

上一讲:GIL 全局解释器锁 | 下一讲:模块导入系统


💬 联系我

平台账号/链接
微信扫码加好友
B 站Python 自动化办公社区

主营业务:AI 编程培训、企业内训、技术咨询

🎓 AI 编程实战课程

想系统学习 AI 编程?程序员晚枫的 AI 编程实战课 帮你从零上手!