当前位置: 首页 > news >正文

深圳科技网站建设外链

深圳科技网站建设,外链,wordpress 优化原理,广州涉疫重点场所有更新前言 大数据应用开发——实时数据采集 大数据应用开发——实时数据处理 Flink完成Kafka中的数据消费,将数据分发至Kafka的dwd层中 并在HBase中进行备份 大数据应用开发——数据可视化 hadoop,zookeeper,kafka,flink要开启 目…

前言

大数据应用开发——实时数据采集

大数据应用开发——实时数据处理

        Flink完成Kafka中的数据消费,将数据分发至Kafka的dwd层中

        并在HBase中进行备份

大数据应用开发——数据可视化

hadoop,zookeeper,kafka,flink要开启

目录

        题目

        Flink完成Kafka中的数据消费,将数据分发至Kafka的dwd层中


题目

按照任务书要求使用Java语言基于Flink完成Kafka中的数据消费,将数据分发至Kafka的dwd层中,并在HBase中进行备份同时建立Hive外表,基于Flink完成相关的数据指标计算并将计算结果存入Redis、ClickHouse中

Flink完成Kafka中的数据消费,将数据分发至Kafka的dwd层中

在IDEA下用maven创建flink项目:

# 用cmd执行,创建在当前目录下
# java版本
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=flink版本号# scala版本
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-scala -DarchetypeVersion=flink版本号

修改pox.xml文件,将flink-connector-kafka_...依赖移出来

 demo包下有两个.java

PS:一个用于批处理,另一个用于流处理

public class StreamingJob {public static void main(String[] args) throws Exception {// set up the streaming execution environmentfinal StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 配置发送的KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("master:9092").setTopics("order").setGroupId("my_group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();// 配置接收的KafkaSink<String> sink = KafkaSink.<String>builder().setBootstrapServers("master:9092").setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("dwd_order").setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliverGuarantee(DeliveryGuarantee.NONE).build();// 指定的源创建一个数据流DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");// 将数据里的'符号去掉DataStream<String> text = stream.map(new MapFunction<String, String>() {@Overridepublic String map(String s) throws Exception {return s.replace("'","");}});// 打印处理结果到控制台text.print();// 发送text.sinkTo(sink);// execute programenv.execute("Flink Streaming Java API Skeleton");}
}

将代码打包成.jar,可以先clean,再package

生成位置在当前项目位置/target/项目名称-...jar

 放进主节点

# /usr/flink/bin/flink run -c 包名.运行class名 放在主节点的位置
/usr/flink/bin/flink run -c demo.StreamingJob /opt/flink-java-1.0-SNAPSHOT.jar

最后,可以用flink控制台或kafka-console-consumer.sh查看 

http://www.hengruixuexiao.com/news/53709.html

相关文章:

  • 湖南网站推seo变现培训
  • 京东第一次做网站百度推广登录平台登录
  • 重庆开网站搜索引擎营销优化
  • 开发定制电商平台广州关于进一步优化疫情防控措施
  • 沧州网站建设选网龙苏州网站建设
  • 网站建设PHP开发是什么意思东莞seo靠谱
  • 做普通网站选择什么服务器如何实现网站的快速排名
  • 怎么做盗版视频网站线上营销的方式
  • 百度收录新网站如何在百度推广自己的产品
  • 博客网站开发报告世界十大网站排名出炉
  • 做网站用js的好处会员卡营销策划方案
  • 河源做网站的客户seo优化有百度系和什么
  • 天津建设网站的公司seo 工具分析
  • 网站版心怎么做seo少女
  • 长沙营销型网站建设公司腾讯企点注册
  • 微信开放平台注销温州seo品牌优化软件
  • 免费的毕业论文网站无锡哪里有做网站的
  • php高性能网站建设十大营销策略
  • 政府网站建设百度收录最新方法
  • 机械毕业设计代做网站推荐免费网站推广网站不用下载
  • 古典家具公司网站模板seo实战培训学校
  • 域名怎么用到自己做的网站站长工具的网址
  • 如何开发公司的网站百度快照收录
  • 网络营销的成功例子网站seo是什么
  • 怎么做网站卖美瞳互联网金融
  • 厦门设计师网站nba排名最新赛程
  • 网站建设战略伙伴软文推广有哪些平台
  • wordpress 不同侧边栏seo云优化软件破解版
  • 搭建一个网址惠州seo优化
  • 长春市做网站seo外链建设的方法