Flink 消费kafka ,写ORC文件

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink 消费kafka ,写ORC文件

Jacob
【现状如下】

Flink Job消费kafka消息,每半个小时将消费到的消息进行一系列聚合操作(flink 窗口聚合),然后写入一个orc文件。
据了解,flink写orc的桶分配策略[1],有两种:

一种是基于时间,即按时间为目录创建orc文件。[test/realtime/ : 为根目录]

test/realtime/
└── 2021-03-23--07
    ├── part-0-0.orc
    ├── part-0-1.orc
└── 2021-03-23--08
    ├── part-0-0.orc
    ├── part-0-1.orc

一种是将所有部分文件放在一个目录下:

test/realtime/
    ├── part-0-0.orc
    ├── part-0-1.orc
    ├── part-0-2.orc
    ├── part-0-3.orc

【问题】

最终需求是想按照partition将每半个小时的orc文件load到hive,hive表dt为分区字段,值为时间戳,如:

hive> show partitions table_demo;
OK
dt=1616455800000
dt=1616457600000
dt=1616459400000
dt=1616461200001
dt=1616463000001
Time taken: 0.134 seconds, Fetched: 5 row(s)

因此希望每个orc文件的所在目录名都是dt=`时间戳`的格式:

<http://apache-flink.147419.n8.nabble.com/file/t1162/dir.png>

用flink实现这些功能后,发现这两种桶分配策略都不能实现上述需求。

不知如何实现?之前一直是自己写代码实现聚合、写orc的操作,目录文件名一切东西完全可控,现在用flink自带的功能实现,发现不太容易实现上述需求了

[1].https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/file_sink.html#%E6%A1%B6%E5%88%86%E9%85%8D



-----
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Thanks!
Jacob
Reply | Threaded
Open this post in threaded view
|

Re: Flink 消费kafka ,写ORC文件

LiangbinZhang
Hi,Jacob
 
    官网有这么一段:`我们可以在格式构建器上调用 .withBucketAssigner(assigner) 来自定义 BucketAssigner
`
链接:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/file_sink.html#%E6%A1%B6%E5%88%86%E9%85%8D
<https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/file_sink.html#%E6%A1%B6%E5%88%86%E9%85%8D>  
希望对你有帮助。

Best,
Robin


Jacob wrote

> 【现状如下】
>
> Flink Job消费kafka消息,每半个小时将消费到的消息进行一系列聚合操作(flink 窗口聚合),然后写入一个orc文件。
> 据了解,flink写orc的桶分配策略[1],有两种:
>
> 一种是基于时间,即按时间为目录创建orc文件。[test/realtime/ : 为根目录]
>
> test/realtime/
> └── 2021-03-23--07
>     ├── part-0-0.orc
>     ├── part-0-1.orc
> └── 2021-03-23--08
>     ├── part-0-0.orc
>     ├── part-0-1.orc
>
> 一种是将所有部分文件放在一个目录下:
>
> test/realtime/
>     ├── part-0-0.orc
>     ├── part-0-1.orc
>     ├── part-0-2.orc
>     ├── part-0-3.orc
>
> 【问题】
>
> 最终需求是想按照partition将每半个小时的orc文件load到hive,hive表dt为分区字段,值为时间戳,如:
>
> hive> show partitions table_demo;
> OK
> dt=1616455800000
> dt=1616457600000
> dt=1616459400000
> dt=1616461200001
> dt=1616463000001
> Time taken: 0.134 seconds, Fetched: 5 row(s)
>
> 因此希望每个orc文件的所在目录名都是dt=`时间戳`的格式:
>
> &lt;http://apache-flink.147419.n8.nabble.com/file/t1162/dir.png&gt; 
>
> 用flink实现这些功能后,发现这两种桶分配策略都不能实现上述需求。
>
> 不知如何实现?之前一直是自己写代码实现聚合、写orc的操作,目录文件名一切东西完全可控,现在用flink自带的功能实现,发现不太容易实现上述需求了
>
> [1].https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/file_sink.html#%E6%A1%B6%E5%88%86%E9%85%8D
>
>
>
> -----
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/





--
Sent from: http://apache-flink.147419.n8.nabble.com/