当前位置: 首页 > news >正文

温州外贸网站设计乐天seo培训中心

温州外贸网站设计,乐天seo培训中心,所有的网站都要用htmlu做吗,网站组织结构图背景 在某个场景中,需要从Kafka中获取数据,经过转换处理后,需要同时sink到多个输出源中(kafka、mysql、hologres)等。两次调用execute, 阿里云Flink vvr引擎报错: public static void main(String[] args) {final StreamExecuti…

背景

在某个场景中,需要从Kafka中获取数据,经过转换处理后,需要同时sink到多个输出源中(kafka、mysql、hologres)等。两次调用execute, 阿里云Flink vvr引擎报错:

public static void main(String[] args) {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);StreamStatementSet streamStatementSet = tEnv.createStatementSet();String s = LocalDateTimeUtils.getDateTime(System.currentTimeMillis());DataStream<String> dataStream = env.fromElements(s, LocalDateTimeUtils.getDateTime(System.currentTimeMillis()));tEnv.executeSql(KAFKA_TABLE_SQL);tEnv.executeSql(KAFKA_TABLE_SQL_1);Table table = tEnv.fromDataStream(dataStream);table.insertInto("kafka_sink").execute();table.insertInto("kafka_sink_1").execute();streamStatementSet.execute();}
Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more than one execute() or executeAsync() call in a single environment.at org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:199) ~[flink-dist-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:187) ~[flink-dist-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]at org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:110) ~[?:?]at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:877) ~[flink-table-api-java-uber-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:756) ~[flink-table-api-java-uber-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:955) ~[flink-table-api-java-uber-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]at org.apache.flink.table.api.internal.TablePipelineImpl.execute(TablePipelineImpl.java:57) ~[flink-table-api-java-uber-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]

解决

使用 StreamStatementSet. 具体参考官网:
https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/data_stream_api/#converting-between-datastream-and-table

改良后的代码:

public static void main(String[] args) {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);StreamStatementSet streamStatementSet = tEnv.createStatementSet();String s = LocalDateTimeUtils.getDateTime(System.currentTimeMillis());DataStream<String> dataStream = env.fromElements(s, LocalDateTimeUtils.getDateTime(System.currentTimeMillis()));tEnv.executeSql(KAFKA_TABLE_SQL);tEnv.executeSql(KAFKA_TABLE_SQL_1);Table table = tEnv.fromDataStream(dataStream);streamStatementSet.addInsert("kafka_sink", table);streamStatementSet.addInsert("kafka_sink_1", table);streamStatementSet.execute();}
http://www.hengruixuexiao.com/news/55321.html

相关文章:

  • 美容美发化妆品培训企业网站源码带后台php织梦dede5.7百度账号客服24小时人工电话
  • 成都网站制作电话做公司网页
  • 做方案的网站二次感染即将大爆发
  • 桥下网站制作哪家好市场营销方案怎么写
  • 怎么自己做网站模板网站在线生成app
  • 上海知名网站设计东莞疫情最新情况
  • 重庆建设工程信息网官网+安全监督+安管人员长沙seo排名公司
  • java做网站开发域名是什么意思呢
  • 广告宣传片seo系统优化
  • 网站数据库模版如何做百度竞价推广
  • 网站开发域名注册百度管理员联系方式
  • wampserver做的网站目前最靠谱的推广平台
  • 拟与 合作建设网站 请予审批南京网站设计
  • 达川区建设局局网站seo关键词推广
  • 菏泽建设集团网站优化排名软件哪些最好
  • 做电子的外单网站有哪些的网络营销推广8种方法
  • 招聘网站销售怎么做推广页面制作
  • wordpress wp_list_comments天津seo托管
  • 比较好的微网站开发平台百度网站怎么申请注册
  • 石龙做网站网站开发培训
  • 郑州网站改版公司友情链接多少钱一个
  • 用网站做宣传的费用收录网站查询
  • 长沙专门做网站公司站外推广
  • 上门做指甲哪个网站专业做网站建设的公司
  • 企业解决方案ppt10页麒麟seo外推软件
  • 佛山企业网站建设特色百度seo排名优
  • 什么是营销型网站建设郑州纯手工seo
  • 商务网站建设与维护 ppt贵阳百度快照优化排名
  • wordpress优化图片分离seo网站排名全选
  • 农家乐网站建设多少钱做关键词优化