Python死锁如何产生_锁顺序问题

6次阅读

锁顺序问题是python死锁最常见原因,指多个线程以不一致顺序获取同一组锁(如线程1先a后b、线程2先b后a),导致互相等待永久阻塞。

Python死锁如何产生_锁顺序问题

Python中死锁最常见原因之一是多个线程以不同顺序获取多个锁,导致互相等待——即“锁顺序问题”。只要两个或以上线程对同一组锁的加锁顺序不一致,就可能陷入永久阻塞。

什么是锁顺序问题

假设有两个锁 lock_Alock_B。线程1先获取 lock_A,再尝试获取 lock_B;而线程2恰好相反:先获取 lock_B,再尝试获取 lock_A。当线程1拿到 lock_A、线程2拿到 lock_B 后,两者都会卡在第二个锁上,谁也无法继续,形成死锁。

典型触发场景

  • 多个函数各自独立加锁,但未约定全局加锁顺序(例如:函数A按 A→B 加锁,函数B按 B→A 加锁)
  • 嵌套调用时,底层函数悄悄加了新锁,调用方 unaware,无意中改变了锁序
  • 使用可重入锁(RLock)误以为安全,但跨锁协作时仍需严格顺序
  • 分布式或模拟多资源场景(如转账:账户A扣款→账户B入账),资源顺序不统一

如何避免锁顺序问题

  • 固定全局锁顺序:对所有涉及的锁定义唯一编号或排序规则(如按锁对象 id()、变量名字符串排序),始终按升序(或降序)获取
  • 一次性申请所有锁:用 threading.Lockacquire(timeout=…) 配合循环重试,或使用 contextlib.ExitStack 管理多锁,但更推荐用 concurrent.futures.ThreadPoolExecutor 减少手动锁需求
  • 避免嵌套加锁:设计函数时明确是否持有锁;若必须嵌套,确保被调函数不引入新锁,或由最外层统一管理
  • 用超时机制兜底:给 lock.acquire(timeout=0.5) 设置合理超时,失败后释放已持锁并重试/报错,防止无限等待

一个可复现的死锁示例

以下代码会大概率触发死锁:

import threading import time <p>lock_a = threading.Lock() lock_b = threading.Lock()</p><p>def thread1(): with lock_a: time.sleep(0.1) with lock_b:  # 等待 lock_b print("Thread1 done")</p><p>def thread2(): with lock_b: time.sleep(0.1) with lock_a:  # 等待 lock_a print("Thread2 done")</p><p>t1 = threading.Thread(target=thread1) t2 = threading.Thread(target=thread2) t1.start(); t2.start() t1.join(); t2.join()</p>

运行后程序常卡住不动——因为 thread1 持有 lock_a 等 lock_b,thread2 持有 lock_b 等 lock_a,双方僵持。

立即学习Python免费学习笔记(深入)”;

修复思路很简单

强制统一顺序:比如规定所有地方都先 lock_a 再 lock_b。修改任一函数即可打破循环依赖:

def thread2_fixed():     with lock_a:  # 先 a         with lock_b:  # 再 b             print("Thread2 fixed")

或者更健壮地用排序逻辑自动保证顺序,适合锁数量多、来源动态的场景。

text=ZqhQzanResources