Flink 1.9 SQL/TableAPI 设置uid及State 更新问题

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink 1.9 SQL/TableAPI 设置uid及State 更新问题

去冒险吧
Hi ~,


在使用Flink 1.9 SQL时,需要结合外部大量数据与当前流进行Join、TopN和Distinct操作,考虑采用初始化相关Operator的State方法,遇到下面几个问题,麻烦解答下:
1. 是否SQL或Table API是禁止设置uid或者uidhash的?包括对Kafka DataStreamSource设置了uid或者uidhash也无效?
2. 在不改变Graph下,对一个SQL Job 下某个GroupAggregator Operator进行State更新,根据WebUI已经拿到uidhash,但SavePoint API只允许传入uid,没有uidhash的方法,这个要怎么解决?
3. 最终是要解决以下问题:需要Union/Join大量外部Hive数据 =》 因为存在Finished  Task 导致整个Job没法做checkpoint。解决这类问题有相关的实践方案吗?


非常感谢。
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.9 SQL/TableAPI 设置uid及State 更新问题

Jark
Administrator
Hi,


1. table 不禁止 uid/uidhash 的使用。
2. 你说的 Savepoint API 是指 State Processor API 吗?据我所知,目前只支持 uid。
3. 有一个方法是,让你的 hive source 不finish(会浪费资源,但能做 checkpoint)。

有一个疑问,你的流作业任务只需要关联 static data吗?
据我所知,这种场景很少,因为 streaming job 一般都是 long run 作业,所以关联的也是会变得数据。

Best,
Jark



On Wed, 16 Oct 2019 at 10:24, 去冒险吧 <[hidden email]> wrote:

> Hi ~,
>
>
> 在使用Flink 1.9
> SQL时,需要结合外部大量数据与当前流进行Join、TopN和Distinct操作,考虑采用初始化相关Operator的State方法,遇到下面几个问题,麻烦解答下:
> 1. 是否SQL或Table API是禁止设置uid或者uidhash的?包括对Kafka
> DataStreamSource设置了uid或者uidhash也无效?
> 2. 在不改变Graph下,对一个SQL Job 下某个GroupAggregator
> Operator进行State更新,根据WebUI已经拿到uidhash,但SavePoint
> API只允许传入uid,没有uidhash的方法,这个要怎么解决?
> 3. 最终是要解决以下问题:需要Union/Join大量外部Hive数据 =》 因为存在Finished  Task
> 导致整个Job没法做checkpoint。解决这类问题有相关的实践方案吗?
>
>
> 非常感谢。
Reply | Threaded
Open this post in threaded view
|

回复: Flink 1.9 SQL/TableAPI 设置uid及State 更新问题

去冒险吧
Hi ~,


谢谢解答。


1. 意思是table api 不禁止uid/uidhash?但sql会禁止?
    我用于测试的样例是:
    1. env.addSource(kafkaSource).name('xxx').uid('myUid').map(...).toTable(...);
    2. tableEnv.register(table, ...);
    3. tableEnv.sqlQuery(mySQL);
    这样设置Source的uid会失效吗?我在WebUI上看到是没变的。设置其它属于如name是生效的。


2. 是的,提到的就是State Processor API, 目前看来我只能包装下让它来支持uidhash了。


3. 能大概说下如何让Hive Source 不finish吗?我使用的是HiveCatalog,粗看下,框架当中检查reachedEnd的地方比较难Hack, 是要包装HiveTableInputFormat类?


4. 回答你说的问题:我的作业都基于stream sql, 大部分只需要关联流数据,但少部分除了关联流数据,也需要考虑“历史"状态, 比如Only emit global min/max/distinct  value,且不考虑retract。这种实践一般怎么”优雅“或者”平台透明”解决?


非常感谢。


------------------ 原始邮件 ------------------
发件人: "Jark Wu"<[hidden email]>;
发送时间: 2019年10月16日(星期三) 下午4:04
收件人: "user-zh"<[hidden email]>;

主题: Re: Flink 1.9 SQL/TableAPI 设置uid及State 更新问题



Hi,


1. table 不禁止 uid/uidhash 的使用。
2. 你说的 Savepoint API 是指 State Processor API 吗?据我所知,目前只支持 uid。
3. 有一个方法是,让你的 hive source 不finish(会浪费资源,但能做 checkpoint)。

有一个疑问,你的流作业任务只需要关联 static data吗?
据我所知,这种场景很少,因为 streaming job 一般都是 long run 作业,所以关联的也是会变得数据。

Best,
Jark



On Wed, 16 Oct 2019 at 10:24, 去冒险吧 <[hidden email]> wrote:

