hi
我有这样一个场景,以多个相同的key.做keyby, DataStream resStream = demoStream.keyBy(groupKeys) .flatMap(new MyFlatmapFunction()) .keyBy(groupKeys) .process(new MyProcessFunction()) .keyBy(groupKeys) .timeWindow(Time.seconds(1)) .aggregate(new MyAggFunction()) .keyBy(groupKeys) .timeWindow(Time.seconds(1)) .process(new MyKeyProcessFunction()); 我想通过DataStreamUtils.reinterpretasKeyedStream来改写这个流程,我自己尝试这样写下,, int[] groupKeys = new int[]{0,2,3}; DataStream proStream = DataStreamUtils.reinterpretAsKeyedStream(demoStream, new MyKeySelector2(groupKeys) ) // MyKeySelector2 自己实现keySelector .flatMap(new MyFlatmapFunction()) 我这样写发现数据流经过flatmap后并不是返回一个keyedstream ,请问是哪里有问题吗?请问该如何使用DataStreamUtils.reinterpretasKeyedStream |
Hi
flatmap之后返回的本身就不是Keyedstream哈,keyby之后返回的才是keyedstream。 Best, Yichao Yang ------------------ 原始邮件 ------------------ 发件人: "绘梦飘雪"<[hidden email]>; 发送时间: 2020年6月10日(星期三) 晚上7:18 收件人: "user-zh"<[hidden email]>; 主题: 关于DataStreamUtils.reinterpretasKeyedStream的使用 hi &nbsp; 我有这样一个场景,以多个相同的key.做keyby, DataStream resStream =&nbsp; demoStream.keyBy(groupKeys) &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .flatMap(new MyFlatmapFunction()) &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .keyBy(groupKeys) &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .process(new MyProcessFunction()) &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .keyBy(groupKeys) &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .timeWindow(Time.seconds(1)) &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .aggregate(new MyAggFunction()) &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .keyBy(groupKeys) &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .timeWindow(Time.seconds(1)) &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .process(new MyKeyProcessFunction()); 我想通过DataStreamUtils.reinterpretasKeyedStream来改写这个流程,我自己尝试这样写下,, int[] groupKeys = new int[]{0,2,3}; DataStream proStream = DataStreamUtils.reinterpretAsKeyedStream(demoStream, new MyKeySelector2(groupKeys) ) // MyKeySelector2 自己实现keySelector .flatMap(new MyFlatmapFunction()) 我这样写发现数据流经过flatmap后并不是返回一个keyedstream ,请问是哪里有问题吗?请问该如何使用DataStreamUtils.reinterpretasKeyedStream |
Administrator
|
Hi,
你可以再用 DataStreamUtils.reinterpretAsKeyedStream(proStream, new MyKeySelector2(groupKeys) ) 把它解释成 KeyedStream。 因为你的 flatmap 和 上游节点之间如果并发一样的话,运行时是会 chain 在一起的,所以key的分布没有变。 Best, Jark On Wed, 10 Jun 2020 at 21:15, Yichao Yang <[hidden email]> wrote: > Hi > > > flatmap之后返回的本身就不是Keyedstream哈,keyby之后返回的才是keyedstream。 > > > Best, > Yichao Yang > > > > > ------------------ 原始邮件 ------------------ > 发件人: "绘梦飘雪"<[hidden email]>; > 发送时间: 2020年6月10日(星期三) 晚上7:18 > 收件人: "user-zh"<[hidden email]>; > > 主题: 关于DataStreamUtils.reinterpretasKeyedStream的使用 > > > > hi > &nbsp; 我有这样一个场景,以多个相同的key.做keyby, > DataStream resStream =&nbsp; demoStream.keyBy(groupKeys) > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > .flatMap(new MyFlatmapFunction()) > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > .keyBy(groupKeys) > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > .process(new MyProcessFunction()) > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > .keyBy(groupKeys) > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > .timeWindow(Time.seconds(1)) > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > .aggregate(new MyAggFunction()) > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > .keyBy(groupKeys) > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > .timeWindow(Time.seconds(1)) > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > .process(new MyKeyProcessFunction()); > > 我想通过DataStreamUtils.reinterpretasKeyedStream来改写这个流程,我自己尝试这样写下,, > int[] groupKeys = new int[]{0,2,3}; > > DataStream proStream = > DataStreamUtils.reinterpretAsKeyedStream(demoStream, new > MyKeySelector2(groupKeys) ) // MyKeySelector2 > 自己实现keySelector > .flatMap(new MyFlatmapFunction()) > 我这样写发现数据流经过flatmap后并不是返回一个keyedstream > ,请问是哪里有问题吗?请问该如何使用DataStreamUtils.reinterpretasKeyedStream |
Hi
DataStreamUtils.reinterpretasKeyedStream 会返回一个 KeyedStream,但是在 KeyedStream 上再进行 flatmap 之后就变成 DataStream 了,可以按照 Jark 说的再次使用 reinterpretAsKeyedStream 得到 KeyedStream。 另外注意在 1.8 之前的版本中使用这个功能有可能会丢数据,具体可以参考这个 issue[1] [1] https://issues.apache.org/jira/browse/FLINK-12296 Best, Congxian Jark Wu <[hidden email]> 于2020年6月10日周三 下午10:29写道: > Hi, > > 你可以再用 DataStreamUtils.reinterpretAsKeyedStream(proStream, new > MyKeySelector2(groupKeys) ) 把它解释成 KeyedStream。 > 因为你的 flatmap 和 上游节点之间如果并发一样的话,运行时是会 chain 在一起的,所以key的分布没有变。 > > Best, > Jark > > On Wed, 10 Jun 2020 at 21:15, Yichao Yang <[hidden email]> wrote: > > > Hi > > > > > > flatmap之后返回的本身就不是Keyedstream哈,keyby之后返回的才是keyedstream。 > > > > > > Best, > > Yichao Yang > > > > > > > > > > ------------------ 原始邮件 ------------------ > > 发件人: "绘梦飘雪"<[hidden email]>; > > 发送时间: 2020年6月10日(星期三) 晚上7:18 > > 收件人: "user-zh"<[hidden email]>; > > > > 主题: 关于DataStreamUtils.reinterpretasKeyedStream的使用 > > > > > > > > hi > > &nbsp; 我有这样一个场景,以多个相同的key.做keyby, > > DataStream resStream =&nbsp; demoStream.keyBy(groupKeys) > > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > > .flatMap(new MyFlatmapFunction()) > > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > > .keyBy(groupKeys) > > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > > .process(new MyProcessFunction()) > > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > > .keyBy(groupKeys) > > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > > .timeWindow(Time.seconds(1)) > > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > > .aggregate(new MyAggFunction()) > > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > > .keyBy(groupKeys) > > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > > .timeWindow(Time.seconds(1)) > > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > > .process(new MyKeyProcessFunction()); > > > > 我想通过DataStreamUtils.reinterpretasKeyedStream来改写这个流程,我自己尝试这样写下,, > > int[] groupKeys = new int[]{0,2,3}; > > > > DataStream proStream = > > DataStreamUtils.reinterpretAsKeyedStream(demoStream, new > > MyKeySelector2(groupKeys) ) // MyKeySelector2 > > > 自己实现keySelector > > .flatMap(new MyFlatmapFunction()) > > 我这样写发现数据流经过flatmap后并不是返回一个keyedstream > > ,请问是哪里有问题吗?请问该如何使用DataStreamUtils.reinterpretasKeyedStream > |
Free forum by Nabble | Edit this page |