Hi, all:
本人当前使用flink版本1.11.0,但是在执行executeSql后,print时没有在console打印出结果(查看kafka是一直有数据产生的), sql如下: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); Catalog catalog = new HiveCatalog("x", "default", "D:\\conf", "1.1.0"); tEnv.registerCatalog("x", catalog); TableResult execute = tEnv.executeSql("select * from x.ods.ods_binlog_test_trip_create_t_order_1"); execute.print(); 建表语句如下: CREATE TABLE x.ods.ods_binlog_test_trip_create_t_order_1 ( _type STRING, order_no STRING, order_time STRING, dt as TO_TIMESTAMP(order_time), proctime as PROCTIME(), WATERMARK FOR dt AS dt - INTERVAL '5' SECOND ) WITH ( 'connector.type' = 'kafka', 'connector.properties.bootstrap.servers' = '***', 'connector.properties.zookeeper.connect' = '****', 'connector.version' = 'universal', 'format.type' = 'json', 'connector.properties.group.id' = 'testGroup', 'connector.startup-mode' = 'group-offsets', 'connector.topic' = 'test' )
junbaozhang
|
1.11的 TableResult.collect() 和 TableResult.print() 方法在流模式下,
都是exactly once语义,需要配置checkpoint才能得到结果。 Best, Godfrey [hidden email] <[hidden email]> 于2020年7月23日周四 下午7:22写道: > Hi, all: > > 本人当前使用flink版本1.11.0,但是在执行executeSql后,print时没有在console打印出结果(查看kafka是一直有数据产生的), > sql如下: > > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > EnvironmentSettings settings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); > > Catalog catalog = new HiveCatalog("x", "default", "D:\\conf", "1.1.0"); > tEnv.registerCatalog("x", catalog); > > TableResult execute = tEnv.executeSql("select * from > x.ods.ods_binlog_test_trip_create_t_order_1"); > > execute.print(); > > 建表语句如下: > > CREATE TABLE x.ods.ods_binlog_test_trip_create_t_order_1 ( > _type STRING, > order_no STRING, > order_time STRING, > dt as TO_TIMESTAMP(order_time), > proctime as PROCTIME(), > WATERMARK FOR dt AS dt - INTERVAL '5' SECOND > ) WITH ( > 'connector.type' = 'kafka', > 'connector.properties.bootstrap.servers' = '***', > 'connector.properties.zookeeper.connect' = '****', > 'connector.version' = 'universal', > 'format.type' = 'json', > 'connector.properties.group.id' = 'testGroup', > 'connector.startup-mode' = 'group-offsets', > 'connector.topic' = 'test' > ) > |
Hi,Godfrey:
加了checkpoint后确实可以了,能具体讲一下原理吗?print是在完成快照的时候顺便把结果输出了吗?或者有没有相关文档? Best, Junbao Zhang ________________________________ 发件人: godfrey he <[hidden email]> 发送时间: 2020年7月23日 19:24 收件人: user-zh <[hidden email]> 主题: Re: flink 1.11 executeSql查询kafka表print没有输出 1.11的 TableResult.collect() 和 TableResult.print() 方法在流模式下, 都是exactly once语义,需要配置checkpoint才能得到结果。 Best, Godfrey [hidden email] <[hidden email]> 于2020年7月23日周四 下午7:22写道: > Hi, all: > > 本人当前使用flink版本1.11.0,但是在执行executeSql后,print时没有在console打印出结果(查看kafka是一直有数据产生的), > sql如下: > > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > EnvironmentSettings settings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); > > Catalog catalog = new HiveCatalog("x", "default", "D:\\conf", "1.1.0"); > tEnv.registerCatalog("x", catalog); > > TableResult execute = tEnv.executeSql("select * from > x.ods.ods_binlog_test_trip_create_t_order_1"); > > execute.print(); > > 建表语句如下: > > CREATE TABLE x.ods.ods_binlog_test_trip_create_t_order_1 ( > _type STRING, > order_no STRING, > order_time STRING, > dt as TO_TIMESTAMP(order_time), > proctime as PROCTIME(), > WATERMARK FOR dt AS dt - INTERVAL '5' SECOND > ) WITH ( > 'connector.type' = 'kafka', > 'connector.properties.bootstrap.servers' = '***', > 'connector.properties.zookeeper.connect' = '****', > 'connector.version' = 'universal', > 'format.type' = 'json', > 'connector.properties.group.id' = 'testGroup', > 'connector.startup-mode' = 'group-offsets', > 'connector.topic' = 'test' > ) >
junbaozhang
|
client端会不断的pull sink产生的数据,但是只有等checkpoint完成后,其对应的数据才能 collect() 和 print()
返回。 这是为了保证exactly once语义。 在1.12里,同时支持了at least once 和 exactly once 语义。默认情况下是 at least once,collect() 和 print() 的结果可能有重复。 如果有兴趣可以参考pr:https://github.com/apache/flink/pull/12867 <https://github.com/apache/flink/pull/12867#event-3578490750> Best, Godfrey [hidden email] <[hidden email]> 于2020年7月23日周四 下午7:34写道: > Hi,Godfrey: > 加了checkpoint后确实可以了,能具体讲一下原理吗?print是在完成快照的时候顺便把结果输出了吗?或者有没有相关文档? > > Best, > Junbao Zhang > ________________________________ > 发件人: godfrey he <[hidden email]> > 发送时间: 2020年7月23日 19:24 > 收件人: user-zh <[hidden email]> > 主题: Re: flink 1.11 executeSql查询kafka表print没有输出 > > 1.11的 TableResult.collect() 和 TableResult.print() 方法在流模式下, > 都是exactly once语义,需要配置checkpoint才能得到结果。 > > Best, > Godfrey > > [hidden email] <[hidden email]> 于2020年7月23日周四 > 下午7:22写道: > > > Hi, all: > > > > > 本人当前使用flink版本1.11.0,但是在执行executeSql后,print时没有在console打印出结果(查看kafka是一直有数据产生的), > > sql如下: > > > > > > StreamExecutionEnvironment env = > > StreamExecutionEnvironment.getExecutionEnvironment(); > > EnvironmentSettings settings = > > > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, > settings); > > > > Catalog catalog = new HiveCatalog("x", "default", "D:\\conf", "1.1.0"); > > tEnv.registerCatalog("x", catalog); > > > > TableResult execute = tEnv.executeSql("select * from > > x.ods.ods_binlog_test_trip_create_t_order_1"); > > > > execute.print(); > > > > 建表语句如下: > > > > CREATE TABLE x.ods.ods_binlog_test_trip_create_t_order_1 ( > > _type STRING, > > order_no STRING, > > order_time STRING, > > dt as TO_TIMESTAMP(order_time), > > proctime as PROCTIME(), > > WATERMARK FOR dt AS dt - INTERVAL '5' SECOND > > ) WITH ( > > 'connector.type' = 'kafka', > > 'connector.properties.bootstrap.servers' = '***', > > 'connector.properties.zookeeper.connect' = '****', > > 'connector.version' = 'universal', > > 'format.type' = 'json', > > 'connector.properties.group.id' = 'testGroup', > > 'connector.startup-mode' = 'group-offsets', > > 'connector.topic' = 'test' > > ) > > > |
Free forum by Nabble | Edit this page |