这段代码展示了如何使用 Flink 将数据流写入 Redis核心是通过RedisSink和RedisMapper实现数据的映射和存储。通过这种方式可以方便地将实时处理的结果存储到 Redis 中供后续查询或分析使用。这段代码的主要功能是将用户点击事件流写入 Redis 的哈希表中方便后续查询和分析。适用场景:实时记录用户点击行为。将 Flink 处理后的结果存储到 Redis 中供其他系统使用。代码结构包名:package sink表示这个类属于sink包。导入的依赖:org.apache.flink.streaming.api.scala._: Flink 的 Scala API。org.apache.flink.streaming.connectors.redis.RedisSink: Flink 提供的 Redis Sink 连接器。org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig: Redis 连接池的配置类。org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}: Redis 映射相关的类。source.ClickSource: 自定义的数据源用于生成模拟的点击事件流。package sink import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.redis.RedisSink import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper} import source.ClickSource /** * * PROJECT_NAME: flink1.13 * PACKAGE_NAME: sink * author: 赵嘉盟-HONOR * data: 2023-11-20 14:53 * DESCRIPTION * */ object sinkToRedis { def main(args: Array[String]): Unit { val envStreamExecutionEnvironment.getExecutionEnvironment val data env.addSource(new ClickSource) val confnew FlinkJedisPoolConfig.Builder().setHost().build() data.addSink(new RedisSink[source.Event](conf,new RedisMapper[source.Event] { override def getCommandDescription: RedisCommandDescription new RedisCommandDescription(RedisCommand.HSET,click) override def getKeyFromData(t: source.Event): String t.user override def getValueFromData(t: source.Event): String t.url })) env.execute(sinkRedis) } }代码解释(1) 创建 Flink 执行环境val env StreamExecutionEnvironment.getExecutionEnvironment获取 Flink 的流处理执行环境。(2) 添加数据源val data env.addSource(new ClickSource)使用自定义的ClickSource作为数据源生成一个数据流data。ClickSource可能是一个模拟用户点击事件的数据源生成Event类型的数据。(3) 配置 Redis 连接池val conf new FlinkJedisPoolConfig.Builder().setHost().build()创建一个 Redis 连接池配置FlinkJedisPoolConfig。这里setHost()需要填写 Redis 服务器的主机地址例如localhost或127.0.0.1。(4) 添加 Redis Sinkdata.addSink(new RedisSink[source.Event](conf, new RedisMapper[source.Event] { override def getCommandDescription: RedisCommandDescription new RedisCommandDescription(RedisCommand.HSET, click) override def getKeyFromData(t: source.Event): String t.user override def getValueFromData(t: source.Event): String t.url }))将数据流data写入 Redis。RedisSink:第一个参数是 Redis 连接池配置conf。第二个参数是一个RedisMapper的实现用于定义如何将数据映射到 Redis。RedisMapper:getCommandDescription: 定义 Redis 命令和键名。这里使用HSET命令将数据写入 Redis 的哈希表click。getKeyFromData: 定义哈希表中的字段field这里使用Event的user字段。getValueFromData: 定义哈希表中的值value这里使用Event的url字段。(5) 启动 Flink 任务env.execute(sinkRedis)启动 Flink 任务任务名称为sinkRedis。基于scala使用flink将读取到的数据写入到RedisgetCommandDescription第一个参数为写入方式第二个参数为Hset的键