FLINK SQL view的数据复用问题

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

FLINK SQL view的数据复用问题

kandy.wang
FLINK SQL view相关问题:
create view order_source

as

select order_id, order_goods_id, user_id,...

from (

    ......  proctime,row_number() over(partition by order_id, order_goods_id order by proctime desc) as rownum

    from hive.temp_dw.dm_trd_order_goods/*+ OPTIONS('properties.group.id'='flink_etl_kafka_hbase', 'scan.startup.mode'='latest-offset') */

) where  rownum = 1 and  price > 0;




insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN as BIGINT),)

from

(

    select order_date as rowkey,

    sum(amount) as saleN,

    from order_source

    group by order_date

);




insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN as BIGINT))

from

(

    select order_hour as rowkey,    sum(amount) as saleN,

   

    from order_source

    group by order_hour

);
问题:同一个view,相同的消费group,不同的sink,产生 2个job。 这样的话,相当于2个job公用一个consumer group。
最后生成的job是 : a.  order_source  -> sink  1      b.  order_source  -> sink  2    


本意是想通过view  order_source (view里需要对订单数据去重)复用同一份source全量数据,对应底层可以复用同一份state数据 ,如何做到 ?

Reply | Threaded
Open this post in threaded view
|

Re: FLINK SQL view的数据复用问题

godfrey he
blink planner支持将多sink的query优化成尽量复用重复计算部分。
1.11里用StatementSet来提交job,1.11之前用sqlUpdate/insertInto + execute提交任务

kandy.wang <[hidden email]> 于2020年8月4日周二 下午5:20写道:

> FLINK SQL view相关问题:
> create view order_source
>
> as
>
> select order_id, order_goods_id, user_id,...
>
> from (
>
>     ......  proctime,row_number() over(partition by order_id,
> order_goods_id order by proctime desc) as rownum
>
>     from hive.temp_dw.dm_trd_order_goods/*+ OPTIONS('properties.group.id'='flink_etl_kafka_hbase',
> 'scan.startup.mode'='latest-offset') */
>
> ) where  rownum = 1 and  price > 0;
>
>
>
>
> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN as
> BIGINT),)
>
> from
>
> (
>
>     select order_date as rowkey,
>
>     sum(amount) as saleN,
>
>     from order_source
>
>     group by order_date
>
> );
>
>
>
>
> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN as
> BIGINT))
>
> from
>
> (
>
>     select order_hour as rowkey,    sum(amount) as saleN,
>
>
>
>     from order_source
>
>     group by order_hour
>
> );
> 问题:同一个view,相同的消费group,不同的sink,产生 2个job。 这样的话,相当于2个job公用一个consumer group。
> 最后生成的job是 : a.  order_source  -> sink  1      b.  order_source  -> sink
> 2
>
>
> 本意是想通过view  order_source
> (view里需要对订单数据去重)复用同一份source全量数据,对应底层可以复用同一份state数据 ,如何做到 ?
>
>
Reply | Threaded
Open this post in threaded view
|

Re:Re: FLINK SQL view的数据复用问题

kandy.wang















@ godfrey
thanks。刚试了一下,source -> Deduplicate  -> GlobalGroupAggregate,在souce端确实是复用了。但是Deduplicate 端是没有复用呢?理论上source + Deduplicate 都是view里的逻辑,都应该复用才对。就是感觉复用的还不够多呢。
                     

在 2020-08-04 17:26:02,"godfrey he" <[hidden email]> 写道:

>blink planner支持将多sink的query优化成尽量复用重复计算部分。
>1.11里用StatementSet来提交job,1.11之前用sqlUpdate/insertInto + execute提交任务
>
>kandy.wang <[hidden email]> 于2020年8月4日周二 下午5:20写道:
>
>> FLINK SQL view相关问题:
>> create view order_source
>>
>> as
>>
>> select order_id, order_goods_id, user_id,...
>>
>> from (
>>
>>     ......  proctime,row_number() over(partition by order_id,
>> order_goods_id order by proctime desc) as rownum
>>
>>     from hive.temp_dw.dm_trd_order_goods/*+ OPTIONS('properties.group.id'='flink_etl_kafka_hbase',
>> 'scan.startup.mode'='latest-offset') */
>>
>> ) where  rownum = 1 and  price > 0;
>>
>>
>>
>>
>> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN as
>> BIGINT),)
>>
>> from
>>
>> (
>>
>>     select order_date as rowkey,
>>
>>     sum(amount) as saleN,
>>
>>     from order_source
>>
>>     group by order_date
>>
>> );
>>
>>
>>
>>
>> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN as
>> BIGINT))
>>
>> from
>>
>> (
>>
>>     select order_hour as rowkey,    sum(amount) as saleN,
>>
>>
>>
>>     from order_source
>>
>>     group by order_hour
>>
>> );
>> 问题:同一个view,相同的消费group,不同的sink,产生 2个job。 这样的话,相当于2个job公用一个consumer group。
>> 最后生成的job是 : a.  order_source  -> sink  1      b.  order_source  -> sink
>> 2
>>
>>
>> 本意是想通过view  order_source
>> (view里需要对订单数据去重)复用同一份source全量数据,对应底层可以复用同一份state数据 ,如何做到 ?
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: Re: FLINK SQL view的数据复用问题