> Hi ~,
>
>
> 在使用Flink 1.9
> SQL时,需要结合外部大量数据与当前流进行Join、TopN和Distinct操作,考虑采用初始化相关Operator的State方法,遇到下面几个问题,麻烦解答下:
> 1. 是否SQL或Table API是禁止设置uid或者uidhash的?包括对Kafka
> DataStreamSource设置了uid或者uidhash也无效?
> 2. 在不改变Graph下,对一个SQL Job 下某个GroupAggregator
> Operator进行State更新,根据WebUI已经拿到uidhash,但SavePoint
> API只允许传入uid,没有uidhash的方法,这个要怎么解决?
> 3. 最终是要解决以下问题:需要Union/Join大量外部Hive数据 =》 因为存在Finished  Task
> 导致整个Job没法做checkpoint。解决这类问题有相关的实践方案吗?
>
>
> 非常感谢。
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.9 SQL/TableAPI 设置uid及State 更新问题

Jark
Administrator
Hi,

1. 我说的 table 是指整个 table 模块,包括了 Table API 和 SQL。 SQL 也不会禁止。
3. 需要修改 HiveTableInputFormat 的实现,reachedEnd() 永远返回 false
4. 阿里内部没怎么遇到过这种需求。 疑问:作业启动后,你关联上了 global min/max/distinct value,作业运行一个月后,你还是关联上一个月前的 value 吗?

Best,
Jark

> 在 2019年10月17日,10:43,去冒险吧 <[hidden email]> 写道:
>
> Hi ~,
>
>
> 谢谢解答。
>
>
> 1. 意思是table api 不禁止uid/uidhash?但sql会禁止?
>    我用于测试的样例是:
>    1. env.addSource(kafkaSource).name('xxx').uid('myUid').map(...).toTable(...);
>    2. tableEnv.register(table, ...);
>    3. tableEnv.sqlQuery(mySQL);
>    这样设置Source的uid会失效吗?我在WebUI上看到是没变的。设置其它属于如name是生效的。
>
>
> 2. 是的,提到的就是State Processor API, 目前看来我只能包装下让它来支持uidhash了。
>
>
> 3. 能大概说下如何让Hive Source 不finish吗?我使用的是HiveCatalog,粗看下,框架当中检查reachedEnd的地方比较难Hack, 是要包装HiveTableInputFormat类?
>
>
> 4. 回答你说的问题:我的作业都基于stream sql, 大部分只需要关联流数据,但少部分除了关联流数据,也需要考虑“历史"状态, 比如Only emit global min/max/distinct  value,且不考虑retract。这种实践一般怎么”优雅“或者”平台透明”解决?
>
>
> 非常感谢。
>
>
> ------------------ 原始邮件 ------------------
> 发件人: "Jark Wu"<[hidden email]>;
> 发送时间: 2019年10月16日(星期三) 下午4:04
> 收件人: "user-zh"<[hidden email]>;
>
> 主题: Re: Flink 1.9 SQL/TableAPI 设置uid及State 更新问题
>
>
>
> Hi,
>
>
> 1. table 不禁止 uid/uidhash 的使用。
> 2. 你说的 Savepoint API 是指 State Processor API 吗?据我所知,目前只支持 uid。
> 3. 有一个方法是,让你的 hive source 不finish(会浪费资源,但能做 checkpoint)。
>
> 有一个疑问,你的流作业任务只需要关联 static data吗?
> 据我所知,这种场景很少,因为 streaming job 一般都是 long run 作业,所以关联的也是会变得数据。
>
> Best,
> Jark
>
>
>
> On Wed, 16 Oct 2019 at 10:24, 去冒险吧 <[hidden email]> wrote:
>
>> Hi ~,
>>
>>
>> 在使用Flink 1.9
>> SQL时,需要结合外部大量数据与当前流进行Join、TopN和Distinct操作,考虑采用初始化相关Operator的State方法,遇到下面几个问题,麻烦解答下:
>> 1. 是否SQL或Table API是禁止设置uid或者uidhash的?包括对Kafka
>> DataStreamSource设置了uid或者uidhash也无效?
>> 2. 在不改变Graph下,对一个SQL Job 下某个GroupAggregator
>> Operator进行State更新,根据WebUI已经拿到uidhash,但SavePoint
>> API只允许传入uid,没有uidhash的方法,这个要怎么解决?
>> 3. 最终是要解决以下问题:需要Union/Join大量外部Hive数据 =》 因为存在Finished  Task
>> 导致整个Job没法做checkpoint。解决这类问题有相关的实践方案吗?
>>
>>
>> 非常感谢。

Reply | Threaded
Open this post in threaded view
|

