Login  Register

Split a stream into any number of streams

classic Classic list List threaded Threaded
9 messages Options Options
Embed post
Permalink
Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

Split a stream into any number of streams

王佩-2
Hi All,

I want to split a stream into any number of streams according to a field,
and then process the split stream one by one.

Can this be achieved? What should I do?

Regards,
Pei
Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

Re: Split a stream into any number of streams

Wesley Peng-3


on 2019/9/17 9:55, 王佩 wrote:
> I want to split a stream into any number of streams according to a field,
> and then process the split stream one by one.

I think that should be easy done. refer to:
https://stackoverflow.com/questions/53588554/apache-flink-using-filter-or-split-to-split-a-stream

regards.
Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

Re: Split a stream into any number of streams

王佩-2
Thank You!
Here is a Example:
      // Execution Env
      StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

      // Input Source
      DataStreamSource<Tuple3<String, String, String>> source =
env.fromElements(
              new Tuple3<>("productID1", "click", "user_1"),
              new Tuple3<>("productID1", "click", "user_2"),
              new Tuple3<>("productID1", "browse", "user_1"),
              new Tuple3<>("productID2", "browse", "user_1"),
              new Tuple3<>("productID2", "click", "user_2"),
              new Tuple3<>("productID2", "click", "user_1")
              ....
              new Tuple3<>("productID50", "click", "user_1")
              ....
              new Tuple3<>("productID90", "click", "user_1")
              ....
              new Tuple3<>("productID100", "click", "user_1")

      );

      // Split Stream
      SplitStream<Tuple3<String, String, String>> splitStream =
source.split(new OutputSelector<Tuple3<String, String, String>>() {
          @Override
          public Iterable<String> select(Tuple3<String, String, String>
value) {

              ArrayList<String> output = new ArrayList<>();
              output.add(value.f0);
              return output;

          }
      });

      // Select Stream
     * // Here: I want to select products 1 to 100 and then process each
one.*

*      //  How should I do it?*
splitStream.select("productID1").print();

      env.execute();

Regards.

Wesley Peng <[hidden email]> 于2019年9月17日周二 上午10:05写道:

>
>
> on 2019/9/17 9:55, 王佩 wrote:
> > I want to split a stream into any number of streams according to a field,
> > and then process the split stream one by one.
>
> I think that should be easy done. refer to:
>
> https://stackoverflow.com/questions/53588554/apache-flink-using-filter-or-split-to-split-a-stream
>
> regards.
>
Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

Re: Split a stream into any number of streams

Wesley Peng-3
Hi

on 2019/9/17 10:28, 王佩 wrote:
> *      //  How should I do it?*
> splitStream.select("productID1").print();

If I understand for that correctly, you want somewhat the dynamic number
of Sinks?

regards
Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

Re: Split a stream into any number of streams

cai yi
In reply to this post by Wesley Peng-3
可以使用Side Output, 将输入流根据不同需求发送到自定义的不同的OutputTag中,最后可以使用DataStream.getSideOutput(outputTag)取出你需要的流进行处理!

在 2019/9/17 上午10:05,“Wesley Peng”<[hidden email]> 写入:

   
   
    on 2019/9/17 9:55, 王佩 wrote:
    > I want to split a stream into any number of streams according to a field,
    > and then process the split stream one by one.
   
    I think that should be easy done. refer to:
    https://stackoverflow.com/questions/53588554/apache-flink-using-filter-or-split-to-split-a-stream
   
    regards.
   
Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

Re: Split a stream into any number of streams

王佩-2
是这样的。比方有1000个事件(通过某个字段区分,事件会继续增加),都在一个kafka topic中。

Flink
从Kafka读取数据后是一个DataStream,我想将每个事件分流出来,这样每个事件都是一个DataStream,后续,可以对每个事件做各种各样的处理,如DataStream异步IO、DataStream
Sink 到Parquet。

1、如果用split...select,由于select(事件名),这里的事件名必须是某个确定的。
2、如果用side output,要提前定义output tag,我有1000个事件(事件会继续增加),这样就需要定义1000+ output tag。

感谢!

cai yi <[hidden email]> 于2019年9月17日周二 下午1:33写道:

> 可以使用Side Output,
> 将输入流根据不同需求发送到自定义的不同的OutputTag中,最后可以使用DataStream.getSideOutput(outputTag)取出你需要的流进行处理!
>
> 在 2019/9/17 上午10:05,“Wesley Peng”<[hidden email]> 写入:
>
>
>
>     on 2019/9/17 9:55, 王佩 wrote:
>     > I want to split a stream into any number of streams according to a
> field,
>     > and then process the split stream one by one.
>
>     I think that should be easy done. refer to:
>
> https://stackoverflow.com/questions/53588554/apache-flink-using-filter-or-split-to-split-a-stream
>
>     regards.
>
>
Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

回复: Split a stream into any number of streams

venn
恐怕不行,sideoutput 和 split 都需要先知道要分多少个流

如sideoutput 需要先定义tag:
        val late = new OutputTag[LateDataEvent]("late")


-----邮件原件-----
发件人: user-zh-return-1164-wxchunjhyy=[hidden email] <user-zh-return-1164-wxchunjhyy=[hidden email]> 代表 王佩
发送时间: Tuesday, September 17, 2019 4:25 PM
收件人: user-zh <[hidden email]>
主题: Re: Split a stream into any number of streams

是这样的。比方有1000个事件(通过某个字段区分,事件会继续增加),都在一个kafka topic中。

Flink
从Kafka读取数据后是一个DataStream,我想将每个事件分流出来,这样每个事件都是一个DataStream,后续,可以对每个事件做各种各样的处理,如DataStream异步IO、DataStream
Sink 到Parquet。

