官网文档和样例的不完整性和不严谨性的问题

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

官网文档和样例的不完整性和不严谨性的问题

xuefli@outlook.com
Flink1.10的集群,用hdfs做backend

无论从flink最早的版本到flink 1.12都存在的一些文档和样例的不完整,或者说相同的代码,因输入源不同导致的结果差异。

比如说下面链接中的样例
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/process_function.html

如果输入源分别为

1.     一次性从内存中的List读取数据

2.     一次性从文件目录读取读取数据

3.     持续从文件目录读取数据

4.     从socket流持续读取文件

上面的4者,只有3和4,对于KeyedStream的process(…)中使用ValueState<T>在处理onTimer函数时才会被触发调用,对于1和2是不会的。

相信其他的算子也存在类似的问题

具体代码如下:
```java

package com.xxx.data.stream;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.util.Collector;

import javax.annotation.Nullable;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

public class KeyedStreamJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        env.setParallelism(4);

        //1.从内存获取数据
        Tuple2<String, Integer> item = null;
        List<Tuple2<String, Integer>> items = new ArrayList<>();
        item = new Tuple2<>("k1", 1);
        items.add(item);
        item = new Tuple2<>("k3", 3);
        items.add(item);
        item = new Tuple2<>("k1", 10);
        items.add(item);
        item = new Tuple2<>("k2", 2);
        items.add(item);
        item = new Tuple2<>("k1", 100);
        items.add(item);
        item = new Tuple2<>("k2", 20);
        items.add(item);
        DataStreamSource<Tuple2<String, Integer>> streamSource = env.fromCollection(items);
        SingleOutputStreamOperator<Tuple2<String, Integer>> listStream = streamSource.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Integer>>() {
            @Nullable
            @Override
            public Watermark getCurrentWatermark() {
                return null;
            }

            @Override
            public long extractTimestamp(Tuple2<String, Integer> element, long previousElementTimestamp) {
                System.out.println("---");
                return System.currentTimeMillis();
            }
        });

        //2.从文件夹一次性获取数据
        SingleOutputStreamOperator<Tuple2<String, Integer>> fileStream = env.readTextFile("D:\\data", "UTF-8").map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                return new Tuple2<>(value, 1);
            }
        })
                .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Integer>>() {
                    @Nullable
                    @Override
                    public Watermark getCurrentWatermark() {
                        return null;
                    }

                    @Override
                    public long extractTimestamp(Tuple2<String, Integer> element, long previousElementTimestamp) {
                        return System.currentTimeMillis();
                    }
                });

        //3.从文件夹持续获取数据
        TypeInformation<String> typeInformation = BasicTypeInfo.STRING_TYPE_INFO;
        TextInputFormat format = new TextInputFormat(new Path("D:\\data"));
        format.setCharsetName("UTF-8");
        //是否支持递归
        format.setNestedFileEnumeration(true);
        SingleOutputStreamOperator<Tuple2<String, Integer>> continuefileStream = env.readFile(format, "D:\\data", FileProcessingMode.PROCESS_CONTINUOUSLY, 6000L, typeInformation).map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                return new Tuple2<>(value, 1);
            }
        })
                .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Integer>>() {
                    @Nullable
                    @Override
                    public Watermark getCurrentWatermark() {
                        return null;
                    }

                    @Override
                    public long extractTimestamp(Tuple2<String, Integer> element, long previousElementTimestamp) {
                        return System.currentTimeMillis();
                    }
                });

        //4.从socket中持续获取数据
        SingleOutputStreamOperator<Tuple2<String, Integer>> socketStream = env.socketTextStream("localhost", 9999).map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                return new Tuple2<>(value, 1);
            }
        })
                .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Integer>>() {
            @Nullable
            @Override
            public Watermark getCurrentWatermark() {
                return null;
            }

            @Override
            public long extractTimestamp(Tuple2<String, Integer> element, long previousElementTimestamp) {
                return System.currentTimeMillis();
            }
        });

        //分别从1. 2. 3. 4. 测试数据的ValueState的超时触发,发现
        //只有3.continuefileStream 4.socketStream 这些持续获取数据的可以触发onTimer
        //至于1.listStream         2.fileStream   这些一次性获取书的不会触发onTimer
        SingleOutputStreamOperator<Tuple2<String, Long>> sum =  continuefileStream  // listStream fileStream socketStream
                .keyBy(0)
                .process(new KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Long>>() {
                    private ValueState<SumWithTimeStamp> sum;
                    private final SimpleDateFormat yyyyMMddHHmmss = new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss.SSS");

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        StateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.seconds(1L)).returnExpiredIfNotCleanedUp().updateTtlOnReadAndWrite().useProcessingTime().build();
                        ValueStateDescriptor<SumWithTimeStamp> valueStateDescriptor = new ValueStateDescriptor<SumWithTimeStamp>("sum", SumWithTimeStamp.class);
//                        valueStateDescriptor.enableTimeToLive(stateTtlConfig);

                        sum = getRuntimeContext().getState(valueStateDescriptor);
                    }

                    @Override
                    public void processElement(Tuple2<String, Integer> item, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
                        SumWithTimeStamp sumValue = sum.value();
                        if (sumValue == null) {
                            sumValue = new SumWithTimeStamp();
                            sumValue.key = item.f0;
//                            Thread.sleep(1500L);
//                            Date cur = new Date();
//                            cur.setTime(ctx.timestamp());
//                            System.out.println("ini " + ctx.getCurrentKey().toString()  + yyyyMMddHHmmss.format(cur));
                           sumValue.sum += item.f1.longValue();
                            sumValue.lastModified = ctx.timestamp();
                            sum.update(sumValue);
                            ctx.timerService().registerProcessingTimeTimer(sumValue.lastModified + 3*1000);
                            System.out.println("ini " + ctx.getCurrentKey().toString() + " item:" + item.toString() + " sum:" + sum.value().sum);
                        } else {
                            sumValue.sum += item.f1.longValue();
                            sumValue.lastModified = ctx.timestamp();
                            sum.update(sumValue);
//                            ctx.timerService().registerProcessingTimeTimer(sumValue.lastModified + 5*1000);
                            System.out.println("up " + ctx.getCurrentKey().toString() + " item:" + item.toString() + " sum:" + sum.value().sum);
                        }
//                            Date cur = new Date();
//                            cur.setTime(ctx.timestamp());
//                            System.out.println("up " + ctx.getCurrentKey().toString()  + yyyyMMddHHmmss.format(cur));
//                            Thread.sleep(1500L);

                    }

                    @Override
                   public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {
//                        super.onTimer(timestamp, ctx, out);
                        System.out.println("-------" + ctx.getCurrentKey().toString());
                        if (timestamp <= sum.value().lastModified + 5000) {
                            out.collect(new Tuple2<String, Long>(sum.value().key, sum.value().sum));
//                            sum.clear();
                        }
                    }
                });

        sum.print();

        //continueSum(streamSource);
        env.execute("keyedSteamJob");
//        System.in.read();
    }

    public static void continueSum(DataStreamSource<Tuple2<String, Integer>> streamSource) {
        streamSource
                //by 1
                //.assignTimestampsAndWatermarks(new IngestionTimeExtractor())
                .keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
                    @Override
                    public String getKey(Tuple2<String, Integer> value) throws Exception {
                        return value.f0;
                    }
                })
//                .window(TumblingEventTimeWindows.of(Time.milliseconds(10L)))
                .sum(1)
                .print("+++++++++++++++++++++++++++");

    }

    public static class SumWithTimeStamp {
        public String key;
        public long sum;
        public long lastModified;
    }
}


```
发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用

