Python 并发安全的计数器实现

4次阅读

threading.Lock是最直接的解法,因其能强制串行化临界区以解决GIL下复合操作非原子性问题;需共享锁实例、用with语法、细粒度加锁,且asyncio中须换用asyncio.Lock。

Python 并发安全的计数器实现

为什么 threading.Lock 是最直接的解法

python 的全局解释器锁(GIL)不能保证复合操作的原子性,比如 counter += 1 实际拆成读、加、写三步,线程下极易丢更新。用 threading.Lock 能强制串行化临界区,简单可靠。

实操建议:

  • 对象必须是全局或共享实例,不能在每次调用里新建 threading.Lock()
  • 务必用 with lock: 语法,避免忘记 release() 导致死锁
  • 锁粒度要细——只包住真正需要保护的语句,比如只包 self._value += 1,别把日志或网络请求也塞进去

示例:

import threading <p>class Counter: def <strong>init</strong>(self): self._value = 0 self._lock = threading.Lock()</p><pre class="brush:php;toolbar:false;">def inc(self):     with self._lock:  # ✅ 正确:自动 acquire/release         self._value += 1     return self._value

立即学习Python免费学习笔记(深入)”;

asyncio 场景下不能用 threading.Lock

协程不是线程,threading.Lockasync 函数里会阻塞整个事件循环,导致并发退化为串行。必须换用 asyncio.Lock

常见错误现象:

  • 用了 threading.Lock 包裹 await 调用,程序卡住或报 RuntimeError: await wasn't used with future
  • 多个协程同时等待同一个 threading.Lock,实际变成排队执行,吞吐量骤降

实操建议:

  • asyncio.Lock 必须在协程内用 async with lock:,不能和 threading.Lock 混用
  • 初始化时要确保在事件循环已启动的上下文中(比如不能在模块顶层直接 asyncio.Lock()
  • 如果计数器还要被同步代码访问,就得拆成两套逻辑,或统一用线程安全基类 + 额外同步机制

AtomicInteger 类不存在,别被 Java 习惯带偏

Python 标准库没有类似 Java 的 AtomicInteger,也没有内置原子加法指令。有人试图用 ctypes_thread 模拟,结果往往更慢、更难 debug,还破坏可移植性。

为什么不用这些“高级”方案:

  • ctypes.c_longthreading.Lock 包裹,性能反而比纯 Python 对象差(额外类型转换开销)
  • _thread.atomic_add 是假的——CPython 内部没有暴露该接口,第三方包如 atomic 实际仍是基于锁封装
  • numpynp.add.atthreading.local 不解决跨线程共享计数问题,只是转移了竞争点

结论:老实用 threading.Lockasyncio.Lock,清晰、可测、无隐藏陷阱。

测试并发安全不能只靠 time.sleep()

手写 for 循环起一线程,再加 time.sleep() 模拟调度,根本压不出竞态。真实问题往往在高并发短任务下才暴露,比如 1000 线程抢 10 次 increment。

有效验证方式:

  • concurrent.futures.ThreadPoolExecutor 提交大量小任务,比如 500 线程各执行 100 次 inc(),最后检查总数是否等于 50000
  • 配合 threading.active_count()Logging.debug 打点,确认锁确实被争用(而非单线程跑完)
  • 在 CI 中重复运行 100 次,失败一次就红,别信“本地试了几次都对”

容易被忽略的是:测试环境 CPU 核心数、Python 版本(3.12 引入 per-interpreter GIL)、甚至是否开了 faulthandler 都会影响竞态复现概率。

text=ZqhQzanResources