关于窗口org.apache.flink.streaming.api.operators.TimerHeapInternalTimer 类实例数一直增大 导致内存溢出的问题

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

关于窗口org.apache.flink.streaming.api.operators.TimerHeapInternalTimer 类实例数一直增大 导致内存溢出的问题

claylin
写了个去重的任务,代码如下:

     StreamQueryConfig queryConfig = tabEnv.queryConfig();
        queryConfig.withIdleStateRetentionTime(Time.seconds(20), Time.minutes(6));


        DataStream<Student> source = env.socketTextStream("localhost", 10028)
                .map(new MapFunction<String, Student>() {
                    @Override
                    public Student map(String value) throws Exception {
                        String[] vals = value.split(",");
                        if (vals.length < 2) {
                            return null;
                        }
                        Student st = new Student();
                        st.stNo = vals[0];
                        st.name = vals[1];
                        return st;
                    }
                }).returns(Student.class);


        Table table = tabEnv.fromDataStream(source, "stNo, name");


        Table distinctTab = table.groupBy("stNo, name").select("stNo, name");//.select("name, name.count as cnt");


        DataStream<Tuple2<Boolean, Student>> distinctStream = tabEnv.toRetractStream(distinctTab, Student.class);


        DataStream<Student> distintOutStrem = distinctStream.map(tuple2 -> {
            if (tuple2.f0) {
                return tuple2.f1;
            }
            return null;
        }).filter(Objects::nonNull);


        Table after = tabEnv.fromDataStream(distintOutStrem, "stNo, name, proctime.proctime");


        Table result = after.window(Tumble.over("10.seconds").on("proctime").as("w"))
                .groupBy("name, w")
                .select("name, name.count as cnt, w.start as wStart, w.end as wEnd, w.proctime as wProctime");


        DataStream<Result> resultStream = tabEnv.toAppendStream(result, Result.class);
        resultStream.print();
        env.execute(TestState.class.getSimpleName());



但是不知道问题出在哪里,随着长时间运行会导致jvm内存有用光,后面dump内存发现org.apache.flink.streaming.api.operators.TimerHeapInternalTimer 类实例一直在递增,按理说一个窗口时间到了对应的TimerHeapInternalTimer实例都会随着任务执行而被删掉,但是我这里一直在递增。
 num     #instances         #bytes  class name