godfrey he
调用 StatementSet#explain() 把结果打出来看看是否因 Deduplicate的digest不一样导致的没法复用

kandy.wang <[hidden email]> 于2020年8月4日周二 下午6:21写道:

>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> @ godfrey
> thanks。刚试了一下,source -> Deduplicate  ->
> GlobalGroupAggregate,在souce端确实是复用了。但是Deduplicate 端是没有复用呢?理论上source +
> Deduplicate 都是view里的逻辑,都应该复用才对。就是感觉复用的还不够多呢。
>
>
> 在 2020-08-04 17:26:02,"godfrey he" <[hidden email]> 写道:
> >blink planner支持将多sink的query优化成尽量复用重复计算部分。
> >1.11里用StatementSet来提交job,1.11之前用sqlUpdate/insertInto + execute提交任务
> >
> >kandy.wang <[hidden email]> 于2020年8月4日周二 下午5:20写道:
> >
> >> FLINK SQL view相关问题:
> >> create view order_source
> >>
> >> as
> >>
> >> select order_id, order_goods_id, user_id,...
> >>
> >> from (
> >>
> >>     ......  proctime,row_number() over(partition by order_id,
> >> order_goods_id order by proctime desc) as rownum
> >>
> >>     from hive.temp_dw.dm_trd_order_goods/*+ OPTIONS('
> properties.group.id'='flink_etl_kafka_hbase',
> >> 'scan.startup.mode'='latest-offset') */
> >>
> >> ) where  rownum = 1 and  price > 0;
> >>
> >>
> >>
> >>
> >> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN
> as
> >> BIGINT),)
> >>
> >> from
> >>
> >> (
> >>
> >>     select order_date as rowkey,
> >>
> >>     sum(amount) as saleN,
> >>
> >>     from order_source
> >>
> >>     group by order_date
> >>
> >> );
> >>
> >>
> >>
> >>
> >> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN
> as
> >> BIGINT))
> >>
> >> from
> >>
> >> (
> >>
> >>     select order_hour as rowkey,    sum(amount) as saleN,
> >>
> >>
> >>
> >>     from order_source
> >>
> >>     group by order_hour
> >>
> >> );
> >> 问题:同一个view,相同的消费group,不同的sink,产生 2个job。 这样的话,相当于2个job公用一个consumer group。
> >> 最后生成的job是 : a.  order_source  -> sink  1      b.  order_source  -> sink
> >> 2
> >>
> >>
> >> 本意是想通过view  order_source
> >> (view里需要对订单数据去重)复用同一份source全量数据,对应底层可以复用同一份state数据 ,如何做到 ?
> >>
> >>
>
Reply | Threaded
Open this post in threaded view
|

Re:Re: Re: FLINK SQL view的数据复用问题

kandy.wang






@ godfrey
你说的这种StatementSet 提交方式,在sql-client提交任务的时候不支持吧? 可以给加上么。











在 2020-08-04 19:36:56,"godfrey he" <[hidden email]> 写道:

>调用 StatementSet#explain() 把结果打出来看看是否因 Deduplicate的digest不一样导致的没法复用
>
>kandy.wang <[hidden email]> 于2020年8月4日周二 下午6:21写道:
>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> @ godfrey
>> thanks。刚试了一下,source -> Deduplicate  ->
>> GlobalGroupAggregate,在souce端确实是复用了。但是Deduplicate 端是没有复用呢?理论上source +
>> Deduplicate 都是view里的逻辑,都应该复用才对。就是感觉复用的还不够多呢。
>>
>>
>> 在 2020-08-04 17:26:02,"godfrey he" <[hidden email]> 写道:
>> >blink planner支持将多sink的query优化成尽量复用重复计算部分。
>> >1.11里用StatementSet来提交job,1.11之前用sqlUpdate/insertInto + execute提交任务
>> >
>> >kandy.wang <[hidden email]> 于2020年8月4日周二 下午5:20写道:
>> >
>> >> FLINK SQL view相关问题:
>> >> create view order_source
>> >>
>> >> as
>> >>
>> >> select order_id, order_goods_id, user_id,...
>> >>
>> >> from (
>> >>
>> >>     ......  proctime,row_number() over(partition by order_id,
>> >> order_goods_id order by proctime desc) as rownum
>> >>
>> >>     from hive.temp_dw.dm_trd_order_goods/*+ OPTIONS('
>> properties.group.id'='flink_etl_kafka_hbase',
>> >> 'scan.startup.mode'='latest-offset') */
>> >>
>> >> ) where  rownum = 1 and  price > 0;
>> >>
>> >>
>> >>
>> >>
>> >> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN
>> as
>> >> BIGINT),)
>> >>
>> >> from
>> >>
>> >> (
>> >>
>> >>     select order_date as rowkey,
>> >>
>> >>     sum(amount) as saleN,
>> >>
>> >>     from order_source
>> >>
>> >>     group by order_date
>> >>
>> >> );
>> >>
>> >>
>> >>
>> >>
>> >> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN
>> as
>> >> BIGINT))
>> >>
>> >> from
>> >>
>> >> (
>> >>
>> >>     select order_hour as rowkey,    sum(amount) as saleN,
>> >>
>> >>
>> >>
>> >>     from order_source
>> >>
>> >>     group by order_hour
>> >>
>> >> );
>> >> 问题:同一个view,相同的消费group,不同的sink,产生 2个job。 这样的话,相当于2个job公用一个consumer group。
>> >> 最后生成的job是 : a.  order_source  -> sink  1      b.  order_source  -> sink
>> >> 2
>> >>
>> >>
>> >> 本意是想通过view  order_source
>> >> (view里需要对订单数据去重)复用同一份source全量数据,对应底层可以复用同一份state数据 ,如何做到 ?
>> >>
>> >>
>>
Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: FLINK SQL view的数据复用问题

