FLINK1.11.1 对OGG数据入HIVE的问题咨询

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

FLINK1.11.1 对OGG数据入HIVE的问题咨询

CHENJIE


任务流程:
OGG->KAFKA->FLINK->HIVE


KAFKA数据样例:
其中会有多个 "table",所以"before","after"中的字段是不一致的,同一个表如果有有DDL变更也会导致"before","after"字段的变更。
{
    "table": "SCOOT.TABLENAME",
    "op_type": "U",
    "op_ts": "2020-08-11 07:53:40.008001",
    "current_ts": "2020-08-11T15:56:41.233000",
    "pos": "00000000980119769930",
    "before": {
        "C1": 4499000,
        "C2": null,
        "C3": null,
        "C4": null,
        "C5": null
    },
    "after": {
        "C1": 4499000,
        "C2": null,
        "C3": "0000",
        "C4": "0000",
        "C5": "通过"
    }
}
问题:有没有优雅的方式在入到hive中可以跟源库表及结构一致?
看到很多FLINK to HIVE 的案例,很多大公司也都在用实时数仓,不知入hive这部分如果做到灵活,拓展,通用的?


例如 样例数据在hive中建表
create table TABLENAME
(
        op_type      STRING,
        op_ts          STRING,
        current_ts   STRING,
        pos             STRING,
        "C1" STRING,
        "C2" STRING,
        "C3" STRING,
        "C4" STRING,
        "C5" STRING
)
理解的难点,
1.同一FLINK任务需要写入多个表,每个表的字段是不一致的
2.同一FLINK任务会有新增的表,需自动适配
3.同一个表结构不是固定的,需要随着源库DDL变更而变更,可能的字段类型长度变更,新增删除字段等


或者只能采用通过表结构
create table TABLENAME
(
        table           STRING,
        op_type      STRING,
        op_ts          STRING,
        current_ts   STRING,
        pos             STRING,
        "before"      STRING,
        "after"         STRING
)
然后剩下的在HIVE中解决。


或者有其他更好的方案?

Reply | Threaded
Open this post in threaded view
|

Re: FLINK1.11.1 对OGG数据入HIVE的问题咨询

Rui Li
你提到的这三个难点现在的hive
connector确实是支持不了的。前两个也许可以通过把写不同的表变成写同一个表的不同分区来解决。第三个可能可以通过检查数据跟目标schema是不是匹配,来判断是不是需要去跟HMS同步新的schema。

On Thu, Aug 13, 2020 at 3:27 PM USERNAME <[hidden email]> wrote:

>
>
> 任务流程:
> OGG->KAFKA->FLINK->HIVE
>
>
> KAFKA数据样例:
> 其中会有多个
> "table",所以"before","after"中的字段是不一致的,同一个表如果有有DDL变更也会导致"before","after"字段的变更。
> {
>     "table": "SCOOT.TABLENAME",
>     "op_type": "U",
>     "op_ts": "2020-08-11 07:53:40.008001",
>     "current_ts": "2020-08-11T15:56:41.233000",
>     "pos": "00000000980119769930",
>     "before": {
>         "C1": 4499000,
>         "C2": null,
>         "C3": null,
>         "C4": null,
>         "C5": null
>     },
>     "after": {
>         "C1": 4499000,
>         "C2": null,
>         "C3": "0000",
>         "C4": "0000",
>         "C5": "通过"
>     }
> }
> 问题:有没有优雅的方式在入到hive中可以跟源库表及结构一致?
> 看到很多FLINK to HIVE 的案例,很多大公司也都在用实时数仓,不知入hive这部分如果做到灵活,拓展,通用的?
>
>
> 例如 样例数据在hive中建表
> create table TABLENAME
> (
>         op_type      STRING,
>         op_ts          STRING,
>         current_ts   STRING,
>         pos             STRING,
>         "C1" STRING,
>         "C2" STRING,
>         "C3" STRING,
>         "C4" STRING,
>         "C5" STRING
> )
> 理解的难点,
> 1.同一FLINK任务需要写入多个表,每个表的字段是不一致的
> 2.同一FLINK任务会有新增的表,需自动适配
> 3.同一个表结构不是固定的,需要随着源库DDL变更而变更,可能的字段类型长度变更,新增删除字段等
>
>
> 或者只能采用通过表结构
> create table TABLENAME
> (
>         table           STRING,
>         op_type      STRING,
>         op_ts          STRING,
>         current_ts   STRING,
>         pos             STRING,
>         "before"      STRING,
>         "after"         STRING
> )
> 然后剩下的在HIVE中解决。
>
>
> 或者有其他更好的方案?
>
>

