简单说flinkkafkaKafka 是高吞吐、高可靠的消息队列负责承接上游所有动态数据用户行为、业务日志、设备采集、数据库变更Flink 是流批一体的计算引擎负责对 Kafka 里的 “流动数据” 做实时处理。下面是企业真实开发中最常用的 3 类代码模板覆盖 “消费→处理→输出” 全链路前置依赖需补充 Kafka 连接器dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion1.17.0/version/dependency基础消费 Kafka 数据做实时统计你理解的 “统计功能”importorg.apache.flink.api.common.serialization.SimpleStringSchema;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;importjava.util.Properties;publicclassFlinkKafkaCountDemo{publicstaticvoidmain(String[]args)throwsException{StreamExecutionEnvironmentenvStreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 1. 配置Kafka连接参数PropertieskafkaPropsnewProperties();kafkaProps.setProperty(bootstrap.servers,localhost:9092);kafkaProps.setProperty(group.id,flink-kafka-group);// 2. 从Kafka消费数据topicuser_behaviorDataStreamStringkafkaStreamenv.addSource(newFlinkKafkaConsumer(user_behavior,newSimpleStringSchema(),kafkaProps));// 3. 实时统计按用户行为类型点击/加购/下单计数kafkaStream.map(line-line.split(,)[1])// 假设数据格式userID,behavior,time.keyBy(behavior-behavior).countWindow(60)// 60秒窗口.sum(0).print(实时行为统计);env.execute(Flink-Kafka Count Job);}}进阶消费 Kafka 做实时 ETL清洗后写入另一 Topic// 承接上面的kafkaStreamDataStreamStringcleanStreamkafkaStream// 过滤脏数据非JSON/空值.filter(line-line!nullline.startsWith({))// 转换格式JSON→CSV.map(line-{JSONObjectjsonJSON.parseObject(line);returnjson.getString(userID),json.getString(orderID),json.getString(amount);});// 将清洗后的数据写入Kafka的clean_order_topiccleanStream.addSink(newFlinkKafkaProducer(localhost:9092,clean_order_topic,newSimpleStringSchema()));高阶消费 Kafka 做 CEP 复杂事件检测// 承接kafkaStream解析为用户行为实体DataStreamUserBehaviorbehaviorStreamkafkaStream.map(line-{String[]fieldsline.split(,);returnnewUserBehavior(fields[0],fields[1],Long.parseLong(fields[2]));});// 定义CEP模式10分钟内连续3次失败登录PatternUserBehavior,?failLoginPatternPattern.UserBehaviorbegin(first_fail).where(behavior-fail_login.equals(behavior.getBehavior())).next(second_fail).where(behavior-fail_login.equals(behavior.getBehavior())).next(third_fail).where(behavior-fail_login.equals(behavior.getBehavior())).within(Time.minutes(10));// 检测匹配的事件输出风控告警PatternStreamUserBehaviorpatternStreamCEP.pattern(behaviorStream.keyBy(UserBehavior::getUserID),failLoginPattern);patternStream.select(pattern-用户pattern.get(first_fail).get(0).getUserID()10分钟内3次登录失败).print(风控告警);总结核心结论Flink 接 Kafka 不只是 “统计”而是覆盖实时统计、ETL、风控、特征工程、CEP 等全场景“统计” 只是最基础的入门场景架构逻辑Kafka 负责 “数据传输 / 暂存”Flink 负责 “数据计算 / 处理”是企业实时大数据的标准组合落地关键无论什么场景核心流程都是「Kafka 消费数据 → Flink 计算 / 处理 → 输出到存储 / 下游 Kafka / 大屏」。如果需要某类场景如金融风控 / 电商实时统计的完整可运行代码可以直接说我会按企业真实开发规范提供。#“为什么非要用 Flink自己写接口接数据、做清洗、插数据库不行吗”核心差异其实体现在「数据量 / 并发 / 实时性 / 可靠性」这几个维度尤其是数据量变大后手写接口的方案会快速失效。#手写方案的致命问题并发瓶颈单消费者线程处理每秒 1 万条会卡死多消费者需自己做分区分配极易出现重复消费可靠性差如果插入数据库时程序崩溃已经处理的日志可能丢了未处理的可能重复扩展难要加 “5 分钟内去重” 逻辑需自己写缓存Redis 定时清理代码量翻倍监控缺失不知道每秒处理了多少条、失败了多少条出问题只能查日志。// Spring Boot Kafka ConsumerServicepublicclassLogConsumerService{AutowiredprivateClickHouseMapperclickHouseMapper;// 手写Kafka消费者KafkaListener(topicsuser_log,groupIdlog-group)publicvoidconsume(ListStringlogs,Acknowledgmentack){try{// 1. 数据清洗简单过滤ListStringcleanLogslogs.stream().filter(log-log!null!log.contains(null)).collect(Collectors.toList());// 2. 批量插入ClickHouse自己写批量逻辑clickHouseMapper.batchInsert(cleanLogs);// 3. 手动提交偏移量避免丢数据ack.acknowledge();}catch(Exceptione){// 4. 异常处理自己写重试极易重复入库retryInsert(logs);}}}#Flink 方案的优势并发只需设置env.setParallelism(10)就能用 10 个并行度处理轻松扛每秒 1 万条可靠Checkpoint 自动记录状态程序崩溃重启后从 5 秒前的快照继续数据不丢不重扩展要加 CEP 检测异常行为只需加几行 Pattern 代码无需重构运维启动 Flink Web UI默认 8081 端口能实时看到处理吞吐量、延迟、失败数一键扩缩容。publicclassFlinkLogCleanJob{publicstaticvoidmain(String[]args)throwsException{StreamExecutionEnvironmentenvStreamExecutionEnvironment.getExecutionEnvironment();// 开启Checkpoint每5秒做一次快照故障重启不丢数据env.enableCheckpointing(5000);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 1. 消费Kafka数据自动做负载均衡、分区分配PropertieskafkaPropsnewProperties();kafkaProps.setProperty(bootstrap.servers,localhost:9092);kafkaProps.setProperty(group.id,flink-log-group);DataStreamStringlogStreamenv.addSource(newFlinkKafkaConsumer(user_log,newSimpleStringSchema(),kafkaProps));// 2. 数据清洗内置算子支持复杂逻辑DataStreamStringcleanStreamlogStream.filter(log-log!null!log.contains(null))// 过滤脏数据.keyBy(log-log.split(,)[0])// 按用户ID分组.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))// 5秒窗口去重.distinct();// 内置去重算子// 3. 写入ClickHouse内置连接器支持Exactly-OncecleanStream.addSink(ClickHouseSink.builder().setUrl(jdbc:clickhouse://localhost:8123/db).setSql(INSERT INTO user_log VALUES (?)).build());env.execute(Flink Log Clean Job);}}Flink 是如何实现分布式Flink 集群分为 3 类节点分工明确dependencies!--Flink核心依赖--dependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion1.17.0/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java/artifactIdversion1.17.0/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactIdversion1.17.0/version/dependency!--Flink-Kafka连接器--dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion1.17.0/version/dependency!--Flink-JDBC对接ClickHouse--dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-jdbc/artifactIdversion1.17.0/version/dependency!--ClickHouseJDBC驱动--dependencygroupIdru.yandex.clickhouse/groupIdartifactIdclickhouse-jdbc/artifactIdversion0.3.2/version/dependency/dependenciesimportorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.api.common.functions.FilterFunction;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.common.serialization.SimpleStringSchema;importorg.apache.flink.connector.jdbc.JdbcConnectionOptions;importorg.apache.flink.connector.jdbc.JdbcSink;importorg.apache.flink.connector.kafka.source.KafkaSource;importorg.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;importorg.apache.flink.streaming.api.windowing.time.Time;importjava.sql.PreparedStatement;/** * Flink 分布式数据清洗入库完整可运行版 * 功能消费Kafka日志 → 清洗 → 5秒窗口去重 → 写入ClickHouse */publicclassFlinkDistributedLogCleanJob{// 定义ClickHouse表结构提前创建CREATE TABLE user_log (user_id String, log_content String, ts BIGINT) ENGINE MergeTree() ORDER BY ts;publicstaticclassUserLog{privateStringuserId;privateStringlogContent;privatelongts;publicUserLog(StringuserId,StringlogContent,longts){this.userIduserId;this.logContentlogContent;this.tsts;}// getterpublicStringgetUserId(){returnuserId;}publicStringgetLogContent(){returnlogContent;}publiclonggetTs(){returnts;}}publicstaticvoidmain(String[]args)throwsException{// 1. 创建Flink执行环境分布式执行的入口StreamExecutionEnvironmentenvStreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度3分布式核心任务会拆成3个子任务运行在不同TaskManagerenv.setParallelism(3);// 2. 开启Checkpoint分布式容错核心每5秒快照故障重启不丢数据env.enableCheckpointing(5000);// 5秒一次Checkpointenv.getCheckpointConfig().setCheckpointingMode(org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE);// 精准一次语义env.getCheckpointConfig().setCheckpointTimeout(60000);// Checkpoint超时时间60秒// 3. 配置Kafka数据源分布式消费自动分配分区到不同子任务KafkaSourceStringkafkaSourceKafkaSource.Stringbuilder().setBootstrapServers(localhost:9092)// Kafka集群地址分布式部署时填多个节点.setTopics(user_log)// 消费的Topic.setGroupId(flink-log-group)// 消费组.setStartingOffsets(OffsetsInitializer.latest())// 从最新偏移量开始消费.setValueOnlyDeserializer(newSimpleStringSchema())// 反序列化.build();// 4. 读取Kafka数据分布式消费DataStreamStringkafkaStreamenv.fromSource(kafkaSource,WatermarkStrategy.noWatermarks(),Kafka Source);// 5. 分布式数据清洗过滤脏数据 解析为实体类DataStreamUserLogcleanLogStreamkafkaStream// 过滤空值/异常日志分布式执行每个子任务独立过滤自己的分片数据.filter((FilterFunctionString)log-log!null!log.contains(null)log.split(,).length3)// 解析日志格式user_id,log_content,ts.map((MapFunctionString,UserLog)log-{String[]fieldslog.split(,);returnnewUserLog(fields[0],fields[1],Long.parseLong(fields[2]));});// 6. 分布式窗口去重按用户ID分组5秒窗口去重DataStreamUserLogdistinctLogStreamcleanLogStream.keyBy(UserLog::getUserId)// 按用户ID分组分布式相同用户的日志会发到同一个子任务.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))// 5秒滚动窗口.distinct(UserLog::getLogContent);// 窗口内去重分布式执行// 7. 分布式写入ClickHouse每个子任务独立写入Flink保证Exactly-OncedistinctLogStream.addSink(JdbcSink.sink(INSERT INTO user_log (user_id, log_content, ts) VALUES (?, ?, ?),// ClickHouse插入SQL(PreparedStatementstmt,UserLoglog)-{// 绑定参数stmt.setString(1,log.getUserId());stmt.setString(2,log.getLogContent());stmt.setLong(3,log.getTs());},newJdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl(jdbc:clickhouse://localhost:8123/default)// ClickHouse地址.withDriverName(ru.yandex.clickhouse.ClickHouseDriver)// 驱动类.withUsername(default)// 用户名.withPassword()// 密码默认空.build())).name(ClickHouse Sink);// 8. 提交任务分布式执行JobManager接收任务后拆分并调度到TaskManagerenv.execute(Flink Distributed Log Clean Job);}}启动 Flink 集群./bin/start-cluster.sh #会启动 1 个 JobManager 1 个 TaskManager默认有 4 个 Slot