c# Saga模式和两阶段提交在c#中的实现

3次阅读

MassTransit Saga 是 C# 微服务中实现分布式事务补偿的成熟方案,通过状态机定义流转与 Compensate 方法声明补偿动作,需配合持久化、幂等性及 CorrelationId 使用。

c# Saga模式和两阶段提交在c#中的实现

Saga 模式在 C# 中用 MassTransit 实现补偿逻辑

Saga 是处理长时间运行、跨服务事务的主流方案,C# 生态里最成熟的选择是 MassTransit —— 它原生支持基于状态机(StatemachineSaga)的 Saga 管理,自动持久化、幂等性、重试和补偿都内置了。

关键不是“手写事务日志”,而是定义状态流转和对应命令。比如订单创建后扣库存失败,必须触发“取消预留库存”操作,这个补偿动作由 Compensate 方法声明,不是靠 try-catch 手动调。

  • 必须实现 ISagaRepository,推荐用 EntityFrameworkSagaRepository 存到 sql Server 或 postgresql
  • 每个 Saga 实体需有唯一 CorrelationId(通常用 Guid),所有消息必须携带它,否则状态无法关联
  • 超时控制用 RequestTimeout + When(Timeout),别依赖外部定时器轮询
  • 不要在 When() 里直接调用 http API;应发新命令(如 ReserveInventoryCommand),让下游消费者处理,保证解耦
public class OrderSaga : MassTransitStateMachine {     public State Submitted { get; private set; }     public State InventoryReserved { get; private set; }     public Event SubmitOrder { get; private set; }     public Event InventoryReserved { get; private set; }     public Event InventoryReservationFailed { get; private set; } 
public OrderSaga() {     InstanceState(x =youjiankuohaophpcn x.CurrentState);      Event(() =youjiankuohaophpcn SubmitOrder);     Event(() =youjiankuohaophpcn InventoryReserved);     Event(() =youjiankuohaophpcn InventoryReservationFailed);      Initially(         When(SubmitOrder)             .Then(ctx =youjiankuohaophpcn ctx.Instance.OrderId = ctx.Data.OrderId)             .TransitionTo(Submitted)             .Send(context =youjiankuohaophpcn new ReserveInventoryCommand(context.Instance.OrderId)));      During(Submitted,         When(InventoryReserved)             .TransitionTo(InventoryReserved),         When(InventoryReservationFailed)             .Call(ctx =youjiankuohaophpcn Console.WriteLine($"Rollback for {ctx.Instance.OrderId}"))             .Compensate(ctx =youjiankuohaophpcn new CancelInventoryReservationCommand(ctx.Instance.OrderId))); }

}

两阶段提交(2PC)在 C# 中不推荐直接实现

.net 原生没有跨服务 2PC 支持,System.Transactions.TransactionScope 只适用于同进程内多个 SqlConnection 或支持 MSDTC 的资源,一旦涉及 HTTP、rabbitmq、第三方 API,它就完全失效——不是功能限制,是协议层面不兼容。

常见误用:用 TransactionScope 包住 EF Core SaveChanges 和 HttpClient.PostAsync,以为能原子提交。实际结果是数据库改了,HTTP 请求失败,没人回滚数据库。

  • TransactionScope 要求所有参与者实现 IEnlistmentNotification,而 rest apikafka Producer、gRPC Client 都不实现它
  • 启用 MSDTC 在容器或云环境几乎不可行,且性能差、故障面大,超时默认 10 分钟,容易卡死资源
  • 即使本地多 DB 场景,EF Core 6+ 的 BeginTransactionasync(isolationLevel) 也比 TransactionScope 更可控、无隐式分布式事务风险

什么时候该选 Saga,什么时候绕开分布式事务

核心判断依据是“业务是否允许中间态 + 补偿可行性”。例如电商下单:用户看到“已提交”,库存显示“已预留”,这是合法中间态;若支付失败,发补偿指令取消预留,整个流程可自愈。

  • 强一致性要求场景(如银行实时转账)→ 别用微服务拆,合并在单库单服务里用 ACID
  • 跨组织/第三方系统(如调微信支付、对接物流 SaaS)→ 只能靠 Saga + 对账,2PC 根本不存在
  • 高吞吐写入(如 iot 设备上报)→ 用最终一致性 + 幂等写入,连 Saga 状态机都可能成为瓶颈,改用事件溯源 + 状态投影
  • 临时性数据(如购物车)→ 用 redis + 过期时间,根本不需要事务语义

MassTransit Saga 的坑:持久化与幂等性没配对就等于没做

很多人只写状态机,却忘了配持久化,导致重启后 Saga 状态丢失,消息重复消费时无法识别“这单我已经处理过了”,直接双倍扣库存。

另一个高频问题是补偿消息没加幂等键。比如 CancelInventoryReservationCommand 被重发三次,库存就多加三次。正确做法是在命令里带 RequestId,消费者用该字段做去重(如存到 redis Set 或 DB 唯一索引)。

  • EF Core 迁移必须包含 DbContext 中的 ISagaRepository 表(如 OrderState),否则启动报 InvalidOperation: No saga repository configured
  • 所有入站消息(SubmitOrder, InventoryReserved)必须设置 MessageIdCorrelationId,否则 ConsumeContext 拿不到上下文
  • 本地测试时禁用重试(UseInMemoryOutbox() + DisableRetry()),否则补偿逻辑会被干扰,难以验证状态流转

分布式事务从来不是“怎么实现”,而是“哪些地方根本不能分布”。Saga 不是银弹,但它是目前 C# 微服务里最靠谱的落地路径;硬上 2PC,多数时候只是把问题从代码移到运维日志里。

text=ZqhQzanResources