--
Best regards!
Rui Li
Reply | Threaded
Open this post in threaded view
|

Re:Re: FLINK1.11.1 对OGG数据入HIVE的问题咨询

CHENJIE


感谢您的回复,您说的这个方法类似于 “采用通过表结构”如下结构,屏蔽掉 用table分区,每个表的data部分弱化到一个字段中,使用的时候再通过解析json方式来从 "before"或者"after"中获取对应表的字段及数据,
这种方式确实拓展跟灵活性强很多,牺牲掉部分易用性。
看到很多大公司 美团 字节等 都有基于flink的实时数仓,不知道他们是怎么解决这种大量表入仓的 拓展灵活易用性的
create table TABLENAME
 (
         table           STRING,
         op_type      STRING,
         op_ts          STRING,
         current_ts   STRING,
         pos             STRING,
         "before"      STRING,
         "after"         STRING

 ) partitioned by (pt_d table)






在 2020-08-13 16:35:19,"Rui Li" <[hidden email]> 写道:

>你提到的这三个难点现在的hive
>connector确实是支持不了的。前两个也许可以通过把写不同的表变成写同一个表的不同分区来解决。第三个可能可以通过检查数据跟目标schema是不是匹配,来判断是不是需要去跟HMS同步新的schema。
>
>On Thu, Aug 13, 2020 at 3:27 PM USERNAME <[hidden email]> wrote:
>
>>
>>
>> 任务流程:
>> OGG->KAFKA->FLINK->HIVE
>>
>>
>> KAFKA数据样例:
>> 其中会有多个
>> "table",所以"before","after"中的字段是不一致的,同一个表如果有有DDL变更也会导致"before","after"字段的变更。
>> {
>>     "table": "SCOOT.TABLENAME",
>>     "op_type": "U",
>>     "op_ts": "2020-08-11 07:53:40.008001",
>>     "current_ts": "2020-08-11T15:56:41.233000",
>>     "pos": "00000000980119769930",
>>     "before": {
>>         "C1": 4499000,
>>         "C2": null,
>>         "C3": null,
>>         "C4": null,
>>         "C5": null
>>     },
>>     "after": {
>>         "C1": 4499000,
>>         "C2": null,
>>         "C3": "0000",
>>         "C4": "0000",
>>         "C5": "通过"
>>     }
>> }
>> 问题:有没有优雅的方式在入到hive中可以跟源库表及结构一致?
>> 看到很多FLINK to HIVE 的案例,很多大公司也都在用实时数仓,不知入hive这部分如果做到灵活,拓展,通用的?
>>
>>
>> 例如 样例数据在hive中建表
>> create table TABLENAME
>> (
>>         op_type      STRING,
>>         op_ts          STRING,
>>         current_ts   STRING,
>>         pos             STRING,
>>         "C1" STRING,
>>         "C2" STRING,
>>         "C3" STRING,
>>         "C4" STRING,
>>         "C5" STRING
>> )
>> 理解的难点,
>> 1.同一FLINK任务需要写入多个表,每个表的字段是不一致的
>> 2.同一FLINK任务会有新增的表,需自动适配
>> 3.同一个表结构不是固定的,需要随着源库DDL变更而变更,可能的字段类型长度变更,新增删除字段等
>>
>>
>> 或者只能采用通过表结构
>> create table TABLENAME
>> (
>>         table           STRING,
>>         op_type      STRING,
>>         op_ts          STRING,
>>         current_ts   STRING,
>>         pos             STRING,
>>         "before"      STRING,
>>         "after"         STRING
>> )
>> 然后剩下的在HIVE中解决。
>>
>>
>> 或者有其他更好的方案?
>>
>>
>
>--
>Best regards!
>Rui Li