c++如何使用zeromq消息队列_c++ 发布订阅模式与请求响应模式【实战】

13次阅读

ZeroMQ的PUB/SUB与REQ/REP是两种根本不同的通信模式:前者单向广播、无连接、需显式订阅;后者双向配对、强顺序、send/recv必须严格交替,socket类型不可混用或复用。

c++如何使用zeromq消息队列_c++ 发布订阅模式与请求响应模式【实战】

ZeroMQ 的 c++ 发布订阅(PUB/SUB)和请求响应(REQ/REP)模式,本质是两种完全不同的通信契约:前者是单向广播、无连接、无确认;后者是双向配对、隐式同步、强顺序。选错模式或混用语义,90% 的“收不到消息”“卡死”“乱序”问题都源于此。

zmq::socket_t 类型必须严格匹配模式,且不能复用

ZeroMQ 不允许一个 socket 同时承担多种角色。比如用 ZMQ_REQ socket 去 connect 一个 ZMQ_PUB 端口,程序不会报错,但永远收不到任何数据——因为协议层根本不兼容。

  • ZMQ_REQ 只能 connect 到 ZMQ_REP(或 ZMQ_ROUTER),且每次 send 后必须紧跟 recv,否则下一次 send 会阻塞
  • ZMQ_PUB 只能 bind(不建议 connect),ZMQ_SUB 只能 connect(不建议 bind),且 ZMQ_SUB 必须调用 setsockopt(ZMQ_SUBSCRIBE, ...) 才能收到消息,空订阅("")在较新版本中默认不生效
  • 一个 zmq::context_t 可创建多个 socket,但每个 socket 的类型和行为由构造时第二个参数唯一决定,不可运行时变更

发布订阅模式中,zmq_setsockopt(..., ZMQ_SUBSCRIBE, ...) 是必填项

这是新手最常踩的坑:SUB 端代码写了 connect,也写了 recv,但始终收不到 PUB 发出的消息。根本原因就是漏了订阅动作——ZeroMQ 的 SUB socket 默认屏蔽所有消息,哪怕 PUB 已经在发。

  • 订阅空字符串subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0); 表示接收所有主题(注意长度传 0,不是 1)
  • 订阅特定前缀:std::String topic = "stock.AAPL"; subscriber.setsockopt(ZMQ_SUBSCRIBE, topic.c_str(), topic.size());
  • 可多次调用 setsockopt(ZMQ_SUBSCRIBE, ...) 实现多主题订阅,但不支持通配符(如 "stock.*")——需靠应用层过滤
  • 取消订阅用 setsockopt(ZMQ_UNSUBSCRIBE, ...),但实际中极少需要

请求响应模式里,sendrecv 必须严格交替,且不能跳过响应

ZMQ_REQ socket 内部维护一个“请求-响应”状态机。一旦发出请求未收到回复,socket 就会进入不可用状态,后续 send 直接返回 false 或抛异常(取决于 cppzmq 版本)。

立即学习C++免费学习笔记(深入)”;

  • 服务端 ZMQ_REP 也必须先 recvsend,否则客户端会一直等待
  • 不要试图在一个 REQ socket 上并发发多个请求——它不支持异步 I/O;如需并发,请用多个 socket 或改用 ZMQ_DEALER + ZMQ_ROUTER
  • 超时控制推荐用 setsockopt(ZMQ_RCVTIMEO, &timeout_ms, sizeof(timeout_ms)),避免无限阻塞;但注意:ZMQ_SNDTIMEO 对 REQ socket 无效
#include  #include  #include   // 客户端:REQ 模式(务必配对 recv) int main() {     zmq::context_t ctx;     zmq::socket_t sock(ctx, ZMQ_REQ);     sock.connect("tcp://localhost:5555");      // 设置接收超时,防止永久卡住     int timeout_ms = 3000;     sock.setsockopt(ZMQ_RCVTIMEO, &timeout_ms, sizeof(timeout_ms));      zmq::message_t req(5);     memcpy(req.data(), "HELLO", 5);     sock.send(req);      zmq::message_t reply;     if (sock.recv(reply) == 0) {  // 返回 0 表示成功         std::cout << "got: " << std::string(static_cast(reply.data()), reply.size()) << "n";     } else {         std::cerr << "Timeout or errorn";     } }

PUB/SUB 的连接时机与消息丢失风险必须手动管理

PUB 端 bind 后立刻开始发消息,但 SUB 端 connect 是异步建立的,中间存在“连接窗口期”。若 PUB 在 SUB 还没连上来就已发消息,这些消息将永久丢失——ZeroMQ 默认不缓存(不像 kafka)。这不是 bug,是设计选择。

  • 解决方案一(简单):PUB 启动后 sleep 几百毫秒,再开始发;或让 SUB 先启动、等几秒再启 PUB
  • 解决方案二(健壮):引入一个“握手” REQ/REP socket,SUB 连上后发个注册请求,PUB 收到才开始广播
  • 解决方案三(高级):用 ZMQ_XPUB + ZMQ_XSUB 设备做代理,并启用 ZMQ_XPUB_VERBOSE 监听订阅事件,实现动态流控
  • 注意:ZMQ_CONFLATE 可开启“只保留最新消息”,但仅对单个订阅者有效,且不解决初始连接丢失问题

真正难的不是写通代码,而是理解 ZeroMQ 每种 socket 类型背后的状态机和契约约束。REQ/REP 要求严格配对,PUB/SUB 要求显式订阅+容忍丢消息——把它们当普通 TCP socket 用,一定会掉进坑里。

text=ZqhQzanResources