Flink1.10的集群,用hdfs做backend
无论从flink最早的版本到flink 1.12都存在的一些文档和样例的不完整,或者说相同的代码,因输入源不同导致的结果差异。 比如说下面链接中的样例 https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/process_function.html 如果输入源分别为 1. 一次性从内存中的List读取数据 2. 一次性从文件目录读取读取数据 3. 持续从文件目录读取数据 4. 从socket流持续读取文件 上面的4者,只有3和4,对于KeyedStream的process(…)中使用ValueState<T>在处理onTimer函数时才会被触发调用,对于1和2是不会的。 相信其他的算子也存在类似的问题 具体代码如下: ```java package com.xxx.data.stream; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.io.TextInputFormat; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.IngestionTimeExtractor; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.source.FileProcessingMode; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.util.Collector; import javax.annotation.Nullable; import java.text.SimpleDateFormat; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Date; import java.util.List; public class KeyedStreamJob { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); env.setParallelism(4); //1.从内存获取数据 Tuple2<String, Integer> item = null; List<Tuple2<String, Integer>> items = new ArrayList<>(); item = new Tuple2<>("k1", 1); items.add(item); item = new Tuple2<>("k3", 3); items.add(item); item = new Tuple2<>("k1", 10); items.add(item); item = new Tuple2<>("k2", 2); items.add(item); item = new Tuple2<>("k1", 100); items.add(item); item = new Tuple2<>("k2", 20); items.add(item); DataStreamSource<Tuple2<String, Integer>> streamSource = env.fromCollection(items); SingleOutputStreamOperator<Tuple2<String, Integer>> listStream = streamSource.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Integer>>() { @Nullable @Override public Watermark getCurrentWatermark() { return null; } @Override public long extractTimestamp(Tuple2<String, Integer> element, long previousElementTimestamp) { System.out.println("---"); return System.currentTimeMillis(); } }); //2.从文件夹一次性获取数据 SingleOutputStreamOperator<Tuple2<String, Integer>> fileStream = env.readTextFile("D:\\data", "UTF-8").map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { return new Tuple2<>(value, 1); } }) .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Integer>>() { @Nullable @Override public Watermark getCurrentWatermark() { return null; } @Override public long extractTimestamp(Tuple2<String, Integer> element, long previousElementTimestamp) { return System.currentTimeMillis(); } }); //3.从文件夹持续获取数据 TypeInformation<String> typeInformation = BasicTypeInfo.STRING_TYPE_INFO; TextInputFormat format = new TextInputFormat(new Path("D:\\data")); format.setCharsetName("UTF-8"); //是否支持递归 format.setNestedFileEnumeration(true); SingleOutputStreamOperator<Tuple2<String, Integer>> continuefileStream = env.readFile(format, "D:\\data", FileProcessingMode.PROCESS_CONTINUOUSLY, 6000L, typeInformation).map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { return new Tuple2<>(value, 1); } }) .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Integer>>() { @Nullable @Override public Watermark getCurrentWatermark() { return null; } @Override public long extractTimestamp(Tuple2<String, Integer> element, long previousElementTimestamp) { return System.currentTimeMillis(); } }); //4.从socket中持续获取数据 SingleOutputStreamOperator<Tuple2<String, Integer>> socketStream = env.socketTextStream("localhost", 9999).map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { return new Tuple2<>(value, 1); } }) .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Integer>>() { @Nullable @Override public Watermark getCurrentWatermark() { return null; } @Override public long extractTimestamp(Tuple2<String, Integer> element, long previousElementTimestamp) { return System.currentTimeMillis(); } }); //分别从1. 2. 3. 4. 测试数据的ValueState的超时触发,发现 //只有3.continuefileStream 4.socketStream 这些持续获取数据的可以触发onTimer //至于1.listStream 2.fileStream 这些一次性获取书的不会触发onTimer SingleOutputStreamOperator<Tuple2<String, Long>> sum = continuefileStream // listStream fileStream socketStream .keyBy(0) .process(new KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Long>>() { private ValueState<SumWithTimeStamp> sum; private final SimpleDateFormat yyyyMMddHHmmss = new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss.SSS"); @Override public void open(Configuration parameters) throws Exception { super.open(parameters); StateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.seconds(1L)).returnExpiredIfNotCleanedUp().updateTtlOnReadAndWrite().useProcessingTime().build(); ValueStateDescriptor<SumWithTimeStamp> valueStateDescriptor = new ValueStateDescriptor<SumWithTimeStamp>("sum", SumWithTimeStamp.class); // valueStateDescriptor.enableTimeToLive(stateTtlConfig); sum = getRuntimeContext().getState(valueStateDescriptor); } @Override public void processElement(Tuple2<String, Integer> item, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception { SumWithTimeStamp sumValue = sum.value(); if (sumValue == null) { sumValue = new SumWithTimeStamp(); sumValue.key = item.f0; // Thread.sleep(1500L); // Date cur = new Date(); // cur.setTime(ctx.timestamp()); // System.out.println("ini " + ctx.getCurrentKey().toString() + yyyyMMddHHmmss.format(cur)); sumValue.sum += item.f1.longValue(); sumValue.lastModified = ctx.timestamp(); sum.update(sumValue); ctx.timerService().registerProcessingTimeTimer(sumValue.lastModified + 3*1000); System.out.println("ini " + ctx.getCurrentKey().toString() + " item:" + item.toString() + " sum:" + sum.value().sum); } else { sumValue.sum += item.f1.longValue(); sumValue.lastModified = ctx.timestamp(); sum.update(sumValue); // ctx.timerService().registerProcessingTimeTimer(sumValue.lastModified + 5*1000); System.out.println("up " + ctx.getCurrentKey().toString() + " item:" + item.toString() + " sum:" + sum.value().sum); } // Date cur = new Date(); // cur.setTime(ctx.timestamp()); // System.out.println("up " + ctx.getCurrentKey().toString() + yyyyMMddHHmmss.format(cur)); // Thread.sleep(1500L); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception { // super.onTimer(timestamp, ctx, out); System.out.println("-------" + ctx.getCurrentKey().toString()); if (timestamp <= sum.value().lastModified + 5000) { out.collect(new Tuple2<String, Long>(sum.value().key, sum.value().sum)); // sum.clear(); } } }); sum.print(); //continueSum(streamSource); env.execute("keyedSteamJob"); // System.in.read(); } public static void continueSum(DataStreamSource<Tuple2<String, Integer>> streamSource) { streamSource //by 1 //.assignTimestampsAndWatermarks(new IngestionTimeExtractor()) .keyBy(new KeySelector<Tuple2<String, Integer>, String>() { @Override public String getKey(Tuple2<String, Integer> value) throws Exception { return value.f0; } }) // .window(TumblingEventTimeWindows.of(Time.milliseconds(10L))) .sum(1) .print("+++++++++++++++++++++++++++"); } public static class SumWithTimeStamp { public String key; public long sum; public long lastModified; } } ``` 发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用 |
Hi, xuefli.
非常感谢你指出文档的问题! 由于邮件中看代码比较吃力(没有语法高亮以及排版的问题),我只是粗略地看了下代码。 当输入源 为 `一次性从内存中的List读取数据`,无法触发onTimer。 实际的例子中,我看到看到采用的是process time,且延时 3s 触发 。我怀疑是不是,数据量太少,所以程序很快就结束了导致没来得及触发timer,建议改成event time试试这种情况。 Best, Shengkai [hidden email] <[hidden email]> 于2021年4月25日周日 上午9:42写道: > Flink1.10的集群,用hdfs做backend > > 无论从flink最早的版本到flink 1.12都存在的一些文档和样例的不完整,或者说相同的代码,因输入源不同导致的结果差异。 > > 比如说下面链接中的样例 > > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/process_function.html > > 如果输入源分别为 > > 1. 一次性从内存中的List读取数据 > > 2. 一次性从文件目录读取读取数据 > > 3. 持续从文件目录读取数据 > > 4. 从socket流持续读取文件 > > > 上面的4者,只有3和4,对于KeyedStream的process(…)中使用ValueState<T>在处理onTimer函数时才会被触发调用,对于1和2是不会的。 > > 相信其他的算子也存在类似的问题 > > 具体代码如下: > ```java > > package com.xxx.data.stream; > > import org.apache.flink.api.common.functions.MapFunction; > import org.apache.flink.api.common.state.StateTtlConfig; > import org.apache.flink.api.common.state.ValueState; > import org.apache.flink.api.common.state.ValueStateDescriptor; > import org.apache.flink.api.common.time.Time; > import org.apache.flink.api.common.typeinfo.BasicTypeInfo; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.java.functions.KeySelector; > import org.apache.flink.api.java.io.TextInputFormat; > import org.apache.flink.api.java.tuple.Tuple; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.core.fs.Path; > import org.apache.flink.streaming.api.TimeCharacteristic; > import org.apache.flink.streaming.api.datastream.DataStreamSource; > import > org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; > import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import > org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; > import > org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; > import org.apache.flink.streaming.api.functions.IngestionTimeExtractor; > import org.apache.flink.streaming.api.functions.KeyedProcessFunction; > import org.apache.flink.streaming.api.functions.source.FileProcessingMode; > import org.apache.flink.streaming.api.watermark.Watermark; > import > org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; > import > org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; > import org.apache.flink.util.Collector; > > import javax.annotation.Nullable; > import java.text.SimpleDateFormat; > import java.time.LocalDateTime; > import java.time.format.DateTimeFormatter; > import java.util.ArrayList; > import java.util.Date; > import java.util.List; > > public class KeyedStreamJob { > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); > env.setParallelism(4); > > //1.从内存获取数据 > Tuple2<String, Integer> item = null; > List<Tuple2<String, Integer>> items = new ArrayList<>(); > item = new Tuple2<>("k1", 1); > items.add(item); > item = new Tuple2<>("k3", 3); > items.add(item); > item = new Tuple2<>("k1", 10); > items.add(item); > item = new Tuple2<>("k2", 2); > items.add(item); > item = new Tuple2<>("k1", 100); > items.add(item); > item = new Tuple2<>("k2", 20); > items.add(item); > DataStreamSource<Tuple2<String, Integer>> streamSource = > env.fromCollection(items); > SingleOutputStreamOperator<Tuple2<String, Integer>> listStream = > streamSource.assignTimestampsAndWatermarks(new > AssignerWithPeriodicWatermarks<Tuple2<String, Integer>>() { > @Nullable > @Override > public Watermark getCurrentWatermark() { > return null; > } > > @Override > public long extractTimestamp(Tuple2<String, Integer> element, > long previousElementTimestamp) { > System.out.println("---"); > return System.currentTimeMillis(); > } > }); > > //2.从文件夹一次性获取数据 > SingleOutputStreamOperator<Tuple2<String, Integer>> fileStream = > env.readTextFile("D:\\data", "UTF-8").map(new MapFunction<String, > Tuple2<String, Integer>>() { > @Override > public Tuple2<String, Integer> map(String value) throws > Exception { > return new Tuple2<>(value, 1); > } > }) > .assignTimestampsAndWatermarks(new > AssignerWithPeriodicWatermarks<Tuple2<String, Integer>>() { > @Nullable > @Override > public Watermark getCurrentWatermark() { > return null; > } > > @Override > public long extractTimestamp(Tuple2<String, Integer> > element, long previousElementTimestamp) { > return System.currentTimeMillis(); > } > }); > > //3.从文件夹持续获取数据 > TypeInformation<String> typeInformation = > BasicTypeInfo.STRING_TYPE_INFO; > TextInputFormat format = new TextInputFormat(new Path("D:\\data")); > format.setCharsetName("UTF-8"); > //是否支持递归 > format.setNestedFileEnumeration(true); > SingleOutputStreamOperator<Tuple2<String, Integer>> > continuefileStream = env.readFile(format, "D:\\data", > FileProcessingMode.PROCESS_CONTINUOUSLY, 6000L, typeInformation).map(new > MapFunction<String, Tuple2<String, Integer>>() { > @Override > public Tuple2<String, Integer> map(String value) throws > Exception { > return new Tuple2<>(value, 1); > } > }) > .assignTimestampsAndWatermarks(new > AssignerWithPeriodicWatermarks<Tuple2<String, Integer>>() { > @Nullable > @Override > public Watermark getCurrentWatermark() { > return null; > } > > @Override > public long extractTimestamp(Tuple2<String, Integer> > element, long previousElementTimestamp) { > return System.currentTimeMillis(); > } > }); > > //4.从socket中持续获取数据 > SingleOutputStreamOperator<Tuple2<String, Integer>> socketStream = > env.socketTextStream("localhost", 9999).map(new MapFunction<String, > Tuple2<String, Integer>>() { > @Override > public Tuple2<String, Integer> map(String value) throws > Exception { > return new Tuple2<>(value, 1); > } > }) > .assignTimestampsAndWatermarks(new > AssignerWithPeriodicWatermarks<Tuple2<String, Integer>>() { > @Nullable > @Override > public Watermark getCurrentWatermark() { > return null; > } > > @Override > public long extractTimestamp(Tuple2<String, Integer> element, > long previousElementTimestamp) { > return System.currentTimeMillis(); > } > }); > > //分别从1. 2. 3. 4. 测试数据的ValueState的超时触发,发现 > //只有3.continuefileStream 4.socketStream 这些持续获取数据的可以触发onTimer > //至于1.listStream 2.fileStream 这些一次性获取书的不会触发onTimer > SingleOutputStreamOperator<Tuple2<String, Long>> sum = > continuefileStream // listStream fileStream socketStream > .keyBy(0) > .process(new KeyedProcessFunction<Tuple, Tuple2<String, > Integer>, Tuple2<String, Long>>() { > private ValueState<SumWithTimeStamp> sum; > private final SimpleDateFormat yyyyMMddHHmmss = new > SimpleDateFormat("yyyy-MM-dd:HH-mm-ss.SSS"); > > @Override > public void open(Configuration parameters) throws > Exception { > super.open(parameters); > StateTtlConfig stateTtlConfig = > StateTtlConfig.newBuilder(Time.seconds(1L)).returnExpiredIfNotCleanedUp().updateTtlOnReadAndWrite().useProcessingTime().build(); > ValueStateDescriptor<SumWithTimeStamp> > valueStateDescriptor = new ValueStateDescriptor<SumWithTimeStamp>("sum", > SumWithTimeStamp.class); > // > valueStateDescriptor.enableTimeToLive(stateTtlConfig); > > sum = > getRuntimeContext().getState(valueStateDescriptor); > } > > @Override > public void processElement(Tuple2<String, Integer> > item, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception { > SumWithTimeStamp sumValue = sum.value(); > if (sumValue == null) { > sumValue = new SumWithTimeStamp(); > sumValue.key = item.f0; > // Thread.sleep(1500L); > // Date cur = new Date(); > // cur.setTime(ctx.timestamp()); > // System.out.println("ini " + > ctx.getCurrentKey().toString() + yyyyMMddHHmmss.format(cur)); > sumValue.sum += item.f1.longValue(); > sumValue.lastModified = ctx.timestamp(); > sum.update(sumValue); > > ctx.timerService().registerProcessingTimeTimer(sumValue.lastModified + > 3*1000); > System.out.println("ini " + > ctx.getCurrentKey().toString() + " item:" + item.toString() + " sum:" + > sum.value().sum); > } else { > sumValue.sum += item.f1.longValue(); > sumValue.lastModified = ctx.timestamp(); > sum.update(sumValue); > // > ctx.timerService().registerProcessingTimeTimer(sumValue.lastModified + > 5*1000); > System.out.println("up " + > ctx.getCurrentKey().toString() + " item:" + item.toString() + " sum:" + > sum.value().sum); > } > // Date cur = new Date(); > // cur.setTime(ctx.timestamp()); > // System.out.println("up " + > ctx.getCurrentKey().toString() + yyyyMMddHHmmss.format(cur)); > // Thread.sleep(1500L); > > } > > @Override > public void onTimer(long timestamp, OnTimerContext ctx, > Collector<Tuple2<String, Long>> out) throws Exception { > // super.onTimer(timestamp, ctx, out); > System.out.println("-------" + > ctx.getCurrentKey().toString()); > if (timestamp <= sum.value().lastModified + 5000) { > out.collect(new Tuple2<String, > Long>(sum.value().key, sum.value().sum)); > // sum.clear(); > } > } > }); > > sum.print(); > > //continueSum(streamSource); > env.execute("keyedSteamJob"); > // System.in.read(); > } > > public static void continueSum(DataStreamSource<Tuple2<String, > Integer>> streamSource) { > streamSource > //by 1 > //.assignTimestampsAndWatermarks(new > IngestionTimeExtractor()) > .keyBy(new KeySelector<Tuple2<String, Integer>, String>() { > @Override > public String getKey(Tuple2<String, Integer> value) > throws Exception { > return value.f0; > } > }) > // > .window(TumblingEventTimeWindows.of(Time.milliseconds(10L))) > .sum(1) > .print("+++++++++++++++++++++++++++"); > > } > > public static class SumWithTimeStamp { > public String key; > public long sum; > public long lastModified; > } > } > > > ``` > 发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用 > > |
Free forum by Nabble | Edit this page |