Flink 1.11 table.executeInsert 程序退出

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink 1.11 table.executeInsert 程序退出

HunterXHunter
当我在使用 StreamTableEnvironment Api的时候;

 Table a = getStreamTable(getKafkaDataStream("test", "localhost:9092",
"latest"),"topic,offset,msg");
  tableEnv.createTemporaryView("test", a);
 
tableEnv.executeSql(DDLSourceSQLManager.createCustomPrintlnRetractSinkTbl("printlnSink_retract"));
  tableEnv.executeSql("insert into printlnSink_retract select
topic,msg,count(*) as ll from test group by topic,msg");

程序直接结束退出,但如果最后加Thread.sleep(10000L) 就可以消费10s钟,如果加
tableEnv.execute("jobname");
报错:No operators defined in streaming topology



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

回复: Flink 1.11 table.executeInsert 程序退出

史 正超
这个是一个已知问题,可以看看这个jira: https://issues.apache.org/jira/browse/FLINK-18545
规避这个问题的话,可以不用执行 tableEnv.execute("jobname"); 直接用 executeSql 就可以了,遇到INSERT语句就能生成job了。
________________________________
发件人: HunterXHunter <[hidden email]>
发送时间: 2020年9月30日 2:32
收件人: [hidden email] <[hidden email]>
主题: Flink 1.11 table.executeInsert 程序退出

当我在使用 StreamTableEnvironment Api的时候;

 Table a = getStreamTable(getKafkaDataStream("test", "localhost:9092",
"latest"),"topic,offset,msg");
  tableEnv.createTemporaryView("test", a);

tableEnv.executeSql(DDLSourceSQLManager.createCustomPrintlnRetractSinkTbl("printlnSink_retract"));
  tableEnv.executeSql("insert into printlnSink_retract select
topic,msg,count(*) as ll from test group by topic,msg");

程序直接结束退出,但如果最后加Thread.sleep(10000L) 就可以消费10s钟,如果加
tableEnv.execute("jobname");
报错:No operators defined in streaming topology



--
Sent from: http://apache-flink.147419.n8.nabble.com/