大家好,我是正在实战各种 AI 项目的程序员晚枫。
Python 的并发编程,常被人误解。"有 GIL 所以多线程没用"——这是一个危险的过度简化。真正的问题是:你的任务是 CPU 密集型还是 IO 密集型?搞清楚这个,才能选对工具。
🧩 理解 GIL:真正的含义 GIL 是什么? GIL(Global Interpreter Lock,全局解释器锁)是 CPython 解释器中的一把互斥锁,它确保同一时刻只有一个线程在执行 Python 字节码 。
1 2 3 线程 1 ─────[获取GIL]──执行字节码──[释放GIL]──────────────── 线程 2 ────────────────────────────[获取GIL]──执行字节码──... 线程 3 ─────────────────────────────────────────[等待GIL]───
GIL 什么时候释放? 关键点:GIL 并不是永远锁住的——
1 2 3 4 5 6 7 8 9 import sysprint (sys.getswitchinterval())
结论 :
IO 密集型任务 → GIL 在等待 IO 时主动释放 → 多线程有用 CPU 密集型任务 → 纯计算,GIL 不释放 → 多线程没用 ,用多进程 🧵 threading:IO 密集型的利器 基础用法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import threadingimport timeimport requestsdef download (url: str , results: list , index: int ) -> None : """模拟下载任务""" print (f"[Thread-{index} ] 开始下载 {url} " ) time.sleep(1 ) results[index] = f"Downloaded: {url} " print (f"[Thread-{index} ] 完成" ) urls = [f"https://example.com/{i} " for i in range (5 )] results = [None ] * len (urls) start = time.time() for i, url in enumerate (urls): download(url, results, i) print (f"顺序执行耗时:{time.time() - start:.2 f} s" ) start = time.time() threads = [] for i, url in enumerate (urls): t = threading.Thread(target=download, args=(url, results, i)) threads.append(t) t.start() for t in threads: t.join() print (f"并发执行耗时:{time.time() - start:.2 f} s" )
线程同步:Lock 和 Event 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 import threadingcounter = 0 lock = threading.Lock() def increment (): global counter for _ in range (10000 ): with lock: counter += 1 threads = [threading.Thread(target=increment) for _ in range (5 )] for t in threads: t.start() for t in threads: t.join() print (counter) ready = threading.Event() def worker (): print ("Worker 等待信号..." ) ready.wait() print ("Worker 收到信号,开始工作" ) t = threading.Thread(target=worker) t.start() time.sleep(1 ) ready.set () t.join()
🔀 multiprocessing:CPU 密集型的救星 真正的并行 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import multiprocessingimport timeimport mathdef cpu_task (n: int ) -> float : """CPU 密集型任务:计算 n 的阶乘""" result = 1 for i in range (1 , n + 1 ): result *= i return math.log(result) numbers = [50000 ] * 8 start = time.time() results = [cpu_task(n) for n in numbers] print (f"单进程耗时:{time.time() - start:.2 f} s" )start = time.time() with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool: results = pool.map (cpu_task, numbers) print (f"多进程耗时:{time.time() - start:.2 f} s" )print (f"CPU 核数:{multiprocessing.cpu_count()} " )
进程间通信 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import multiprocessing as mpdef producer (queue: mp.Queue, items: list ) -> None : for item in items: queue.put(item) queue.put(None ) def consumer (queue: mp.Queue, results: list ) -> None : while True : item = queue.get() if item is None : break results.append(item * 2 ) queue = mp.Queue() manager = mp.Manager() results = manager.list () p1 = mp.Process(target=producer, args=(queue, [1 , 2 , 3 , 4 , 5 ])) p2 = mp.Process(target=consumer, args=(queue, results)) p1.start(); p2.start() p1.join(); p2.join() print (list (results))
⚡ concurrent.futures:现代并发的首选 concurrent.futures 是 Python 标准库提供的高级并发接口,推荐优先使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completedimport timedef io_task (n: int ) -> str : time.sleep(0.5 ) return f"IO result {n} " def cpu_task (n: int ) -> int : return sum (i ** 2 for i in range (n)) with ThreadPoolExecutor(max_workers=5 ) as executor: future = executor.submit(io_task, 1 ) print (future.result()) results = list (executor.map (io_task, range (10 ))) futures = {executor.submit(io_task, i): i for i in range (5 )} for future in as_completed(futures): n = futures[future] try : result = future.result() print (f"任务 {n} 完成:{result} " ) except Exception as e: print (f"任务 {n} 失败:{e} " ) with ProcessPoolExecutor() as executor: results = list (executor.map (cpu_task, [100000 , 200000 , 300000 ])) print (results)
Future 对象 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 from concurrent.futures import ThreadPoolExecutorimport timedef slow_task (n: int ) -> int : time.sleep(n) return n * 2 with ThreadPoolExecutor(max_workers=3 ) as executor: future = executor.submit(slow_task, 2 ) print (future.done()) print (future.running()) result = future.result(timeout=5 ) print (result) print (future.done())
📊 性能对比 实际测试中,不同场景的加速效果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 import timefrom concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutordef io_bound (n ): time.sleep(0.1 ) return n def cpu_bound (n ): return sum (i**2 for i in range (n)) N = 20 start = time.time() results = [io_bound(i) for i in range (N)] print (f"IO 顺序: {time.time()-start:.2 f} s" ) start = time.time() with ThreadPoolExecutor(max_workers=N) as ex: results = list (ex.map (io_bound, range (N))) print (f"IO 多线程: {time.time()-start:.2 f} s" ) tasks = [100_000 ] * N start = time.time() results = [cpu_bound(t) for t in tasks] print (f"CPU 顺序: {time.time()-start:.2 f} s" )start = time.time() with ThreadPoolExecutor(max_workers=N) as ex: results = list (ex.map (cpu_bound, tasks)) print (f"CPU 多线程: {time.time()-start:.2 f} s" ) start = time.time() with ProcessPoolExecutor() as ex: results = list (ex.map (cpu_bound, tasks)) print (f"CPU 多进程: {time.time()-start:.2 f} s" )
🗺️ 选择指南 场景 推荐方案 原因 网络请求、文件 IO ThreadPoolExecutorGIL 在 IO 时释放,线程开销小 大量网络连接(万级) asyncio协程开销极小,见第18讲 CPU 密集计算 ProcessPoolExecutor绕过 GIL,真正利用多核 数值计算 NumPy + 多进程 NumPy 内部释放 GIL 混合型任务 线程池 + 进程池组合 分层架构
⚠️ 常见陷阱 1. 多进程中 if name == 'main ' 不可省略 1 2 3 4 if __name__ == '__main__' : with ProcessPoolExecutor() as executor: results = list (executor.map (cpu_task, range (10 )))
2. 不要在线程间共享可变状态(除非有锁) 1 2 3 4 5 6 7 8 9 10 11 12 shared_list = [] def unsafe_append (item ): shared_list.append(item) from queue import Queueresult_queue = Queue() def safe_append (item ): result_queue.put(item)
3. 进程间通信只能传递可 pickle 的对象 1 2 3 4 5 6 7 8 9 10 with ProcessPoolExecutor() as ex: results = list (ex.map (lambda x: x**2 , range (10 ))) def square (x ): return x ** 2 with ProcessPoolExecutor() as ex: results = list (ex.map (square, range (10 )))
🎯 本讲总结 GIL 的真相 :只限制 CPU 密集型多线程;IO 等待时主动释放,多线程对 IO 密集型有显著提升。
threading :适合 IO 密集型;Lock/Event 处理同步;代码简单但需小心共享状态。
multiprocessing :适合 CPU 密集型;真正并行利用多核;进程间通信有开销,只能传 pickle 对象。
concurrent.futures :ThreadPoolExecutor 和 ProcessPoolExecutor 提供统一接口;推荐优先使用,比直接用 threading/multiprocessing 更简洁。
选择原则 :IO 密集 → 多线程;CPU 密集 → 多进程;海量连接 → asyncio(下一讲)。
📚 推荐教材 《Python 编程从入门到实践(第 3 版)》 | 《流畅的 Python(第 2 版)》 | 《CPython 设计与实现》
学习路线: 零基础 → 《从入门到实践》 → 《流畅的 Python》 → 本门课程 → 《CPython 设计与实现》
🎓 加入《流畅的 Python》直播共读营 学到这里,如果你想系统吃透这本书——欢迎加入我的直播共读课。
每周直播精讲,逐章拆解核心知识点 专属学习群,随时答疑交流 试运营特惠:499 元 → 299 元 👉 【立即报名《流畅的 Python》共读课】 :https://mp.weixin.qq.com/s/ivHJwn1nNx5ug4TFrapvGg
🔗 课程导航 ← 上一讲:类型提示 | 下一讲:异步编程 →
💬 联系我 主营业务 :AI 编程培训、企业内训、技术咨询
🎓 AI 编程实战课程 想系统学习 AI 编程?程序员晚枫的 AI 编程实战课 帮你从零上手!