flink 1.11 executeSql查询kafka表print没有输出

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

flink 1.11 executeSql查询kafka表print没有输出

junbaozhang
Hi, all:
      本人当前使用flink版本1.11.0,但是在执行executeSql后,print时没有在console打印出结果(查看kafka是一直有数据产生的), sql如下:


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

Catalog catalog = new HiveCatalog("x", "default", "D:\\conf", "1.1.0");
tEnv.registerCatalog("x", catalog);

TableResult execute = tEnv.executeSql("select * from x.ods.ods_binlog_test_trip_create_t_order_1");

execute.print();

建表语句如下:

CREATE TABLE x.ods.ods_binlog_test_trip_create_t_order_1 (
  _type STRING,
  order_no STRING,
  order_time STRING,
  dt as TO_TIMESTAMP(order_time),
  proctime as PROCTIME(),
  WATERMARK FOR dt AS dt - INTERVAL '5' SECOND
) WITH (
  'connector.type' = 'kafka',
  'connector.properties.bootstrap.servers' = '***',
  'connector.properties.zookeeper.connect' = '****',
  'connector.version' = 'universal',
  'format.type' = 'json',
  'connector.properties.group.id' = 'testGroup',
  'connector.startup-mode' = 'group-offsets',
  'connector.topic' = 'test'
)
junbaozhang
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 executeSql查询kafka表print没有输出

godfrey he
1.11的 TableResult.collect() 和 TableResult.print() 方法在流模式下,
都是exactly once语义,需要配置checkpoint才能得到结果。

Best,
Godfrey

[hidden email] <[hidden email]> 于2020年7月23日周四 下午7:22写道:

