flink dataset 分组后拼接分组后内容

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

flink dataset 分组后拼接分组后内容

hery168@163.com
col1 col2 pid

1.0  2.0  1

2.0  2.0  1

1.0  2.0  1

3.0  2.0  1

1.0  2.0  1

1.0  2.0  2

1.0  2.0  2

1.0  2.0  2

1.0  2.0  2

1.0  2.0  2
各位大神,想问一下利用flink dataset 对pid 列进行分组,然后对分组后的col1列的内容进行拼接,如1.0#2.0#1.0#3.0....
请问大家这个该怎么实现?
Reply | Threaded
Open this post in threaded view
|

Re: flink dataset 分组后拼接分组后内容

admin
Hi,hery168
可以这样写

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.fromElements(Tuple3.of(1.0, 2.0, 1), Tuple3.of(2.0, 2.0, 1),Tuple3.of(3.0,2.0,1), Tuple3.of(1.0, 2.0, 2), Tuple3.of(2.0, 2.0, 2),Tuple3.of(3.0,2.0,2))
        .map((MapFunction<Tuple3<Double, Double, Integer>, Tuple3<String, Double, Integer>>) t -> Tuple3.of(String.valueOf(t.f0), t.f1, t.f2)).groupBy(2)
        .reduce((ReduceFunction<Tuple3<String, Double, Integer>>) (tuple, t1) -> Tuple3.of(tuple.f0 + "#" + t1.f0, tuple.f1, tuple.f2)).print();
env.execute();
结果:
(1.0#2.0#3.0,2.0,1)
(1.0#2.0#3.0,2.0,2)

> 2020年4月28日 下午5:24,hery168 <[hidden email]> 写道:
>
> col1 col2 pid
>
> 1.0  2.0  1
>
> 2.0  2.0  1
>
> 1.0  2.0  1
>
> 3.0  2.0  1
>
> 1.0  2.0  1
>
> 1.0  2.0  2
>
> 1.0  2.0  2
>
> 1.0  2.0  2
>
> 1.0  2.0  2
>
> 1.0  2.0  2
> 各位大神,想问一下利用flink dataset 对pid 列进行分组,然后对分组后的col1列的内容进行拼接,如1.0#2.0#1.0#3.0....
> 请问大家这个该怎么实现?