以下程序运行,控制台一直没有数据输出1. 程序package kafka;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class DataGenTest { public static void main(String[] args) { StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); String sourceTableDDL = "CREATE TABLE datagen ( " + " f_random INT, " + " f_random_str STRING, " + " ts AS localtimestamp, " + " WATERMARK FOR ts AS ts " + ") WITH ( " + " 'connector' = 'datagen', " + " 'rows-per-second'='20', " + " 'fields.f_random.min'='1', " + " 'fields.f_random.max'='10', " + " 'fields.f_random_str.length'='10' " + ")"; bsTableEnv.executeSql(sourceTableDDL); bsTableEnv.executeSql("SELECT f_random, count(1) " + "FROM datagen " + "GROUP BY TUMBLE(ts, INTERVAL '1' second), f_random").print(); } }2. 控制台,log4j:WARN No appenders could be found for logger (org.apache.flink.table.module.ModuleManager). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. +-------------+----------------------+ | f_random | EXPR$1 | +-------------+----------------------+ |
Hi Asahi Lee:
我在 master 上的 flink-sql-client 模块中建了一个类,复制你的代码控制台是有输出的,你使用的版本是什么的? Best, Hailong Wang 在 2020-07-29 15:35:30,"Asahi Lee" <[hidden email]> 写道: >以下程序运行,控制台一直没有数据输出1. 程序package kafka; > >import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >import org.apache.flink.table.api.EnvironmentSettings; >import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > >public class DataGenTest { > > public static void main(String[] args) { > > StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); > EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); > > String sourceTableDDL = "CREATE TABLE datagen ( " + > " f_random INT, " + > " f_random_str STRING, " + > " ts AS localtimestamp, " + > " WATERMARK FOR ts AS ts " + > ") WITH ( " + > " 'connector' = 'datagen', " + > " 'rows-per-second'='20', " + > " 'fields.f_random.min'='1', " + > " 'fields.f_random.max'='10', " + > " 'fields.f_random_str.length'='10' " + > ")"; > > bsTableEnv.executeSql(sourceTableDDL); > > bsTableEnv.executeSql("SELECT f_random, count(1) " + > "FROM datagen " + > "GROUP BY TUMBLE(ts, INTERVAL '1' second), f_random").print(); > > } > >}2. 控制台,log4j:WARN No appenders could be found for logger (org.apache.flink.table.module.ModuleManager). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. +-------------+----------------------+ | f_random | EXPR$1 | +-------------+----------------------+ |
In reply to this post by Asahi Lee
Hi
> > bsTableEnv.executeSql("SELECT f_random, count(1) " + > "FROM datagen " + > "GROUP BY TUMBLE(ts, INTERVAL '1' second), f_random").print(); TableResult.print() 方法目前只支持了 exactly-once 语义,在 streaming 模式下必须设置checkpoint才能work, 你配置下checkpoint之后再试下,支持 At Least Once 的方法在1.12里应该会支持,支持后可以不用设置 checkpoint。 祝好 Leonard |
Free forum by Nabble | Edit this page |