当前位置: 首页 > news >正文

聊城网站营销信息海南网站制作公司

聊城网站营销信息,海南网站制作公司,如何做网络投票网站,wordpress 全局播放器1 、 使用 Flink 消费 Kafka 中 ProduceRecord 主题的数据,统计在已经检验的产品中,各设备每 5 分钟 生产产品总数,将结果存入Redis 中, key 值为 “totalproduce” , value 值为 “ 设备 id ,最近五分钟生…

1 、 使用 Flink 消费 Kafka ProduceRecord 主题的数据,统计在已经检验的产品中,各设备每 5 分钟 生产产品总数,将结果存入Redis 中, key 值为 “totalproduce” value 值为 设备 id ,最近五分钟生产总数” 。使用 redis cli HGETALL key 方式获取 totalproduce值。
注: ProduceRecord 主题,生产一个产品产生一条数据; change_handle_state字段为 1 代表已经检验, 0 代表未检验; 时间语义使用Processing Time

package flink.connimport org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.{FlinkJedisConfigBase, FlinkJedisPoolConfig}
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper/*** Redis连接配置类* * 该类用于配置与Redis的连接参数,并提供获取RedisSink实例的方法,* 使Flink流处理能够将数据输出到Redis。*/
class ConnRedis(host: String, port: Int) {// Redis连接配置,使用FlinkJedisPoolConfig来配置Redis的连接参数private val jedisConfig: FlinkJedisConfigBase = createJedisConfig(host, port)/*** 创建并返回Redis连接配置* * @param host Redis服务器的主机名或IP地址* @param port Redis服务器的端口号* @return FlinkJedisConfigBase 连接配置对象*/private def createJedisConfig(host: String, port: Int): FlinkJedisConfigBase = {new FlinkJedisPoolConfig.Builder().setHost(host)  // 设置Redis主机地址.setPort(port)  // 设置Redis端口号.build()        // 构建并返回配置对象}/*** 获取RedisSink实例* * 该方法接受一个RedisMapper作为参数,并返回一个与该Mapper关联的RedisSink。* * @param redisMapper 通过该Mapper来定义如何将数据写入Redis* @tparam T 泛型参数,表示流中数据的类型* @return RedisSink[T] 一个与Redis连接的Sink实例*/def getRedisSink[T](redisMapper: RedisMapper[T]): RedisSink[T] = {new RedisSink[T](jedisConfig, redisMapper)}
}

类和方法说明

1. 类 ConnRedis
  • 构造函数ConnRedis 类的构造函数接受两个参数 hostport,分别表示 Redis 服务器的主机地址和端口号。这个类通过这些参数来配置与 Redis 的连接。

  • 成员变量 jedisConfigjedisConfigFlinkJedisConfigBase 类型的变量,用于存储 Redis 连接的配置。它是通过 createJedisConfig 方法创建的。

2. 方法 createJedisConfig

这个私有方法用于创建和配置 Redis 连接的参数,并返回一个 FlinkJedisConfigBase 类型的配置对象。

  • 输入参数

    • host:Redis 服务器的主机名或 IP 地址。
    • port:Redis 服务器的端口号。
  • 返回值

    • 返回一个 FlinkJedisConfigBase 类型的对象,这个对象是使用 FlinkJedisPoolConfig.Builder() 来构建的。该构建器允许你设置 Redis 连接的主机地址和端口号,并最终生成一个配置对象。

    这个配置对象后续将用于建立连接到 Redis 数据库。

3. 方法 getRedisSink

这个方法用于返回一个 RedisSink 实例,这个实例允许 Flink 流处理作业将数据写入 Redis。

  • 输入参数

    • redisMapper:一个 RedisMapper[T] 类型的参数。RedisMapper 是 Flink 中的一个接口,它定义了如何将流中的数据(类型为 T)映射为 Redis 中的键值对。
  • 返回值

    • 返回一个 RedisSink[T] 对象。RedisSink 是 Flink 用来将数据输出到外部系统(在这里是 Redis)的一种 Sink。构建时需要传入 jedisConfig(Redis 连接配置)和 redisMapper(数据映射器)两个参数。

    通过 RedisSink,Flink 流处理作业可以将数据推送到 Redis,具体如何存储数据由 redisMapper 来决定。

关键组件解析

  • FlinkJedisConfigBaseFlinkJedisPoolConfig

    • FlinkJedisConfigBase 是 Flink 与 Redis 连接时使用的基础配置类,具体配置会使用 FlinkJedisPoolConfig 来实现,它封装了 Redis 连接池的相关设置,包括 Redis 的主机、端口等。
  • RedisSink

    • RedisSink 是 Flink 提供的一个 Sink 用于将数据写入 Redis。在这里,它通过 jedisConfig 和 redisMapper 来配置如何与 Redis 建立连接,并将流处理结果写入 Redis。
  • RedisMapper

    • RedisMapper 是 Flink 中的一个接口,用于定义如何将流中的数据映射为 Redis 的键值对。你可以实现 RedisMapper 接口,并在其中定义数据的 Redis 键和值的生成方式。比如,你可以将数据的某些字段作为 Redis 键,其他字段作为 Redis 值。

 

