flink sql 1.12 minibatch的问题

classic Classic list List threaded Threaded
1 message Options
op
Reply | Threaded
Open this post in threaded view
|

flink sql 1.12 minibatch的问题

op
    在用flink sql 1.12 的时候开启了minibatch,如下
val config = tConfig.getConfiguration()
config.setString("table.exec.mini-batch.enabled", "true") //  mini-batch is enabled
config.setString("table.exec.mini-batch.allow-latency", "true")
config.setString("table.exec.mini-batch.size", 100)
config.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE") // enable two-phase, i.e. local-global aggregation
config.setString("table.optimizer.distinct-agg.split.enabled", "true") //not support user defined AggregateFunctionsql是tableEnv.executeSql(
  s"""insert into event_test
     |select
     |      date_format(create_time,'yyyyMMdd') dt,
     |      uid,
     |      count(distinct fid) text_feed_count,
     |      max(event_time) event_time
     |    from basic_fd_base where ftype <&gt;'0' and uid is not null
     |      group by
     |       date_format(create_time,'yyyyMMdd'),
     |       uid
""".stripMargin).print()event_test 是个print connectorbasic_fd_base 是个kafka connector设置了从指定timestamp消费,打印的结果里面eventtime字段会有很多null值如下,已经核查数据源这个字段不存在null值2&gt; +I(20210427,747873111868904192,1,2021-04-27T14:03:10)
2&gt; +I(20210427,709531067945685120,1,null)
2&gt; +I(20210427,759213633292150016,1,2021-04-27T13:59:01.923)
2&gt; +I(20210427,758340406550406272,4,2021-04-27T14:02:14.553)
2&gt; +I(20210427,759658063329437312,1,2021-04-27T14:02:18.305)
2&gt; +I(20210427,737415823706231680,1,2021-04-27T14:02:11.061)
2&gt; +I(20210427,[hidden email],1,2021-04-27T14:05:37)
2&gt; +I(20210427,759219266892539008,1,null)
2&gt; +I(20210427,758349976605763328,1,2021-04-27T14:02:24.184)
2&gt; -U(20210427,709531067945685120,1,null)
2&gt; +U(20210427,709531067945685120,1,2021-04-27T14:09:27.156)
2&gt; +I(20210427,751664239562922752,1,2021-04-27T14:16:51.133)
2&gt; -U(20210427,759219266892539008,1,null)
2&gt; +U(20210427,759219266892539008,1,2021-04-27T14:12:40.692)
2&gt; +I(20210427,745540385069273984,1,2021-04-27T14:23:34)
2&gt; +I(20210427,745399833011098240,1,2021-04-27T14:20:32.870)
2&gt; +I(20210427,714590484395398016,1,2021-04-27T14:19:06)
2&gt; +I(20210427,747859173236216832,1,2021-04-27T14:28:21.864)
2&gt; +I(20210427,746212052309316608,1,null)
2&gt; +I(20210427,666839205279797376,1,2021-04-27T14:26:36.743)
2&gt; +I(20210427,758334362541565568,3,2021-04-27T14:18:58.396)
2&gt; +I(20210427,758325137706788480,1,2021-04-27T14:01:09.053)
2&gt; +I(20210427,747837209624908800,1,2021-04-27T13:59:44.193)
2&gt; -U(20210427,758388594254750720,1,2021-04-27T14:00:44.212)
2&gt; +U(20210427,758388594254750720,4,2021-04-27T14:14:55)
2&gt; +I(20210427,759466217777079296,1,2021-04-27T14:25:59.019)
2&gt; -U(20210427,762769243539450496,1,2021-04-27T14:04:29)
2&gt; +U(20210427,762769243539450496,2,2021-04-27T14:04:29)
2&gt; +I(20210427,720648040456852096,1,2021-04-27T14:19:38.680)
2&gt; +I(20210427,750144041584368000,1,2021-04-27T14:29:25.621)
2&gt; +I(20210427,713108045701517952,1,null)
然后把minibatch设置去掉,其他配置比如消费起始位置什么都不变,打印出来就没有这种null字段的数据,请问这是什么情况?