消息队列模式:异步处理最佳实践
消息队列模式异步处理最佳实践核心概念消息队列是实现异步通信的重要工具可以解耦系统组件、提高系统的可扩展性和可靠性。本文将介绍常见的消息队列模式和最佳实践。消息队列模式1. 点对点模式// 点对点生产者 Component public class PointToPointProducer { private final RabbitTemplate rabbitTemplate; public PointToPointProducer(RabbitTemplate rabbitTemplate) { this.rabbitTemplate rabbitTemplate; } public void sendMessage(String queueName, Object message) { rabbitTemplate.convertAndSend(queueName, message); } public void sendMessageWithCorrelation(String queueName, Object message, String correlationId) { rabbitTemplate.convertAndSend(queueName, message, msg - { msg.getMessageProperties().setCorrelationId(correlationId); return msg; }); } } // 点对点消费者 Component public class PointToPointConsumer { RabbitListener(queues task-queue) public void handleMessage(String message) { System.out.println(Received message: message); processMessage(message); } private void processMessage(String message) { // 处理消息逻辑 } }2. 发布/订阅模式// 发布者 Component public class Publisher { private final RabbitTemplate rabbitTemplate; public Publisher(RabbitTemplate rabbitTemplate) { this.rabbitTemplate rabbitTemplate; } public void publish(String exchangeName, String routingKey, Object message) { rabbitTemplate.convertAndSend(exchangeName, routingKey, message); } } // 订阅者 1 Component public class Subscriber1 { RabbitListener(bindings QueueBinding( value Queue, exchange Exchange(value events, type ExchangeTypes.TOPIC), key user.created )) public void handleUserCreated(UserCreatedEvent event) { System.out.println(User created: event.getUserId()); } } // 订阅者 2 Component public class Subscriber2 { RabbitListener(bindings QueueBinding( value Queue, exchange Exchange(value events, type ExchangeTypes.TOPIC), key user.* )) public void handleUserEvents(Object event) { System.out.println(User event received: event); } }3. 请求/回复模式// 请求者 Component public class Requestor { private final RabbitTemplate rabbitTemplate; public Requestor(RabbitTemplate rabbitTemplate) { this.rabbitTemplate rabbitTemplate; rabbitTemplate.setReplyTimeout(60000); } public String sendRequest(String request) { return rabbitTemplate.convertSendAndReceive(request-queue, request, String.class); } public CompletableFutureString sendAsyncRequest(String request) { return rabbitTemplate.convertSendAndReceiveAsynchronously(request-queue, request) .thenApply(o - (String) o); } } // 回复者 Component public class Replier { RabbitListener(queues request-queue) public String handleRequest(String request) { String response processRequest(request); return response; } private String processRequest(String request) { // 处理请求并返回响应 return Response for: request; } }消息可靠性// 消息确认配置 Configuration public class RabbitMqConfig { Bean public Queue durableQueue() { return QueueBuilder.durable(durable-queue) .build(); } Bean public Exchange durableExchange() { return ExchangeBuilder.topicExchange(durable-exchange) .durable(true) .build(); } Bean public Binding binding(Queue durableQueue, Exchange durableExchange) { return BindingBuilder.bind(durableQueue) .to(durableExchange) .with(routing.key) .noargs(); } Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory( ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); factory.setConcurrentConsumers(3); factory.setMaxConcurrentConsumers(10); return factory; } } // 手动确认消费者 Component public class ManualAckConsumer { RabbitListener(queues manual-ack-queue, containerFactory rabbitListenerContainerFactory) public void handleMessage(String message, Channel channel, Message messageObj) throws IOException { try { processMessage(message); channel.basicAck(messageObj.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { // 拒绝并重新入队 channel.basicNack(messageObj.getMessageProperties().getDeliveryTag(), false, true); } } private void processMessage(String message) { // 处理消息逻辑 } }死信队列// 死信队列配置 Configuration public class DeadLetterConfig { Bean public Queue mainQueue() { return QueueBuilder.durable(main-queue) .deadLetterExchange(dead-letter-exchange) .deadLetterRoutingKey(dead-letter-key) .build(); } Bean public Queue deadLetterQueue() { return QueueBuilder.durable(dead-letter-queue) .build(); } Bean public Exchange deadLetterExchange() { return ExchangeBuilder.directExchange(dead-letter-exchange) .build(); } Bean public Binding deadLetterBinding() { return BindingBuilder.bind(deadLetterQueue()) .to(deadLetterExchange()) .with(dead-letter-key) .noargs(); } } // 死信队列消费者 Component public class DeadLetterConsumer { RabbitListener(queues dead-letter-queue) public void handleDeadLetter(Message message) { System.out.println(Dead letter received: new String(message.getBody())); // 记录日志、发送告警、人工处理等 } }消息幂等性// 幂等性处理器 Component public class IdempotentMessageHandler { private final RedisTemplateString, String redisTemplate; private static final String PROCESSING_PREFIX processing:; private static final String PROCESSED_PREFIX processed:; public IdempotentMessageHandler(RedisTemplateString, String redisTemplate) { this.redisTemplate redisTemplate; } public boolean processIfNotProcessed(String messageId, Runnable handler) { String processingKey PROCESSING_PREFIX messageId; String processedKey PROCESSED_PREFIX messageId; // 检查是否已处理 if (Boolean.TRUE.equals(redisTemplate.hasKey(processedKey))) { return false; } // 尝试获取处理锁 Boolean acquired redisTemplate.opsForValue() .setIfAbsent(processingKey, processing, 5, TimeUnit.MINUTES); if (Boolean.FALSE.equals(acquired)) { return false; } try { handler.run(); // 标记为已处理 redisTemplate.opsForValue().set(processedKey, true, 24, TimeUnit.HOURS); return true; } finally { // 释放处理锁 redisTemplate.delete(processingKey); } } } // 使用幂等处理器 Service public class OrderService { private final IdempotentMessageHandler idempotentHandler; public OrderService(IdempotentMessageHandler idempotentHandler) { this.idempotentHandler idempotentHandler; } public void processOrder(OrderCreatedEvent event) { boolean processed idempotentHandler.processIfNotProcessed( event.getMessageId(), () - { // 实际处理逻辑 createOrder(event); } ); if (!processed) { System.out.println(Message already processed: event.getMessageId()); } } private void createOrder(OrderCreatedEvent event) { // 创建订单逻辑 } }消息重试// 重试配置 Configuration public class RetryConfig { Bean public RetryTemplate retryTemplate() { RetryTemplate retryTemplate new RetryTemplate(); SimpleRetryPolicy retryPolicy new SimpleRetryPolicy(); retryPolicy.setMaxAttempts(3); FixedBackOffPolicy backOffPolicy new FixedBackOffPolicy(); backOffPolicy.setBackOffPeriod(1000); retryTemplate.setRetryPolicy(retryPolicy); retryTemplate.setBackOffPolicy(backOffPolicy); return retryTemplate; } } // 使用重试的消息处理器 Component public class RetryableConsumer { private final RetryTemplate retryTemplate; public RetryableConsumer(RetryTemplate retryTemplate) { this.retryTemplate retryTemplate; } RabbitListener(queues retry-queue) public void handleMessage(String message) { retryTemplate.execute(context - { processMessage(message); return null; }); } private void processMessage(String message) { // 可能失败的处理逻辑 } }Kafka 集成// Kafka 生产者配置 Configuration public class KafkaProducerConfig { Value(${spring.kafka.bootstrap-servers}) private String bootstrapServers; Bean public ProducerFactoryString, Object producerFactory() { MapString, Object config new HashMap(); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); config.put(ProducerConfig.ACKS_CONFIG, all); config.put(ProducerConfig.RETRIES_CONFIG, 3); return new DefaultKafkaProducerFactory(config); } Bean public KafkaTemplateString, Object kafkaTemplate() { return new KafkaTemplate(producerFactory()); } } // Kafka 消费者配置 Configuration public class KafkaConsumerConfig { Value(${spring.kafka.bootstrap-servers}) private String bootstrapServers; Bean public ConsumerFactoryString, Object consumerFactory() { MapString, Object config new HashMap(); config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); config.put(ConsumerConfig.GROUP_ID_CONFIG, my-group); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, earliest); return new DefaultKafkaConsumerFactory(config); } Bean public ConcurrentKafkaListenerContainerFactoryString, Object kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactoryString, Object factory new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory()); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); return factory; } } // Kafka 消费者 Component public class KafkaMessageConsumer { KafkaListener(topics orders, containerFactory kafkaListenerContainerFactory) public void consume(ConsumerRecordString, OrderCreatedEvent record, Acknowledgment acknowledgment) { try { OrderCreatedEvent event record.value(); processOrder(event); acknowledgment.acknowledge(); } catch (Exception e) { // 处理异常可能需要重试或发送到死信队列 } } private void processOrder(OrderCreatedEvent event) { // 处理订单创建事件 } }最佳实践消息持久化确保消息在 broker 重启后不丢失消息确认使用手动确认确保消息被正确处理幂等性确保重复消息不会导致重复处理死信队列处理无法处理的消息重试机制对失败的消息进行重试监控告警监控消息队列的状态和性能消息过期设置消息过期时间避免消息积压批量处理对大量消息进行批量处理实际应用场景异步任务处理将耗时操作异步化事件驱动架构实现松耦合的组件通信流量削峰通过队列缓冲突发流量日志收集收集和处理日志数据总结消息队列是构建高可用、高扩展系统的重要组件。通过合理使用消息队列模式可以实现系统组件的解耦和异步处理。在实际应用中需要注意消息可靠性、幂等性和监控等方面。别叫我大神叫我 Alex 就好。这其实可以更优雅一点合理的消息队列设计让系统变得更加可靠和高效。