> Hi, all:
>
> 本人当前使用flink版本1.11.0,但是在执行executeSql后,print时没有在console打印出结果(查看kafka是一直有数据产生的),
> sql如下:
>
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings settings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
>
> Catalog catalog = new HiveCatalog("x", "default", "D:\\conf", "1.1.0");
> tEnv.registerCatalog("x", catalog);
>
> TableResult execute = tEnv.executeSql("select * from
> x.ods.ods_binlog_test_trip_create_t_order_1");
>
> execute.print();
>
> 建表语句如下:
>
> CREATE TABLE x.ods.ods_binlog_test_trip_create_t_order_1 (
>   _type STRING,
>   order_no STRING,
>   order_time STRING,
>   dt as TO_TIMESTAMP(order_time),
>   proctime as PROCTIME(),
>   WATERMARK FOR dt AS dt - INTERVAL '5' SECOND
> ) WITH (
>   'connector.type' = 'kafka',
>   'connector.properties.bootstrap.servers' = '***',
>   'connector.properties.zookeeper.connect' = '****',
>   'connector.version' = 'universal',
>   'format.type' = 'json',
>   'connector.properties.group.id' = 'testGroup',
>   'connector.startup-mode' = 'group-offsets',
>   'connector.topic' = 'test'
> )
>
Reply | Threaded
Open this post in threaded view
|

回复: flink 1.11 executeSql查询kafka表print没有输出

junbaozhang
Hi,Godfrey:
     加了checkpoint后确实可以了,能具体讲一下原理吗?print是在完成快照的时候顺便把结果输出了吗?或者有没有相关文档?

Best,
Junbao Zhang
________________________________
发件人: godfrey he <[hidden email]>
发送时间: 2020年7月23日 19:24
收件人: user-zh <[hidden email]>
主题: Re: flink 1.11 executeSql查询kafka表print没有输出

1.11的 TableResult.collect() 和 TableResult.print() 方法在流模式下,
都是exactly once语义,需要配置checkpoint才能得到结果。

Best,
Godfrey

[hidden email] <[hidden email]> 于2020年7月23日周四 下午7:22写道:

> Hi, all:
>
> 本人当前使用flink版本1.11.0,但是在执行executeSql后,print时没有在console打印出结果(查看kafka是一直有数据产生的),
> sql如下:
>
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings settings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
>
> Catalog catalog = new HiveCatalog("x", "default", "D:\\conf", "1.1.0");
> tEnv.registerCatalog("x", catalog);
>
> TableResult execute = tEnv.executeSql("select * from
> x.ods.ods_binlog_test_trip_create_t_order_1");
>
> execute.print();
>
> 建表语句如下:
>
> CREATE TABLE x.ods.ods_binlog_test_trip_create_t_order_1 (
>   _type STRING,
>   order_no STRING,
>   order_time STRING,
>   dt as TO_TIMESTAMP(order_time),
>   proctime as PROCTIME(),
>   WATERMARK FOR dt AS dt - INTERVAL '5' SECOND
> ) WITH (
>   'connector.type' = 'kafka',
>   'connector.properties.bootstrap.servers' = '***',
>   'connector.properties.zookeeper.connect' = '****',
>   'connector.version' = 'universal',
>   'format.type' = 'json',
>   'connector.properties.group.id' = 'testGroup',
>   'connector.startup-mode' = 'group-offsets',
>   'connector.topic' = 'test'
> )
>
junbaozhang
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 executeSql查询kafka表print没有输出

godfrey he
client端会不断的pull sink产生的数据,但是只有等checkpoint完成后,其对应的数据才能 collect() 和 print()
返回。
这是为了保证exactly once语义。
在1.12里,同时支持了at least once 和 exactly once 语义。默认情况下是 at least once,collect()
和 print() 的结果可能有重复。
如果有兴趣可以参考pr:https://github.com/apache/flink/pull/12867
<https://github.com/apache/flink/pull/12867#event-3578490750>

Best,
Godfrey

[hidden email] <[hidden email]> 于2020年7月23日周四 下午7:34写道:

> Hi,Godfrey:
>      加了checkpoint后确实可以了,能具体讲一下原理吗?print是在完成快照的时候顺便把结果输出了吗?或者有没有相关文档?
>
> Best,
> Junbao Zhang
> ________________________________
> 发件人: godfrey he <[hidden email]>
> 发送时间: 2020年7月23日 19:24
> 收件人: user-zh <[hidden email]>
> 主题: Re: flink 1.11 executeSql查询kafka表print没有输出
>
> 1.11的 TableResult.collect() 和 TableResult.print() 方法在流模式下,
> 都是exactly once语义,需要配置checkpoint才能得到结果。
>
> Best,
> Godfrey
>
> [hidden email] <[hidden email]> 于2020年7月23日周四
> 下午7:22写道:
>
> > Hi, all:
> >
> >
> 本人当前使用flink版本1.11.0,但是在执行executeSql后,print时没有在console打印出结果(查看kafka是一直有数据产生的),
> > sql如下:
> >
> >
> > StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> > EnvironmentSettings settings =
> >
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
> settings);
> >
> > Catalog catalog = new HiveCatalog("x", "default", "D:\\conf", "1.1.0");
> > tEnv.registerCatalog("x", catalog);
> >
> > TableResult execute = tEnv.executeSql("select * from
> > x.ods.ods_binlog_test_trip_create_t_order_1");
> >
> > execute.print();
> >
> > 建表语句如下:
> >
> > CREATE TABLE x.ods.ods_binlog_test_trip_create_t_order_1 (
> >   _type STRING,
> >   order_no STRING,
> >   order_time STRING,
> >   dt as TO_TIMESTAMP(order_time),
> >   proctime as PROCTIME(),
> >   WATERMARK FOR dt AS dt - INTERVAL '5' SECOND
> > ) WITH (
> >   'connector.type' = 'kafka',
> >   'connector.properties.bootstrap.servers' = '***',
> >   'connector.properties.zookeeper.connect' = '****',
> >   'connector.version' = 'universal',
> >   'format.type' = 'json',
> >   'connector.properties.group.id' = 'testGroup',
> >   'connector.startup-mode' = 'group-offsets',
> >   'connector.topic' = 'test'
> > )
> >
>