1、如果用split...select,由于select(事件名),这里的事件名必须是某个确定的。
2、如果用side output,要提前定义output tag,我有1000个事件(事件会继续增加),这样就需要定义1000+ output tag。

感谢!

cai yi <[hidden email]> 于2019年9月17日周二 下午1:33写道:

> 可以使用Side Output,
> 将输入流根据不同需求发送到自定义的不同的OutputTag中,最后可以使用DataStream.getSideOutput(outputTag)取出你需要的流进行处理!
>
> 在 2019/9/17 上午10:05,“Wesley Peng”<[hidden email]> 写入:
>
>
>
>     on 2019/9/17 9:55, 王佩 wrote:
>     > I want to split a stream into any number of streams according to
> a field,
>     > and then process the split stream one by one.
>
>     I think that should be easy done. refer to:
>
> https://stackoverflow.com/questions/53588554/apache-flink-using-filter
> -or-split-to-split-a-stream
>
>     regards.
>
>
Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

回复: Split a stream into any number of streams

Jun Zhang-2
对DataStream进行keyBy操作是否能解决呢?




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"venn"<[hidden email]&gt;;
发送时间:&nbsp;2019年9月17日(星期二) 下午4:51
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;回复: Split a stream into any number of streams



恐怕不行,sideoutput 和 split 都需要先知道要分多少个流

如sideoutput 需要先定义tag:
        val late = new OutputTag[LateDataEvent]("late")


-----邮件原件-----
发件人: user-zh-return-1164-wxchunjhyy=[hidden email] <user-zh-return-1164-wxchunjhyy=[hidden email]&gt; 代表 王佩
发送时间: Tuesday, September 17, 2019 4:25 PM
收件人: user-zh <[hidden email]&gt;
主题: Re: Split a stream into any number of streams

是这样的。比方有1000个事件(通过某个字段区分,事件会继续增加),都在一个kafka topic中。

Flink
从Kafka读取数据后是一个DataStream,我想将每个事件分流出来,这样每个事件都是一个DataStream,后续,可以对每个事件做各种各样的处理,如DataStream异步IO、DataStream
Sink 到Parquet。

1、如果用split...select,由于select(事件名),这里的事件名必须是某个确定的。
2、如果用side output,要提前定义output tag,我有1000个事件(事件会继续增加),这样就需要定义1000+ output tag。

感谢!

cai yi <[hidden email]&gt; 于2019年9月17日周二 下午1:33写道:

&gt; 可以使用Side Output,
&gt; 将输入流根据不同需求发送到自定义的不同的OutputTag中,最后可以使用DataStream.getSideOutput(outputTag)取出你需要的流进行处理!
&gt;
&gt; 在 2019/9/17 上午10:05,“Wesley Peng”<[hidden email]&gt; 写入:
&gt;
&gt;
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp; on 2019/9/17 9:55, 王佩 wrote:
&gt;&nbsp;&nbsp;&nbsp;&nbsp; &gt; I want to split a stream into any number of streams according to
&gt; a field,
&gt;&nbsp;&nbsp;&nbsp;&nbsp; &gt; and then process the split stream one by one.
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp; I think that should be easy done. refer to:
&gt;
&gt; https://stackoverflow.com/questions/53588554/apache-flink-using-filter
&gt; -or-split-to-split-a-stream
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp; regards.
&gt;
&gt;
Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

Re: 回复: Split a stream into any number of streams

cai yi
In reply to this post by venn
你这个场景貌似可以用Broadcast来广播自定义的事件规则然后join数据流, 之后可以在process中进行处理...

在 2019/9/17 下午4:52,“venn”<[hidden email]> 写入:

    恐怕不行,sideoutput 和 split 都需要先知道要分多少个流
   
    如sideoutput 需要先定义tag:
    val late = new OutputTag[LateDataEvent]("late")
   
   
    -----邮件原件-----
    发件人: user-zh-return-1164-wxchunjhyy=[hidden email] <user-zh-return-1164-wxchunjhyy=[hidden email]> 代表 王佩
    发送时间: Tuesday, September 17, 2019 4:25 PM
    收件人: user-zh <[hidden email]>
    主题: Re: Split a stream into any number of streams
   
    是这样的。比方有1000个事件(通过某个字段区分,事件会继续增加),都在一个kafka topic中。
   
    Flink
    从Kafka读取数据后是一个DataStream,我想将每个事件分流出来,这样每个事件都是一个DataStream,后续,可以对每个事件做各种各样的处理,如DataStream异步IO、DataStream
    Sink 到Parquet。
   
    1、如果用split...select,由于select(事件名),这里的事件名必须是某个确定的。
    2、如果用side output,要提前定义output tag,我有1000个事件(事件会继续增加),这样就需要定义1000+ output tag。
   
    感谢!
   
    cai yi <[hidden email]> 于2019年9月17日周二 下午1:33写道:
   
    > 可以使用Side Output,
    > 将输入流根据不同需求发送到自定义的不同的OutputTag中,最后可以使用DataStream.getSideOutput(outputTag)取出你需要的流进行处理!
    >
    > 在 2019/9/17 上午10:05,“Wesley Peng”<[hidden email]> 写入:
    >
    >
    >
    >     on 2019/9/17 9:55, 王佩 wrote:
    >     > I want to split a stream into any number of streams according to
    > a field,
    >     > and then process the split stream one by one.
    >
    >     I think that should be easy done. refer to:
    >
    > https://stackoverflow.com/questions/53588554/apache-flink-using-filter
    > -or-split-to-split-a-stream
    >
    >     regards.
    >
    >