在用flink sql 1.12 的时候开启了minibatch,如下
val config = tConfig.getConfiguration() config.setString("table.exec.mini-batch.enabled", "true") // mini-batch is enabled config.setString("table.exec.mini-batch.allow-latency", "true") config.setString("table.exec.mini-batch.size", 100) config.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE") // enable two-phase, i.e. local-global aggregation config.setString("table.optimizer.distinct-agg.split.enabled", "true") //not support user defined AggregateFunctionsql是tableEnv.executeSql( s"""insert into event_test |select | date_format(create_time,'yyyyMMdd') dt, | uid, | count(distinct fid) text_feed_count, | max(event_time) event_time | from basic_fd_base where ftype <>'0' and uid is not null | group by | date_format(create_time,'yyyyMMdd'), | uid """.stripMargin).print()event_test 是个print connectorbasic_fd_base 是个kafka connector设置了从指定timestamp消费,打印的结果里面eventtime字段会有很多null值如下,已经核查数据源这个字段不存在null值2> +I(20210427,747873111868904192,1,2021-04-27T14:03:10) 2> +I(20210427,709531067945685120,1,null) 2> +I(20210427,759213633292150016,1,2021-04-27T13:59:01.923) 2> +I(20210427,758340406550406272,4,2021-04-27T14:02:14.553) 2> +I(20210427,759658063329437312,1,2021-04-27T14:02:18.305) 2> +I(20210427,737415823706231680,1,2021-04-27T14:02:11.061) 2> +I(20210427,[hidden email],1,2021-04-27T14:05:37) 2> +I(20210427,759219266892539008,1,null) 2> +I(20210427,758349976605763328,1,2021-04-27T14:02:24.184) 2> -U(20210427,709531067945685120,1,null) 2> +U(20210427,709531067945685120,1,2021-04-27T14:09:27.156) 2> +I(20210427,751664239562922752,1,2021-04-27T14:16:51.133) 2> -U(20210427,759219266892539008,1,null) 2> +U(20210427,759219266892539008,1,2021-04-27T14:12:40.692) 2> +I(20210427,745540385069273984,1,2021-04-27T14:23:34) 2> +I(20210427,745399833011098240,1,2021-04-27T14:20:32.870) 2> +I(20210427,714590484395398016,1,2021-04-27T14:19:06) 2> +I(20210427,747859173236216832,1,2021-04-27T14:28:21.864) 2> +I(20210427,746212052309316608,1,null) 2> +I(20210427,666839205279797376,1,2021-04-27T14:26:36.743) 2> +I(20210427,758334362541565568,3,2021-04-27T14:18:58.396) 2> +I(20210427,758325137706788480,1,2021-04-27T14:01:09.053) 2> +I(20210427,747837209624908800,1,2021-04-27T13:59:44.193) 2> -U(20210427,758388594254750720,1,2021-04-27T14:00:44.212) 2> +U(20210427,758388594254750720,4,2021-04-27T14:14:55) 2> +I(20210427,759466217777079296,1,2021-04-27T14:25:59.019) 2> -U(20210427,762769243539450496,1,2021-04-27T14:04:29) 2> +U(20210427,762769243539450496,2,2021-04-27T14:04:29) 2> +I(20210427,720648040456852096,1,2021-04-27T14:19:38.680) 2> +I(20210427,750144041584368000,1,2021-04-27T14:29:25.621) 2> +I(20210427,713108045701517952,1,null) 然后把minibatch设置去掉,其他配置比如消费起始位置什么都不变,打印出来就没有这种null字段的数据,请问这是什么情况? |
Free forum by Nabble | Edit this page |