----------------------------------------------
   1:          5937       44249552  [B
   2:        214238       18291832  [C
   3:        141199        5647960  org.apache.flink.runtime.state.heap.CopyOnWriteStateTable$StateTableEntry
   4:        213521        5124504  java.lang.String
   5:        118727        4397272  [Ljava.lang.Object;
   6:        108138        3460416  java.util.HashMap$Node
   7:         19440        1667688  [Ljava.util.HashMap$Node;
   8:         94253        1508048  org.apache.flink.types.Row
   9:         47066        1506112  org.apache.flink.streaming.api.operators.TimerHeapInternalTimer
  10:         12924        1426104  java.lang.Class
  11:            49        1229592  [Lorg.apache.flink.runtime.state.heap.CopyOnWriteStateTable$StateTableEntry;
  12:         48072        1153728  java.lang.Long
  13:         34657        1109024  java.util.concurrent.ConcurrentHashMap$Node
  14:          7772        1078360  [I
  15:         26591        1063640  java.util.LinkedHashMap$Entry
  16:         15301         856856  java.util.LinkedHashMap
  17:         11771         847512  java.lang.reflect.Field
  18:         13172         843008  java.nio.DirectByteBuffer
  19:          8570         754160  java.lang.reflect.Method
  20:            20         655680  [Lscala.concurrent.forkjoin.ForkJoinTask;
  21:         13402         643296  java.util.HashMap
  22:         12945         621360  org.apache.flink.core.memory.HybridMemorySegment
  23:         13275         531000  sun.misc.Cleaner
  24:         15840         506880  com.esotericsoftware.kryo.Registration
  25:           393         450928  [Ljava.nio.ByteBuffer;
  26:         13166         421312  java.nio.DirectByteBuffer$Deallocator
  27:         25852         413632  java.lang.Object
  28:         14137         339288  java.util.ArrayList
  29:          6410         307680  org.apache.kafka.common.metrics.stats.SampledStat$Sample
  30:          4572         292608  com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeObjectField
  31:           392         288576  [Ljava.util.concurrent.ConcurrentHashMap$Node;
  32:          8412         269184  org.apache.kafka.common.MetricName
  33:          8412         269184  org.apache.kafka.common.metrics.KafkaMetric
  34:            72         268704  [Lorg.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
  35:         10070         241680  org.apache.kafka.common.requests.ApiVersionsResponse$ApiVersion
  36:          9828         225040  [Ljava.lang.Class;
  37:          9360         224640  com.esotericsoftware.kryo.Kryo$DefaultSerializerEntry
  38:          7905         189720  org.apache.flink.api.java.tuple.Tuple2
  39:          2358         150912  org.apache.kafka.common.metrics.Sensor
  40:          1855         148400  java.lang.reflect.Constructor
  41:          1464         143936  [J
  42:          8764         140224  java.util.LinkedHashMap$LinkedEntrySet
  43:          1668         133440  com.esotericsoftware.kryo.serializers.FieldSerializer
Reply | Threaded
Open this post in threaded view
|

Re: 关于窗口org.apache.flink.streaming.api.operators.TimerHeapInternalTimer 类实例数一直增大 导致内存溢出的问题

Terry Wang
会不会是你数据量比较大,然后heapMemory配置的相对较小导致的,是否尝试过调大内存和并发观察是否还有OOM?

Best,
Terry Wang



> 在 2019年9月26日,上午9:25,claylin <[hidden email]> 写道:
>
> 写了个去重的任务,代码如下:
>
>     StreamQueryConfig queryConfig = tabEnv.queryConfig();
>        queryConfig.withIdleStateRetentionTime(Time.seconds(20), Time.minutes(6));
>
>
>        DataStream<Student> source = env.socketTextStream("localhost", 10028)
>                .map(new MapFunction<String, Student>() {
>                    @Override
>                    public Student map(String value) throws Exception {
>                        String[] vals = value.split(",");
>                        if (vals.length < 2) {
>                            return null;
>                        }
>                        Student st = new Student();
>                        st.stNo = vals[0];
>                        st.name = vals[1];
>                        return st;
>                    }
>                }).returns(Student.class);
>
>
>        Table table = tabEnv.fromDataStream(source, "stNo, name");
>
>
>        Table distinctTab = table.groupBy("stNo, name").select("stNo, name");//.select("name, name.count as cnt");
>
>
>        DataStream<Tuple2<Boolean, Student>> distinctStream = tabEnv.toRetractStream(distinctTab, Student.class);
>
>
>        DataStream<Student> distintOutStrem = distinctStream.map(tuple2 -> {
>            if (tuple2.f0) {
>                return tuple2.f1;
>            }
>            return null;
>        }).filter(Objects::nonNull);
>
>
>        Table after = tabEnv.fromDataStream(distintOutStrem, "stNo, name, proctime.proctime");
>
>
>        Table result = after.window(Tumble.over("10.seconds").on("proctime").as("w"))
>                .groupBy("name, w")
>                .select("name, name.count as cnt, w.start as wStart, w.end as wEnd, w.proctime as wProctime");
>
>
>        DataStream<Result> resultStream = tabEnv.toAppendStream(result, Result.class);
>        resultStream.print();
>        env.execute(TestState.class.getSimpleName());
>
>
>
> 但是不知道问题出在哪里,随着长时间运行会导致jvm内存有用光,后面dump内存发现org.apache.flink.streaming.api.operators.TimerHeapInternalTimer 类实例一直在递增,按理说一个窗口时间到了对应的TimerHeapInternalTimer实例都会随着任务执行而被删掉,但是我这里一直在递增。
> num     #instances         #bytes  class name
> ----------------------------------------------
>   1:          5937       44249552  [B
>   2:        214238       18291832  [C
>   3:        141199        5647960  org.apache.flink.runtime.state.heap.CopyOnWriteStateTable$StateTableEntry
>   4:        213521        5124504  java.lang.String
>   5:        118727        4397272  [Ljava.lang.Object;
>   6:        108138        3460416  java.util.HashMap$Node
>   7:         19440        1667688  [Ljava.util.HashMap$Node;
>   8:         94253        1508048  org.apache.flink.types.Row
>   9:         47066        1506112  org.apache.flink.streaming.api.operators.TimerHeapInternalTimer
>  10:         12924        1426104  java.lang.Class
>  11:            49        1229592  [Lorg.apache.flink.runtime.state.heap.CopyOnWriteStateTable$StateTableEntry;
>  12:         48072        1153728  java.lang.Long
>  13:         34657        1109024  java.util.concurrent.ConcurrentHashMap$Node
>  14:          7772        1078360  [I
>  15:         26591        1063640  java.util.LinkedHashMap$Entry
>  16:         15301         856856  java.util.LinkedHashMap
>  17:         11771         847512  java.lang.reflect.Field
>  18:         13172         843008  java.nio.DirectByteBuffer
>  19:          8570         754160  java.lang.reflect.Method
>  20:            20         655680  [Lscala.concurrent.forkjoin.ForkJoinTask;
>  21:         13402         643296  java.util.HashMap
>  22:         12945         621360  org.apache.flink.core.memory.HybridMemorySegment
>  23:         13275         531000  sun.misc.Cleaner
>  24:         15840         506880  com.esotericsoftware.kryo.Registration
>  25:           393         450928  [Ljava.nio.ByteBuffer;
>  26:         13166         421312  java.nio.DirectByteBuffer$Deallocator
>  27:         25852         413632  java.lang.Object
>  28:         14137         339288  java.util.ArrayList
>  29:          6410         307680  org.apache.kafka.common.metrics.stats.SampledStat$Sample
>  30:          4572         292608  com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeObjectField
>  31:           392         288576  [Ljava.util.concurrent.ConcurrentHashMap$Node;
>  32:          8412         269184  org.apache.kafka.common.MetricName
>  33:          8412         269184  org.apache.kafka.common.metrics.KafkaMetric
>  34:            72         268704  [Lorg.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
>  35:         10070         241680  org.apache.kafka.common.requests.ApiVersionsResponse$ApiVersion
>  36:          9828         225040  [Ljava.lang.Class;
>  37:          9360         224640  com.esotericsoftware.kryo.Kryo$DefaultSerializerEntry
>  38:          7905         189720  org.apache.flink.api.java.tuple.Tuple2
>  39:          2358         150912  org.apache.kafka.common.metrics.Sensor
>  40:          1855         148400  java.lang.reflect.Constructor
>  41:          1464         143936  [J
>  42:          8764         140224  java.util.LinkedHashMap$LinkedEntrySet
>  43:          1668         133440  com.esotericsoftware.kryo.serializers.FieldSerializer