FLINK SQL view相关问题:
create view order_source as select order_id, order_goods_id, user_id,... from ( ...... proctime,row_number() over(partition by order_id, order_goods_id order by proctime desc) as rownum from hive.temp_dw.dm_trd_order_goods/*+ OPTIONS('properties.group.id'='flink_etl_kafka_hbase', 'scan.startup.mode'='latest-offset') */ ) where rownum = 1 and price > 0; insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN as BIGINT),) from ( select order_date as rowkey, sum(amount) as saleN, from order_source group by order_date ); insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN as BIGINT)) from ( select order_hour as rowkey, sum(amount) as saleN, from order_source group by order_hour ); 问题:同一个view,相同的消费group,不同的sink,产生 2个job。 这样的话,相当于2个job公用一个consumer group。 最后生成的job是 : a. order_source -> sink 1 b. order_source -> sink 2 本意是想通过view order_source (view里需要对订单数据去重)复用同一份source全量数据,对应底层可以复用同一份state数据 ,如何做到 ? |
blink planner支持将多sink的query优化成尽量复用重复计算部分。
1.11里用StatementSet来提交job,1.11之前用sqlUpdate/insertInto + execute提交任务 kandy.wang <[hidden email]> 于2020年8月4日周二 下午5:20写道: > FLINK SQL view相关问题: > create view order_source > > as > > select order_id, order_goods_id, user_id,... > > from ( > > ...... proctime,row_number() over(partition by order_id, > order_goods_id order by proctime desc) as rownum > > from hive.temp_dw.dm_trd_order_goods/*+ OPTIONS('properties.group.id'='flink_etl_kafka_hbase', > 'scan.startup.mode'='latest-offset') */ > > ) where rownum = 1 and price > 0; > > > > > insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN as > BIGINT),) > > from > > ( > > select order_date as rowkey, > > sum(amount) as saleN, > > from order_source > > group by order_date > > ); > > > > > insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN as > BIGINT)) > > from > > ( > > select order_hour as rowkey, sum(amount) as saleN, > > > > from order_source > > group by order_hour > > ); > 问题:同一个view,相同的消费group,不同的sink,产生 2个job。 这样的话,相当于2个job公用一个consumer group。 > 最后生成的job是 : a. order_source -> sink 1 b. order_source -> sink > 2 > > > 本意是想通过view order_source > (view里需要对订单数据去重)复用同一份source全量数据,对应底层可以复用同一份state数据 ,如何做到 ? > > |
@ godfrey thanks。刚试了一下,source -> Deduplicate -> GlobalGroupAggregate,在souce端确实是复用了。但是Deduplicate 端是没有复用呢?理论上source + Deduplicate 都是view里的逻辑,都应该复用才对。就是感觉复用的还不够多呢。 在 2020-08-04 17:26:02,"godfrey he" <[hidden email]> 写道: >blink planner支持将多sink的query优化成尽量复用重复计算部分。 >1.11里用StatementSet来提交job,1.11之前用sqlUpdate/insertInto + execute提交任务 > >kandy.wang <[hidden email]> 于2020年8月4日周二 下午5:20写道: > >> FLINK SQL view相关问题: >> create view order_source >> >> as >> >> select order_id, order_goods_id, user_id,... >> >> from ( >> >> ...... proctime,row_number() over(partition by order_id, >> order_goods_id order by proctime desc) as rownum >> >> from hive.temp_dw.dm_trd_order_goods/*+ OPTIONS('properties.group.id'='flink_etl_kafka_hbase', >> 'scan.startup.mode'='latest-offset') */ >> >> ) where rownum = 1 and price > 0; >> >> >> >> >> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN as >> BIGINT),) >> >> from >> >> ( >> >> select order_date as rowkey, >> >> sum(amount) as saleN, >> >> from order_source >> >> group by order_date >> >> ); >> >> >> >> >> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN as >> BIGINT)) >> >> from >> >> ( >> >> select order_hour as rowkey, sum(amount) as saleN, >> >> >> >> from order_source >> >> group by order_hour >> >> ); >> 问题:同一个view,相同的消费group,不同的sink,产生 2个job。 这样的话,相当于2个job公用一个consumer group。 >> 最后生成的job是 : a. order_source -> sink 1 b. order_source -> sink >> 2 >> >> >> 本意是想通过view order_source >> (view里需要对订单数据去重)复用同一份source全量数据,对应底层可以复用同一份state数据 ,如何做到 ? >> >> |
调用 StatementSet#explain() 把结果打出来看看是否因 Deduplicate的digest不一样导致的没法复用
kandy.wang <[hidden email]> 于2020年8月4日周二 下午6:21写道: > > > > > > > > > > > > > > > > @ godfrey > thanks。刚试了一下,source -> Deduplicate -> > GlobalGroupAggregate,在souce端确实是复用了。但是Deduplicate 端是没有复用呢?理论上source + > Deduplicate 都是view里的逻辑,都应该复用才对。就是感觉复用的还不够多呢。 > > > 在 2020-08-04 17:26:02,"godfrey he" <[hidden email]> 写道: > >blink planner支持将多sink的query优化成尽量复用重复计算部分。 > >1.11里用StatementSet来提交job,1.11之前用sqlUpdate/insertInto + execute提交任务 > > > >kandy.wang <[hidden email]> 于2020年8月4日周二 下午5:20写道: > > > >> FLINK SQL view相关问题: > >> create view order_source > >> > >> as > >> > >> select order_id, order_goods_id, user_id,... > >> > >> from ( > >> > >> ...... proctime,row_number() over(partition by order_id, > >> order_goods_id order by proctime desc) as rownum > >> > >> from hive.temp_dw.dm_trd_order_goods/*+ OPTIONS(' > properties.group.id'='flink_etl_kafka_hbase', > >> 'scan.startup.mode'='latest-offset') */ > >> > >> ) where rownum = 1 and price > 0; > >> > >> > >> > >> > >> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN > as > >> BIGINT),) > >> > >> from > >> > >> ( > >> > >> select order_date as rowkey, > >> > >> sum(amount) as saleN, > >> > >> from order_source > >> > >> group by order_date > >> > >> ); > >> > >> > >> > >> > >> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN > as > >> BIGINT)) > >> > >> from > >> > >> ( > >> > >> select order_hour as rowkey, sum(amount) as saleN, > >> > >> > >> > >> from order_source > >> > >> group by order_hour > >> > >> ); > >> 问题:同一个view,相同的消费group,不同的sink,产生 2个job。 这样的话,相当于2个job公用一个consumer group。 > >> 最后生成的job是 : a. order_source -> sink 1 b. order_source -> sink > >> 2 > >> > >> > >> 本意是想通过view order_source > >> (view里需要对订单数据去重)复用同一份source全量数据,对应底层可以复用同一份state数据 ,如何做到 ? > >> > >> > |
@ godfrey 你说的这种StatementSet 提交方式,在sql-client提交任务的时候不支持吧? 可以给加上么。 在 2020-08-04 19:36:56,"godfrey he" <[hidden email]> 写道: >调用 StatementSet#explain() 把结果打出来看看是否因 Deduplicate的digest不一样导致的没法复用 > >kandy.wang <[hidden email]> 于2020年8月4日周二 下午6:21写道: > >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> @ godfrey >> thanks。刚试了一下,source -> Deduplicate -> >> GlobalGroupAggregate,在souce端确实是复用了。但是Deduplicate 端是没有复用呢?理论上source + >> Deduplicate 都是view里的逻辑,都应该复用才对。就是感觉复用的还不够多呢。 >> >> >> 在 2020-08-04 17:26:02,"godfrey he" <[hidden email]> 写道: >> >blink planner支持将多sink的query优化成尽量复用重复计算部分。 >> >1.11里用StatementSet来提交job,1.11之前用sqlUpdate/insertInto + execute提交任务 >> > >> >kandy.wang <[hidden email]> 于2020年8月4日周二 下午5:20写道: >> > >> >> FLINK SQL view相关问题: >> >> create view order_source >> >> >> >> as >> >> >> >> select order_id, order_goods_id, user_id,... >> >> >> >> from ( >> >> >> >> ...... proctime,row_number() over(partition by order_id, >> >> order_goods_id order by proctime desc) as rownum >> >> >> >> from hive.temp_dw.dm_trd_order_goods/*+ OPTIONS(' >> properties.group.id'='flink_etl_kafka_hbase', >> >> 'scan.startup.mode'='latest-offset') */ >> >> >> >> ) where rownum = 1 and price > 0; >> >> >> >> >> >> >> >> >> >> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN >> as >> >> BIGINT),) >> >> >> >> from >> >> >> >> ( >> >> >> >> select order_date as rowkey, >> >> >> >> sum(amount) as saleN, >> >> >> >> from order_source >> >> >> >> group by order_date >> >> >> >> ); >> >> >> >> >> >> >> >> >> >> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN >> as >> >> BIGINT)) >> >> >> >> from >> >> >> >> ( >> >> >> >> select order_hour as rowkey, sum(amount) as saleN, >> >> >> >> >> >> >> >> from order_source >> >> >> >> group by order_hour >> >> >> >> ); >> >> 问题:同一个view,相同的消费group,不同的sink,产生 2个job。 这样的话,相当于2个job公用一个consumer group。 >> >> 最后生成的job是 : a. order_source -> sink 1 b. order_source -> sink >> >> 2 >> >> >> >> >> >> 本意是想通过view order_source >> >> (view里需要对订单数据去重)复用同一份source全量数据,对应底层可以复用同一份state数据 ,如何做到 ? >> >> >> >> >> |
目前sql-client还不支持。关于纯SQL文本statement set的支持,
目前社区已经达成语法的一致意见,应该后续会慢慢的支持。 kandy.wang <[hidden email]> 于2020年8月5日周三 下午10:43写道: > > > > > > > @ godfrey > 你说的这种StatementSet 提交方式,在sql-client提交任务的时候不支持吧? 可以给加上么。 > > > > > > > > > > > > 在 2020-08-04 19:36:56,"godfrey he" <[hidden email]> 写道: > >调用 StatementSet#explain() 把结果打出来看看是否因 Deduplicate的digest不一样导致的没法复用 > > > >kandy.wang <[hidden email]> 于2020年8月4日周二 下午6:21写道: > > > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> @ godfrey > >> thanks。刚试了一下,source -> Deduplicate -> > >> GlobalGroupAggregate,在souce端确实是复用了。但是Deduplicate 端是没有复用呢?理论上source + > >> Deduplicate 都是view里的逻辑,都应该复用才对。就是感觉复用的还不够多呢。 > >> > >> > >> 在 2020-08-04 17:26:02,"godfrey he" <[hidden email]> 写道: > >> >blink planner支持将多sink的query优化成尽量复用重复计算部分。 > >> >1.11里用StatementSet来提交job,1.11之前用sqlUpdate/insertInto + execute提交任务 > >> > > >> >kandy.wang <[hidden email]> 于2020年8月4日周二 下午5:20写道: > >> > > >> >> FLINK SQL view相关问题: > >> >> create view order_source > >> >> > >> >> as > >> >> > >> >> select order_id, order_goods_id, user_id,... > >> >> > >> >> from ( > >> >> > >> >> ...... proctime,row_number() over(partition by order_id, > >> >> order_goods_id order by proctime desc) as rownum > >> >> > >> >> from hive.temp_dw.dm_trd_order_goods/*+ OPTIONS(' > >> properties.group.id'='flink_etl_kafka_hbase', > >> >> 'scan.startup.mode'='latest-offset') */ > >> >> > >> >> ) where rownum = 1 and price > 0; > >> >> > >> >> > >> >> > >> >> > >> >> insert into hive.temp_dw.day_order_index select rowkey, > ROW(cast(saleN > >> as > >> >> BIGINT),) > >> >> > >> >> from > >> >> > >> >> ( > >> >> > >> >> select order_date as rowkey, > >> >> > >> >> sum(amount) as saleN, > >> >> > >> >> from order_source > >> >> > >> >> group by order_date > >> >> > >> >> ); > >> >> > >> >> > >> >> > >> >> > >> >> insert into hive.temp_dw.day_order_index select rowkey, > ROW(cast(saleN > >> as > >> >> BIGINT)) > >> >> > >> >> from > >> >> > >> >> ( > >> >> > >> >> select order_hour as rowkey, sum(amount) as saleN, > >> >> > >> >> > >> >> > >> >> from order_source > >> >> > >> >> group by order_hour > >> >> > >> >> ); > >> >> 问题:同一个view,相同的消费group,不同的sink,产生 2个job。 这样的话,相当于2个job公用一个consumer > group。 > >> >> 最后生成的job是 : a. order_source -> sink 1 b. order_source -> > sink > >> >> 2 > >> >> > >> >> > >> >> 本意是想通过view order_source > >> >> (view里需要对订单数据去重)复用同一份source全量数据,对应底层可以复用同一份state数据 ,如何做到 ? > >> >> > >> >> > >> > |
Free forum by Nabble | Edit this page |