http://apache-flink.370.s1.nabble.com/org-apache-flink-streaming-api-operators-TimerHeapInternalTimer-tp745p746.html
> 在 2019年9月26日,上午9:25,claylin <
[hidden email]> 写道:
>
> 写了个去重的任务,代码如下:
>
> StreamQueryConfig queryConfig = tabEnv.queryConfig();
> queryConfig.withIdleStateRetentionTime(Time.seconds(20), Time.minutes(6));
>
>
> DataStream<Student> source = env.socketTextStream("localhost", 10028)
> .map(new MapFunction<String, Student>() {
> @Override
> public Student map(String value) throws Exception {
> String[] vals = value.split(",");
> if (vals.length < 2) {
> return null;
> }
> Student st = new Student();
> st.stNo = vals[0];
> st.name = vals[1];
> return st;
> }
> }).returns(Student.class);
>
>
> Table table = tabEnv.fromDataStream(source, "stNo, name");
>
>
> Table distinctTab = table.groupBy("stNo, name").select("stNo, name");//.select("name, name.count as cnt");
>
>
> DataStream<Tuple2<Boolean, Student>> distinctStream = tabEnv.toRetractStream(distinctTab, Student.class);
>
>
> DataStream<Student> distintOutStrem = distinctStream.map(tuple2 -> {
> if (tuple2.f0) {
> return tuple2.f1;
> }
> return null;
> }).filter(Objects::nonNull);
>
>
> Table after = tabEnv.fromDataStream(distintOutStrem, "stNo, name, proctime.proctime");
>
>
> Table result = after.window(Tumble.over("10.seconds").on("proctime").as("w"))
> .groupBy("name, w")
> .select("name, name.count as cnt, w.start as wStart, w.end as wEnd, w.proctime as wProctime");
>
>
> DataStream<Result> resultStream = tabEnv.toAppendStream(result, Result.class);
> resultStream.print();
> env.execute(TestState.class.getSimpleName());
>
>
>
> 但是不知道问题出在哪里,随着长时间运行会导致jvm内存有用光,后面dump内存发现org.apache.flink.streaming.api.operators.TimerHeapInternalTimer 类实例一直在递增,按理说一个窗口时间到了对应的TimerHeapInternalTimer实例都会随着任务执行而被删掉,但是我这里一直在递增。
> num #instances #bytes class name
> ----------------------------------------------
> 1: 5937 44249552 [B
> 2: 214238 18291832 [C
> 3: 141199 5647960 org.apache.flink.runtime.state.heap.CopyOnWriteStateTable$StateTableEntry
> 4: 213521 5124504 java.lang.String
> 5: 118727 4397272 [Ljava.lang.Object;
> 6: 108138 3460416 java.util.HashMap$Node
> 7: 19440 1667688 [Ljava.util.HashMap$Node;
> 8: 94253 1508048 org.apache.flink.types.Row
> 9: 47066 1506112 org.apache.flink.streaming.api.operators.TimerHeapInternalTimer
> 10: 12924 1426104 java.lang.Class
> 11: 49 1229592 [Lorg.apache.flink.runtime.state.heap.CopyOnWriteStateTable$StateTableEntry;
> 12: 48072 1153728 java.lang.Long
> 13: 34657 1109024 java.util.concurrent.ConcurrentHashMap$Node
> 14: 7772 1078360 [I
> 15: 26591 1063640 java.util.LinkedHashMap$Entry
> 16: 15301 856856 java.util.LinkedHashMap
> 17: 11771 847512 java.lang.reflect.Field
> 18: 13172 843008 java.nio.DirectByteBuffer
> 19: 8570 754160 java.lang.reflect.Method
> 20: 20 655680 [Lscala.concurrent.forkjoin.ForkJoinTask;
> 21: 13402 643296 java.util.HashMap
> 22: 12945 621360 org.apache.flink.core.memory.HybridMemorySegment
> 23: 13275 531000 sun.misc.Cleaner
> 24: 15840 506880 com.esotericsoftware.kryo.Registration
> 25: 393 450928 [Ljava.nio.ByteBuffer;
> 26: 13166 421312 java.nio.DirectByteBuffer$Deallocator
> 27: 25852 413632 java.lang.Object
> 28: 14137 339288 java.util.ArrayList
> 29: 6410 307680 org.apache.kafka.common.metrics.stats.SampledStat$Sample
> 30: 4572 292608 com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeObjectField
> 31: 392 288576 [Ljava.util.concurrent.ConcurrentHashMap$Node;
> 32: 8412 269184 org.apache.kafka.common.MetricName
> 33: 8412 269184 org.apache.kafka.common.metrics.KafkaMetric
> 34: 72 268704 [Lorg.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
> 35: 10070 241680 org.apache.kafka.common.requests.ApiVersionsResponse$ApiVersion
> 36: 9828 225040 [Ljava.lang.Class;
> 37: 9360 224640 com.esotericsoftware.kryo.Kryo$DefaultSerializerEntry
> 38: 7905 189720 org.apache.flink.api.java.tuple.Tuple2
> 39: 2358 150912 org.apache.kafka.common.metrics.Sensor
> 40: 1855 148400 java.lang.reflect.Constructor
> 41: 1464 143936 [J
> 42: 8764 140224 java.util.LinkedHashMap$LinkedEntrySet
> 43: 1668 133440 com.esotericsoftware.kryo.serializers.FieldSerializer