Python多线程死锁示例_死锁产生条件解析

3次阅读

python线程死锁源于多个线程循环等待对方持有的锁,核心是锁获取顺序不一致;需同时满足互斥、占有并等待、不可剥夺、循环等待四条件;避免方法包括统一加锁顺序和设置超时机制。

Python多线程死锁示例_死锁产生条件解析

Python多线程死锁通常发生在多个线程互相等待对方持有的锁,且都不释放,导致所有相关线程永久阻塞。关键不在于“用了多线程”,而在于锁的获取顺序不一致、未按约定顺序加锁,或持有锁期间又去争抢其他锁。

死锁产生的四个必要条件

理解这四点,才能有针对性地预防:

  • 互斥条件:锁是独占的,一个线程持有时,其他线程无法进入临界区
  • 占有并等待:线程已持有一个锁,同时又申请另一个锁(尚未释放前者)
  • 不可剥夺:锁不能被系统强制回收,只能由持有者主动释放
  • 循环等待:线程A等B的锁,B等C的锁,C又等A的锁,形成闭环

一个典型的Python死锁示例

以下代码模拟两个账户转账场景,两个线程分别尝试以相反顺序锁定两个账户锁:

import Threading import time <h1>两个账户和对应锁</h1><p>account_a = 1000 account_b = 1000 lock_a = threading.Lock() lock_b = threading.Lock()</p><p>def transfer_ab():</p><p><span>立即学习</span>“<a href="https://pan.quark.cn/s/00968c3c2c15" style="text-decoration: underline !important; color: blue; font-weight: bolder;" rel="nofollow" target="_blank">Python免费学习笔记(深入)</a>”;</p><div class="aritcle_card flexRow">                                                         <div class="artcardd flexRow">                                                                 <a class="aritcle_card_img" href="/ai/2101" title="Keeva AI"><img                                                                                 src="https://img.php.cn/upload/ai_manual/000/000/000/175680079922248.png" alt="Keeva AI"  onerror="this.onerror='';this.src='/static/lhimages/moren/morentu.png'" ></a>                                                                 <div class="aritcle_card_info flexColumn">                                                                         <a href="/ai/2101" title="Keeva AI">Keeva AI</a>                                                                         <p>AI一键生成数字人营销视频</p>                                                                 </div>                                                                 <a href="/ai/2101" title="Keeva AI" class="aritcle_card_btn flexRow flexcenter"><b></b><span>下载</span> </a>                                                         </div>                                                 </div><h1>先锁A,再锁B</h1><pre class='brush:python;toolbar:false;'>lock_a.acquire() print("Thread-1: locked A") time.sleep(0.1)  # 故意延迟,增加死锁概率 lock_b.acquire() print("Thread-1: locked B") # 实际转账逻辑(略) lock_b.release() lock_a.release()

def transfer_ba():

先锁B,再锁A → 与上面顺序相反!

lock_b.acquire() print("Thread-2: locked B") time.sleep(0.1) lock_a.acquire() print("Thread-2: locked A") # 实际转账逻辑(略) lock_a.release() lock_b.release()

t1 = threading.Thread(target=transfer_ab) t2 = threading.Thread(target=transfer_ba) t1.start() t2.start() t1.join() t2.join()

运行后很可能卡住:Thread-1 持有 lock_a 等 lock_b,Thread-2 持有 lock_b 等 lock_a —— 循环等待成立,死锁发生。

如何避免死锁

核心思路是打破上述任一必要条件,实践中最有效的是统一加锁顺序和使用超时机制:

  • 固定锁顺序:所有线程按相同规则(如对象ID、名称字典序)获取多个锁。例如始终先 lock_a 再 lock_b,哪怕业务逻辑看似“该先B”也强制调整
  • 使用 try_lock + 超时:调用 lock.acquire(timeout=2),失败则释放已持锁并重试或回退,避免无限等待
  • 用 RLock 替代 Lock?不解决根本问题:RLock 允许同一线程重复获取,但跨线程死锁依然会发生,不能替代顺序约束
  • 减少锁粒度 / 避免嵌套锁:把大锁拆成小锁,或通过无锁设计(如队列通信)绕过共享状态竞争

检测与调试小技巧

死锁不易复现,但可通过以下方式辅助定位:

  • 启用 threading.settrace() 或使用 threading.enumerate() 查看活跃线程状态(需配合日志判断是否长时间停在 acquire)
  • 在 acquire 前后打时间戳日志,观察某线程是否“进得去、出不来”
  • 使用 threading.Lock子类记录持有者和等待者(适合调试环境,勿用于生产)

text=ZqhQzanResources