Java + Spring Boot 操作 Kafka 完整学习指南
前置条件ZooKeeper 集群 Kafka 集群已启动3个ZK节点 3个Broker Broker 地址172.17.0.7:9092, 172.17.0.7:9093, 172.17.0.7:9094第一阶段原生 Java API 操作 Kafka目的理解底层原理Spring Boot 只是对这层的封装一、Maven 依赖dependencies !-- Kafka 客户端原生Java API -- dependency groupIdorg.apache.kafka/groupId artifactIdkafka-clients/artifactId version3.9.0/version /dependency !-- 日志可选 -- dependency groupIdorg.slf4j/groupId artifactIdslf4j-simple/artifactId version1.7.36/version /dependency /dependencies二、生产者Producer2.1 最简单的生产者import org.apache.kafka.clients.producer.*; import java.util.Properties; public class SimpleProducer { public static void main(String[] args) throws Exception { // 1. 配置生产者 Properties props new Properties(); props.put(bootstrap.servers, 172.17.0.7:9092,172.17.0.7:9093,172.17.0.7:9094); props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer); props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer); // 2. 创建生产者 KafkaProducerString, String producer new KafkaProducer(props); // 3. 发送消息 for (int i 0; i 10; i) { ProducerRecordString, String record new ProducerRecord(test-topic, key- i, hello kafka i); producer.send(record); System.out.println(发送消息: hello kafka i); } // 4. 关闭会自动flush producer.close(); } }2.2 三种发送方式// ① 发后不管Fire and Forget—— 性能最高可能丢消息 producer.send(record); // ② 同步发送 —— 等待结果性能最低最可靠 try { RecordMetadata metadata producer.send(record).get(); // .get() 阻塞等待 System.out.printf(发送成功 → Topic:%s Partition:%d Offset:%d%n, metadata.topic(), metadata.partition(), metadata.offset()); } catch (Exception e) { System.out.println(发送失败: e.getMessage()); } // ③ 异步发送推荐—— 性能高有回调处理结果 producer.send(record, (metadata, exception) - { if (exception null) { System.out.printf(发送成功 → Partition:%d Offset:%d%n, metadata.partition(), metadata.offset()); } else { System.out.println(发送失败: exception.getMessage()); } });2.3 生产者重要配置详解Properties props new Properties(); props.put(bootstrap.servers, 172.17.0.7:9092,172.17.0.7:9093,172.17.0.7:9094); props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer); props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer); // ★ acks消息可靠性核心配置 // 0 → 发完不管性能最高可能丢消息 // 1 → Leader写成功就返回Leader挂了可能丢 // all→ 所有ISR副本写成功才返回最可靠推荐生产环境 props.put(acks, all); // 重试次数遇到可重试错误时 props.put(retries, 3); // 批量发送消息积累到16KB一起发提升吞吐量 props.put(batch.size, 16384); // 等待时间即使不到16KB等1ms也发出去减少延迟 props.put(linger.ms, 1); // 缓冲区大小生产者本地缓存32MB满了send()会阻塞 props.put(buffer.memory, 33554432); // 幂等性开启后自动去重防止重试导致的消息重复 props.put(enable.idempotence, true);2.4 消息分区策略// 情况1指定分区 → 直接发到该分区 new ProducerRecord(test-topic, 0, key, value); // 发到分区0 // 情况2指定key → 对key做hash相同key一定进同一分区保证有序 new ProducerRecord(test-topic, user-001, 登录消息); new ProducerRecord(test-topic, user-001, 下单消息); // 上面两条消息一定在同一分区保证user-001的消息有序 // 情况3不指定key → 轮询分配到各分区负载均衡 new ProducerRecord(test-topic, hello);三、消费者Consumer3.1 最简单的消费者import org.apache.kafka.clients.consumer.*; import java.time.Duration; import java.util.*; public class SimpleConsumer { public static void main(String[] args) { // 1. 配置消费者 Properties props new Properties(); props.put(bootstrap.servers, 172.17.0.7:9092,172.17.0.7:9093,172.17.0.7:9094); props.put(group.id, my-consumer-group); // 消费者组ID重要 props.put(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer); props.put(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer); // 2. 创建消费者 KafkaConsumerString, String consumer new KafkaConsumer(props); // 3. 订阅 Topic consumer.subscribe(Collections.singletonList(test-topic)); // 4. 持续拉取消息 while (true) { // poll()向Broker拉取消息最多等待1秒 ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecordString, String record : records) { System.out.printf(收到消息 → Topic:%s Partition:%d Offset:%d Key:%s Value:%s%n, record.topic(), record.partition(), record.offset(), record.key(), record.value()); } } } }3.2 消费者组Consumer Group核心概念Topic: test-topic3个分区 消费者组A3个消费者 消费者组B1个消费者 Consumer1 → Partition0 Consumer1 → Partition0 Consumer2 → Partition1 → Partition1 Consumer3 → Partition2 → Partition2 结论 - 同一组内每条消息只被一个消费者消费负载均衡 - 不同组间每条消息都会被每个组各消费一次广播 - 消费者数 分区数多余的消费者空闲等待接管3.3 offset 提交方式// ① 自动提交默认—— 简单但可能重复消费或丢失 props.put(enable.auto.commit, true); props.put(auto.commit.interval.ms, 5000); // 每5秒自动提交一次 // ② 手动同步提交 —— 处理完消息再提交更可靠 props.put(enable.auto.commit, false); while (true) { ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecordString, String record : records) { // 处理消息... System.out.println(处理消息: record.value()); } consumer.commitSync(); // 处理完一批再提交阻塞等待 } // ③ 手动异步提交 —— 性能更好推荐 consumer.commitAsync((offsets, exception) - { if (exception ! null) { System.out.println(提交失败: exception.getMessage()); } });3.4 消费者重要配置详解// 从哪里开始消费当消费者组第一次启动没有记录offset时 // earliest → 从最早的消息开始--from-beginning // latest → 只消费启动后的新消息默认 props.put(auto.offset.reset, earliest); // 一次poll最多拉取多少条 props.put(max.poll.records, 500); // poll间隔超过这个时间broker认为消费者挂了触发Rebalance props.put(max.poll.interval.ms, 300000); // 5分钟 // 心跳间隔消费者定期向broker报活 props.put(heartbeat.interval.ms, 3000); // 超过这个时间没心跳认为消费者挂了 props.put(session.timeout.ms, 30000);四、Rebalance再均衡机制什么是Rebalance 消费者组内成员发生变化时Kafka重新分配分区给消费者的过程 触发时机 1. 新消费者加入组 2. 消费者离开组正常关闭或崩溃 3. Topic分区数变化 Rebalance过程STW 所有消费者停止消费 → GroupCoordinator重新分配 → 消费者恢复消费 ↑ 这段时间不能消费所以Rebalance要尽量避免频繁发生 避免不必要Rebalance的方法 - 合理设置 session.timeout.ms 和 heartbeat.interval.ms - 处理消息不要太慢避免超过 max.poll.interval.ms第二阶段Spring Boot 操作 Kafka一、项目搭建1.1 Maven 依赖parent groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-parent/artifactId version3.2.0/version /parent dependencies dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter/artifactId /dependency !-- Spring Kafka包含了kafka-clients -- dependency groupIdorg.springframework.kafka/groupId artifactIdspring-kafka/artifactId /dependency !-- 用于对象序列化 -- dependency groupIdcom.fasterxml.jackson.core/groupId artifactIdjackson-databind/artifactId /dependency dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-test/artifactId scopetest/scope /dependency dependency groupIdorg.springframework.kafka/groupId artifactIdspring-kafka-test/artifactId scopetest/scope /dependency /dependencies1.2 application.yml 核心配置spring: kafka: # Broker 地址列表 bootstrap-servers: 172.17.0.7:9092,172.17.0.7:9093,172.17.0.7:9094 # 生产者配置 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer acks: all # 最高可靠性 retries: 3 batch-size: 16384 linger-ms: 1 buffer-memory: 33554432 # 如果发送的是对象改用 JsonSerializer # value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # 消费者配置 consumer: group-id: my-spring-group # 消费者组ID auto-offset-reset: earliest # 第一次启动从最早消息开始 enable-auto-commit: false # 关闭自动提交让Spring管理 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer max-poll-records: 100 # 如果消费的是对象改用 JsonDeserializer # value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer # properties: # spring.json.trusted.packages: * # 监听器配置 listener: ack-mode: manual_immediate # 手动提交offset concurrency: 3 # 3个消费者线程建议 分区数 type: batch # batch批量消费single逐条消费二、生产者2.1 发送字符串消息import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; import java.util.concurrent.CompletableFuture; Service public class KafkaProducerService { // Spring 自动注入无需手动创建 private final KafkaTemplateString, String kafkaTemplate; public KafkaProducerService(KafkaTemplateString, String kafkaTemplate) { this.kafkaTemplate kafkaTemplate; } // ① 最简单只发消息内容 public void sendMessage(String message) { kafkaTemplate.send(test-topic, message); } // ② 带 key相同key进同一分区保证有序 public void sendWithKey(String key, String message) { kafkaTemplate.send(test-topic, key, message); } // ③ 指定分区 public void sendToPartition(int partition, String key, String message) { kafkaTemplate.send(test-topic, partition, key, message); } // ④ 异步回调推荐 public void sendWithCallback(String message) { CompletableFutureSendResultString, String future kafkaTemplate.send(test-topic, message); future.whenComplete((result, ex) - { if (ex null) { System.out.printf(发送成功 → Partition:%d Offset:%d%n, result.getRecordMetadata().partition(), result.getRecordMetadata().offset()); } else { System.out.println(发送失败: ex.getMessage()); } }); } // ⑤ 同步发送等待结果 public void sendSync(String message) throws Exception { SendResultString, String result kafkaTemplate.send(test-topic, message).get(); System.out.printf(同步发送成功 → Partition:%d Offset:%d%n, result.getRecordMetadata().partition(), result.getRecordMetadata().offset()); } }2.2 发送对象JSON// 实体类 public class OrderEvent { private String orderId; private String userId; private Double amount; private String status; // getter/setter/构造方法省略 }# application.yml 中改为 JsonSerializer spring: kafka: producer: value-serializer: org.springframework.kafka.support.serializer.JsonSerializer// 注入对象类型的 KafkaTemplate Service public class OrderProducerService { private final KafkaTemplateString, OrderEvent kafkaTemplate; public OrderProducerService(KafkaTemplateString, OrderEvent kafkaTemplate) { this.kafkaTemplate kafkaTemplate; } public void sendOrder(OrderEvent order) { // 用 orderId 作为 key保证同一订单的消息有序 kafkaTemplate.send(order-topic, order.getOrderId(), order); System.out.println(发送订单事件: order.getOrderId()); } }三、消费者3.1 基础消费逐条import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; Component public class KafkaConsumerService { // 最简单的监听 KafkaListener(topics test-topic, groupId my-spring-group) public void listen(String message) { System.out.println(收到消息: message); } // 获取消息元数据partition、offset等 KafkaListener(topics test-topic, groupId my-spring-group) public void listenWithMeta(ConsumerRecordString, String record) { System.out.printf(收到消息 → Partition:%d Offset:%d Key:%s Value:%s%n, record.partition(), record.offset(), record.key(), record.value()); } // 手动提交 offset需要 yml 中配置 ack-mode: manual_immediate KafkaListener(topics test-topic, groupId my-spring-group) public void listenWithAck(ConsumerRecordString, String record, Acknowledgment ack) { try { System.out.println(处理消息: record.value()); // 处理成功后手动提交 ack.acknowledge(); } catch (Exception e) { // 处理失败不提交下次重新消费 System.out.println(处理失败不提交offset: e.getMessage()); } } }3.2 批量消费推荐性能更好# yml 中配置 spring: kafka: listener: type: batch # 开启批量模式KafkaListener(topics test-topic, groupId my-spring-group) public void batchListen(ListConsumerRecordString, String records, Acknowledgment ack) { System.out.println(批量收到 records.size() 条消息); for (ConsumerRecordString, String record : records) { System.out.printf(Partition:%d Offset:%d Value:%s%n, record.partition(), record.offset(), record.value()); // 处理每条消息... } // 整批处理完后提交一次比逐条提交性能好很多 ack.acknowledge(); }3.3 消费对象JSONspring: kafka: consumer: value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer properties: spring.json.trusted.packages: com.yourpackage.dto # 信任的包名 spring.json.value.default.type: com.yourpackage.dto.OrderEventKafkaListener(topics order-topic, groupId order-group) public void handleOrder(ConsumerRecordString, OrderEvent record, Acknowledgment ack) { OrderEvent order record.value(); System.out.printf(收到订单 → ID:%s 用户:%s 金额:%.2f%n, order.getOrderId(), order.getUserId(), order.getAmount()); // 处理订单业务逻辑... ack.acknowledge(); }3.4 监听多个 Topic / 指定分区// 监听多个 Topic KafkaListener(topics {topic1, topic2}, groupId my-group) public void listenMultipleTopics(ConsumerRecordString, String record) { System.out.println(来自 record.topic() : record.value()); } // 指定消费某个分区从offset0开始 KafkaListener( groupId my-group, topicPartitions { TopicPartition(topic test-topic, partitionOffsets { PartitionOffset(partition 0, initialOffset 0), PartitionOffset(partition 1, initialOffset 0) }) } ) public void listenSpecificPartition(ConsumerRecordString, String record) { System.out.println(分区 record.partition() : record.value()); }四、高级特性4.1 消息头Header传递额外信息// 生产者发送时携带 Header import org.apache.kafka.common.header.internals.RecordHeader; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; public void sendWithHeader(String message) { MessageString msg MessageBuilder .withPayload(message) .setHeader(source, order-service) .setHeader(version, v1) .setHeader(KafkaHeaders.TOPIC, test-topic) .build(); kafkaTemplate.send(msg); } // 消费者读取 Header KafkaListener(topics test-topic) public void listenWithHeader( ConsumerRecordString, String record, Header(value source, required false) String source) { System.out.println(来源: source 消息: record.value()); }4.2 错误处理与重试import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.listener.DefaultErrorHandler; import org.springframework.util.backoff.FixedBackOff; Configuration public class KafkaConfig { Bean public DefaultErrorHandler errorHandler() { // 重试2次每次间隔1秒 FixedBackOff backOff new FixedBackOff(1000L, 2); DefaultErrorHandler handler new DefaultErrorHandler(backOff); // 某些异常不重试直接跳过 handler.addNotRetryableExceptions(IllegalArgumentException.class); return handler; } Bean public ConcurrentKafkaListenerContainerFactoryString, String kafkaListenerContainerFactory( ConsumerFactoryString, String consumerFactory, DefaultErrorHandler errorHandler) { ConcurrentKafkaListenerContainerFactoryString, String factory new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory); factory.setCommonErrorHandler(errorHandler); return factory; } }4.3 死信队列DLT// 重试多次后仍然失败的消息自动发到死信Topic原Topic名.DLT import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; Bean public DefaultErrorHandler errorHandler(KafkaTemplateObject, Object template) { // 失败消息发送到死信队列 DeadLetterPublishingRecoverer recoverer new DeadLetterPublishingRecoverer(template); // 重试3次后进死信队列 FixedBackOff backOff new FixedBackOff(1000L, 3); return new DefaultErrorHandler(recoverer, backOff); } // 消费死信队列 KafkaListener(topics test-topic.DLT, groupId dlt-group) public void handleDeadLetter(ConsumerRecordString, String record) { System.out.println(死信消息需人工处理: record.value()); // 记录日志、告警、人工干预... }4.4 事务保证消息发送原子性spring: kafka: producer: transaction-id-prefix: tx- # 开启事务Service public class TransactionalProducerService { private final KafkaTemplateString, String kafkaTemplate; // Transactional 注解方法内所有消息要么全发成功要么全回滚 Transactional public void sendTransactional() { kafkaTemplate.send(topic1, 消息1); kafkaTemplate.send(topic2, 消息2); // 如果这里抛异常上面两条消息都不会发出去 if (Math.random() 0.5) { throw new RuntimeException(模拟业务异常); } kafkaTemplate.send(topic3, 消息3); } }4.5 动态创建 Topicimport org.apache.kafka.clients.admin.NewTopic; import org.springframework.kafka.config.TopicBuilder; Configuration public class KafkaTopicConfig { // Spring Boot 启动时自动创建如果不存在 Bean public NewTopic orderTopic() { return TopicBuilder.name(order-topic) .partitions(3) .replicas(3) .build(); } Bean public NewTopic userTopic() { return TopicBuilder.name(user-topic) .partitions(3) .replicas(2) .build(); } }五、完整实战案例订单消息系统项目结构src/main/java/com/example/kafka/ ├── KafkaApplication.java ├── config/ │ ├── KafkaTopicConfig.java ← Topic 配置 │ └── KafkaConsumerConfig.java ← 消费者工厂配置 ├── dto/ │ └── OrderEvent.java ← 消息实体 ├── producer/ │ └── OrderProducer.java ← 生产者 └── consumer/ ├── OrderConsumer.java ← 正常消费者 └── OrderDltConsumer.java ← 死信队列消费者OrderEvent.javapackage com.example.kafka.dto; public class OrderEvent { private String orderId; private String userId; private Double amount; private String status; // CREATED / PAID / SHIPPED / COMPLETED public OrderEvent() {} public OrderEvent(String orderId, String userId, Double amount, String status) { this.orderId orderId; this.userId userId; this.amount amount; this.status status; } // getter setter public String getOrderId() { return orderId; } public void setOrderId(String orderId) { this.orderId orderId; } public String getUserId() { return userId; } public void setUserId(String userId) { this.userId userId; } public Double getAmount() { return amount; } public void setAmount(Double amount) { this.amount amount; } public String getStatus() { return status; } public void setStatus(String status) { this.status status; } Override public String toString() { return String.format(OrderEvent{orderId%s, userId%s, amount%.2f, status%s}, orderId, userId, amount, status); } }KafkaTopicConfig.javapackage com.example.kafka.config; import org.apache.kafka.clients.admin.NewTopic; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.TopicBuilder; Configuration public class KafkaTopicConfig { Bean public NewTopic orderTopic() { return TopicBuilder.name(order-topic) .partitions(3) .replicas(3) .build(); } }OrderProducer.javapackage com.example.kafka.producer; import com.example.kafka.dto.OrderEvent; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; Service public class OrderProducer { private final KafkaTemplateString, OrderEvent kafkaTemplate; public OrderProducer(KafkaTemplateString, OrderEvent kafkaTemplate) { this.kafkaTemplate kafkaTemplate; } public void sendOrder(OrderEvent order) { kafkaTemplate.send(order-topic, order.getOrderId(), order) .whenComplete((result, ex) - { if (ex null) { System.out.printf([Producer] 订单发送成功 → %s Partition:%d%n, order.getOrderId(), result.getRecordMetadata().partition()); } else { System.out.println([Producer] 发送失败: ex.getMessage()); } }); } }OrderConsumer.javapackage com.example.kafka.consumer; import com.example.kafka.dto.OrderEvent; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; Component public class OrderConsumer { KafkaListener( topics order-topic, groupId order-service-group, containerFactory kafkaListenerContainerFactory ) public void handleOrder(ConsumerRecordString, OrderEvent record, Acknowledgment ack) { OrderEvent order record.value(); System.out.printf([Consumer] 收到订单 Partition:%d Offset:%d → %s%n, record.partition(), record.offset(), order); try { // 模拟业务处理 processOrder(order); ack.acknowledge(); // 处理成功提交offset } catch (Exception e) { // 不提交offset交给重试机制处理 System.out.println([Consumer] 处理失败: e.getMessage()); throw e; // 抛出让 ErrorHandler 处理重试 } } private void processOrder(OrderEvent order) { System.out.println([Consumer] 处理订单业务逻辑: order.getOrderId()); // 实际业务更新数据库、调用其他服务等 } }KafkaApplication.java启动类 测试package com.example.kafka; import com.example.kafka.dto.OrderEvent; import com.example.kafka.producer.OrderProducer; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; SpringBootApplication public class KafkaApplication { public static void main(String[] args) { SpringApplication.run(KafkaApplication.class, args); } // 启动后自动发送测试消息 Bean public CommandLineRunner runner(OrderProducer producer) { return args - { Thread.sleep(2000); // 等消费者就绪 for (int i 1; i 5; i) { OrderEvent order new OrderEvent( ORDER- i, USER- (i % 3 1), 100.0 * i, CREATED ); producer.sendOrder(order); Thread.sleep(500); } }; } }六、原生 API vs Spring Boot 对比功能原生 Java APISpring Boot配置手动写 Propertiesapplication.yml 声明式配置生产者KafkaProducer 手动管理KafkaTemplate自动注入消费者while(true)手动轮询KafkaListener注解驱动offset提交完全手动自动/手动可选ack-mode错误处理自己写try/catch重试逻辑DefaultErrorHandler开箱即用事务手动beginTransactionTransactional注解Topic创建调用AdminClientBean NewTopic自动创建多线程消费自己管理线程池concurrency参数一行搞定七、学习路线图第1天原生Java API ├── SimpleProducer 发送10条消息 ├── SimpleConsumer 消费并打印 └── 理解 Partition/Offset/ConsumerGroup 第2天Spring Boot 基础 ├── 搭项目配 yml ├── KafkaTemplate 发消息 └── KafkaListener 收消息 第3天Spring Boot 进阶 ├── 发送 JSON 对象 ├── 手动提交 offset └── 批量消费 第4天高级特性 ├── 错误处理 重试 ├── 死信队列 └── 事务消息 第5天实战演练 ├── 完成订单消息系统 ├── 模拟Broker宕机观察行为 └── 监控消费者Lag八、常见问题速查Q1: 消息重复消费怎么处理原因消费者处理完但offset提交前崩溃重启后重新消费 解决业务层做幂等处理 - 数据库唯一键约束 - Redis 记录已处理的 messageId - 数据库乐观锁版本号Q2: 消息顺序性怎么保证Kafka 只保证同一Partition内有序 做法 - 需要有序的消息使用相同的 key → 进入同一Partition - 如同一订单的所有事件用 orderId 作为 keyQ3: 如何监控消费进度Lag# 查看某个消费者组的消费进度 kafka-consumer-groups.sh \ --bootstrap-server 172.17.0.7:9092 \ --describe \ --group order-service-group # 输出 # TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG # order-topic 0 100 105 5 ← 落后5条Q4: 发送失败了怎么办生产者端 - acksall retries3 保证发送可靠性 - enable.idempotencetrue 防止重复发送 消费者端 - 手动提交offset处理失败不提交 - 配置重试 死信队列兜底最后更新2026年5月