C# gRPC双向流方法 C#如何实现客户端和服务器的实时双向通信

2次阅读

grpc双向流在C#中对应方法签名是public async Task Bidirectionalstreaming(IAsyncEnumerable requestStream, IServerStreamWriter responseStream, ServerCallContext context);客户端必须用CallAsync()调用,且proto需声明rpc Chat(stream Message) returns (stream Message)。

C# gRPC双向流方法 C#如何实现客户端和服务器的实时双向通信

gRPC 双向流在 C# 中对应什么方法签名

双向流(Bidi Streaming)在 C# gRPC 中必须使用 IAsyncEnumerable 作为参数,返回 IAsyncEnumerable。不能用普通 Task 或单次 ValueTask,否则编译通过但运行时会报错 Status(StatusCode=Unimplemented, Detail="Method not found")

服务端方法签名必须是 public async Task BidirectionalStreaming(IAsyncEnumerable requestStream, IServerStreamWriter responseStream, ServerCallContext context) 或更常见的异步迭代写法(见下条):

  • 客户端调用必须用 CallAsync() 而非 AsyncUnaryCall 等其他方式
  • proto 文件中该 RPC 必须声明为 rpc Chat(stream Message) returns (stream Message); —— 两个 stream 缺一不可
  • 生成的客户端类里对应方法名后缀是 Async,且参数/返回类型严格匹配生成器输出(如 ChatAsync

客户端如何正确发送+接收并避免挂起或丢消息

常见错误是只 await foreach 接收,却没主动发请求;或用 channel 发送但未调用 Writer.CompleteAsync() 导致服务端永远等不到流结束信号。

推荐结构:用 Channel 做发送缓冲,同时启动两个并发任务(发送 + 接收),并在退出前显式关闭发送通道:

var channel = Channel.CreateUnbounded(); using var call = client.ChatAsync(); _ = Task.Run(async () => {     await foreach (var req in channel.Reader.ReadAllAsync())     {         await call.RequestStream.WriteAsync(req);     }     await call.RequestStream.CompleteAsync(); // 关键:通知服务端“我不再发了” }); // 启动接收 _ = Task.Run(async () => {     await foreach (var resp in call.ResponseStream.ReadAllAsync())     {         Console.WriteLine(resp.Content);     } }); // 示例:发一条消息 await channel.Writer.WriteAsync(new Request { Content = "hello" });

注意:call.ResponseStream.ReadAllAsync().net 6+ 才支持的扩展方法;若用 .NET 5,需手动 while (await call.ResponseStream.MoveNext())

服务端如何维持长连接并处理并发客户端

双向流默认不超时,但底层 http/2 连接可能被代理或防火墙中断。必须在 ServerCallContext 中监听取消令牌,并在异常时及时释放资源。

  • 不要在流处理中直接 await Task.Delay 阻塞循环,应改用 context.CancellationToken 控制等待
  • 每个客户端连接对应一个独立的流处理任务,天然并发;但共享状态(如全局广播列表)需加锁或用 ConcurrentDictionary
  • 若需广播消息给所有活跃客户端,建议把 IServerStreamWriter 存入线程安全集合,并在写入前检查 context.CancellationToken.IsCancellationRequested
  • 服务端抛出异常(如 throw new RpcException(new Status(StatusCode.Cancelled, "bye")))会立即断开该流,客户端收到 RpcException 并可捕获 Status.StatusCode

调试时最常见的三个失败点

90% 的双向流不通问题集中在这三处:

  • proto 定义漏掉任一 stream 关键字 → 生成代码变成 unary 方法,客户端调用时 404
  • 客户端未调用 CompleteAsync() → 服务端 await foreach 永远卡住,无任何日志
  • 服务端未响应任何 WriteAsync() → 客户端 ResponseStream 不触发 MoveNext,看起来像“连上了但没反应”

最有效的验证方式:先注释掉所有业务逻辑,在服务端流开始时立刻 await responseStream.WriteAsync(new Response { ... }),客户端打印收到内容;确认基础通路跑通后再加逻辑。HTTP/2 层面的问题(如 TLS 配置、ALPN 协商失败)通常表现为 IOException: The request was aborted,此时需查 Kestrel 日志而非业务代码。

text=ZqhQzanResources