MassTransit Saga 是 C# 微服务中实现分布式事务补偿的成熟方案,通过状态机定义流转与 Compensate 方法声明补偿动作,需配合持久化、幂等性及 CorrelationId 使用。

Saga 模式在 C# 中用 MassTransit 实现补偿逻辑
Saga 是处理长时间运行、跨服务事务的主流方案,C# 生态里最成熟的选择是 MassTransit —— 它原生支持基于状态机(StatemachineSaga)的 Saga 管理,自动持久化、幂等性、重试和补偿都内置了。
关键不是“手写事务日志”,而是定义状态流转和对应命令。比如订单创建后扣库存失败,必须触发“取消预留库存”操作,这个补偿动作由 Compensate 方法声明,不是靠 try-catch 手动调。
- 必须实现
ISagaRepository,推荐用EntityFrameworkSagaRepository存到 sql Server 或 postgresql - 每个 Saga 实体需有唯一
CorrelationId(通常用Guid),所有消息必须携带它,否则状态无法关联 - 超时控制用
RequestTimeout+When(Timeout),别依赖外部定时器轮询 - 不要在
When()里直接调用 http API;应发新命令(如ReserveInventoryCommand),让下游消费者处理,保证解耦
public class OrderSaga : MassTransitStateMachine { public State Submitted { get; private set; } public State InventoryReserved { get; private set; } public Event SubmitOrder { get; private set; } public Event InventoryReserved { get; private set; } public Event InventoryReservationFailed { get; private set; } public OrderSaga() { InstanceState(x =youjiankuohaophpcn x.CurrentState); Event(() =youjiankuohaophpcn SubmitOrder); Event(() =youjiankuohaophpcn InventoryReserved); Event(() =youjiankuohaophpcn InventoryReservationFailed); Initially( When(SubmitOrder) .Then(ctx =youjiankuohaophpcn ctx.Instance.OrderId = ctx.Data.OrderId) .TransitionTo(Submitted) .Send(context =youjiankuohaophpcn new ReserveInventoryCommand(context.Instance.OrderId))); During(Submitted, When(InventoryReserved) .TransitionTo(InventoryReserved), When(InventoryReservationFailed) .Call(ctx =youjiankuohaophpcn Console.WriteLine($"Rollback for {ctx.Instance.OrderId}")) .Compensate(ctx =youjiankuohaophpcn new CancelInventoryReservationCommand(ctx.Instance.OrderId))); }
}
两阶段提交(2PC)在 C# 中不推荐直接实现
.net 原生没有跨服务 2PC 支持,System.Transactions.TransactionScope 只适用于同进程内多个 SqlConnection 或支持 MSDTC 的资源,一旦涉及 HTTP、rabbitmq、第三方 API,它就完全失效——不是功能限制,是协议层面不兼容。
常见误用:用 TransactionScope 包住 EF Core SaveChanges 和 HttpClient.PostAsync,以为能原子提交。实际结果是数据库改了,HTTP 请求失败,没人回滚数据库。
-
TransactionScope要求所有参与者实现IEnlistmentNotification,而 rest api、kafka Producer、gRPC Client 都不实现它 - 启用 MSDTC 在容器或云环境几乎不可行,且性能差、故障面大,超时默认 10 分钟,容易卡死资源
- 即使本地多 DB 场景,EF Core 6+ 的
BeginTransactionasync(isolationLevel)也比TransactionScope更可控、无隐式分布式事务风险
什么时候该选 Saga,什么时候绕开分布式事务
核心判断依据是“业务是否允许中间态 + 补偿可行性”。例如电商下单:用户看到“已提交”,库存显示“已预留”,这是合法中间态;若支付失败,发补偿指令取消预留,整个流程可自愈。
- 强一致性要求场景(如银行实时转账)→ 别用微服务拆,合并在单库单服务里用 ACID
- 跨组织/第三方系统(如调微信支付、对接物流 SaaS)→ 只能靠 Saga + 对账,2PC 根本不存在
- 高吞吐写入(如 iot 设备上报)→ 用最终一致性 + 幂等写入,连 Saga 状态机都可能成为瓶颈,改用事件溯源 + 状态投影
- 临时性数据(如购物车)→ 用 redis + 过期时间,根本不需要事务语义
MassTransit Saga 的坑:持久化与幂等性没配对就等于没做
很多人只写状态机,却忘了配持久化,导致重启后 Saga 状态丢失,消息重复消费时无法识别“这单我已经处理过了”,直接双倍扣库存。
另一个高频问题是补偿消息没加幂等键。比如 CancelInventoryReservationCommand 被重发三次,库存就多加三次。正确做法是在命令里带 RequestId,消费者用该字段做去重(如存到 redis Set 或 DB 唯一索引)。
- EF Core 迁移必须包含
DbContext中的ISagaRepository表(如OrderState),否则启动报InvalidOperation: No saga repository configured - 所有入站消息(
SubmitOrder,InventoryReserved)必须设置MessageId和CorrelationId,否则ConsumeContext拿不到上下文 - 本地测试时禁用重试(
UseInMemoryOutbox()+DisableRetry()),否则补偿逻辑会被干扰,难以验证状态流转
分布式事务从来不是“怎么实现”,而是“哪些地方根本不能分布”。Saga 不是银弹,但它是目前 C# 微服务里最靠谱的落地路径;硬上 2PC,多数时候只是把问题从代码移到运维日志里。