package flink.calculate.ProduceRecordimport flink.conn.ConnRedis
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}/*** Flink作业:统计生产记录并将结果输出到Redis* * 本作业从Kafka消费生产记录数据,统计每个设备在最近五分钟内的生产数量,* 并将结果存储到Redis中。*/
object TotalProduceToRedis {// 定义Kafka的相关配置private val topicName: String = "ProduceRecord"         // Kafka主题名private val bootstrapServer: String = "master:9092,slave1:9092,slave2:9092"  // Kafka bootstrap serversprivate val redisHost: String = "master"                // Redis服务器地址// 主程序入口def main(args: Array[String]): Unit = {// 创建流式执行环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment// 设置自动水位线间隔为0,意味着不使用水位线(适用于没有事件时间的场景)env.getConfig.setAutoWatermarkInterval(0L)// 设置并行度为1env.setParallelism(1)// 设置Kafka Source:消费Kafka中的数据val kafkaSource: KafkaSource[String] = KafkaSource.builder().setTopics(topicName) // 设置Kafka主题.setBootstrapServers(bootstrapServer) // 设置Kafka服务器地址.setStartingOffsets(OffsetsInitializer.latest()) // 设置偏移量策略:latest表示读取最新数据.setValueOnlyDeserializer(new SimpleStringSchema()) // 设置数据的反序列化方式.build()// 从Kafka读取数据并生成DataStreamval dataStream: DataStream[String] = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), topicName)// 打印读取的数据(可用于调试)dataStream.print("========元数据============>")// 处理数据:统计每个设备的生产数量val result: DataStream[TotalProduce] = dataStream.map(parseProduceData) // 转换为TotalProduce对象.filter(_.ProduceInspect == 1) // 过滤已检验的产品.keyBy(_.ProduceMachineID) // 按设备ID分组.window(TumblingProcessingTimeWindows.of(Time.minutes(5))) // 使用滚动窗口,窗口大小为5分钟.sum(1) // 对生产数量求和// 将结果存入Redisresult.addSink(new ConnRedis(redisHost, 6379).getRedisSink(new TotalProduceMapper))// 打印最终结果(可用于调试)result.print("=========result=========>>>>>>>")// 执行流处理作业env.execute("TotalProduceToRedis")}/*** 将输入的生产记录字符串转换为TotalProduce对象* * @param input 生产记录字符串* @return 返回转换后的TotalProduce对象*/def parseProduceData(input: String): TotalProduce = {val array: Array[String] = input.split(",") // 按逗号分割字符串TotalProduce(array(1), array(9).toInt) // 返回TotalProduce对象,分别提取生产设备ID和生产检查状态}/*** 生产记录类,包含设备ID和生产检查状态** @param ProduceMachineID 设备ID* @param ProduceInspect 生产检查状态*/case class TotalProduce(ProduceMachineID: String, ProduceInspect: Int)/*** Redis数据映射器:将TotalProduce对象映射到Redis的命令*/class TotalProduceMapper extends RedisMapper[TotalProduce] {/*** 获取Redis命令描述信息* * @return RedisCommandDescription,表示Redis操作类型及目标*/override def getCommandDescription: RedisCommandDescription = {new RedisCommandDescription(RedisCommand.HSET, "totalproduce") // 使用HSET命令将数据存入Redis的哈希表中}/*** 从TotalProduce对象中获取Redis的key* * @param t TotalProduce对象* @return Redis中的键,即设备ID*/override def getKeyFromData(t: TotalProduce): String = t.ProduceMachineID/*** 从TotalProduce对象中获取Redis的value* * @param t TotalProduce对象* @return Redis中的值,表示最近五分钟内该设备生产的数量*/override def getValueFromData(t: TotalProduce): String = {s"设备 ${t.ProduceMachineID} 最近五分钟生产了 ${t.ProduceInspect} 个产品"}}}

 

1. Kafka 配置

  • Kafka 主题和服务器配置:
    • topicName:指定了 Kafka 主题名为 ProduceRecord,这是从中消费数据的主题。
    • bootstrapServer:定义了 Kafka 集群的地址(多个 broker),这些是连接 Kafka 的入口。
  • Kafka 数据源创建:
    • 通过 KafkaSource.builder() 创建 Kafka 数据源,设定了主题、Kafka 服务器地址等信息。
    • setStartingOffsets(OffsetsInitializer.latest()) 设置了 Kafka 消费的偏移量策略为 latest,即只读取最新的数据(如果是 earliest,则从头开始读取所有数据)。
    • setValueOnlyDeserializer(new SimpleStringSchema()) 设置了数据的反序列化方式,简单地将每条记录转换成字符串。

2. 流处理环境设置

