如何使用c++和gRPC实现流式(streaming) RPC? (双向通信)

15次阅读

grpc双向流(Bidi streaming)支持客户端与服务器在单个RPC中同时异步收发多条消息,需在.proto中声明双stream,用async c++ API配合CompletionQueue手动管理读写协程及回调,漏调Read()或Write()将导致流停滞。

如何使用c++和gRPC实现流式(streaming) RPC? (双向通信)

什么是 gRPC 双向流(Bidi Streaming)

gRPC 双向流 RPC 允许客户端和服务器在同一个 RPC 调用中**同时发送和接收多个消息**,连接保持打开,直到任意一方关闭。它对应 Protocol Buffer 中的 stream 关键字出现在请求和响应两侧:rpc Chat(stream ChatMessage) returns (stream ChatMessage);。这不是“先发后收”或“发一次收多次”,而是真正的并发读写——你得自己管理读/写协程或回调,否则容易卡死。

如何定义并生成双向流接口

关键在 .proto 文件里声明双 stream,且必须用 async C++ API(同步 API 不支持双向流)。生成代码时需启用 C++ 插件,并确保 grpc_cpp_plugin 版本与 libgrpc 匹配,否则 AsyncReaderWriter 类型可能缺失或签名不一致。

  • 定义示例:
    service ChatService {   rpc Chat(stream ChatMessage) returns (stream ChatMessage); } message ChatMessage {   string content = 1;   int64 timestamp = 2; }
  • 生成命令要包含 --grpc_out--plugin=protoc-gen-grpc=.../grpc_cpp_plugin
  • 生成后你会得到 ChatService::AsyncService 和客户端 stub 的 AsyncChat 方法,返回类型是 std::unique_ptr<:clientasyncreaderwriter chatmessage>>

客户端如何发起并驱动双向流

客户端必须显式启动读写循环:先调用 Write() 发送首条消息(或空消息触发流建立),再立即调用 Read() 启动异步接收;后续靠 OnReadDone()OnWriteDone() 回调交替推进。漏掉任一 Read() 或重复 Write() 而没等完成,都会导致流挂起或崩溃。

  • 必须用 CompletionQueue 驱动异步事件,不能混用多个队列处理同一 stream
  • Write() 第二个参数是 void* 标签,用于区分不同操作;建议用 this 或枚举值,避免裸指针误释放
  • 调用 WritesDone() 表示客户端不再发消息,但服务端仍可继续发;之后收到 status.ok() == true 且无更多 Read() 时才算真正结束
  • 常见错误:Read() 后没再调用下一次 Read() → 流停滞;Write() 前未检查 write_ok_ → 写入失败被忽略

服务端如何响应并维持双向流

服务端用 AsyncServiceRequestChat() 接收新流,拿到 ServerAsyncReaderWriter 对象后,必须立刻调用其 Read() 启动接收,再根据业务逻辑决定何时 Write()。注意:所有 Read()/Write() 都是异步且需配对回调,不能在回调里直接阻塞等待另一端。

立即学习C++免费学习笔记(深入)”;

  • 不要在 OnReadDone() 回调里直接调用 Write() 并期望立刻发出——必须再次 Write() + Next() 才生效
  • 若需广播消息(如聊天室),要把 ServerAsyncReaderWriter 指针安全存入容器,但注意生命周期:客户端断连时该对象会被销毁,需监听 Finish() 状态
  • 性能陷阱:频繁小包 Write() 会放大 TCP 小包问题,可考虑缓冲+定时 flush,但需权衡延迟
  • 调试提示:抓包看 http/2 DATA 帧是否双向持续流动;若只有 client→server 有数据,大概率是 server 忘了 Read()Write() 没触发完成队列

双向流的本质不是“自动管道”,而是由你手动拼接的一组异步读写原语——漏掉一次 Read(),整个流就静默了。最易错的永远是“以为发完就完了”,其实只是刚开了个门。

text=ZqhQzanResources