Reply | Threaded
Open this post in threaded view
|

Re: 官网文档和样例的不完整性和不严谨性的问题

Shengkai Fang
Hi, xuefli.

非常感谢你指出文档的问题!

由于邮件中看代码比较吃力(没有语法高亮以及排版的问题),我只是粗略地看了下代码。

 当输入源 为 `一次性从内存中的List读取数据`,无法触发onTimer。 实际的例子中,我看到看到采用的是process time,且延时 3s
触发 。我怀疑是不是,数据量太少,所以程序很快就结束了导致没来得及触发timer,建议改成event time试试这种情况。

Best,
Shengkai

[hidden email] <[hidden email]> 于2021年4月25日周日 上午9:42写道:

> Flink1.10的集群,用hdfs做backend
>
> 无论从flink最早的版本到flink 1.12都存在的一些文档和样例的不完整,或者说相同的代码,因输入源不同导致的结果差异。
>
> 比如说下面链接中的样例
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/process_function.html
>
> 如果输入源分别为
>
> 1.     一次性从内存中的List读取数据
>
> 2.     一次性从文件目录读取读取数据
>
> 3.     持续从文件目录读取数据
>
> 4.     从socket流持续读取文件
>
>
> 上面的4者,只有3和4,对于KeyedStream的process(…)中使用ValueState<T>在处理onTimer函数时才会被触发调用,对于1和2是不会的。
>
> 相信其他的算子也存在类似的问题
>
> 具体代码如下:
> ```java
>
> package com.xxx.data.stream;
>
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.common.state.StateTtlConfig;
> import org.apache.flink.api.common.state.ValueState;
> import org.apache.flink.api.common.state.ValueStateDescriptor;
> import org.apache.flink.api.common.time.Time;
> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.functions.KeySelector;
> import org.apache.flink.api.java.io.TextInputFormat;
> import org.apache.flink.api.java.tuple.Tuple;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.core.fs.Path;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import
> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import
> org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
> import
> org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
> import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
> import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
> import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
> import org.apache.flink.streaming.api.watermark.Watermark;
> import
> org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
> import
> org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
> import org.apache.flink.util.Collector;
>
> import javax.annotation.Nullable;
> import java.text.SimpleDateFormat;
> import java.time.LocalDateTime;
> import java.time.format.DateTimeFormatter;
> import java.util.ArrayList;
> import java.util.Date;
> import java.util.List;
>
> public class KeyedStreamJob {
>     public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
>         env.setParallelism(4);
>
>         //1.从内存获取数据
>         Tuple2<String, Integer> item = null;
>         List<Tuple2<String, Integer>> items = new ArrayList<>();
>         item = new Tuple2<>("k1", 1);
>         items.add(item);
>         item = new Tuple2<>("k3", 3);
>         items.add(item);
>         item = new Tuple2<>("k1", 10);
>         items.add(item);
>         item = new Tuple2<>("k2", 2);
>         items.add(item);
>         item = new Tuple2<>("k1", 100);
>         items.add(item);
>         item = new Tuple2<>("k2", 20);
>         items.add(item);
>         DataStreamSource<Tuple2<String, Integer>> streamSource =
> env.fromCollection(items);
>         SingleOutputStreamOperator<Tuple2<String, Integer>> listStream =
> streamSource.assignTimestampsAndWatermarks(new
> AssignerWithPeriodicWatermarks<Tuple2<String, Integer>>() {
>             @Nullable
>             @Override
>             public Watermark getCurrentWatermark() {
>                 return null;
>             }
>
>             @Override
>             public long extractTimestamp(Tuple2<String, Integer> element,
> long previousElementTimestamp) {
>                 System.out.println("---");
>                 return System.currentTimeMillis();
>             }
>         });
>
>         //2.从文件夹一次性获取数据
>         SingleOutputStreamOperator<Tuple2<String, Integer>> fileStream =
> env.readTextFile("D:\\data", "UTF-8").map(new MapFunction<String,
> Tuple2<String, Integer>>() {
>             @Override
>             public Tuple2<String, Integer> map(String value) throws
> Exception {
>                 return new Tuple2<>(value, 1);
>             }
>         })
>                 .assignTimestampsAndWatermarks(new
> AssignerWithPeriodicWatermarks<Tuple2<String, Integer>>() {
>                     @Nullable
>                     @Override
>                     public Watermark getCurrentWatermark() {
>                         return null;
>                     }
>
>                     @Override
>                     public long extractTimestamp(Tuple2<String, Integer>
> element, long previousElementTimestamp) {
>                         return System.currentTimeMillis();
>                     }
>                 });
>
>         //3.从文件夹持续获取数据
>         TypeInformation<String> typeInformation =
> BasicTypeInfo.STRING_TYPE_INFO;
>         TextInputFormat format = new TextInputFormat(new Path("D:\\data"));
>         format.setCharsetName("UTF-8");
>         //是否支持递归
>         format.setNestedFileEnumeration(true);
>         SingleOutputStreamOperator<Tuple2<String, Integer>>
> continuefileStream = env.readFile(format, "D:\\data",
> FileProcessingMode.PROCESS_CONTINUOUSLY, 6000L, typeInformation).map(new
> MapFunction<String, Tuple2<String, Integer>>() {
>             @Override
>             public Tuple2<String, Integer> map(String value) throws
> Exception {
>                 return new Tuple2<>(value, 1);
>             }
>         })
>                 .assignTimestampsAndWatermarks(new
> AssignerWithPeriodicWatermarks<Tuple2<String, Integer>>() {
>                     @Nullable
>                     @Override
>                     public Watermark getCurrentWatermark() {
>                         return null;
>                     }
>
>                     @Override
>                     public long extractTimestamp(Tuple2<String, Integer>
> element, long previousElementTimestamp) {
>                         return System.currentTimeMillis();
>                     }
>                 });
>
>         //4.从socket中持续获取数据
>         SingleOutputStreamOperator<Tuple2<String, Integer>> socketStream =
> env.socketTextStream("localhost", 9999).map(new MapFunction<String,
> Tuple2<String, Integer>>() {
>             @Override
>             public Tuple2<String, Integer> map(String value) throws
> Exception {
>                 return new Tuple2<>(value, 1);
>             }
>         })
>                 .assignTimestampsAndWatermarks(new
> AssignerWithPeriodicWatermarks<Tuple2<String, Integer>>() {
>             @Nullable
>             @Override
>             public Watermark getCurrentWatermark() {
>                 return null;
>             }
>
>             @Override
>             public long extractTimestamp(Tuple2<String, Integer> element,
> long previousElementTimestamp) {
>                 return System.currentTimeMillis();
>             }
>         });
>
>         //分别从1. 2. 3. 4. 测试数据的ValueState的超时触发,发现
>         //只有3.continuefileStream 4.socketStream 这些持续获取数据的可以触发onTimer
>         //至于1.listStream         2.fileStream   这些一次性获取书的不会触发onTimer
>         SingleOutputStreamOperator<Tuple2<String, Long>> sum =
> continuefileStream  // listStream fileStream socketStream
>                 .keyBy(0)
>                 .process(new KeyedProcessFunction<Tuple, Tuple2<String,
> Integer>, Tuple2<String, Long>>() {
>                     private ValueState<SumWithTimeStamp> sum;
>                     private final SimpleDateFormat yyyyMMddHHmmss = new
> SimpleDateFormat("yyyy-MM-dd:HH-mm-ss.SSS");
>
>                     @Override
>                     public void open(Configuration parameters) throws
> Exception {
>                         super.open(parameters);
>                         StateTtlConfig stateTtlConfig =
> StateTtlConfig.newBuilder(Time.seconds(1L)).returnExpiredIfNotCleanedUp().updateTtlOnReadAndWrite().useProcessingTime().build();
>                         ValueStateDescriptor<SumWithTimeStamp>
> valueStateDescriptor = new ValueStateDescriptor<SumWithTimeStamp>("sum",
> SumWithTimeStamp.class);
> //
> valueStateDescriptor.enableTimeToLive(stateTtlConfig);
>
>                         sum =
> getRuntimeContext().getState(valueStateDescriptor);
>                     }
>
>                     @Override
>                     public void processElement(Tuple2<String, Integer>
> item, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
>                         SumWithTimeStamp sumValue = sum.value();
>                         if (sumValue == null) {
>                             sumValue = new SumWithTimeStamp();
>                             sumValue.key = item.f0;
> //                            Thread.sleep(1500L);
> //                            Date cur = new Date();
> //                            cur.setTime(ctx.timestamp());
> //                            System.out.println("ini " +
> ctx.getCurrentKey().toString()  + yyyyMMddHHmmss.format(cur));
>                            sumValue.sum += item.f1.longValue();
>                             sumValue.lastModified = ctx.timestamp();
>                             sum.update(sumValue);
>
> ctx.timerService().registerProcessingTimeTimer(sumValue.lastModified +
> 3*1000);
>                             System.out.println("ini " +
> ctx.getCurrentKey().toString() + " item:" + item.toString() + " sum:" +
> sum.value().sum);
>                         } else {
>                             sumValue.sum += item.f1.longValue();
>                             sumValue.lastModified = ctx.timestamp();
>                             sum.update(sumValue);
> //
> ctx.timerService().registerProcessingTimeTimer(sumValue.lastModified +
> 5*1000);
>                             System.out.println("up " +
> ctx.getCurrentKey().toString() + " item:" + item.toString() + " sum:" +
> sum.value().sum);
>                         }
> //                            Date cur = new Date();
> //                            cur.setTime(ctx.timestamp());
> //                            System.out.println("up " +
> ctx.getCurrentKey().toString()  + yyyyMMddHHmmss.format(cur));
> //                            Thread.sleep(1500L);
>
>                     }
>
>                     @Override
>                    public void onTimer(long timestamp, OnTimerContext ctx,
> Collector<Tuple2<String, Long>> out) throws Exception {
> //                        super.onTimer(timestamp, ctx, out);
>                         System.out.println("-------" +
> ctx.getCurrentKey().toString());
>                         if (timestamp <= sum.value().lastModified + 5000) {
>                             out.collect(new Tuple2<String,
> Long>(sum.value().key, sum.value().sum));
> //                            sum.clear();
>                         }
>                     }
>                 });
>
>         sum.print();
>
>         //continueSum(streamSource);
>         env.execute("keyedSteamJob");
> //        System.in.read();
>     }
>
>     public static void continueSum(DataStreamSource<Tuple2<String,
> Integer>> streamSource) {
>         streamSource
>                 //by 1
>                 //.assignTimestampsAndWatermarks(new
> IngestionTimeExtractor())
>                 .keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
>                     @Override
>                     public String getKey(Tuple2<String, Integer> value)
> throws Exception {
>                         return value.f0;
>                     }
>                 })
> //
> .window(TumblingEventTimeWindows.of(Time.milliseconds(10L)))
>                 .sum(1)
>                 .print("+++++++++++++++++++++++++++");
>
>     }
>
>     public static class SumWithTimeStamp {
>         public String key;
>         public long sum;
>         public long lastModified;
>     }
> }
>
>
> ```
> 发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用
>
>