c++怎么使用mqtt协议通信_c++ paho-mqtt库异步发布与订阅【方法】

19次阅读

c++调用paho-mqtt-cpp异步发布需使用mqtt::async_client,连接成功后调用publish()立即返回,须预先设置connection_lost_callback和delivery_complete_callback,发布时传mqtt::message对象且主题为std::String

c++怎么使用mqtt协议通信_c++ paho-mqtt库异步发布与订阅【方法】

如何用 C++ 调用 paho-mqtt-cpp 实现异步发布

直接使用 mqtt::async_client 类,它默认走异步路径;同步发布(publish())会阻塞,而异步必须配合 delivery_complete_callbackconnection_lost_callback 才能真正“不卡线程”。

关键点:异步发布调用 publish() 后立即返回,消息实际发送由底层线程池处理;但你必须在连接成功后才调用,否则抛 std::runtime_error(错误信息含 "not connected")。

  • 连接前先设置回调:cli.set_connected_callback()cli.set_connection_lost_callback()
  • 发布时必须传 mqtt::message 对象,不能只传字符串;主题名需为 std::string,不能是 C 风格字符串字面量(如 "topic" 可以,但 "topic" 会截断)
  • 若需要确认送达,要实现 delivery_complete_callback 并注册,否则无法知道 QoS > 0 消息是否被 broker 确认
mqtt::async_client cli("tcp://localhost:1883", "cpp_client"); cli.set_connection_lost_callback([](const std::string& cause) {     std::cout << "Connection lost: " << cause << std::endl; }); cli.connect()->wait(); // wait() 是阻塞的,但 connect() 返回 future,可改为 async + then auto tok = cli.publish("sensor/temp", "23.5", 1, false); tok->wait(); // 这里 wait() 是等发布动作提交完成,不是等 broker 确认

为什么订阅后收不到消息?检查 callback 注册和事件循环

paho-mqtt-cpp 的异步订阅本身不自动触发回调——它只把消息推到内部队列,你必须显式调用 cli.start_consuming() 启动消费线程,或手动轮询 cli.consume_message()。没这一步,message_arrived_callback 根本不会被调用。

常见错误是:调用了 subscribe(),也注册了 set_message_arrived_callback(),但忘了启动消费机制,结果消息静默丢失。

立即学习C++免费学习笔记(深入)”;

  • start_consuming() 启动后台线程,适合长期运行程序;但它不能中途停,只能靠 stop_consuming()(注意:不是所有版本都支持 stop)
  • 若需精细控制,改用 consume_message(std::chrono::milliseconds) 在主循环中非阻塞拉取,超时返回空指针
  • callback 函数签名必须严格为 void(const mqtt::message&),捕获异常会导致整个消费线程崩溃(建议加 try/catch
cli.set_message_arrived_callback([](const mqtt::message& msg) {     try {         std::cout << "Received: " << msg.get_payload_str() << " on " << msg.get_topic() << std::endl;     } catch (...) { /* 防止异常逃逸 */ } }); cli.subscribe("sensor/#", 1)->wait(); cli.start_consuming(); // 必须有!

QoS 1/2 发布失败却没报错?看 delivery_complete_callback 是否被触发

异步发布下,publish() 成功只表示消息已入发送队列,不代表 broker 已接收。QoS 1 或 2 的消息只有收到 PUBACK/PUBREC 才算真正送达,这个状态通过 delivery_complete_callback 通知。

如果你没设这个回调,或者回调里没做日志/计数,就完全不知道消息是否落地——尤其在网络抖动时,broker 可能丢弃未确认消息,而客户端毫无感知。

  • 回调参数是 const mqtt::idelivery_Token&,调用 token.get_message_id() 可关联原始消息
  • 同一个 token 可能被多次回调(比如重传),所以不要在回调里释放资源,除非确认 token.is_complete()
  • 如果 broker 主动断连,未确认的消息会进入 offline_buffer(默认大小 1000),超出后新消息直接丢弃,且不报错
cli.set_delivery_complete_callback([](mqtt::idelivery_token_ptr tok) {     if (tok && tok->is_complete()) {         std::cout << "Message delivered, ID=" << tok->get_message_id() << std::endl;     } });

链接断开后自动重连失效?别只依赖 set_automatic_reconnect()

set_automatic_reconnect() 只控制 TCP 层重连行为,它不会自动恢复订阅(subscribe)、也不会重新绑定 callback。断连重连后,你得手动补订主题,否则消息收不到。

更隐蔽的问题是:重连成功触发 connected_callback,但此时 client 内部状态可能还没完全就绪(比如 session 还没重建),立刻 subscribe() 可能失败并抛异常。

  • 务必在 connected_callback 里再次调用 subscribe(),且对返回的 itoken 调用 wait() 或监听其完成
  • 避免在重连回调里做耗时操作(如文件读写),否则阻塞重连流程
  • 测试时用 mosquitto_sub -t 'test' -d 手动模拟 broker 断连,比单纯 kill 进程更能暴露状态不同步问题

自动重连不是银弹——它解决不了会话丢失、消息重复、订阅漂移这些 MQTT 协议层问题。

text=ZqhQzanResources