Hi All,
I want to split a stream into any number of streams according to a field, and then process the split stream one by one. Can this be achieved? What should I do? Regards, Pei |
on 2019/9/17 9:55, 王佩 wrote: > I want to split a stream into any number of streams according to a field, > and then process the split stream one by one. I think that should be easy done. refer to: https://stackoverflow.com/questions/53588554/apache-flink-using-filter-or-split-to-split-a-stream regards. |
Thank You!
Here is a Example: // Execution Env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Input Source DataStreamSource<Tuple3<String, String, String>> source = env.fromElements( new Tuple3<>("productID1", "click", "user_1"), new Tuple3<>("productID1", "click", "user_2"), new Tuple3<>("productID1", "browse", "user_1"), new Tuple3<>("productID2", "browse", "user_1"), new Tuple3<>("productID2", "click", "user_2"), new Tuple3<>("productID2", "click", "user_1") .... new Tuple3<>("productID50", "click", "user_1") .... new Tuple3<>("productID90", "click", "user_1") .... new Tuple3<>("productID100", "click", "user_1") ); // Split Stream SplitStream<Tuple3<String, String, String>> splitStream = source.split(new OutputSelector<Tuple3<String, String, String>>() { @Override public Iterable<String> select(Tuple3<String, String, String> value) { ArrayList<String> output = new ArrayList<>(); output.add(value.f0); return output; } }); // Select Stream * // Here: I want to select products 1 to 100 and then process each one.* * // How should I do it?* splitStream.select("productID1").print(); env.execute(); Regards. Wesley Peng <[hidden email]> 于2019年9月17日周二 上午10:05写道: > > > on 2019/9/17 9:55, 王佩 wrote: > > I want to split a stream into any number of streams according to a field, > > and then process the split stream one by one. > > I think that should be easy done. refer to: > > https://stackoverflow.com/questions/53588554/apache-flink-using-filter-or-split-to-split-a-stream > > regards. > |
Hi
on 2019/9/17 10:28, 王佩 wrote: > * // How should I do it?* > splitStream.select("productID1").print(); If I understand for that correctly, you want somewhat the dynamic number of Sinks? regards |
In reply to this post by Wesley Peng-3
可以使用Side Output, 将输入流根据不同需求发送到自定义的不同的OutputTag中,最后可以使用DataStream.getSideOutput(outputTag)取出你需要的流进行处理!
在 2019/9/17 上午10:05,“Wesley Peng”<[hidden email]> 写入: on 2019/9/17 9:55, 王佩 wrote: > I want to split a stream into any number of streams according to a field, > and then process the split stream one by one. I think that should be easy done. refer to: https://stackoverflow.com/questions/53588554/apache-flink-using-filter-or-split-to-split-a-stream regards. |
是这样的。比方有1000个事件(通过某个字段区分,事件会继续增加),都在一个kafka topic中。
Flink 从Kafka读取数据后是一个DataStream,我想将每个事件分流出来,这样每个事件都是一个DataStream,后续,可以对每个事件做各种各样的处理,如DataStream异步IO、DataStream Sink 到Parquet。 1、如果用split...select,由于select(事件名),这里的事件名必须是某个确定的。 2、如果用side output,要提前定义output tag,我有1000个事件(事件会继续增加),这样就需要定义1000+ output tag。 感谢! cai yi <[hidden email]> 于2019年9月17日周二 下午1:33写道: > 可以使用Side Output, > 将输入流根据不同需求发送到自定义的不同的OutputTag中,最后可以使用DataStream.getSideOutput(outputTag)取出你需要的流进行处理! > > 在 2019/9/17 上午10:05,“Wesley Peng”<[hidden email]> 写入: > > > > on 2019/9/17 9:55, 王佩 wrote: > > I want to split a stream into any number of streams according to a > field, > > and then process the split stream one by one. > > I think that should be easy done. refer to: > > https://stackoverflow.com/questions/53588554/apache-flink-using-filter-or-split-to-split-a-stream > > regards. > > |
恐怕不行,sideoutput 和 split 都需要先知道要分多少个流
如sideoutput 需要先定义tag: val late = new OutputTag[LateDataEvent]("late") -----邮件原件----- 发件人: user-zh-return-1164-wxchunjhyy=[hidden email] <user-zh-return-1164-wxchunjhyy=[hidden email]> 代表 王佩 发送时间: Tuesday, September 17, 2019 4:25 PM 收件人: user-zh <[hidden email]> 主题: Re: Split a stream into any number of streams 是这样的。比方有1000个事件(通过某个字段区分,事件会继续增加),都在一个kafka topic中。 Flink 从Kafka读取数据后是一个DataStream,我想将每个事件分流出来,这样每个事件都是一个DataStream,后续,可以对每个事件做各种各样的处理,如DataStream异步IO、DataStream Sink 到Parquet。 1、如果用split...select,由于select(事件名),这里的事件名必须是某个确定的。 2、如果用side output,要提前定义output tag,我有1000个事件(事件会继续增加),这样就需要定义1000+ output tag。 感谢! cai yi <[hidden email]> 于2019年9月17日周二 下午1:33写道: > 可以使用Side Output, > 将输入流根据不同需求发送到自定义的不同的OutputTag中,最后可以使用DataStream.getSideOutput(outputTag)取出你需要的流进行处理! > > 在 2019/9/17 上午10:05,“Wesley Peng”<[hidden email]> 写入: > > > > on 2019/9/17 9:55, 王佩 wrote: > > I want to split a stream into any number of streams according to > a field, > > and then process the split stream one by one. > > I think that should be easy done. refer to: > > https://stackoverflow.com/questions/53588554/apache-flink-using-filter > -or-split-to-split-a-stream > > regards. > > |
对DataStream进行keyBy操作是否能解决呢?
------------------ 原始邮件 ------------------ 发件人: "venn"<[hidden email]>; 发送时间: 2019年9月17日(星期二) 下午4:51 收件人: "user-zh"<[hidden email]>; 主题: 回复: Split a stream into any number of streams 恐怕不行,sideoutput 和 split 都需要先知道要分多少个流 如sideoutput 需要先定义tag: val late = new OutputTag[LateDataEvent]("late") -----邮件原件----- 发件人: user-zh-return-1164-wxchunjhyy=[hidden email] <user-zh-return-1164-wxchunjhyy=[hidden email]> 代表 王佩 发送时间: Tuesday, September 17, 2019 4:25 PM 收件人: user-zh <[hidden email]> 主题: Re: Split a stream into any number of streams 是这样的。比方有1000个事件(通过某个字段区分,事件会继续增加),都在一个kafka topic中。 Flink 从Kafka读取数据后是一个DataStream,我想将每个事件分流出来,这样每个事件都是一个DataStream,后续,可以对每个事件做各种各样的处理,如DataStream异步IO、DataStream Sink 到Parquet。 1、如果用split...select,由于select(事件名),这里的事件名必须是某个确定的。 2、如果用side output,要提前定义output tag,我有1000个事件(事件会继续增加),这样就需要定义1000+ output tag。 感谢! cai yi <[hidden email]> 于2019年9月17日周二 下午1:33写道: > 可以使用Side Output, > 将输入流根据不同需求发送到自定义的不同的OutputTag中,最后可以使用DataStream.getSideOutput(outputTag)取出你需要的流进行处理! > > 在 2019/9/17 上午10:05,“Wesley Peng”<[hidden email]> 写入: > > > > on 2019/9/17 9:55, 王佩 wrote: > > I want to split a stream into any number of streams according to > a field, > > and then process the split stream one by one. > > I think that should be easy done. refer to: > > https://stackoverflow.com/questions/53588554/apache-flink-using-filter > -or-split-to-split-a-stream > > regards. > > |
In reply to this post by venn
你这个场景貌似可以用Broadcast来广播自定义的事件规则然后join数据流, 之后可以在process中进行处理...
在 2019/9/17 下午4:52,“venn”<[hidden email]> 写入: 恐怕不行,sideoutput 和 split 都需要先知道要分多少个流 如sideoutput 需要先定义tag: val late = new OutputTag[LateDataEvent]("late") -----邮件原件----- 发件人: user-zh-return-1164-wxchunjhyy=[hidden email] <user-zh-return-1164-wxchunjhyy=[hidden email]> 代表 王佩 发送时间: Tuesday, September 17, 2019 4:25 PM 收件人: user-zh <[hidden email]> 主题: Re: Split a stream into any number of streams 是这样的。比方有1000个事件(通过某个字段区分,事件会继续增加),都在一个kafka topic中。 Flink 从Kafka读取数据后是一个DataStream,我想将每个事件分流出来,这样每个事件都是一个DataStream,后续,可以对每个事件做各种各样的处理,如DataStream异步IO、DataStream Sink 到Parquet。 1、如果用split...select,由于select(事件名),这里的事件名必须是某个确定的。 2、如果用side output,要提前定义output tag,我有1000个事件(事件会继续增加),这样就需要定义1000+ output tag。 感谢! cai yi <[hidden email]> 于2019年9月17日周二 下午1:33写道: > 可以使用Side Output, > 将输入流根据不同需求发送到自定义的不同的OutputTag中,最后可以使用DataStream.getSideOutput(outputTag)取出你需要的流进行处理! > > 在 2019/9/17 上午10:05,“Wesley Peng”<[hidden email]> 写入: > > > > on 2019/9/17 9:55, 王佩 wrote: > > I want to split a stream into any number of streams according to > a field, > > and then process the split stream one by one. > > I think that should be easy done. refer to: > > https://stackoverflow.com/questions/53588554/apache-flink-using-filter > -or-split-to-split-a-stream > > regards. > > |
Free forum by Nabble | Edit this page |