回复: Flink 1.9 SQL/TableAPI 设置uid及State 更新问题

去冒险吧
Hi ~,


1. 在哪里能看到Operator的uid/uidhash?我目前通过在WebUI上点击任务框然后在URL里查看的。


4. 其实是用流数据去关联当前任务结果经Kafka->HDFS产生的数据(新的HDFS数据相关于也是准实时的,但包含了实时任务运行前的历史数据)达到min/max/distinct目的,假设可以初始化state以及定期做checkpoint,作业运行时,这些GroupAggregator的state不也是一直能实时保持最新的吗?是不是哪块过度设计了,比如,我可以直接把历史数据打到Kafka里,也可以打到如HBase再去Lookup,虽然量挺大,但总感觉对SQL不够友好。




非常感谢。


------------------ 原始邮件 ------------------
发件人: "Jark Wu"<[hidden email]>;
发送时间: 2019年10月17日(星期四) 中午11:05
收件人: "user-zh"<[hidden email]>;

主题: Re: Flink 1.9 SQL/TableAPI 设置uid及State 更新问题



Hi,

1. 我说的 table 是指整个 table 模块,包括了 Table API 和 SQL。 SQL 也不会禁止。
3. 需要修改 HiveTableInputFormat 的实现,reachedEnd() 永远返回 false
4. 阿里内部没怎么遇到过这种需求。 疑问:作业启动后,你关联上了 global min/max/distinct value,作业运行一个月后,你还是关联上一个月前的 value 吗?

Best,
Jark

> 在 2019年10月17日,10:43,去冒险吧 <[hidden email]> 写道:
>
> Hi ~,
>
>
> 谢谢解答。
>
>
> 1. 意思是table api 不禁止uid/uidhash?但sql会禁止?
>    我用于测试的样例是:
>    1. env.addSource(kafkaSource).name('xxx').uid('myUid').map(...).toTable(...);
>    2. tableEnv.register(table, ...);
>    3. tableEnv.sqlQuery(mySQL);
>    这样设置Source的uid会失效吗?我在WebUI上看到是没变的。设置其它属于如name是生效的。
>
>
> 2. 是的,提到的就是State Processor API, 目前看来我只能包装下让它来支持uidhash了。
>
>
> 3. 能大概说下如何让Hive Source 不finish吗?我使用的是HiveCatalog,粗看下,框架当中检查reachedEnd的地方比较难Hack, 是要包装HiveTableInputFormat类?
>
>
> 4. 回答你说的问题:我的作业都基于stream sql, 大部分只需要关联流数据,但少部分除了关联流数据,也需要考虑“历史"状态, 比如Only emit global min/max/distinct  value,且不考虑retract。这种实践一般怎么”优雅“或者”平台透明”解决?
>
>
> 非常感谢。
>
>
> ------------------ 原始邮件 ------------------
> 发件人: "Jark Wu"<[hidden email]>;
> 发送时间: 2019年10月16日(星期三) 下午4:04
> 收件人: "user-zh"<[hidden email]>;
>
> 主题: Re: Flink 1.9 SQL/TableAPI 设置uid及State 更新问题
>
>
>
> Hi,
>
>
> 1. table 不禁止 uid/uidhash 的使用。
> 2. 你说的 Savepoint API 是指 State Processor API 吗?据我所知,目前只支持 uid。
> 3. 有一个方法是,让你的 hive source 不finish(会浪费资源,但能做 checkpoint)。
>
> 有一个疑问,你的流作业任务只需要关联 static data吗?
> 据我所知,这种场景很少,因为 streaming job 一般都是 long run 作业,所以关联的也是会变得数据。
>
> Best,
> Jark
>
>
>
> On Wed, 16 Oct 2019 at 10:24, 去冒险吧 <[hidden email]> wrote:
>
>> Hi ~,
>>
>>
>> 在使用Flink 1.9
>> SQL时,需要结合外部大量数据与当前流进行Join、TopN和Distinct操作,考虑采用初始化相关Operator的State方法,遇到下面几个问题,麻烦解答下:
>> 1. 是否SQL或Table API是禁止设置uid或者uidhash的?包括对Kafka
>> DataStreamSource设置了uid或者uidhash也无效?
>> 2. 在不改变Graph下,对一个SQL Job 下某个GroupAggregator
>> Operator进行State更新,根据WebUI已经拿到uidhash,但SavePoint
>> API只允许传入uid,没有uidhash的方法,这个要怎么解决?
>> 3. 最终是要解决以下问题:需要Union/Join大量外部Hive数据 =》 因为存在Finished  Task
>> 导致整个Job没法做checkpoint。解决这类问题有相关的实践方案吗?
>>
>>
>> 非常感谢。