spring cloud RabbitMQ
一、RabbitMQ 整体架构与原理RabbitMQ 是实现AMQP 0-9-1 协议的开源消息代理Message Broker核心思想是生产者不直接发消息给队列而是发给交换机由交换机按规则路由到队列。核心组件组件说明BrokerRabbitMQ 服务节点负责接收、存储、转发消息ProducerPublisher消息生产者发送消息到 ExchangeConsumer消息消费者从 Queue 拉取或被动接收推送消息Exchange交换机路由中枢不存储消息按类型Binding规则决定消息去哪个QueueQueue队列真正存储消息的地方支持持久化、TTL、长度限制Binding绑定Exchange ↔ Queue 的关联关系附带 Binding Key路由匹配依据Routing Key生产者发消息时携带的字符串Exchange 用它做路由判断Virtual Hostvhost逻辑隔离空间各 vhost 有独立 Exchange/Queue/权限类似 NamespaceConnection客户端与 Broker 间的 TCP 长连接Channel信道Connection 内的虚拟通道实际执行 declare/send/consume多路复用减少 TCP 开销消息完整流转流程Producer 与 Broker 建立TCP Connection→ 开启Channel声明 Exchange如不存在、Queue、BindingProducer 发送消息到 Exchange指定Routing Key和消息属性持久化、优先级、TTL等Exchange 根据类型 Binding 规则匹配将消息拷贝路由到符合条件的 Queue(s)消息在 Queue 中等待Consumer 订阅 QueueBroker推(Push)消息给 ConsumerBasic.Consume或 Consumer 主动拉Basic.Get不推荐Consumer 处理完发送ACKBroker 标记消息可删除若 NACK/无 ACK 可按配置重入队或进死信队列⚠️注意Exchange 本身不存消息若无匹配 Queue 且未设置备用交换器(Alternate Exchange)消息会被丢弃。二、四种 Exchange 交换机详解RabbitMQ 内置四种标准交换机类型路由规则是核心区别。1. Direct Exchange直连交换机—— 精确匹配路由规则消息 Routing Key必须与Binding Key完全相同大小写敏感才路由到对应 Queue特点简单高效支持一个 Key 绑定多个 Queue实现单播伪多播典型场景日志分级error→ 告警队列info→ 归档队列订单状态分发order.created→ 库存队列order.paid→ 结算队列默认交换机RabbitMQ 自带一个匿名Exchangetypedirect自动用 Queue 名作 Binding Key所以不显式声明交换机也能发消息Queue A ← bind order.create Queue B ← bind order.pay msg(routingKeyorder.create) → 只到 Queue A msg(routingKeyorder.pay) → 只到 Queue B2. Fanout Exchange扇形/广播交换机—— 广播路由规则忽略 Routing Key将消息复制并广播到所有绑定到此 Exchange 的 Queue特点性能最好无匹配计算彻底解耦发布者与订阅者拓扑典型场景系统公告 / 运维广播订单创建后 → 同时通知库存、物流、营销、发邮件等多个系统分布式缓存失效通知Queue A、B、C 都 bind 到 fanout_exchange 发任意消息 → A、B、C 各收到一份拷贝3. Topic Exchange主题交换机—— 通配符匹配 ⭐最灵活路由规则Routing Key 用.分隔单词如order.created.vipBinding Key 支持通配符*—— 匹配恰好一个单词#—— 匹配零个或多个单词特点兼具精确与模糊匹配适合多维度分类订阅典型场景日志系统log.error.#接收所有错误日志*.usa.*接收美国区域消息微服务事件总线user.*订阅用户相关所有事件order.*.paid订阅各类订单支付Binding: Queue A ← order.*.vip Binding: Queue B ← order.# routingKeyorder.created.vip → A✓ B✓ routingKeyorder.paid.vip → A✓ B✓ routingKeyorder.created.normal → A✗ B✓ routingKeyorder.paid → A✗ B✓#匹配0词4. Headers Exchange头交换机—— 基于消息头属性匹配路由规则忽略 Routing Key根据消息headersMapString,Object匹配绑定时设x-match参数x-match all→ 所有 header 键值对均匹配x-match any→ 任意一个匹配即可特点支持多条件复杂筛选但性能低于其他三种使用较少Topic 常可替代典型场景需多属性组合路由如typeorder AND priorityhigh// 绑定时MapString,ObjectheadersnewHashMap();headers.put(type,order);headers.put(priority,high);headers.put(x-match,all);channel.queueBind(queueName,headersExchange,,headers);// 发送时AMQP.BasicPropertiespropsnewAMQP.BasicProperties.Builder().headers(newHashMapString,Object(){{put(type,order);put(priority,high);}}).build();channel.basicPublish(headers.ex,,props,body);四种交换机对比速查类型路由依据匹配方式性能使用频率典型场景DirectRouting Key完全相等★★★★★★★★★点对点、日志分级Fanout无(忽略Key)广播全部★★★★★★★★广播通知、多系统联动TopicRouting Key*/#通配符★★★★★★★★分类订阅、事件总线HeadersHeaders属性K-V全/任匹配★★★★多条件复杂路由(少用)三、其他重要概念详解Queue 属性Durable持久化设为 true 时 Queue 元数据存磁盘Broker 重启后保留消息本身还需设deliveryMode2才持久化Exclusive排他仅允许当前 Connection 使用连接关闭自动删除Auto-delete最后一个 Consumer 取消订阅后自动删除支持设置x-message-ttl消息过期时间、x-max-length队列最大长度、x-dead-letter-exchange死信交换机消息确认机制ACKautoAcktrueBroker 发出即删可能丢消息autoAckfalseConsumer 手动basic.ack处理完才删异常可basic.nack(requeue)重入队或丢弃 → 推荐生产环境使用死信队列DLQ — Dead Letter Queue以下情况消息变死信被 Consumer 拒绝/Nack 且不重入队消息 TTL 过期队列超出长度限制死信交换机DLX — Dead Letter Exchange通过在 Queue 声明时设置x-dead-letter-exchange和x-dead-letter-routing-key死信会被转发到指定 DLX→DLQ常用于延迟队列结合 TTL Time To LiveDLX和异常消息兜底处理。Virtual Hostvhost同一 Broker 内逻辑隔离不同 vhost Exchange/Queue 互不可见常用于多环境隔离/dev、/test、/prod、多租户隔离rabbitmqctl add_vhost /prod rabbitmqctl set_permissions-p/prod user.*.*.*Connection ChannelConnectionTCP 连接建立开销大Channel轻量级虚拟通道复用 Connection每个线程建议用独立 Channel避免并发问题几乎所有 AMQP 操作exchangeDeclare、queueDeclare、basicPublish、basicConsume都在 Channel 上进行rabbit 使用springcloud 创建队列 生成消息 交给交换机 进入不同得队列 之后消费普通队列消息 和消费死信队列消息代码下面覆盖你关心的全部点✅ Spring Cloud / Spring Boot✅ 创建延迟队列TTL✅ 创建死信队列DLQ✅ 创建普通业务队列✅ 交换机 → 不同队列✅ 生产消息✅ 消费普通队列✅ 消费死信队列关单一、依赖pom.xmldependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependency二、application.ymlspring:rabbitmq:host:localhostport:5672username:guestpassword:guestvirtual-host:/三、RabbitMQ 配置类核心 ✅只在这一处声明所有 Queue / Exchange / BindingConfigurationpublicclassRabbitConfig{// 交换机 publicstaticfinalStringORDER_EXCHANGEorder.exchange;// 普通队列支付成功 publicstaticfinalStringORDER_PAY_QUEUEorder.pay.queue;publicstaticfinalStringORDER_PAY_KEYorder.pay;// 延迟队列30分钟 publicstaticfinalStringORDER_DELAY_QUEUEorder.delay.queue;publicstaticfinalStringORDER_DELAY_KEYorder.delay;// 死信队列关单 publicstaticfinalStringORDER_CLOSE_QUEUEorder.close.queue;publicstaticfinalStringORDER_CLOSE_KEYorder.close;// 死信交换机 publicstaticfinalStringORDER_DLXorder.dlx;/* 交换机 */BeanpublicDirectExchangeorderExchange(){returnnewDirectExchange(ORDER_EXCHANGE,true,false);}BeanpublicDirectExchangeorderDlx(){returnnewDirectExchange(ORDER_DLX,true,false);}/* 普通队列支付成功 */BeanpublicQueueorderPayQueue(){returnnewQueue(ORDER_PAY_QUEUE,true);}BeanpublicBindingorderPayBinding(){returnBindingBuilder.bind(orderPayQueue()).to(orderExchange()).with(ORDER_PAY_KEY);}/* 死信队列关单 */BeanpublicQueueorderCloseQueue(){returnnewQueue(ORDER_CLOSE_QUEUE,true);}BeanpublicBindingorderCloseBinding(){returnBindingBuilder.bind(orderCloseQueue()).to(orderDlx()).with(ORDER_CLOSE_KEY);}/* 延迟队列重点⭐ */BeanpublicQueueorderDelayQueue(){MapString,ObjectargsnewHashMap();args.put(x-message-ttl,30*60*1000);// 30分钟args.put(x-dead-letter-exchange,ORDER_DLX);args.put(x-dead-letter-routing-key,ORDER_CLOSE_KEY);returnnewQueue(ORDER_DELAY_QUEUE,true,false,false,args);}BeanpublicBindingorderDelayBinding(){returnBindingBuilder.bind(orderDelayQueue()).to(orderExchange()).with(ORDER_DELAY_KEY);}}四、发送消息生产者1️⃣ 下单 → 进延迟队列ComponentRequiredArgsConstructorpublicclassOrderProducer{privatefinalRabbitTemplaterabbitTemplate;publicvoidcreateOrder(LongorderId){StringmsgorderIdorderId;rabbitTemplate.convertAndSend(RabbitConfig.ORDER_EXCHANGE,RabbitConfig.ORDER_DELAY_KEY,msg);}}2️⃣ 支付成功 → 进普通队列publicvoidpaySuccess(LongorderId){StringmsgorderIdorderId;rabbitTemplate.convertAndSend(RabbitConfig.ORDER_EXCHANGE,RabbitConfig.ORDER_PAY_KEY,msg);}✅支付成功只改数据库不删 MQ 消息五、消费普通队列支付成功ComponentSlf4jpublicclassOrderPayConsumer{RabbitListener(queuesRabbitConfig.ORDER_PAY_QUEUE)publicvoidonMessage(Stringmsg,Channelchannel,Header(AmqpHeaders.DELIVERY_TAG)longtag)throwsIOException{log.info(支付成功处理{},msg);// 扣库存 / 发券 / 通知channel.basicAck(tag,false);}}六、消费死信队列30分钟未支付 → 关单 ✅ComponentSlf4jpublicclassOrderCloseConsumer{RabbitListener(queuesRabbitConfig.ORDER_CLOSE_QUEUE)publicvoidcloseOrder(Stringmsg,Channelchannel,Header(AmqpHeaders.DELIVERY_TAG)longtag)throwsIOException{LongorderIdextractOrderId(msg);// ✅ 查数据库OrderorderorderMapper.selectById(orderId);if(order!nullorder.getStatus()UNPAID){orderMapper.updateStatus(orderId,CLOSED);log.info(订单超时关闭{},orderId);}channel.basicAck(tag,false);}}✅这是标准答案✅ 不删 MQ✅ 用 DB 状态兜底七、完整消息流向你这套代码的效果下单 ↓ order.delay.queueTTL30min❌无消费者 ↓ 30分钟 order.dlx ↓ order.close.queue✅消费 ↓ DB 未支付 → 关闭订单 支付成功 ↓ order.pay.queue✅消费 ↓ 支付业务处理八、一句话总结你可以放心用✅延迟队列不消费✅死信队列才消费✅支付成功只改 DB✅30 分钟到 DLQ 再关单如果你愿意下一步我可以帮你加✅防重复关单乐观锁 SQL✅消息 JSON 实体不用 String✅Spring Cloud Stream 版本✅插件版延迟交换机rabbitmq-delayed-message-exchange你说你想看哪一个