  • StreamExecutionEnvironment:创建流处理执行环境,用来定义和执行流式计算。
  • setParallelism(1):设置作业的并行度为 1,意味着整个作业的任务只会在一个任务槽中执行,不会并行处理多个任务。
  • setAutoWatermarkInterval(0L):关闭了水位线的使用。水位线用于处理事件时间和处理时间之间的延迟,这里设置为 0,表示不使用水位线。

3. 数据流处理

  • 读取数据:通过 env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), topicName) 从 Kafka 中读取数据。这里的 WatermarkStrategy.noWatermarks() 表示不使用水位线。
  • 数据解析和转换
    • map(parseProduceData):将每条 Kafka 消息(字符串格式)转换为 TotalProduce 对象,提取出设备ID和生产检查状态。
    • filter(_.ProduceInspect == 1):过滤数据,只保留生产检查状态为 1 的记录,意味着只关心已检验的产品。
    • keyBy(_.ProduceMachineID):按照设备ID进行分组,保证每个设备的生产数据是单独处理的。
    • window(TumblingProcessingTimeWindows.of(Time.minutes(5))):使用滚动窗口(Tumbling Window)进行计算,窗口大小为 5 分钟。
    • sum(1):对每个窗口内的生产数量进行求和,这里假设 ProduceInspect 字段表示生产数量。

4. 输出到 Redis

  • Redis 连接和写入:结果通过 result.addSink(new ConnRedis(redisHost, 6379).getRedisSink(new TotalProduceMapper)) 写入 Redis。
    • ConnRedis 类负责建立与 Redis 的连接,并通过 getRedisSink 获取一个用于将数据存入 Redis 的 Sink。
    • TotalProduceMapper 类负责将 TotalProduce 对象映射为 Redis 命令。具体地,通过 HSET 将数据存储到 Redis 哈希表中,键为设备ID,值为该设备在最近五分钟内生产的产品数量。

5. TotalProduceMapper 类

该类实现了 RedisMapper 接口,负责定义如何将 TotalProduce 对象映射成 Redis 命令和数据:

  • getCommandDescription:指定 Redis 操作类型为 HSET,将数据存入 Redis 的哈希表 totalproduce 中。
  • getKeyFromData:返回 Redis 中的键,即设备ID (ProduceMachineID)。
  • getValueFromData:返回 Redis 中的值,即设备在最近五分钟内生产的数量。这里的值是一个字符串,包含设备ID和生产数量的信息。

6. 程序执行

  • env.execute("TotalProduceToRedis"):启动 Flink 作业,开始流式计算,消费 Kafka 数据,进行窗口计算,并将结果存入 Redis。
http://www.hengruixuexiao.com/news/45283.html

相关文章:

  • 网贷网站开发自建网站平台有哪些
  • wordpress修改数据库连接优化网络培训
  • wordpress网站建设中百度小说风云榜排名
  • 微信网站建设多少钱网络营销特点
  • 网站怎么添加代码五种新型营销方式
  • 做海报网站站点推广是什么意思
  • 网站开发 团队构成无锡百度推广代理商
  • 平顶山做网站公司手机建站平台
  • 陕西专业网站开发多少钱谷歌优化师
  • 旅游网站设计的意义网络seo招聘
  • 做教育网站挣钱千锋培训机构官网
  • 网站建设ASP心得体会除了百度指数还有哪些指数
  • 建设工程类的网站seo门户网价格是多少钱
  • 网站建设综合百度快速排名化
  • 个人公众号做电影网站资源优化网站排名
  • 网站建设方法内蒙古网站seo
  • 网站添加支付宝手机百度网盘网页版登录入口
  • 网站标签怎么做跳转百度代发收录
  • 网站项目验收确认书百度推广怎么赚钱
  • 广州学建设网站百度免费优化
  • 能自己做二次元人物的网站自助建站系统哪个好
  • 中国人在国外做赌博网站代理成人技能培训班有哪些
  • 网站备案号收回市场营销证书含金量
  • 佛山做外贸网站如何网络营销成功案例
  • 类似wordpress的建站系统知乎关键词排名工具
  • 所有网站大全免费使用seo软件
  • page如何转换为wordpress唐山seo
  • 宁波网站建设xpckj西安最新消息今天
  • 2024年1月时事新闻宁波网站优化
  • 东莞响应式网站制作重庆百度快速优化