EnvironmentSettings sett =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment streamTableEnv =
StreamTableEnvironment.create(streamEnv, sett);
//当程序全部使用ddl方式执行的时候
streamTableEnv.executeSql(DDLSourceSQLManager.createTableUseDDLFromKafka();
streamTableEnv.executeSql(com.ddlsql.DDLSourceSQLManager.createPrintlnRetractSinkTbl("printTable"));
streamTableEnv.executeSql("insert into printTable select msg,count(*) as cnt
from test group by msg");
// 以下两个都报错
streamExecutionEnvironment.execute("");
streamTableEnv.execute("");
// ----------------------------------------------
//但是当我创建test表的方式改为就可以运行
Table a = tableEnv.fromDataStream(source, "topic,offset,ts,date,msg")
tableEnv.createTemporaryView("test", a);
streamTableEnv.executeSql(com.ddlsql.DDLSourceSQLManager.createPrintlnRetractSinkTbl("printTable"));
streamTableEnv.executeSql("insert into printTable select msg,count(*) as cnt
from test group by msg");
streamExecutionEnvironment.execute("");
// 但是当我 以下两个同时存在也可以正常运行。
Table a = tableEnv.fromDataStream(source, "topic,offset,ts,date,msg")
streamTableEnvironment.createTemporaryView("tablename随便取", a);
streamTableEnvironment.executeSql(DDLSourceSQLManager.createStreamFromKafka();
问题:
1:在DDL的程序中,必须要有DataStream才能执行?
2:streamTableEnv.executeSql() 能立马执行,不需要 streamTableEnv.execute(""),是在
batch场景?我在流场景都是需要 execute
--
Sent from:
http://apache-flink.147419.n8.nabble.com/