godfrey he
目前sql-client还不支持。关于纯SQL文本statement set的支持,
目前社区已经达成语法的一致意见,应该后续会慢慢的支持。

kandy.wang <[hidden email]> 于2020年8月5日周三 下午10:43写道:

>
>
>
>
>
>
> @ godfrey
> 你说的这种StatementSet 提交方式,在sql-client提交任务的时候不支持吧? 可以给加上么。
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-08-04 19:36:56,"godfrey he" <[hidden email]> 写道:
> >调用 StatementSet#explain() 把结果打出来看看是否因 Deduplicate的digest不一样导致的没法复用
> >
> >kandy.wang <[hidden email]> 于2020年8月4日周二 下午6:21写道:
> >
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> @ godfrey
> >> thanks。刚试了一下,source -> Deduplicate  ->
> >> GlobalGroupAggregate,在souce端确实是复用了。但是Deduplicate 端是没有复用呢?理论上source +
> >> Deduplicate 都是view里的逻辑,都应该复用才对。就是感觉复用的还不够多呢。
> >>
> >>
> >> 在 2020-08-04 17:26:02,"godfrey he" <[hidden email]> 写道:
> >> >blink planner支持将多sink的query优化成尽量复用重复计算部分。
> >> >1.11里用StatementSet来提交job,1.11之前用sqlUpdate/insertInto + execute提交任务
> >> >
> >> >kandy.wang <[hidden email]> 于2020年8月4日周二 下午5:20写道:
> >> >
> >> >> FLINK SQL view相关问题:
> >> >> create view order_source
> >> >>
> >> >> as
> >> >>
> >> >> select order_id, order_goods_id, user_id,...
> >> >>
> >> >> from (
> >> >>
> >> >>     ......  proctime,row_number() over(partition by order_id,
> >> >> order_goods_id order by proctime desc) as rownum
> >> >>
> >> >>     from hive.temp_dw.dm_trd_order_goods/*+ OPTIONS('
> >> properties.group.id'='flink_etl_kafka_hbase',
> >> >> 'scan.startup.mode'='latest-offset') */
> >> >>
> >> >> ) where  rownum = 1 and  price > 0;
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> insert into hive.temp_dw.day_order_index select rowkey,
> ROW(cast(saleN
> >> as
> >> >> BIGINT),)
> >> >>
> >> >> from
> >> >>
> >> >> (
> >> >>
> >> >>     select order_date as rowkey,
> >> >>
> >> >>     sum(amount) as saleN,
> >> >>
> >> >>     from order_source
> >> >>
> >> >>     group by order_date
> >> >>
> >> >> );
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> insert into hive.temp_dw.day_order_index select rowkey,
> ROW(cast(saleN
> >> as
> >> >> BIGINT))
> >> >>
> >> >> from
> >> >>
> >> >> (
> >> >>
> >> >>     select order_hour as rowkey,    sum(amount) as saleN,
> >> >>
> >> >>
> >> >>
> >> >>     from order_source
> >> >>
> >> >>     group by order_hour
> >> >>
> >> >> );
> >> >> 问题:同一个view,相同的消费group,不同的sink,产生 2个job。 这样的话,相当于2个job公用一个consumer
> group。
> >> >> 最后生成的job是 : a.  order_source  -> sink  1      b.  order_source  ->
> sink
> >> >> 2
> >> >>
> >> >>
> >> >> 本意是想通过view  order_source
> >> >> (view里需要对订单数据去重)复用同一份source全量数据,对应底层可以复用同一份state数据 ,如何做到 ?
> >> >>
> >> >>
> >>
>