一、项目概述本项目基于 Flume Kafka Spark Streaming 构建了一套完整的实时日志采集与统计分析系统实现了从日志生成、实时采集、消息队列传输到流式计算的完整数据链路端到端延迟控制在 15 秒以内。二、前置准备下载并配置flume和kafka环境1、Kafka 安装配置步骤QM131节点1.下载 Kafka切换到模块目录cd /opt/module使用华为云镜像下载速度快wget https://mirrors.huaweicloud.com/apache/kafka/3.0.0/kafka_2.13-3.0.0.tgz2.解压并重命名tar -zxvf kafka_2.13-3.0.0.tgzmv kafka_2.13-3.0.0 kafka3.配置环境变量echo export KAFKA_HOME/opt/module/kafka /etc/profileecho export PATH$PATH:$KAFKA_HOME/bin /etc/profilesource /etc/profile2、验证安装kafka-topics.sh --version # 应输出 3.0.01.优化内存配置2GB节点vi /opt/module/kafka/bin/kafka-server-start.sh修改内存参数export KAFKA_HEAP_OPTS-Xmx512m -Xms512m2.配置 server.propertiesvi /opt/module/kafka/config/server.properties修改以下配置propertiesbroker.id1listenersPLAINTEXT://QM131:9092advertised.listenersPLAINTEXT://QM131:9092log.dirs/opt/module/kafka/logszookeeper.connectQM131:21813.启动 Kafkacd /opt/module/kafka3、启动 ZooKeeper后台bin/zookeeper-server-start.sh -daemon config/zookeeper.properties4、启动 Kafka后台bin/kafka-server-start.sh -daemon config/server.properties5、验证进程jps # 应看到 QuorumPeerMain 和 Kafka创建 Topic在 QM131 执行 cd /opt/module/kafkabin/kafka-topics.sh --create \--bootstrap-server QM131:9092 \--replication-factor 1 \--partitions 2 \--topic user_log_topic6、验证 Topicbin/kafka-topics.sh --list --bootstrap-server QM131:9092二、Flume 安装配置步骤QM130节点1.下载 Flumecd /opt/module使用华为云镜像下载wget https://mirrors.huaweicloud.com/apache/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz2.解压并重命名tar -zxvf apache-flume-1.9.0-bin.tar.gzmv apache-flume-1.9.0-bin flume3.配置环境变量echo export FLUME_HOME/opt/module/flume /etc/profileecho export PATH$PATH:$FLUME_HOME/bin /etc/profilesource /etc/profile7、验证安装flume-ng version # 应显示 1.9.01.创建项目目录和配置创建目录mkdir -p /opt/project/realtime/{data,conf,logs}8、创建 Flume 配置文件vi /opt/project/realtime/conf/flume_kafka.conf配置文件内容properties agent.sources tail_source agent.channels memory_channel agent.sinks kafka_sink agent.sources.tail_source.type exec agent.sources.tail_source.command tail -F /opt/project/realtime/data/click.log agent.sources.tail_source.shell /bin/sh -c agent.channels.memory_channel.type memory agent.channels.memory_channel.capacity 1000 agent.channels.memory_channel.transactionCapacity 100 agent.sinks.kafka_sink.type org.apache.flume.sink.kafka.KafkaSink agent.sinks.kafka_sink.kafka.bootstrap.servers QM131:9092 agent.sinks.kafka_sink.kafka.topic user_log_topic agent.sources.tail_source.channels memory_channel agent.sinks.kafka_sink.channel memory_channel1.创建日志生成脚本vi /opt/project/realtime/data/generate_log.pypython #!/usr/bin/env python3 import time import random import datetime actions [browse, add_to_cart, collect, pay] categories [家电, 数码, 服装, 美妆, 食品] log_file /opt/project/realtime/data/click.log while True: timestamp datetime.datetime.now().strftime(%Y-%m-%d %H:%M:%S) user_id random.randint(1000, 9999) action random.choice(actions) category random.choice(categories) log_line f{timestamp}|{user_id}|{action}|{category}\n with open(log_file, a) as f: f.write(log_line) print(log_line.strip()) time.sleep(1)chmod x /opt/project/realtime/data/generate_log.py三、启动命令汇总启动顺序顺序 节点 命令1 QM131 Kafka启动ZooKeeper Kafka2 QM130 日志生成脚本3 QM130 Flume采集4 QM131 Kafka消费者验证可选具体命令QM131 - 启动Kafkacd /opt/module/kafkabin/zookeeper-server-start.sh -daemon config/zookeeper.propertiesbin/kafka-server-start.sh -daemon config/server.propertiesQM130 - 启动日志脚本python3 /opt/project/realtime/data/generate_log.pyQM130 - 启动Flumecd /opt/module/flumebin/flume-ng agent \--name agent \--conf conf \--conf-file /opt/project/realtime/conf/flume_kafka.conf \-Dflume.root.loggerINFO,consoleQM131 - Kafka消费者验证cd /opt/module/kafkabin/kafka-console-consumer.sh --bootstrap-server QM131:9092 --topic user_log_topic二、技术架构技术栈详情组件版本作用部署节点Flume1.9.0日志采集监控文件变化并发送至KafkaQM130Kafka3.0.0消息队列解耦采集与计算缓冲数据QM131Spark3.0.0流式计算实时消费Kafka进行PV统计QM130ZooKeeper3.8.0Kafka集群协调QM131Python3.x模拟用户行为日志生成QM130三、集群环境节点分配节点角色部署组件内存QM130主节点Flume 日志生成脚本 Spark提交2GBQM131Kafka节点ZooKeeper Kafka Broker2GBQM132-133从节点Hadoop DataNode2GB环境配置操作系统CentOS 7JDK版本Java 1.8.0_281Hadoop版本3.1.4Spark路径/opt/module/spark-local四、核心功能实现4.1 日志生成模块文件位置/opt/project/realtime/data/generate_log.py功能模拟用户行为日志每秒生成一条记录日志格式2026-04-29 20:14:01|5394|browse|家电字段说明timestamp行为发生时间user_id用户ID1000-9999随机action行为类型browse/add_to_cart/collect/paycategory商品品类家电/数码/服装/美妆/食品4.2 Flume采集模块配置文件/opt/project/realtime/conf/flume_kafka.conf# Source监控日志文件变化 agent.sources.tail_source.type exec agent.sources.tail_source.command tail -F /opt/project/realtime/data/click.log # Channel内存通道容量1000 agent.channels.memory_channel.type memory agent.channels.memory_channel.capacity 1000 # Sink输出到Kafka agent.sinks.kafka_sink.type org.apache.flume.sink.kafka.KafkaSink agent.sinks.kafka_sink.kafka.bootstrap.servers QM131:9092 agent.sinks.kafka_sink.kafka.topic user_log_topicFlume 启动 [rootQM130 flume]# bin/flume-ng agent \--name agent \--conf conf \--conf-file /opt/project/realtime/conf/flume_kafka.conf \-Dflume.root.loggerINFO,console4.3 Kafka消息队列模块节点QM131配置优化2GB内存节点bashexport KAFKA_HEAP_OPTS-Xmx512m -Xms512m创建的Topic名称user_log_topic分区数2副本因子14.4 Spark Streaming实时计算模块脚本/opt/project/realtime/scripts/streaming_file.py核心逻辑读取Kafka中的日志流按|分割解析日志按分钟聚合计算PV每10秒输出统计结果提交命令bin/spark-submit \ --master local[2] \ --driver-memory 512m \ /opt/project/realtime/scripts/streaming_file.py五、项目成果5.1 运行效果5.2 性能指标指标数值日志生成速率1条/秒Flume→Kafka延迟 1秒Spark处理延迟 10秒端到端总延迟 15秒单日处理能力10万条5.3 已完成功能模拟用户行为日志持续生成Flume实时监控日志文件变化Kafka消息队列数据缓冲与传输Spark Streaming实时消费与PV统计每10秒输出分钟级统计数据2GB小内存节点参数调优六、踩坑与解决方案问题解决方案国外源下载过慢换华为云镜像mirrors.huaweicloud.comKafka依赖包下载失败换用文件流方式file://协议Spark读取HDFS而非本地使用 file:// 前缀指定本地路径Flume连不上Kafka关闭防火墙systemctl stop firewalld2GB内存OOM调小Kafka和Spark堆内存至512M七、项目亮点完整的实时链路从数据产生、采集、传输到计算全流程打通资源受限环境优化2GB节点下通过参数调优保障稳定运行真实模拟数据模拟用户行为日志贴近生产环境模块化配置各组件独立配置易于扩展和维护可观测性强每10秒输出统计结果实时监控数据流八、扩展增加UV统计uv_df parsed_df.groupBy(process_minute).agg( approx_count_distinct(user_id).alias(uv) )品类热度排行按category分组统计category_df parsed_df.groupBy(process_minute, category) \ .count() \ .orderBy(process_minute, col(count).desc())行为分布action_df parsed_df.groupBy(process_minute, action) \ .count() \ .orderBy(process_minute, col(count).desc())增强版代码from pyspark.sql import SparkSession from pyspark.sql.functions import * from pyspark.sql.types import * def main(): spark SparkSession.builder \ .appName(RealTimeLogAnalysis_FileStream) \ .config(spark.sql.shuffle.partitions, 2) \ .getOrCreate() print( * 70) print(Spark Streaming 文件流增强版已启动) print(监控目录: /opt/project/realtime/data) print( * 70) # 读取文件流 df spark.readStream \ .format(text) \ .option(pathGlobFilter, *.log) \ .load(file:///opt/project/realtime/data) # 定义解析函数 def parse_log(line): try: parts line.split(|) if len(parts) 4 and parts[0].startswith(20): return (parts[0], int(parts[1]), parts[2], parts[3]) except: pass return None schema StructType([ StructField(timestamp, StringType(), True), StructField(user_id, IntegerType(), True), StructField(action, StringType(), True), StructField(category, StringType(), True) ]) parse_udf udf(parse_log, schema) # 解析并过滤 parsed_df df.select(parse_udf(col(value)).alias(data)) \ .filter(col(data).isNotNull()) \ .select( col(data.timestamp).alias(timestamp), col(data.user_id).alias(user_id), col(data.action).alias(action), col(data.category).alias(category) ) # 添加处理时间 parsed_df parsed_df.withColumn(process_minute, date_format(current_timestamp(), yyyy-MM-dd HH:mm)) # # 1. PV统计每分钟 # pv_df parsed_df.groupBy(process_minute).count() query_pv pv_df.writeStream \ .outputMode(complete) \ .format(console) \ .trigger(processingTime10 seconds) \ .queryName(PV统计) \ .start() # # 2. UV统计使用近似去重 # uv_df parsed_df.groupBy(process_minute).agg( approx_count_distinct(user_id).alias(uv) ) query_uv uv_df.writeStream \ .outputMode(complete) \ .format(console) \ .trigger(processingTime10 seconds) \ .queryName(UV统计) \ .start() # # 3. 品类热度统计 # category_df parsed_df.groupBy(process_minute, category) \ .count() \ .orderBy(process_minute, col(count).desc()) query_category category_df.writeStream \ .outputMode(complete) \ .format(console) \ .trigger(processingTime10 seconds) \ .queryName(品类热度) \ .start() # # 4. 行为分布统计 # action_df parsed_df.groupBy(process_minute, action) \ .count() \ .orderBy(process_minute, col(count).desc()) query_action action_df.writeStream \ .outputMode(complete) \ .format(console) \ .trigger(processingTime10 seconds) \ .queryName(行为分布) \ .start() print(功能列表) print( PV统计 - 每10秒输出) print( UV统计(近似) - 每10秒输出) print( 品类热度 - 每10秒输出) print( 行为分布 - 每10秒输出) print( * 70) print(等待数据流入...) print( * 70) query_pv.awaitTermination() if __name__ __main__: main()结果截图1.实时日志2.日志采集与传输3.实时计算九、后续可拓展方向结果持久化写入MySQL或Hive供可视化展示接入Grafana构建实时监控大屏增加告警机制PV突增时触发报警