C#怎么连接RabbitMQ C#使用MassTransit操作RabbitMQ教程

12次阅读

推荐使用 MassTransit 连接 rabbitmq,它提供自动连接管理、重连、序列化和消费者生命周期处理;需安装 MassTransit 和 MassTransit.RabbitMQ 包,配置 Host、vhost 与凭证,定义消息契约、发布端 IPublishEndpoint 和 IConsumer 实现类,并注册消费者队列。

C#怎么连接RabbitMQ C#使用MassTransit操作RabbitMQ教程

用 C# 连接 RabbitMQ,推荐直接使用 MassTransit —— 它不是简单的封装,而是成熟的分布式消息通信框架,内置对 RabbitMQ 的深度支持,自动处理连接管理、重连、序列化、消费者生命周期等细节,比手写 Raw RabbitMQ.Client 更安全、更省心。

安装 MassTransit 和 RabbitMQ 客户端依赖

在项目中通过 NuGet 安装两个核心包:

  • MassTransit(主框架)
  • MassTransit.RabbitMQ(RabbitMQ 传输适配器)

命令行执行:

do.net add package MassTransit
dotnet add package MassTransit.RabbitMQ

注意:无需单独安装 RabbitMQ.Client,MassTransit.RabbitMQ 已包含兼容版本。

配置 MassTransit 服务(.NET 6/7/8 推荐方式)

Program.cs 中注册 MassTransit,并指定 RabbitMQ 连接地址和虚拟主机:

builder.Services.AddMassTransit(x =>
{
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", h =>
{
h.Username("guest");
h.Password("guest");
});
// 可选:启用自动交换/队列声明
cfg.ConfigureEndpoints(context);
});
});

说明:
"localhost" 是 RabbitMQ 服务地址,生产环境换成实际 IP 或域名
"/" 是虚拟主机(vhost),默认为 “/”,如已创建自定义 vhost(如 “myapp“)请替换
– 用户名密码默认是 guest/guest,生产务必修改并配置权限

定义消息契约与发送消息

消息必须是 public class,推荐实现空接口标记(非必须但利于类型识别):

public record OrderSubmitted
{
public Guid OrderId { get; init; }
public string CustomerName { get; init; } = default!;
}

在业务代码中注入 IPublishEndpoint 发布事件

public class OrderService
{
private readonly IPublishEndpoint _publishEndpoint;

public OrderService(IPublishEndpoint publishEndpoint)
=> _publishEndpoint = publishEndpoint;

public async Task SubmitOrder(Guid orderId, string name)
{
await _publishEndpoint.Publish(new OrderSubmitted
{
OrderId = orderId,
CustomerName = name
});
}
}

发布即发往默认交换机(amq.topic),MassTransit 自动路由到匹配的消费者队列。

编写消费者接收并处理消息

新建一个类实现 IConsumer

public class OrderSubmittedConsumer : IConsumer
{
public async Task Consume(ConsumeContext context)
{
var message = context.Message;
Console.WriteLine($"收到订单: {message.OrderId} - {message.CustomerName}");

// 模拟处理逻辑(如保存数据库、调用下游服务)
await Task.Delay(100);

// 处理成功,自动 ACK;抛出异常则 NACK 并可能进入重试或死信队列
}
}

注册消费者(仍在 AddMassTransit 配置块内):

x.AddConsumer();

并在 ConfigureEndpoints 前加上:

cfg.ReceiveEndpoint("order-queue", e =>
{
e.ConfigureConsumer(context);
});

这样 MassTransit 就会创建名为 order-queue 的队列,绑定到默认交换机,监听 OrderSubmitted 类型消息。

基本上就这些。MassTransit 把 RabbitMQ 的复杂性屏蔽得很好,你专注定义消息和业务逻辑即可。不复杂但容易忽略的是:确保 RabbitMQ 服务已启动、vhost 存在、用户有对应权限,以及开发时打开管理界面(http://localhost:15672)实时观察队列和消息流动,排查问题快得多。

text=ZqhQzanResources