写了个去重的任务,代码如下:
StreamQueryConfig queryConfig = tabEnv.queryConfig(); queryConfig.withIdleStateRetentionTime(Time.seconds(20), Time.minutes(6)); DataStream<Student> source = env.socketTextStream("localhost", 10028) .map(new MapFunction<String, Student>() { @Override public Student map(String value) throws Exception { String[] vals = value.split(","); if (vals.length < 2) { return null; } Student st = new Student(); st.stNo = vals[0]; st.name = vals[1]; return st; } }).returns(Student.class); Table table = tabEnv.fromDataStream(source, "stNo, name"); Table distinctTab = table.groupBy("stNo, name").select("stNo, name");//.select("name, name.count as cnt"); DataStream<Tuple2<Boolean, Student>> distinctStream = tabEnv.toRetractStream(distinctTab, Student.class); DataStream<Student> distintOutStrem = distinctStream.map(tuple2 -> { if (tuple2.f0) { return tuple2.f1; } return null; }).filter(Objects::nonNull); Table after = tabEnv.fromDataStream(distintOutStrem, "stNo, name, proctime.proctime"); Table result = after.window(Tumble.over("10.seconds").on("proctime").as("w")) .groupBy("name, w") .select("name, name.count as cnt, w.start as wStart, w.end as wEnd, w.proctime as wProctime"); DataStream<Result> resultStream = tabEnv.toAppendStream(result, Result.class); resultStream.print(); env.execute(TestState.class.getSimpleName()); 但是不知道问题出在哪里,随着长时间运行会导致jvm内存有用光,后面dump内存发现org.apache.flink.streaming.api.operators.TimerHeapInternalTimer 类实例一直在递增,按理说一个窗口时间到了对应的TimerHeapInternalTimer实例都会随着任务执行而被删掉,但是我这里一直在递增。 num #instances #bytes class name ---------------------------------------------- 1: 5937 44249552 [B 2: 214238 18291832 [C 3: 141199 5647960 org.apache.flink.runtime.state.heap.CopyOnWriteStateTable$StateTableEntry 4: 213521 5124504 java.lang.String 5: 118727 4397272 [Ljava.lang.Object; 6: 108138 3460416 java.util.HashMap$Node 7: 19440 1667688 [Ljava.util.HashMap$Node; 8: 94253 1508048 org.apache.flink.types.Row 9: 47066 1506112 org.apache.flink.streaming.api.operators.TimerHeapInternalTimer 10: 12924 1426104 java.lang.Class 11: 49 1229592 [Lorg.apache.flink.runtime.state.heap.CopyOnWriteStateTable$StateTableEntry; 12: 48072 1153728 java.lang.Long 13: 34657 1109024 java.util.concurrent.ConcurrentHashMap$Node 14: 7772 1078360 [I 15: 26591 1063640 java.util.LinkedHashMap$Entry 16: 15301 856856 java.util.LinkedHashMap 17: 11771 847512 java.lang.reflect.Field 18: 13172 843008 java.nio.DirectByteBuffer 19: 8570 754160 java.lang.reflect.Method 20: 20 655680 [Lscala.concurrent.forkjoin.ForkJoinTask; 21: 13402 643296 java.util.HashMap 22: 12945 621360 org.apache.flink.core.memory.HybridMemorySegment 23: 13275 531000 sun.misc.Cleaner 24: 15840 506880 com.esotericsoftware.kryo.Registration 25: 393 450928 [Ljava.nio.ByteBuffer; 26: 13166 421312 java.nio.DirectByteBuffer$Deallocator 27: 25852 413632 java.lang.Object 28: 14137 339288 java.util.ArrayList 29: 6410 307680 org.apache.kafka.common.metrics.stats.SampledStat$Sample 30: 4572 292608 com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeObjectField 31: 392 288576 [Ljava.util.concurrent.ConcurrentHashMap$Node; 32: 8412 269184 org.apache.kafka.common.MetricName 33: 8412 269184 org.apache.kafka.common.metrics.KafkaMetric 34: 72 268704 [Lorg.apache.flink.runtime.state.heap.HeapPriorityQueueElement; 35: 10070 241680 org.apache.kafka.common.requests.ApiVersionsResponse$ApiVersion 36: 9828 225040 [Ljava.lang.Class; 37: 9360 224640 com.esotericsoftware.kryo.Kryo$DefaultSerializerEntry 38: 7905 189720 org.apache.flink.api.java.tuple.Tuple2 39: 2358 150912 org.apache.kafka.common.metrics.Sensor 40: 1855 148400 java.lang.reflect.Constructor 41: 1464 143936 [J 42: 8764 140224 java.util.LinkedHashMap$LinkedEntrySet 43: 1668 133440 com.esotericsoftware.kryo.serializers.FieldSerializer |
会不会是你数据量比较大,然后heapMemory配置的相对较小导致的,是否尝试过调大内存和并发观察是否还有OOM?
Best, Terry Wang > 在 2019年9月26日,上午9:25,claylin <[hidden email]> 写道: > > 写了个去重的任务,代码如下: > > StreamQueryConfig queryConfig = tabEnv.queryConfig(); > queryConfig.withIdleStateRetentionTime(Time.seconds(20), Time.minutes(6)); > > > DataStream<Student> source = env.socketTextStream("localhost", 10028) > .map(new MapFunction<String, Student>() { > @Override > public Student map(String value) throws Exception { > String[] vals = value.split(","); > if (vals.length < 2) { > return null; > } > Student st = new Student(); > st.stNo = vals[0]; > st.name = vals[1]; > return st; > } > }).returns(Student.class); > > > Table table = tabEnv.fromDataStream(source, "stNo, name"); > > > Table distinctTab = table.groupBy("stNo, name").select("stNo, name");//.select("name, name.count as cnt"); > > > DataStream<Tuple2<Boolean, Student>> distinctStream = tabEnv.toRetractStream(distinctTab, Student.class); > > > DataStream<Student> distintOutStrem = distinctStream.map(tuple2 -> { > if (tuple2.f0) { > return tuple2.f1; > } > return null; > }).filter(Objects::nonNull); > > > Table after = tabEnv.fromDataStream(distintOutStrem, "stNo, name, proctime.proctime"); > > > Table result = after.window(Tumble.over("10.seconds").on("proctime").as("w")) > .groupBy("name, w") > .select("name, name.count as cnt, w.start as wStart, w.end as wEnd, w.proctime as wProctime"); > > > DataStream<Result> resultStream = tabEnv.toAppendStream(result, Result.class); > resultStream.print(); > env.execute(TestState.class.getSimpleName()); > > > > 但是不知道问题出在哪里,随着长时间运行会导致jvm内存有用光,后面dump内存发现org.apache.flink.streaming.api.operators.TimerHeapInternalTimer 类实例一直在递增,按理说一个窗口时间到了对应的TimerHeapInternalTimer实例都会随着任务执行而被删掉,但是我这里一直在递增。 > num #instances #bytes class name > ---------------------------------------------- > 1: 5937 44249552 [B > 2: 214238 18291832 [C > 3: 141199 5647960 org.apache.flink.runtime.state.heap.CopyOnWriteStateTable$StateTableEntry > 4: 213521 5124504 java.lang.String > 5: 118727 4397272 [Ljava.lang.Object; > 6: 108138 3460416 java.util.HashMap$Node > 7: 19440 1667688 [Ljava.util.HashMap$Node; > 8: 94253 1508048 org.apache.flink.types.Row > 9: 47066 1506112 org.apache.flink.streaming.api.operators.TimerHeapInternalTimer > 10: 12924 1426104 java.lang.Class > 11: 49 1229592 [Lorg.apache.flink.runtime.state.heap.CopyOnWriteStateTable$StateTableEntry; > 12: 48072 1153728 java.lang.Long > 13: 34657 1109024 java.util.concurrent.ConcurrentHashMap$Node > 14: 7772 1078360 [I > 15: 26591 1063640 java.util.LinkedHashMap$Entry > 16: 15301 856856 java.util.LinkedHashMap > 17: 11771 847512 java.lang.reflect.Field > 18: 13172 843008 java.nio.DirectByteBuffer > 19: 8570 754160 java.lang.reflect.Method > 20: 20 655680 [Lscala.concurrent.forkjoin.ForkJoinTask; > 21: 13402 643296 java.util.HashMap > 22: 12945 621360 org.apache.flink.core.memory.HybridMemorySegment > 23: 13275 531000 sun.misc.Cleaner > 24: 15840 506880 com.esotericsoftware.kryo.Registration > 25: 393 450928 [Ljava.nio.ByteBuffer; > 26: 13166 421312 java.nio.DirectByteBuffer$Deallocator > 27: 25852 413632 java.lang.Object > 28: 14137 339288 java.util.ArrayList > 29: 6410 307680 org.apache.kafka.common.metrics.stats.SampledStat$Sample > 30: 4572 292608 com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeObjectField > 31: 392 288576 [Ljava.util.concurrent.ConcurrentHashMap$Node; > 32: 8412 269184 org.apache.kafka.common.MetricName > 33: 8412 269184 org.apache.kafka.common.metrics.KafkaMetric > 34: 72 268704 [Lorg.apache.flink.runtime.state.heap.HeapPriorityQueueElement; > 35: 10070 241680 org.apache.kafka.common.requests.ApiVersionsResponse$ApiVersion > 36: 9828 225040 [Ljava.lang.Class; > 37: 9360 224640 com.esotericsoftware.kryo.Kryo$DefaultSerializerEntry > 38: 7905 189720 org.apache.flink.api.java.tuple.Tuple2 > 39: 2358 150912 org.apache.kafka.common.metrics.Sensor > 40: 1855 148400 java.lang.reflect.Constructor > 41: 1464 143936 [J > 42: 8764 140224 java.util.LinkedHashMap$LinkedEntrySet > 43: 1668 133440 com.esotericsoftware.kryo.serializers.FieldSerializer |
Free forum by Nabble | Edit this page |