关于使用Flink建设基于CDC方式的OGG数据湖

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

关于使用Flink建设基于CDC方式的OGG数据湖

唐门小师兄
一:背景描述
现有数据中心基于GP做数仓,但是在OGG数据到GP做贴源ODS过程耗费集群太多资源,导致集群性能瓶颈,故想考虑基于GP + HADOOP的架构来重构数据仓库。

二:思路
将入库贴源ODS的工作交由Hadoop生态来完成,数据从OGG实时流入kafka时,都是类似DDL中的"I, U ,D"类型数据, 然后Flink采用动态表方式找出每条主键的最新记录,内存中的动态表将按天写入HDFS, 然后GP通过外部表的方式,将今天的数据加载到GP中与存量做meger更新,这样既完成性能瓶颈的调优, 数据流转图简要如下:

三:问题
1、现采用新技术路线,在kafka数据处理后转为toRetractStream后,过滤出tuple中f0为true的记录, 按天写入BucketingSink, 本意是想将一天中的每个主键的最新记录取出,结果所有曾经标记为TRUE的记录都取出了,想请教是否有解决办法?(PS:幂等存储如HBase, ElasticSearch已测试过,是可以更新替换,然后每个主键只保留一条最新的操作,但是数据链路变长,增加了开发维护成本)

(1)输入:
{"table":"04.PMS_GXGL.XT_DLRZ","op_type":"U","op_ts":"2019-05-08 16:21:56.081124","current_ts":"2019-05-09T11:19:43.103001","pos":"00000524730317232030","DLBS":"100000000302","RYBS":"1000000000200709","DLLX":"2","DLZH":"[hidden email]","YHXM":"小明","DLIP":"10.103.158.98","FWQIP":"10.100.80.120","FWQMC":null,"ZXSJ":"2019-05-09 00:21:48","ZZBM":"0402031001","DQBM":"信息","CJSJ":"2019-05-08 15:33:48","CZSJ":"2019-05-09 00:21:48","DLSJ":"2019-05-08 15:33:48","ZHFWSJ":"2019-05-08 15:33:48","JLRZSJ":"2019-06-18T11:11:38.122Z","SBCS":"2","XYSC":"240","JDMC":"yxscapp03_02"}
{"table":"04.PMS_GXGL.XT_DLRZ","op_type":"U","op_ts":"2019-05-08 16:21:57.081124","current_ts":"2019-05-09T11:19:43.103001","pos":"00000524730317232031","DLBS":"100000000302","RYBS":"1000000000200709","DLLX":"2","DLZH":"[hidden email]","YHXM":"小明","DLIP":"10.103.158.98","FWQIP":"10.100.80.120","FWQMC":null,"ZXSJ":"2019-05-09 00:21:48","ZZBM":"0402031001","DQBM":"信息","CJSJ":"2019-05-08 15:33:48","CZSJ":"2019-05-09 00:21:48","DLSJ":"2019-05-08 15:33:48","ZHFWSJ":"2019-05-08 15:33:48","JLRZSJ":"2019-06-18T11:11:38.122Z","SBCS":"2","XYSC":"240","JDMC":"yxscapp03_02"}

(2)处理代码如下:
---------------------------------------------------。。。。

输入数据会在toRetractStream处理,产生3条记录,依次为:

true,....
false,...
true,....

(3)输出如下:
{"table":"04.PMS_GXGL.XT_DLRZ","op_type":"U","op_ts":"2019-05-08 16:21:56.081124","current_ts":"2019-05-09T11:19:43.103001","pos":"00000524730317232030","DLBS":"100000000302","RYBS":"1000000000200709","DLLX":"2","DLZH":"[hidden email]","YHXM":"小明","DLIP":"10.103.158.98","FWQIP":"10.100.80.120","FWQMC":null,"ZXSJ":"2019-05-09 00:21:48","ZZBM":"0402031001","DQBM":"信息","CJSJ":"2019-05-08 15:33:48","CZSJ":"2019-05-09 00:21:48","DLSJ":"2019-05-08 15:33:48","ZHFWSJ":"2019-05-08 15:33:48","JLRZSJ":"2019-06-18T11:11:38.122Z","SBCS":"2","XYSC":"240","JDMC":"yxscapp03_02"}
{"table":"04.PMS_GXGL.XT_DLRZ","op_type":"U","op_ts":"2019-05-08 16:21:57.081124","current_ts":"2019-05-09T11:19:43.103001","pos":"00000524730317232031","DLBS":"100000000302","RYBS":"1000000000200709","DLLX":"2","DLZH":"[hidden email]","YHXM":"小明","DLIP":"10.103.158.98","FWQIP":"10.100.80.120","FWQMC":null,"ZXSJ":"2019-05-09 00:21:48","ZZBM":"0402031001","DQBM":"信息","CJSJ":"2019-05-08 15:33:48","CZSJ":"2019-05-09 00:21:48","DLSJ":"2019-05-08 15:33:48","ZHFWSJ":"2019-05-08 15:33:48","JLRZSJ":"2019-06-18T11:11:38.122Z","SBCS":"2","XYSC":"240","JDMC":"yxscapp03_02"}

(4)期望输出:
{"table":"04.PMS_GXGL.XT_DLRZ","op_type":"U","op_ts":"2019-05-08 16:21:57.081124","current_ts":"2019-05-09T11:19:43.103001","pos":"00000524730317232031","DLBS":"100000000302","RYBS":"1000000000200709","DLLX":"2","DLZH":"[hidden email]","YHXM":"小明","DLIP":"10.103.158.98","FWQIP":"10.100.80.120","FWQMC":null,"ZXSJ":"2019-05-09 00:21:48","ZZBM":"0402031001","DQBM":"信息","CJSJ":"2019-05-08 15:33:48","CZSJ":"2019-05-09 00:21:48","DLSJ":"2019-05-08 15:33:48","ZHFWSJ":"2019-05-08 15:33:48","JLRZSJ":"2019-06-18T11:11:38.122Z","SBCS":"2","XYSC":"240","JDMC":"yxscapp03_02"}