C#怎么使用gRPC流式传输 gRPC双向流实现方法

5次阅读

grpc双向流是客户端和服务器通过一个长期连接同时发送和接收多个消息的全双工通信方式,适用于实时协作、聊天等场景;在.proto中需将rpc方法的请求和响应均声明为stream,c#中通过asyncduplexstreamingcall的requeststream和responsestream实现收发,需调用completeasync()关闭写入端并注意异常处理。

C#怎么使用gRPC流式传输 gRPC双向流实现方法

什么是gRPC双向流

gRPC双向流(Bidirectional Streaming)是指客户端和服务器可以同时发送和接收多个消息,连接保持打开状态,双方消息可交错收发。它适合实时协作、聊天、日志推送、iot设备控制等场景。和单向流(ClientStreaming/ServerStreaming)不同,双向流用一个长期连接完成全双工通信。

定义.proto文件支持双向流

.proto文件中,把RPC方法的请求和响应类型都声明为stream即可:

example.proto

syntax = "proto3";  package chat;  service ChatService {   // 双向流:客户端发多条Message,服务端也回多条Message   rpc Chat(stream Message) returns (stream Message); }  message Message {   string content = 1;   int64 timestamp = 2;   string sender = 3; }

生成C#代码后,会得到带IAsyncEnumerable<t></t>参数和返回值的异步方法。

C#客户端实现双向流调用

使用CallOptions不是必须的,重点是拿到AsyncDuplexStreamingCall<trequest tresponse></trequest>对象,它提供RequestStreamResponseStream两个管道:

  • RequestStream.WriteAsync()持续发消息(可多次)
  • ResponseStream.ReadAllAsync()或手动遍历IAsyncEnumerator接收服务端消息
  • 调用RequestStream.CompleteAsync()通知服务端“我不再发了”,但不影响继续收消息

示例片段:

var channel = GrpcChannel.ForAddress("https://localhost:5001"); var client = new ChatService.ChatServiceClient(channel);  using var call = client.Chat();  // 启动接收任务(后台持续读) _ = Task.Run(async () => {     await foreach (var msg in call.ResponseStream.ReadAllAsync())     {         Console.WriteLine($"[Server] {msg.content}");     } });  // 主线程发消息 await call.RequestStream.WriteAsync(new Message { Content = "Hello", Sender = "client" }); await call.RequestStream.WriteAsync(new Message { Content = "How are you?", Sender = "client" });  // 发送完毕,关闭写入端 await call.RequestStream.CompleteAsync();  // 等待服务端也结束(可选) await call.ResponseStream.Completion;

C#服务端实现双向流处理

服务端方法签名是Task,参数为IAsyncStreamReader<trequest></trequest>,返回值为IAsyncStreamWriter<tresponse></tresponse>

  • requestStream.MoveNextAsync()await foreach接收客户端消息
  • responseStream.WriteAsync()随时向客户端推送响应
  • 无需主动关闭流,gRPC框架会在对方断开或超时后自动清理

示例服务端逻辑:

public override async Task Chat(IAsyncStreamReader<Message> requestStream,                                 IServerStreamWriter<Message> responseStream,                                 ServerCallContext context) {     try     {         await foreach (var msg in requestStream.ReadAllAsync())         {             Console.WriteLine($"[Client] {msg.Content}");              // 回复一条(也可批量或按条件回复)             await responseStream.WriteAsync(new Message             {                 Content = $"Echo: {msg.Content}",                 Timestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds(),                 Sender = "server"             });         }     }     catch (OperationCanceledException)     {         // 客户端断开或取消     } }

基本上就这些。双向流不复杂但容易忽略CompleteAsync()和异常捕获,实际部署建议加上超时、重连、心跳保活逻辑。

text=ZqhQzanResources