Python 消息传递的 zeromq + pyzmq 实践

2次阅读

req-rep 卡死因严格轮转机制:客户端须发→收→发→收,服务端须收→发→收→发;错序或跳步致 recv/send 永久阻塞。

Python 消息传递的 zeromq + pyzmq 实践

REQ-REP 模式为什么一发一收就卡死?

因为 REQREP 套接字是严格配对、强制轮转的:客户端必须发→收→发→收,服务端必须收→发→收→发。任何一步跳过或顺序错乱,连接就会挂起,recv()send() 会永久阻塞(默认行为)。

  • 常见错误现象:socket.recv() 卡住不动,或者报 ZMQError: Operation cannot be accomplished in current state
  • 典型误操作:客户端连续调用两次 send(),没等响应就再发;服务端处理完一次请求后忘了 send(),直接又去 recv()
  • 实操建议:把 REQ 客户端逻辑写成“请求-等待响应”原子块,别拆开;服务端用 while True: 包裹 recv()→业务处理→send() 三步,缺一不可
  • 参数差异:不要设 socket.setsockopt(zmq.SNDTIMEO, 1000) 试图“超时跳过”,它只对发送生效,而卡死通常发生在 recv() —— 正确做法是用 zmq.POLLIN + socket.poll() 做非阻塞检测

用 PUB-SUB 时消息总收不到,是订阅没生效?

90% 是因为 connect()bind() 方向反了,或者订阅者启动太晚,错过了发布者已发出的“热消息”。ZeroMQ 的 SUB 默认不接收连接前发布的消息,且必须显式调用 setsockopt(zmq.SUBSCRIBE, b"") 才能收到所有主题(空字节表示订阅全部)。

  • 常见错误现象:发布者跑了好几轮,订阅者控制台一直空白;或者只收到部分消息
  • 实操建议:先启动订阅者,等几秒再启动发布者;务必在 connect() 后、循环收消息前加 socket.setsockopt(zmq.SUBSCRIBE, b"")
  • 注意点:PUB 端用 bind()SUB 端用 connect() —— 和 REQ/REP 相反;如果要按主题过滤,比如只收 b"logs.error",就得写 socket.setsockopt(zmq.SUBSCRIBE, b"logs.error"),不能漏掉 b"" 的默认兜底
  • 性能影响:SUB 套接字内部有消息缓冲区,但若消费者处理慢,旧消息会被丢弃(ZeroMQ 默认不保证投递),需要监控 zmq.EVENT_DISCONNECTED 或启用 zmq.CONFLATE 模式保最新一条

为什么 PUSH-PULL 在多进程下任务分发不均?

PUSH 套接字默认使用“公平队列(fair-queuing)”算法分发任务,但它依赖所有 PULL 端都处于 ready 状态。只要有一个 worker 进程卡住、崩溃或还没连上,PUSH 就会把任务持续压给已连接的活跃 worker,造成严重倾斜。

  • 常见错误现象:3 个 worker 进程,其中 2 个各跑 5 条任务,第 3 个跑 90 条;或者新启的 worker 一直收不到任务
  • 实操建议:worker 启动后,先向一个单独的 REQ-REP 服务注册“我好了”,再连接 PULL;或者改用 router-DEALER 模式自己控制路由逻辑
  • 关键参数:socket.setsockopt(zmq.SNDHWM, 1) 可限制 PUSH 端待发消息上限,配合 worker 主动 send(b"ready") 触发分发,比纯 PUSH-PULL 更可控
  • 兼容性提醒:windows 下进程间通信慎用 tcp://*,优先试 ipc:///tmp/zmq-pulllinux/macos)或 inproc://(同进程内)

pyzmq 安装后 import zmq 报错找不到 libzmq?

PyZMQ 从 23.x 版本起自带预编译的 libzmq,绝大多数情况 pip install pyzmq 就够了。报错通常出现在两类环境:一是用了旧版 pip(

立即学习Python免费学习笔记(深入)”;

  • 常见错误信息:ImportError: libzmq.so.5: cannot open shared Object filezmq/backend/cython/_proxy.pyx not found
  • 实操建议:先运行 pip install --upgrade pip,再重装 pip install pyzmq;如仍失败,用 pip install pyzmq --no-binary=pyzmq 强制源码安装(需提前装好 gcc / visual studio Build Tools)
  • 验证方法:装完后在 python 里执行 import zmq; print(zmq.zmq_version(), zmq.pyzmq_version()),两个版本号都正常打印才算成功
  • 小坑:某些 docker Alpine 镜像要用 apk add zeromq-devpip install pyzmq --no-binary=pyzmq,否则找不到头文件

事情说清了就结束。真正难的不是写通 REQ-REP,而是当 worker 挂了、网络抖动、消息积时,你的 poll() 超时设多少、linger 值怎么配、要不要加心跳——这些细节不在示例里,但在日志里。

text=ZqhQzanResources