关于Flink checkpoint偶尔会比较长时间的问题。
环境与背景: 版本:flink1.10.0 数据量:每秒约10万左右的记录,数据源是kafka 计算逻辑:滑动窗口统计,每个窗口输出的规模大概1~2千万记录。 是否有反压:任务经常会出现反压(特别是在窗口输出的时候)。 问题: 大部分时候checkpoint都是在1分钟内完成,偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次)。 source的checkpoint消耗的时间比较长。Trigger checkpoint 到 Starting checkpoint消耗时间比较长。 checkpoint情况大致如下: 2020-06-24 21:09:53,369 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Trigger checkpoint 316@1593004193363 for 84dce1ec8aa5a4df2d1758d6e9278693. 2020-06-24 21:09:58,327 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Received heartbeat request from e88ea2f790430c9c160e540ef0546d60. 2020-06-24 21:09:59,266 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Received heartbeat request from b93d7167db364dfdcbda886944f1482f. 2020-06-24 21:09:59,686 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Memory usage stats: [HEAP: 202/2573/2573 MB, NON HEAP: 111/114/424 MB (used/committed/max)] 2020-06-24 21:09:59,686 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Direct memory stats: Count: 17403, Total Capacity: 583911423, Used Memory: 583911424 2020-06-24 21:09:59,686 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Off-heap pool stats: [Code Cache: 35/35/240 MB (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)], [Compressed Class Space: 8/9/88 MB (used/committed/max)] 2020-06-24 21:09:59,686 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6] 2020-06-24 21:10:08,346 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Received heartbeat request from e88ea2f790430c9c160e540ef0546d60. 2020-06-24 21:10:09,286 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Received heartbeat request from b93d7167db364dfdcbda886944f1482f. 2020-06-24 21:10:09,686 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Memory usage stats: [HEAP: 557/2573/2573 MB, NON HEAP: 111/114/424 MB (used/committed/max)] 2020-06-24 21:10:09,686 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Direct memory stats: Count: 17403, Total Capacity: 583911423, Used Memory: 583911424 2020-06-24 21:10:09,686 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Off-heap pool stats: [Code Cache: 35/35/240 MB (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)], [Compressed Class Space: 8/9/88 MB (used/committed/max)] 2020-06-24 21:10:09,686 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6] 省略。。。。 2020-06-24 21:55:39,875 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Direct memory stats: Count: 17403, Total Capacity: 583911423, Used Memory: 583911424 2020-06-24 21:55:39,875 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Off-heap pool stats: [Code Cache: 35/35/240 MB (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)], [Compressed Class Space: 8/9/88 MB (used/committed/max)] 2020-06-24 21:55:39,876 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Garbage collector stats: [PS Scavenge, GC TIME (ms): 110520, GC COUNT: 7083], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6] 2020-06-24 21:55:41,721 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Starting checkpoint (316) CHECKPOINT on task Source: Custom Source -> Map -> Filter -> Timestamps/Watermarks (4/10) 2020-06-24 21:55:41,721 DEBUG org.apache.flink.runtime.state.AbstractSnapshotStrategy - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd, checkpointDirectory=hdfs://xxxxchk-316, sharedStateDirectory=hdfs://xxxxshared, taskOwnedStateDirectory=hdfs://xxxxtaskowned, metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in thread Thread[Source: Custom Source -> Map -> Filter -> Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms. 2020-06-24 21:55:41,721 DEBUG org.apache.flink.runtime.state.AbstractSnapshotStrategy - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd, checkpointDirectory=hdfs://xxxxchk-316, sharedStateDirectory=hdfs://xxxxshared, taskOwnedStateDirectory=hdfs://xxxxtaskowned, metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in thread Thread[Source: Custom Source -> Map -> Filter -> Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms. 2020-06-24 21:55:41,721 DEBUG org.apache.flink.runtime.state.AbstractSnapshotStrategy - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd, checkpointDirectory=hdfs://xxxxchk-316, sharedStateDirectory=hdfs://xxxxshared, taskOwnedStateDirectory=hdfs://xxxxtaskowned, metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in thread Thread[Source: Custom Source -> Map -> Filter -> Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms. 2020-06-24 21:55:41,721 DEBUG org.apache.flink.runtime.state.AbstractSnapshotStrategy - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd, checkpointDirectory=hdfs://xxxxchk-316, sharedStateDirectory=hdfs://xxxxshared, taskOwnedStateDirectory=hdfs://xxxxtaskowned, metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in thread Thread[Source: Custom Source -> Map -> Filter -> Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms. 2020-06-24 21:55:41,721 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Finished synchronous checkpoints for checkpoint 316 on task Source: Custom Source -> Map -> Filter -> Timestamps/Watermarks (4/10) 2020-06-24 21:55:41,721 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Source: Custom Source -> Map -> Filter -> Timestamps/Watermarks (4/10) - finished synchronous part of checkpoint 316. Alignment duration: 0 ms, snapshot duration 0 ms 2020-06-24 21:55:41,737 DEBUG org.apache.flink.runtime.state.AbstractSnapshotStrategy - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd, checkpointDirectory=hdfs://xxxxchk-316, sharedStateDirectory=hdfs://xxxxshared, taskOwnedStateDirectory=hdfs://xxxxtaskowned, metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), fileStateSizeThreshold=1024, writeBufferSize=4096}, asynchronous part) in thread Thread[AsyncOperations-thread-316,5,Flink Task Threads] took 16 ms. 2020-06-24 21:55:41,738 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Source: Custom Source -> Map -> Filter -> Timestamps/Watermarks (4/10) - finished asynchronous part of checkpoint 316. Asynchronous duration: 16 ms 2020-06-24 21:55:42,008 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 3. 2020-06-24 21:55:42,008 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): Starting stream alignment for checkpoint 316. 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 7. 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 2. 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 8. 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 6. 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 0. 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 5. 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 9. 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 4. 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 1. 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): Received all barriers, triggering checkpoint 316 at 1593004193363. 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): End of stream alignment, feeding buffered data back. 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Starting checkpoint (316) CHECKPOINT on task Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) 2020-06-24 21:55:42,110 DEBUG org.apache.flink.runtime.state.AbstractSnapshotStrategy - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119, checkpointDirectory=hdfs://xxxxchk-316, sharedStateDirectory=hdfs://xxxxshared, taskOwnedStateDirectory=hdfs://xxxxtaskowned, metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in thread Thread[Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10),5,Flink Task Threads] took 0 ms. 2020-06-24 21:55:42,110 DEBUG org.apache.flink.runtime.state.AbstractSnapshotStrategy - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119, checkpointDirectory=hdfs://xxxxchk-316, sharedStateDirectory=hdfs://xxxxshared, taskOwnedStateDirectory=hdfs://xxxxtaskowned, metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in thread Thread[Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10),5,Flink Task Threads] took 0 ms. 2020-06-24 21:55:42,980 DEBUG org.apache.flink.runtime.state.AbstractSnapshotStrategy - Asynchronous incremental RocksDB snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119, checkpointDirectory=hdfs://xxxx/chk-316, sharedStateDirectory=hdfs://xxxx/shared, taskOwnedStateDirectory=hdfs://xxxx/taskowned, metadataFilePath=hdfs://xxxx/chk-316/_metadata, reference=(default), fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in thread Thread[Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10),5,Flink Task Threads] took 870 ms. 2020-06-24 21:55:42,981 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Finished synchronous checkpoints for checkpoint 316 on task Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) 2020-06-24 21:55:42,981 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) - finished synchronous part of checkpoint 316. Alignment duration: 101 ms, snapshot duration 870 ms 2020-06-24 21:55:42,981 DEBUG org.apache.flink.streaming.runtime.io.CachedBufferStorage - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): Size of buffered data: 98304 bytes 2020-06-24 21:55:42,981 DEBUG org.apache.flink.streaming.runtime.io.CachedBufferStorage - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): Finished feeding back buffered data. 2020-06-24 21:55:43,814 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Received heartbeat request from e88ea2f790430c9c160e540ef0546d60. 2020-06-24 21:55:44,758 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Received heartbeat request from b93d7167db364dfdcbda886944f1482f. 2020-06-24 21:55:45,714 DEBUG org.apache.flink.runtime.state.AbstractSnapshotStrategy - Asynchronous incremental RocksDB snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119, checkpointDirectory=hdfs://xxxx/chk-316, sharedStateDirectory=hdfs://xxxx/shared, taskOwnedStateDirectory=hdfs://xxxx/taskowned, metadataFilePath=hdfs://xxxx/chk-316/_metadata, reference=(default), fileStateSizeThreshold=1024, writeBufferSize=4096}, asynchronous part) in thread Thread[AsyncOperations-thread-316,5,Flink Task Threads] took 2733 ms. 2020-06-24 21:55:45,715 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) - finished asynchronous part of checkpoint 316. Asynchronous duration: 2734 ms 2020-06-24 21:55:49,876 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Memory usage stats: [HEAP: 220/2589/2589 MB, NON HEAP: 111/114/424 MB (used/committed/max)] 2020-06-24 21:55:49,876 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Direct memory stats: Count: 17414, Total Capacity: 584128352, Used Memory: 584128353 2020-06-24 21:55:49,876 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Off-heap pool stats: [Code Cache: 35/35/240 MB (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)], [Compressed Class Space: 8/9/88 MB (used/committed/max)] 2020-06-24 21:55:49,876 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Garbage collector stats: [PS Scavenge, GC TIME (ms): 110525, GC COUNT: 7084], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6] ************************************** tivanli ************************************** |
hi, Tianwang Li
看到有三个图片挂了,可以试着把图片上传到第三方的图床,然后贴个链接过来,另外: > 任务经常会出现反压(特别是在窗口输出的时候) 这个检查一下窗口下游算子的情况,比如是不是窗口输出的数据过多,而 sink 的并发还和之前的保持一致,导致处理速度跟不上,从而导致的反压。 > 大部分时候checkpoint都是在1分钟内完成,偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次) 这种也可以看看是不是 HDFS 有时候压力比较大导致的出现毛刺现象 另外建议补一下 UI 上 chekcpoint 相关的截图和日志信息,这样才能更好的定位问题。 Best ! zhisheng Tianwang Li <[hidden email]> 于2020年6月28日周日 上午10:17写道: > 关于Flink checkpoint偶尔会比较长时间的问题。 > > *环境与背景:* > 版本:flink1.10.0 > 数据量:每秒约10万左右的记录,数据源是kafka > 计算逻辑:滑动窗口统计,每个窗口输出的规模大概1~2千万记录。 > 是否有反压:任务经常会出现反压(特别是在窗口输出的时候)。 > > *问题:* > 大部分时候checkpoint都是在1分钟内完成,偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次)。 > source的checkpoint消耗的时间比较长。Trigger checkpoint 到 Starting checkpoint消耗时间比较长。 > > checkpoint情况大致如下: > > [image: image.png] > [image: image.png] > [image: image.png] > > 2020-06-24 21:09:53,369 DEBUG > org.apache.flink.runtime.taskexecutor.TaskExecutor - Trigger > checkpoint 316@1593004193363 for 84dce1ec8aa5a4df2d1758d6e9278693. > > 2020-06-24 21:09:58,327 DEBUG > org.apache.flink.runtime.taskexecutor.TaskExecutor - Received > heartbeat request from e88ea2f790430c9c160e540ef0546d60. > > 2020-06-24 21:09:59,266 DEBUG > org.apache.flink.runtime.taskexecutor.TaskExecutor - Received > heartbeat request from b93d7167db364dfdcbda886944f1482f. > > 2020-06-24 21:09:59,686 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner > - Memory usage stats: [HEAP: 202/2573/2573 MB, NON HEAP: > 111/114/424 MB (used/committed/max)] > > 2020-06-24 21:09:59,686 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner > - Direct memory stats: Count: 17403, Total Capacity: 583911423, > Used Memory: 583911424 > > 2020-06-24 21:09:59,686 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner > - Off-heap pool stats: [Code Cache: 35/35/240 MB > (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)], > [Compressed Class Space: 8/9/88 MB (used/committed/max)] > > 2020-06-24 21:09:59,686 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner > - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC > COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6] > > 2020-06-24 21:10:08,346 DEBUG > org.apache.flink.runtime.taskexecutor.TaskExecutor - Received > heartbeat request from e88ea2f790430c9c160e540ef0546d60. > > 2020-06-24 21:10:09,286 DEBUG > org.apache.flink.runtime.taskexecutor.TaskExecutor - Received > heartbeat request from b93d7167db364dfdcbda886944f1482f. > > 2020-06-24 21:10:09,686 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner > - Memory usage stats: [HEAP: 557/2573/2573 MB, NON HEAP: > 111/114/424 MB (used/committed/max)] > > 2020-06-24 21:10:09,686 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner > - Direct memory stats: Count: 17403, Total Capacity: 583911423, > Used Memory: 583911424 > > 2020-06-24 21:10:09,686 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner > - Off-heap pool stats: [Code Cache: 35/35/240 MB > (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)], > [Compressed Class Space: 8/9/88 MB (used/committed/max)] > > 2020-06-24 21:10:09,686 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner > - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC > COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6] > > > 省略。。。。 > > > 2020-06-24 21:55:39,875 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner > - Direct memory stats: Count: 17403, Total Capacity: 583911423, > Used Memory: 583911424 > > 2020-06-24 21:55:39,875 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner > - Off-heap pool stats: [Code Cache: 35/35/240 MB > (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)], > [Compressed Class Space: 8/9/88 MB (used/committed/max)] > > 2020-06-24 21:55:39,876 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner > - Garbage collector stats: [PS Scavenge, GC TIME (ms): 110520, GC > COUNT: 7083], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6] > > 2020-06-24 21:55:41,721 DEBUG > org.apache.flink.streaming.runtime.tasks.StreamTask - Starting > checkpoint (316) CHECKPOINT on task Source: Custom Source -> Map -> Filter > -> Timestamps/Watermarks (4/10) > > 2020-06-24 21:55:41,721 DEBUG > org.apache.flink.runtime.state.AbstractSnapshotStrategy - > DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd, > checkpointDirectory=hdfs://xxxxchk-316, > sharedStateDirectory=hdfs://xxxxshared, > taskOwnedStateDirectory=hdfs://xxxxtaskowned, > metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), > fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in > thread Thread[Source: Custom Source -> Map -> Filter -> > Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms. > > 2020-06-24 21:55:41,721 DEBUG > org.apache.flink.runtime.state.AbstractSnapshotStrategy - > DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd, > checkpointDirectory=hdfs://xxxxchk-316, > sharedStateDirectory=hdfs://xxxxshared, > taskOwnedStateDirectory=hdfs://xxxxtaskowned, > metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), > fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in > thread Thread[Source: Custom Source -> Map -> Filter -> > Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms. > > 2020-06-24 21:55:41,721 DEBUG > org.apache.flink.runtime.state.AbstractSnapshotStrategy - > DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd, > checkpointDirectory=hdfs://xxxxchk-316, > sharedStateDirectory=hdfs://xxxxshared, > taskOwnedStateDirectory=hdfs://xxxxtaskowned, > metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), > fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in > thread Thread[Source: Custom Source -> Map -> Filter -> > Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms. > > 2020-06-24 21:55:41,721 DEBUG > org.apache.flink.runtime.state.AbstractSnapshotStrategy - > DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd, > checkpointDirectory=hdfs://xxxxchk-316, > sharedStateDirectory=hdfs://xxxxshared, > taskOwnedStateDirectory=hdfs://xxxxtaskowned, > metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), > fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in > thread Thread[Source: Custom Source -> Map -> Filter -> > Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms. > > 2020-06-24 21:55:41,721 DEBUG > org.apache.flink.streaming.runtime.tasks.StreamTask - Finished > synchronous checkpoints for checkpoint 316 on task Source: Custom Source -> > Map -> Filter -> Timestamps/Watermarks (4/10) > > 2020-06-24 21:55:41,721 DEBUG > org.apache.flink.streaming.runtime.tasks.StreamTask - Source: > Custom Source -> Map -> Filter -> Timestamps/Watermarks (4/10) - finished > synchronous part of checkpoint 316. Alignment duration: 0 ms, snapshot > duration 0 ms > > 2020-06-24 21:55:41,737 DEBUG > org.apache.flink.runtime.state.AbstractSnapshotStrategy - > DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd, > checkpointDirectory=hdfs://xxxxchk-316, > sharedStateDirectory=hdfs://xxxxshared, > taskOwnedStateDirectory=hdfs://xxxxtaskowned, > metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), > fileStateSizeThreshold=1024, writeBufferSize=4096}, asynchronous part) in > thread Thread[AsyncOperations-thread-316,5,Flink Task Threads] took 16 ms. > > 2020-06-24 21:55:41,738 DEBUG > org.apache.flink.streaming.runtime.tasks.StreamTask - Source: > Custom Source -> Map -> Filter -> Timestamps/Watermarks (4/10) - finished > asynchronous part of checkpoint 316. Asynchronous duration: 16 ms > > 2020-06-24 21:55:42,008 DEBUG > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 3. > > 2020-06-24 21:55:42,008 DEBUG > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > (a608e8a1ab75622dc57f164bdbb86743): Starting stream alignment for > checkpoint 316. > > 2020-06-24 21:55:42,110 DEBUG > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 7. > > 2020-06-24 21:55:42,110 DEBUG > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 2. > > 2020-06-24 21:55:42,110 DEBUG > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 8. > > 2020-06-24 21:55:42,110 DEBUG > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 6. > > 2020-06-24 21:55:42,110 DEBUG > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 0. > > 2020-06-24 21:55:42,110 DEBUG > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 5. > > 2020-06-24 21:55:42,110 DEBUG > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 9. > > 2020-06-24 21:55:42,110 DEBUG > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 4. > > 2020-06-24 21:55:42,110 DEBUG > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 1. > > 2020-06-24 21:55:42,110 DEBUG > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > (a608e8a1ab75622dc57f164bdbb86743): Received all barriers, triggering > checkpoint 316 at 1593004193363. > > 2020-06-24 21:55:42,110 DEBUG > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > (a608e8a1ab75622dc57f164bdbb86743): End of stream alignment, feeding > buffered data back. > > 2020-06-24 21:55:42,110 DEBUG > org.apache.flink.streaming.runtime.tasks.StreamTask - Starting > checkpoint (316) CHECKPOINT on task > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > > 2020-06-24 21:55:42,110 DEBUG > org.apache.flink.runtime.state.AbstractSnapshotStrategy - > DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119, > checkpointDirectory=hdfs://xxxxchk-316, > sharedStateDirectory=hdfs://xxxxshared, > taskOwnedStateDirectory=hdfs://xxxxtaskowned, > metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), > fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in > thread Thread[Window(SlidingEventTimeWindows(10800000, 300000, 0), > EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map > (10/10),5,Flink Task Threads] took 0 ms. > > 2020-06-24 21:55:42,110 DEBUG > org.apache.flink.runtime.state.AbstractSnapshotStrategy - > DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119, > checkpointDirectory=hdfs://xxxxchk-316, > sharedStateDirectory=hdfs://xxxxshared, > taskOwnedStateDirectory=hdfs://xxxxtaskowned, > metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), > fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in > thread Thread[Window(SlidingEventTimeWindows(10800000, 300000, 0), > EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map > (10/10),5,Flink Task Threads] took 0 ms. > > 2020-06-24 21:55:42,980 DEBUG > org.apache.flink.runtime.state.AbstractSnapshotStrategy - > Asynchronous incremental RocksDB snapshot (FsCheckpointStorageLocation > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119, > checkpointDirectory=hdfs://xxxx/chk-316, > sharedStateDirectory=hdfs://xxxx/shared, > taskOwnedStateDirectory=hdfs://xxxx/taskowned, > metadataFilePath=hdfs://xxxx/chk-316/_metadata, reference=(default), > fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in > thread Thread[Window(SlidingEventTimeWindows(10800000, 300000, 0), > EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map > (10/10),5,Flink Task Threads] took 870 ms. > > 2020-06-24 21:55:42,981 DEBUG > org.apache.flink.streaming.runtime.tasks.StreamTask - Finished > synchronous checkpoints for checkpoint 316 on task > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > > 2020-06-24 21:55:42,981 DEBUG > org.apache.flink.streaming.runtime.tasks.StreamTask - > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) - finished > synchronous part of checkpoint 316. Alignment duration: 101 ms, snapshot > duration 870 ms > > 2020-06-24 21:55:42,981 DEBUG > org.apache.flink.streaming.runtime.io.CachedBufferStorage - > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > (a608e8a1ab75622dc57f164bdbb86743): Size of buffered data: 98304 bytes > > 2020-06-24 21:55:42,981 DEBUG > org.apache.flink.streaming.runtime.io.CachedBufferStorage - > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > (a608e8a1ab75622dc57f164bdbb86743): Finished feeding back buffered data. > > 2020-06-24 21:55:43,814 DEBUG > org.apache.flink.runtime.taskexecutor.TaskExecutor - Received > heartbeat request from e88ea2f790430c9c160e540ef0546d60. > > 2020-06-24 21:55:44,758 DEBUG > org.apache.flink.runtime.taskexecutor.TaskExecutor - Received > heartbeat request from b93d7167db364dfdcbda886944f1482f. > > 2020-06-24 21:55:45,714 DEBUG > org.apache.flink.runtime.state.AbstractSnapshotStrategy - > Asynchronous incremental RocksDB snapshot (FsCheckpointStorageLocation > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119, > checkpointDirectory=hdfs://xxxx/chk-316, > sharedStateDirectory=hdfs://xxxx/shared, > taskOwnedStateDirectory=hdfs://xxxx/taskowned, > metadataFilePath=hdfs://xxxx/chk-316/_metadata, reference=(default), > fileStateSizeThreshold=1024, writeBufferSize=4096}, asynchronous part) in > thread Thread[AsyncOperations-thread-316,5,Flink Task Threads] took 2733 ms. > > 2020-06-24 21:55:45,715 DEBUG > org.apache.flink.streaming.runtime.tasks.StreamTask - > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) - finished > asynchronous part of checkpoint 316. Asynchronous duration: 2734 ms > > 2020-06-24 21:55:49,876 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner > - Memory usage stats: [HEAP: 220/2589/2589 MB, NON HEAP: > 111/114/424 MB (used/committed/max)] > > 2020-06-24 21:55:49,876 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner > - Direct memory stats: Count: 17414, Total Capacity: 584128352, > Used Memory: 584128353 > > 2020-06-24 21:55:49,876 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner > - Off-heap pool stats: [Code Cache: 35/35/240 MB > (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)], > [Compressed Class Space: 8/9/88 MB (used/committed/max)] > > 2020-06-24 21:55:49,876 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner > - Garbage collector stats: [PS Scavenge, GC TIME (ms): 110525, GC > COUNT: 7084], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6] > > > -- > ************************************** > tivanli > ************************************** > |
Hi Tianwang Li,
偶尔一两天出现 Checkpoint 超时,看下你的任务中,是否可能存在某类 key 在这一两天会突然增多的情况。 Best, LakeShen zhisheng <[hidden email]> 于2020年6月28日周日 上午10:27写道: > hi, Tianwang Li > > 看到有三个图片挂了,可以试着把图片上传到第三方的图床,然后贴个链接过来,另外: > > > 任务经常会出现反压(特别是在窗口输出的时候) > > 这个检查一下窗口下游算子的情况,比如是不是窗口输出的数据过多,而 sink 的并发还和之前的保持一致,导致处理速度跟不上,从而导致的反压。 > > > > 大部分时候checkpoint都是在1分钟内完成,偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次) > > 这种也可以看看是不是 HDFS 有时候压力比较大导致的出现毛刺现象 > > 另外建议补一下 UI 上 chekcpoint 相关的截图和日志信息,这样才能更好的定位问题。 > > > Best ! > zhisheng > > > Tianwang Li <[hidden email]> 于2020年6月28日周日 上午10:17写道: > > > 关于Flink checkpoint偶尔会比较长时间的问题。 > > > > *环境与背景:* > > 版本:flink1.10.0 > > 数据量:每秒约10万左右的记录,数据源是kafka > > 计算逻辑:滑动窗口统计,每个窗口输出的规模大概1~2千万记录。 > > 是否有反压:任务经常会出现反压(特别是在窗口输出的时候)。 > > > > *问题:* > > 大部分时候checkpoint都是在1分钟内完成,偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次)。 > > source的checkpoint消耗的时间比较长。Trigger checkpoint 到 Starting > checkpoint消耗时间比较长。 > > > > checkpoint情况大致如下: > > > > [image: image.png] > > [image: image.png] > > [image: image.png] > > > > 2020-06-24 21:09:53,369 DEBUG > > org.apache.flink.runtime.taskexecutor.TaskExecutor - Trigger > > checkpoint 316@1593004193363 for 84dce1ec8aa5a4df2d1758d6e9278693. > > > > 2020-06-24 21:09:58,327 DEBUG > > org.apache.flink.runtime.taskexecutor.TaskExecutor - Received > > heartbeat request from e88ea2f790430c9c160e540ef0546d60. > > > > 2020-06-24 21:09:59,266 DEBUG > > org.apache.flink.runtime.taskexecutor.TaskExecutor - Received > > heartbeat request from b93d7167db364dfdcbda886944f1482f. > > > > 2020-06-24 21:09:59,686 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > > - Memory usage stats: [HEAP: 202/2573/2573 MB, NON HEAP: > > 111/114/424 MB (used/committed/max)] > > > > 2020-06-24 21:09:59,686 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > > - Direct memory stats: Count: 17403, Total Capacity: 583911423, > > Used Memory: 583911424 > > > > 2020-06-24 21:09:59,686 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > > - Off-heap pool stats: [Code Cache: 35/35/240 MB > > (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)], > > [Compressed Class Space: 8/9/88 MB (used/committed/max)] > > > > 2020-06-24 21:09:59,686 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > > - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC > > COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6] > > > > 2020-06-24 21:10:08,346 DEBUG > > org.apache.flink.runtime.taskexecutor.TaskExecutor - Received > > heartbeat request from e88ea2f790430c9c160e540ef0546d60. > > > > 2020-06-24 21:10:09,286 DEBUG > > org.apache.flink.runtime.taskexecutor.TaskExecutor - Received > > heartbeat request from b93d7167db364dfdcbda886944f1482f. > > > > 2020-06-24 21:10:09,686 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > > - Memory usage stats: [HEAP: 557/2573/2573 MB, NON HEAP: > > 111/114/424 MB (used/committed/max)] > > > > 2020-06-24 21:10:09,686 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > > - Direct memory stats: Count: 17403, Total Capacity: 583911423, > > Used Memory: 583911424 > > > > 2020-06-24 21:10:09,686 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > > - Off-heap pool stats: [Code Cache: 35/35/240 MB > > (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)], > > [Compressed Class Space: 8/9/88 MB (used/committed/max)] > > > > 2020-06-24 21:10:09,686 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > > - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC > > COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6] > > > > > > 省略。。。。 > > > > > > 2020-06-24 21:55:39,875 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > > - Direct memory stats: Count: 17403, Total Capacity: 583911423, > > Used Memory: 583911424 > > > > 2020-06-24 21:55:39,875 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > > - Off-heap pool stats: [Code Cache: 35/35/240 MB > > (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)], > > [Compressed Class Space: 8/9/88 MB (used/committed/max)] > > > > 2020-06-24 21:55:39,876 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > > - Garbage collector stats: [PS Scavenge, GC TIME (ms): 110520, GC > > COUNT: 7083], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6] > > > > 2020-06-24 21:55:41,721 DEBUG > > org.apache.flink.streaming.runtime.tasks.StreamTask - Starting > > checkpoint (316) CHECKPOINT on task Source: Custom Source -> Map -> > Filter > > -> Timestamps/Watermarks (4/10) > > > > 2020-06-24 21:55:41,721 DEBUG > > org.apache.flink.runtime.state.AbstractSnapshotStrategy - > > DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation > > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd > , > > checkpointDirectory=hdfs://xxxxchk-316, > > sharedStateDirectory=hdfs://xxxxshared, > > taskOwnedStateDirectory=hdfs://xxxxtaskowned, > > metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), > > fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in > > thread Thread[Source: Custom Source -> Map -> Filter -> > > Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms. > > > > 2020-06-24 21:55:41,721 DEBUG > > org.apache.flink.runtime.state.AbstractSnapshotStrategy - > > DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation > > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd > , > > checkpointDirectory=hdfs://xxxxchk-316, > > sharedStateDirectory=hdfs://xxxxshared, > > taskOwnedStateDirectory=hdfs://xxxxtaskowned, > > metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), > > fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in > > thread Thread[Source: Custom Source -> Map -> Filter -> > > Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms. > > > > 2020-06-24 21:55:41,721 DEBUG > > org.apache.flink.runtime.state.AbstractSnapshotStrategy - > > DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation > > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd > , > > checkpointDirectory=hdfs://xxxxchk-316, > > sharedStateDirectory=hdfs://xxxxshared, > > taskOwnedStateDirectory=hdfs://xxxxtaskowned, > > metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), > > fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in > > thread Thread[Source: Custom Source -> Map -> Filter -> > > Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms. > > > > 2020-06-24 21:55:41,721 DEBUG > > org.apache.flink.runtime.state.AbstractSnapshotStrategy - > > DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation > > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd > , > > checkpointDirectory=hdfs://xxxxchk-316, > > sharedStateDirectory=hdfs://xxxxshared, > > taskOwnedStateDirectory=hdfs://xxxxtaskowned, > > metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), > > fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in > > thread Thread[Source: Custom Source -> Map -> Filter -> > > Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms. > > > > 2020-06-24 21:55:41,721 DEBUG > > org.apache.flink.streaming.runtime.tasks.StreamTask - Finished > > synchronous checkpoints for checkpoint 316 on task Source: Custom Source > -> > > Map -> Filter -> Timestamps/Watermarks (4/10) > > > > 2020-06-24 21:55:41,721 DEBUG > > org.apache.flink.streaming.runtime.tasks.StreamTask - Source: > > Custom Source -> Map -> Filter -> Timestamps/Watermarks (4/10) - finished > > synchronous part of checkpoint 316. Alignment duration: 0 ms, snapshot > > duration 0 ms > > > > 2020-06-24 21:55:41,737 DEBUG > > org.apache.flink.runtime.state.AbstractSnapshotStrategy - > > DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation > > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd > , > > checkpointDirectory=hdfs://xxxxchk-316, > > sharedStateDirectory=hdfs://xxxxshared, > > taskOwnedStateDirectory=hdfs://xxxxtaskowned, > > metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), > > fileStateSizeThreshold=1024, writeBufferSize=4096}, asynchronous part) in > > thread Thread[AsyncOperations-thread-316,5,Flink Task Threads] took 16 > ms. > > > > 2020-06-24 21:55:41,738 DEBUG > > org.apache.flink.streaming.runtime.tasks.StreamTask - Source: > > Custom Source -> Map -> Filter -> Timestamps/Watermarks (4/10) - finished > > asynchronous part of checkpoint 316. Asynchronous duration: 16 ms > > > > 2020-06-24 21:55:42,008 DEBUG > > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 3. > > > > 2020-06-24 21:55:42,008 DEBUG > > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > > (a608e8a1ab75622dc57f164bdbb86743): Starting stream alignment for > > checkpoint 316. > > > > 2020-06-24 21:55:42,110 DEBUG > > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 7. > > > > 2020-06-24 21:55:42,110 DEBUG > > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 2. > > > > 2020-06-24 21:55:42,110 DEBUG > > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 8. > > > > 2020-06-24 21:55:42,110 DEBUG > > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 6. > > > > 2020-06-24 21:55:42,110 DEBUG > > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 0. > > > > 2020-06-24 21:55:42,110 DEBUG > > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 5. > > > > 2020-06-24 21:55:42,110 DEBUG > > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 9. > > > > 2020-06-24 21:55:42,110 DEBUG > > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 4. > > > > 2020-06-24 21:55:42,110 DEBUG > > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 1. > > > > 2020-06-24 21:55:42,110 DEBUG > > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > > (a608e8a1ab75622dc57f164bdbb86743): Received all barriers, triggering > > checkpoint 316 at 1593004193363. > > > > 2020-06-24 21:55:42,110 DEBUG > > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > > (a608e8a1ab75622dc57f164bdbb86743): End of stream alignment, feeding > > buffered data back. > > > > 2020-06-24 21:55:42,110 DEBUG > > org.apache.flink.streaming.runtime.tasks.StreamTask - Starting > > checkpoint (316) CHECKPOINT on task > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > > > > 2020-06-24 21:55:42,110 DEBUG > > org.apache.flink.runtime.state.AbstractSnapshotStrategy - > > DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation > > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119 > , > > checkpointDirectory=hdfs://xxxxchk-316, > > sharedStateDirectory=hdfs://xxxxshared, > > taskOwnedStateDirectory=hdfs://xxxxtaskowned, > > metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), > > fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in > > thread Thread[Window(SlidingEventTimeWindows(10800000, 300000, 0), > > EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map > > (10/10),5,Flink Task Threads] took 0 ms. > > > > 2020-06-24 21:55:42,110 DEBUG > > org.apache.flink.runtime.state.AbstractSnapshotStrategy - > > DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation > > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119 > , > > checkpointDirectory=hdfs://xxxxchk-316, > > sharedStateDirectory=hdfs://xxxxshared, > > taskOwnedStateDirectory=hdfs://xxxxtaskowned, > > metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), > > fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in > > thread Thread[Window(SlidingEventTimeWindows(10800000, 300000, 0), > > EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map > > (10/10),5,Flink Task Threads] took 0 ms. > > > > 2020-06-24 21:55:42,980 DEBUG > > org.apache.flink.runtime.state.AbstractSnapshotStrategy - > > Asynchronous incremental RocksDB snapshot (FsCheckpointStorageLocation > > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119 > , > > checkpointDirectory=hdfs://xxxx/chk-316, > > sharedStateDirectory=hdfs://xxxx/shared, > > taskOwnedStateDirectory=hdfs://xxxx/taskowned, > > metadataFilePath=hdfs://xxxx/chk-316/_metadata, reference=(default), > > fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in > > thread Thread[Window(SlidingEventTimeWindows(10800000, 300000, 0), > > EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map > > (10/10),5,Flink Task Threads] took 870 ms. > > > > 2020-06-24 21:55:42,981 DEBUG > > org.apache.flink.streaming.runtime.tasks.StreamTask - Finished > > synchronous checkpoints for checkpoint 316 on task > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > > > > 2020-06-24 21:55:42,981 DEBUG > > org.apache.flink.streaming.runtime.tasks.StreamTask - > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) - finished > > synchronous part of checkpoint 316. Alignment duration: 101 ms, snapshot > > duration 870 ms > > > > 2020-06-24 21:55:42,981 DEBUG > > org.apache.flink.streaming.runtime.io.CachedBufferStorage - > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > > (a608e8a1ab75622dc57f164bdbb86743): Size of buffered data: 98304 bytes > > > > 2020-06-24 21:55:42,981 DEBUG > > org.apache.flink.streaming.runtime.io.CachedBufferStorage - > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > > (a608e8a1ab75622dc57f164bdbb86743): Finished feeding back buffered data. > > > > 2020-06-24 21:55:43,814 DEBUG > > org.apache.flink.runtime.taskexecutor.TaskExecutor - Received > > heartbeat request from e88ea2f790430c9c160e540ef0546d60. > > > > 2020-06-24 21:55:44,758 DEBUG > > org.apache.flink.runtime.taskexecutor.TaskExecutor - Received > > heartbeat request from b93d7167db364dfdcbda886944f1482f. > > > > 2020-06-24 21:55:45,714 DEBUG > > org.apache.flink.runtime.state.AbstractSnapshotStrategy - > > Asynchronous incremental RocksDB snapshot (FsCheckpointStorageLocation > > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119 > , > > checkpointDirectory=hdfs://xxxx/chk-316, > > sharedStateDirectory=hdfs://xxxx/shared, > > taskOwnedStateDirectory=hdfs://xxxx/taskowned, > > metadataFilePath=hdfs://xxxx/chk-316/_metadata, reference=(default), > > fileStateSizeThreshold=1024, writeBufferSize=4096}, asynchronous part) in > > thread Thread[AsyncOperations-thread-316,5,Flink Task Threads] took 2733 > ms. > > > > 2020-06-24 21:55:45,715 DEBUG > > org.apache.flink.streaming.runtime.tasks.StreamTask - > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) - finished > > asynchronous part of checkpoint 316. Asynchronous duration: 2734 ms > > > > 2020-06-24 21:55:49,876 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > > - Memory usage stats: [HEAP: 220/2589/2589 MB, NON HEAP: > > 111/114/424 MB (used/committed/max)] > > > > 2020-06-24 21:55:49,876 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > > - Direct memory stats: Count: 17414, Total Capacity: 584128352, > > Used Memory: 584128353 > > > > 2020-06-24 21:55:49,876 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > > - Off-heap pool stats: [Code Cache: 35/35/240 MB > > (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)], > > [Compressed Class Space: 8/9/88 MB (used/committed/max)] > > > > 2020-06-24 21:55:49,876 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > > - Garbage collector stats: [PS Scavenge, GC TIME (ms): 110525, GC > > COUNT: 7084], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6] > > > > > > -- > > ************************************** > > tivanli > > ************************************** > > > |
************************************** tivanli ************************************** |
In reply to this post by LakeShen
>
> 偶尔一两天出现 Checkpoint 超时,看下你的任务中,是否可能存在某类 key 在这一两天会突然增多的情况。 > 我增加每个task处理窗口数据的时间在观察一下, 我这个是测试任务,没有sink输出。 source -> window -> window(统计上一个窗口的输出的记录数,pint 10记录左右) LakeShen <[hidden email]> 于2020年6月28日周日 上午10:35写道: > Hi Tianwang Li, > > 偶尔一两天出现 Checkpoint 超时,看下你的任务中,是否可能存在某类 key 在这一两天会突然增多的情况。 > > Best, > LakeShen > > zhisheng <[hidden email]> 于2020年6月28日周日 上午10:27写道: > > > hi, Tianwang Li > > > > 看到有三个图片挂了,可以试着把图片上传到第三方的图床,然后贴个链接过来,另外: > > > > > 任务经常会出现反压(特别是在窗口输出的时候) > > > > 这个检查一下窗口下游算子的情况,比如是不是窗口输出的数据过多,而 sink 的并发还和之前的保持一致,导致处理速度跟不上,从而导致的反压。 > > > > > > > 大部分时候checkpoint都是在1分钟内完成,偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次) > > > > 这种也可以看看是不是 HDFS 有时候压力比较大导致的出现毛刺现象 > > > > 另外建议补一下 UI 上 chekcpoint 相关的截图和日志信息,这样才能更好的定位问题。 > > > > > > Best ! > > zhisheng > > > > > > Tianwang Li <[hidden email]> 于2020年6月28日周日 上午10:17写道: > > > > > 关于Flink checkpoint偶尔会比较长时间的问题。 > > > > > > *环境与背景:* > > > 版本:flink1.10.0 > > > 数据量:每秒约10万左右的记录,数据源是kafka > > > 计算逻辑:滑动窗口统计,每个窗口输出的规模大概1~2千万记录。 > > > 是否有反压:任务经常会出现反压(特别是在窗口输出的时候)。 > > > > > > *问题:* > > > 大部分时候checkpoint都是在1分钟内完成,偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次)。 > > > source的checkpoint消耗的时间比较长。Trigger checkpoint 到 Starting > > checkpoint消耗时间比较长。 > > > > > > checkpoint情况大致如下: > > > > > > [image: image.png] > > > [image: image.png] > > > [image: image.png] > > > > > > 2020-06-24 21:09:53,369 DEBUG > > > org.apache.flink.runtime.taskexecutor.TaskExecutor - Trigger > > > checkpoint 316@1593004193363 for 84dce1ec8aa5a4df2d1758d6e9278693. > > > > > > 2020-06-24 21:09:58,327 DEBUG > > > org.apache.flink.runtime.taskexecutor.TaskExecutor - > Received > > > heartbeat request from e88ea2f790430c9c160e540ef0546d60. > > > > > > 2020-06-24 21:09:59,266 DEBUG > > > org.apache.flink.runtime.taskexecutor.TaskExecutor - > Received > > > heartbeat request from b93d7167db364dfdcbda886944f1482f. > > > > > > 2020-06-24 21:09:59,686 INFO > > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > > > - Memory usage stats: [HEAP: 202/2573/2573 MB, NON HEAP: > > > 111/114/424 MB (used/committed/max)] > > > > > > 2020-06-24 21:09:59,686 INFO > > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > > > - Direct memory stats: Count: 17403, Total Capacity: 583911423, > > > Used Memory: 583911424 > > > > > > 2020-06-24 21:09:59,686 INFO > > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > > > - Off-heap pool stats: [Code Cache: 35/35/240 MB > > > (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)], > > > [Compressed Class Space: 8/9/88 MB (used/committed/max)] > > > > > > 2020-06-24 21:09:59,686 INFO > > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > > > - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC > > > COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6] > > > > > > 2020-06-24 21:10:08,346 DEBUG > > > org.apache.flink.runtime.taskexecutor.TaskExecutor - > Received > > > heartbeat request from e88ea2f790430c9c160e540ef0546d60. > > > > > > 2020-06-24 21:10:09,286 DEBUG > > > org.apache.flink.runtime.taskexecutor.TaskExecutor - > Received > > > heartbeat request from b93d7167db364dfdcbda886944f1482f. > > > > > > 2020-06-24 21:10:09,686 INFO > > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > > > - Memory usage stats: [HEAP: 557/2573/2573 MB, NON HEAP: > > > 111/114/424 MB (used/committed/max)] > > > > > > 2020-06-24 21:10:09,686 INFO > > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > > > - Direct memory stats: Count: 17403, Total Capacity: 583911423, > > > Used Memory: 583911424 > > > > > > 2020-06-24 21:10:09,686 INFO > > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > > > - Off-heap pool stats: [Code Cache: 35/35/240 MB > > > (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)], > > > [Compressed Class Space: 8/9/88 MB (used/committed/max)] > > > > > > 2020-06-24 21:10:09,686 INFO > > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > > > - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC > > > COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6] > > > > > > > > > 省略。。。。 > > > > > > > > > 2020-06-24 21:55:39,875 INFO > > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > > > - Direct memory stats: Count: 17403, Total Capacity: 583911423, > > > Used Memory: 583911424 > > > > > > 2020-06-24 21:55:39,875 INFO > > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > > > - Off-heap pool stats: [Code Cache: 35/35/240 MB > > > (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)], > > > [Compressed Class Space: 8/9/88 MB (used/committed/max)] > > > > > > 2020-06-24 21:55:39,876 INFO > > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > > > - Garbage collector stats: [PS Scavenge, GC TIME (ms): 110520, GC > > > COUNT: 7083], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6] > > > > > > 2020-06-24 21:55:41,721 DEBUG > > > org.apache.flink.streaming.runtime.tasks.StreamTask - > Starting > > > checkpoint (316) CHECKPOINT on task Source: Custom Source -> Map -> > > Filter > > > -> Timestamps/Watermarks (4/10) > > > > > > 2020-06-24 21:55:41,721 DEBUG > > > org.apache.flink.runtime.state.AbstractSnapshotStrategy - > > > DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation > > > > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd > > , > > > checkpointDirectory=hdfs://xxxxchk-316, > > > sharedStateDirectory=hdfs://xxxxshared, > > > taskOwnedStateDirectory=hdfs://xxxxtaskowned, > > > metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), > > > fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) > in > > > thread Thread[Source: Custom Source -> Map -> Filter -> > > > Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms. > > > > > > 2020-06-24 21:55:41,721 DEBUG > > > org.apache.flink.runtime.state.AbstractSnapshotStrategy - > > > DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation > > > > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd > > , > > > checkpointDirectory=hdfs://xxxxchk-316, > > > sharedStateDirectory=hdfs://xxxxshared, > > > taskOwnedStateDirectory=hdfs://xxxxtaskowned, > > > metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), > > > fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) > in > > > thread Thread[Source: Custom Source -> Map -> Filter -> > > > Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms. > > > > > > 2020-06-24 21:55:41,721 DEBUG > > > org.apache.flink.runtime.state.AbstractSnapshotStrategy - > > > DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation > > > > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd > > , > > > checkpointDirectory=hdfs://xxxxchk-316, > > > sharedStateDirectory=hdfs://xxxxshared, > > > taskOwnedStateDirectory=hdfs://xxxxtaskowned, > > > metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), > > > fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) > in > > > thread Thread[Source: Custom Source -> Map -> Filter -> > > > Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms. > > > > > > 2020-06-24 21:55:41,721 DEBUG > > > org.apache.flink.runtime.state.AbstractSnapshotStrategy - > > > DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation > > > > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd > > , > > > checkpointDirectory=hdfs://xxxxchk-316, > > > sharedStateDirectory=hdfs://xxxxshared, > > > taskOwnedStateDirectory=hdfs://xxxxtaskowned, > > > metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), > > > fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) > in > > > thread Thread[Source: Custom Source -> Map -> Filter -> > > > Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms. > > > > > > 2020-06-24 21:55:41,721 DEBUG > > > org.apache.flink.streaming.runtime.tasks.StreamTask - > Finished > > > synchronous checkpoints for checkpoint 316 on task Source: Custom > Source > > -> > > > Map -> Filter -> Timestamps/Watermarks (4/10) > > > > > > 2020-06-24 21:55:41,721 DEBUG > > > org.apache.flink.streaming.runtime.tasks.StreamTask - Source: > > > Custom Source -> Map -> Filter -> Timestamps/Watermarks (4/10) - > finished > > > synchronous part of checkpoint 316. Alignment duration: 0 ms, snapshot > > > duration 0 ms > > > > > > 2020-06-24 21:55:41,737 DEBUG > > > org.apache.flink.runtime.state.AbstractSnapshotStrategy - > > > DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation > > > > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd > > , > > > checkpointDirectory=hdfs://xxxxchk-316, > > > sharedStateDirectory=hdfs://xxxxshared, > > > taskOwnedStateDirectory=hdfs://xxxxtaskowned, > > > metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), > > > fileStateSizeThreshold=1024, writeBufferSize=4096}, asynchronous part) > in > > > thread Thread[AsyncOperations-thread-316,5,Flink Task Threads] took 16 > > ms. > > > > > > 2020-06-24 21:55:41,738 DEBUG > > > org.apache.flink.streaming.runtime.tasks.StreamTask - Source: > > > Custom Source -> Map -> Filter -> Timestamps/Watermarks (4/10) - > finished > > > asynchronous part of checkpoint 316. Asynchronous duration: 16 ms > > > > > > 2020-06-24 21:55:42,008 DEBUG > > > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - > > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > > > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 3. > > > > > > 2020-06-24 21:55:42,008 DEBUG > > > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - > > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > > > (a608e8a1ab75622dc57f164bdbb86743): Starting stream alignment for > > > checkpoint 316. > > > > > > 2020-06-24 21:55:42,110 DEBUG > > > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - > > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > > > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 7. > > > > > > 2020-06-24 21:55:42,110 DEBUG > > > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - > > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > > > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 2. > > > > > > 2020-06-24 21:55:42,110 DEBUG > > > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - > > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > > > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 8. > > > > > > 2020-06-24 21:55:42,110 DEBUG > > > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - > > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > > > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 6. > > > > > > 2020-06-24 21:55:42,110 DEBUG > > > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - > > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > > > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 0. > > > > > > 2020-06-24 21:55:42,110 DEBUG > > > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - > > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > > > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 5. > > > > > > 2020-06-24 21:55:42,110 DEBUG > > > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - > > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > > > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 9. > > > > > > 2020-06-24 21:55:42,110 DEBUG > > > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - > > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > > > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 4. > > > > > > 2020-06-24 21:55:42,110 DEBUG > > > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - > > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > > > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 1. > > > > > > 2020-06-24 21:55:42,110 DEBUG > > > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - > > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > > > (a608e8a1ab75622dc57f164bdbb86743): Received all barriers, triggering > > > checkpoint 316 at 1593004193363. > > > > > > 2020-06-24 21:55:42,110 DEBUG > > > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - > > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > > > (a608e8a1ab75622dc57f164bdbb86743): End of stream alignment, feeding > > > buffered data back. > > > > > > 2020-06-24 21:55:42,110 DEBUG > > > org.apache.flink.streaming.runtime.tasks.StreamTask - > Starting > > > checkpoint (316) CHECKPOINT on task > > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > > > > > > 2020-06-24 21:55:42,110 DEBUG > > > org.apache.flink.runtime.state.AbstractSnapshotStrategy - > > > DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation > > > > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119 > > , > > > checkpointDirectory=hdfs://xxxxchk-316, > > > sharedStateDirectory=hdfs://xxxxshared, > > > taskOwnedStateDirectory=hdfs://xxxxtaskowned, > > > metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), > > > fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) > in > > > thread Thread[Window(SlidingEventTimeWindows(10800000, 300000, 0), > > > EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map > > > (10/10),5,Flink Task Threads] took 0 ms. > > > > > > 2020-06-24 21:55:42,110 DEBUG > > > org.apache.flink.runtime.state.AbstractSnapshotStrategy - > > > DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation > > > > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119 > > , > > > checkpointDirectory=hdfs://xxxxchk-316, > > > sharedStateDirectory=hdfs://xxxxshared, > > > taskOwnedStateDirectory=hdfs://xxxxtaskowned, > > > metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), > > > fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) > in > > > thread Thread[Window(SlidingEventTimeWindows(10800000, 300000, 0), > > > EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map > > > (10/10),5,Flink Task Threads] took 0 ms. > > > > > > 2020-06-24 21:55:42,980 DEBUG > > > org.apache.flink.runtime.state.AbstractSnapshotStrategy - > > > Asynchronous incremental RocksDB snapshot (FsCheckpointStorageLocation > > > > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119 > > , > > > checkpointDirectory=hdfs://xxxx/chk-316, > > > sharedStateDirectory=hdfs://xxxx/shared, > > > taskOwnedStateDirectory=hdfs://xxxx/taskowned, > > > metadataFilePath=hdfs://xxxx/chk-316/_metadata, reference=(default), > > > fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) > in > > > thread Thread[Window(SlidingEventTimeWindows(10800000, 300000, 0), > > > EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map > > > (10/10),5,Flink Task Threads] took 870 ms. > > > > > > 2020-06-24 21:55:42,981 DEBUG > > > org.apache.flink.streaming.runtime.tasks.StreamTask - > Finished > > > synchronous checkpoints for checkpoint 316 on task > > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > > > > > > 2020-06-24 21:55:42,981 DEBUG > > > org.apache.flink.streaming.runtime.tasks.StreamTask - > > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) - finished > > > synchronous part of checkpoint 316. Alignment duration: 101 ms, > snapshot > > > duration 870 ms > > > > > > 2020-06-24 21:55:42,981 DEBUG > > > org.apache.flink.streaming.runtime.io.CachedBufferStorage - > > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > > > (a608e8a1ab75622dc57f164bdbb86743): Size of buffered data: 98304 bytes > > > > > > 2020-06-24 21:55:42,981 DEBUG > > > org.apache.flink.streaming.runtime.io.CachedBufferStorage - > > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > > > (a608e8a1ab75622dc57f164bdbb86743): Finished feeding back buffered > data. > > > > > > 2020-06-24 21:55:43,814 DEBUG > > > org.apache.flink.runtime.taskexecutor.TaskExecutor - > Received > > > heartbeat request from e88ea2f790430c9c160e540ef0546d60. > > > > > > 2020-06-24 21:55:44,758 DEBUG > > > org.apache.flink.runtime.taskexecutor.TaskExecutor - > Received > > > heartbeat request from b93d7167db364dfdcbda886944f1482f. > > > > > > 2020-06-24 21:55:45,714 DEBUG > > > org.apache.flink.runtime.state.AbstractSnapshotStrategy - > > > Asynchronous incremental RocksDB snapshot (FsCheckpointStorageLocation > > > > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119 > > , > > > checkpointDirectory=hdfs://xxxx/chk-316, > > > sharedStateDirectory=hdfs://xxxx/shared, > > > taskOwnedStateDirectory=hdfs://xxxx/taskowned, > > > metadataFilePath=hdfs://xxxx/chk-316/_metadata, reference=(default), > > > fileStateSizeThreshold=1024, writeBufferSize=4096}, asynchronous part) > in > > > thread Thread[AsyncOperations-thread-316,5,Flink Task Threads] took > 2733 > > ms. > > > > > > 2020-06-24 21:55:45,715 DEBUG > > > org.apache.flink.streaming.runtime.tasks.StreamTask - > > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) - finished > > > asynchronous part of checkpoint 316. Asynchronous duration: 2734 ms > > > > > > 2020-06-24 21:55:49,876 INFO > > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > > > - Memory usage stats: [HEAP: 220/2589/2589 MB, NON HEAP: > > > 111/114/424 MB (used/committed/max)] > > > > > > 2020-06-24 21:55:49,876 INFO > > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > > > - Direct memory stats: Count: 17414, Total Capacity: 584128352, > > > Used Memory: 584128353 > > > > > > 2020-06-24 21:55:49,876 INFO > > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > > > - Off-heap pool stats: [Code Cache: 35/35/240 MB > > > (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)], > > > [Compressed Class Space: 8/9/88 MB (used/committed/max)] > > > > > > 2020-06-24 21:55:49,876 INFO > > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > > > - Garbage collector stats: [PS Scavenge, GC TIME (ms): 110525, GC > > > COUNT: 7084], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6] > > > > > > > > > -- > > > ************************************** > > > tivanli > > > ************************************** > > > > > > -- ************************************** tivanli ************************************** |
In reply to this post by Tianwang Li
Hi
如果任务会经常出现反压的话,可以先解决反压问题,因为任务反压也会影响checkpoint,其次就是可以关注一下你的作业的物理资源指标,比如cpu使用率,内存使用率,gc频率是否特别高,尝试保障物理资源使用率在正常水平。 Best, Yichao Yang ------------------ 原始邮件 ------------------ 发件人: "Tianwang Li"<[hidden email]>; 发送时间: 2020年6月28日(星期天) 上午10:17 收件人: "user-zh"<[hidden email]>; 主题: Flink-1.10.0 source的checkpoint偶尔时间比较长 关于Flink checkpoint偶尔会比较长时间的问题。 环境与背景: 版本:flink1.10.0 数据量:每秒约10万左右的记录,数据源是kafka 计算逻辑:滑动窗口统计,每个窗口输出的规模大概1~2千万记录。 是否有反压:任务经常会出现反压(特别是在窗口输出的时候)。 问题: 大部分时候checkpoint都是在1分钟内完成,偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次)。 source的checkpoint消耗的时间比较长。Trigger checkpoint 到 Starting checkpoint消耗时间比较长。 checkpoint情况大致如下: 2020-06-24 21:09:53,369 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Trigger checkpoint 316@1593004193363 for 84dce1ec8aa5a4df2d1758d6e9278693. 2020-06-24 21:09:58,327 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Received heartbeat request from e88ea2f790430c9c160e540ef0546d60. 2020-06-24 21:09:59,266 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Received heartbeat request from b93d7167db364dfdcbda886944f1482f. 2020-06-24 21:09:59,686 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Memory usage stats: [HEAP: 202/2573/2573 MB, NON HEAP: 111/114/424 MB (used/committed/max)] 2020-06-24 21:09:59,686 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Direct memory stats: Count: 17403, Total Capacity: 583911423, Used Memory: 583911424 2020-06-24 21:09:59,686 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Off-heap pool stats: [Code Cache: 35/35/240 MB (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)], [Compressed Class Space: 8/9/88 MB (used/committed/max)] 2020-06-24 21:09:59,686 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6] 2020-06-24 21:10:08,346 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Received heartbeat request from e88ea2f790430c9c160e540ef0546d60. 2020-06-24 21:10:09,286 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Received heartbeat request from b93d7167db364dfdcbda886944f1482f. 2020-06-24 21:10:09,686 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Memory usage stats: [HEAP: 557/2573/2573 MB, NON HEAP: 111/114/424 MB (used/committed/max)] 2020-06-24 21:10:09,686 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Direct memory stats: Count: 17403, Total Capacity: 583911423, Used Memory: 583911424 2020-06-24 21:10:09,686 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Off-heap pool stats: [Code Cache: 35/35/240 MB (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)], [Compressed Class Space: 8/9/88 MB (used/committed/max)] 2020-06-24 21:10:09,686 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6] 省略。。。。 2020-06-24 21:55:39,875 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Direct memory stats: Count: 17403, Total Capacity: 583911423, Used Memory: 583911424 2020-06-24 21:55:39,875 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Off-heap pool stats: [Code Cache: 35/35/240 MB (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)], [Compressed Class Space: 8/9/88 MB (used/committed/max)] 2020-06-24 21:55:39,876 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Garbage collector stats: [PS Scavenge, GC TIME (ms): 110520, GC COUNT: 7083], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6] 2020-06-24 21:55:41,721 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Starting checkpoint (316) CHECKPOINT on task Source: Custom Source -> Map -> Filter -> Timestamps/Watermarks (4/10) 2020-06-24 21:55:41,721 DEBUG org.apache.flink.runtime.state.AbstractSnapshotStrategy - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd, checkpointDirectory=hdfs://xxxxchk-316, sharedStateDirectory=hdfs://xxxxshared, taskOwnedStateDirectory=hdfs://xxxxtaskowned, metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in thread Thread[Source: Custom Source -> Map -> Filter -> Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms. 2020-06-24 21:55:41,721 DEBUG org.apache.flink.runtime.state.AbstractSnapshotStrategy - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd, checkpointDirectory=hdfs://xxxxchk-316, sharedStateDirectory=hdfs://xxxxshared, taskOwnedStateDirectory=hdfs://xxxxtaskowned, metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in thread Thread[Source: Custom Source -> Map -> Filter -> Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms. 2020-06-24 21:55:41,721 DEBUG org.apache.flink.runtime.state.AbstractSnapshotStrategy - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd, checkpointDirectory=hdfs://xxxxchk-316, sharedStateDirectory=hdfs://xxxxshared, taskOwnedStateDirectory=hdfs://xxxxtaskowned, metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in thread Thread[Source: Custom Source -> Map -> Filter -> Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms. 2020-06-24 21:55:41,721 DEBUG org.apache.flink.runtime.state.AbstractSnapshotStrategy - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd, checkpointDirectory=hdfs://xxxxchk-316, sharedStateDirectory=hdfs://xxxxshared, taskOwnedStateDirectory=hdfs://xxxxtaskowned, metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in thread Thread[Source: Custom Source -> Map -> Filter -> Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms. 2020-06-24 21:55:41,721 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Finished synchronous checkpoints for checkpoint 316 on task Source: Custom Source -> Map -> Filter -> Timestamps/Watermarks (4/10) 2020-06-24 21:55:41,721 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Source: Custom Source -> Map -> Filter -> Timestamps/Watermarks (4/10) - finished synchronous part of checkpoint 316. Alignment duration: 0 ms, snapshot duration 0 ms 2020-06-24 21:55:41,737 DEBUG org.apache.flink.runtime.state.AbstractSnapshotStrategy - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd, checkpointDirectory=hdfs://xxxxchk-316, sharedStateDirectory=hdfs://xxxxshared, taskOwnedStateDirectory=hdfs://xxxxtaskowned, metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), fileStateSizeThreshold=1024, writeBufferSize=4096}, asynchronous part) in thread Thread[AsyncOperations-thread-316,5,Flink Task Threads] took 16 ms. 2020-06-24 21:55:41,738 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Source: Custom Source -> Map -> Filter -> Timestamps/Watermarks (4/10) - finished asynchronous part of checkpoint 316. Asynchronous duration: 16 ms 2020-06-24 21:55:42,008 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 3. 2020-06-24 21:55:42,008 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): Starting stream alignment for checkpoint 316. 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 7. 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 2. 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 8. 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 6. 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 0. 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 5. 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 9. 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 4. 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 1. 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): Received all barriers, triggering checkpoint 316 at 1593004193363. 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): End of stream alignment, feeding buffered data back. 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Starting checkpoint (316) CHECKPOINT on task Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) 2020-06-24 21:55:42,110 DEBUG org.apache.flink.runtime.state.AbstractSnapshotStrategy - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119, checkpointDirectory=hdfs://xxxxchk-316, sharedStateDirectory=hdfs://xxxxshared, taskOwnedStateDirectory=hdfs://xxxxtaskowned, metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in thread Thread[Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10),5,Flink Task Threads] took 0 ms. 2020-06-24 21:55:42,110 DEBUG org.apache.flink.runtime.state.AbstractSnapshotStrategy - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119, checkpointDirectory=hdfs://xxxxchk-316, sharedStateDirectory=hdfs://xxxxshared, taskOwnedStateDirectory=hdfs://xxxxtaskowned, metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in thread Thread[Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10),5,Flink Task Threads] took 0 ms. 2020-06-24 21:55:42,980 DEBUG org.apache.flink.runtime.state.AbstractSnapshotStrategy - Asynchronous incremental RocksDB snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119, checkpointDirectory=hdfs://xxxx/chk-316, sharedStateDirectory=hdfs://xxxx/shared, taskOwnedStateDirectory=hdfs://xxxx/taskowned, metadataFilePath=hdfs://xxxx/chk-316/_metadata, reference=(default), fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in thread Thread[Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10),5,Flink Task Threads] took 870 ms. 2020-06-24 21:55:42,981 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Finished synchronous checkpoints for checkpoint 316 on task Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) 2020-06-24 21:55:42,981 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) - finished synchronous part of checkpoint 316. Alignment duration: 101 ms, snapshot duration 870 ms 2020-06-24 21:55:42,981 DEBUG org.apache.flink.streaming.runtime.io.CachedBufferStorage - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): Size of buffered data: 98304 bytes 2020-06-24 21:55:42,981 DEBUG org.apache.flink.streaming.runtime.io.CachedBufferStorage - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): Finished feeding back buffered data. 2020-06-24 21:55:43,814 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Received heartbeat request from e88ea2f790430c9c160e540ef0546d60. 2020-06-24 21:55:44,758 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Received heartbeat request from b93d7167db364dfdcbda886944f1482f. 2020-06-24 21:55:45,714 DEBUG org.apache.flink.runtime.state.AbstractSnapshotStrategy - Asynchronous incremental RocksDB snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119, checkpointDirectory=hdfs://xxxx/chk-316, sharedStateDirectory=hdfs://xxxx/shared, taskOwnedStateDirectory=hdfs://xxxx/taskowned, metadataFilePath=hdfs://xxxx/chk-316/_metadata, reference=(default), fileStateSizeThreshold=1024, writeBufferSize=4096}, asynchronous part) in thread Thread[AsyncOperations-thread-316,5,Flink Task Threads] took 2733 ms. 2020-06-24 21:55:45,715 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) - finished asynchronous part of checkpoint 316. Asynchronous duration: 2734 ms 2020-06-24 21:55:49,876 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Memory usage stats: [HEAP: 220/2589/2589 MB, NON HEAP: 111/114/424 MB (used/committed/max)] 2020-06-24 21:55:49,876 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Direct memory stats: Count: 17414, Total Capacity: 584128352, Used Memory: 584128353 2020-06-24 21:55:49,876 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Off-heap pool stats: [Code Cache: 35/35/240 MB (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)], [Compressed Class Space: 8/9/88 MB (used/committed/max)] 2020-06-24 21:55:49,876 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Garbage collector stats: [PS Scavenge, GC TIME (ms): 110525, GC COUNT: 7084], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6] -- ************************************** tivanli ************************************** |
>
> 偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次) 找到原因了, 任务处理延迟比较大,kafka数据过期清理了,导致从last消费(watermark一下子增长了好多个小时), 然后,这个时候需要输出几个小时内的所有窗口(平时一次只输出一个窗口,这时一次要输出30个窗口消耗比较长时间)。 因为是稳定测试任务,没有关注kafka 延迟 导致数据过期到问题。 感谢,zhisheng、LakeShen、Yichao Yang。 Yichao Yang <[hidden email]> 于2020年6月29日周一 下午7:58写道: > Hi > > > > 如果任务会经常出现反压的话,可以先解决反压问题,因为任务反压也会影响checkpoint,其次就是可以关注一下你的作业的物理资源指标,比如cpu使用率,内存使用率,gc频率是否特别高,尝试保障物理资源使用率在正常水平。 > > > Best, > Yichao Yang > > > > > ------------------ 原始邮件 ------------------ > 发件人: "Tianwang Li"<[hidden email]>; > 发送时间: 2020年6月28日(星期天) 上午10:17 > 收件人: "user-zh"<[hidden email]>; > > 主题: Flink-1.10.0 source的checkpoint偶尔时间比较长 > > > > 关于Flink checkpoint偶尔会比较长时间的问题。 > 环境与背景: > 版本:flink1.10.0 > 数据量:每秒约10万左右的记录,数据源是kafka > 计算逻辑:滑动窗口统计,每个窗口输出的规模大概1~2千万记录。 > 是否有反压:任务经常会出现反压(特别是在窗口输出的时候)。 > > > 问题: > > 大部分时候checkpoint都是在1分钟内完成,偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次)。 > source的checkpoint消耗的时间比较长。Trigger checkpoint 到 Starting > checkpoint消耗时间比较长。 > > > checkpoint情况大致如下: > > > > > > > > > > > > > > > 2020-06-24 21:09:53,369 DEBUG > org.apache.flink.runtime.taskexecutor.TaskExecutor > - Trigger checkpoint 316@1593004193363 for > 84dce1ec8aa5a4df2d1758d6e9278693. > > 2020-06-24 21:09:58,327 DEBUG > org.apache.flink.runtime.taskexecutor.TaskExecutor > - Received heartbeat request from > e88ea2f790430c9c160e540ef0546d60. > > 2020-06-24 21:09:59,266 DEBUG > org.apache.flink.runtime.taskexecutor.TaskExecutor > - Received heartbeat request from > b93d7167db364dfdcbda886944f1482f. > > 2020-06-24 21:09:59,686 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > - Memory usage stats: [HEAP: 202/2573/2573 MB, NON HEAP: 111/114/424 > MB (used/committed/max)] > > 2020-06-24 21:09:59,686 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > - Direct memory stats: Count: 17403, Total Capacity: 583911423, Used > Memory: 583911424 > > 2020-06-24 21:09:59,686 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > - Off-heap pool stats: [Code Cache: 35/35/240 MB > (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)], > [Compressed Class Space: 8/9/88 MB (used/committed/max)] > > 2020-06-24 21:09:59,686 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC > COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6] > > 2020-06-24 21:10:08,346 DEBUG > org.apache.flink.runtime.taskexecutor.TaskExecutor > - Received heartbeat request from > e88ea2f790430c9c160e540ef0546d60. > > 2020-06-24 21:10:09,286 DEBUG > org.apache.flink.runtime.taskexecutor.TaskExecutor > - Received heartbeat request from > b93d7167db364dfdcbda886944f1482f. > > 2020-06-24 21:10:09,686 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > - Memory usage stats: [HEAP: 557/2573/2573 MB, NON HEAP: 111/114/424 > MB (used/committed/max)] > > 2020-06-24 21:10:09,686 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > - Direct memory stats: Count: 17403, Total Capacity: 583911423, Used > Memory: 583911424 > > 2020-06-24 21:10:09,686 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > - Off-heap pool stats: [Code Cache: 35/35/240 MB > (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)], > [Compressed Class Space: 8/9/88 MB (used/committed/max)] > > 2020-06-24 21:10:09,686 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC > COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6] > > > > > 省略。。。。 > > > > > 2020-06-24 21:55:39,875 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > - Direct memory stats: Count: 17403, Total Capacity: 583911423, Used > Memory: 583911424 > > 2020-06-24 21:55:39,875 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > - Off-heap pool stats: [Code Cache: 35/35/240 MB > (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)], > [Compressed Class Space: 8/9/88 MB (used/committed/max)] > > 2020-06-24 21:55:39,876 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > - Garbage collector stats: [PS Scavenge, GC TIME (ms): 110520, GC > COUNT: 7083], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6] > > 2020-06-24 21:55:41,721 DEBUG > org.apache.flink.streaming.runtime.tasks.StreamTask > - Starting checkpoint (316) CHECKPOINT on task Source: Custom > Source -> Map -> Filter -> Timestamps/Watermarks (4/10) > > 2020-06-24 21:55:41,721 DEBUG > org.apache.flink.runtime.state.AbstractSnapshotStrategy > - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd, > checkpointDirectory=hdfs://xxxxchk-316, > sharedStateDirectory=hdfs://xxxxshared, > taskOwnedStateDirectory=hdfs://xxxxtaskowned, > metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), > fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in > thread Thread[Source: Custom Source -> Map -> Filter -> > Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms. > > 2020-06-24 21:55:41,721 DEBUG > org.apache.flink.runtime.state.AbstractSnapshotStrategy > - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd, > checkpointDirectory=hdfs://xxxxchk-316, > sharedStateDirectory=hdfs://xxxxshared, > taskOwnedStateDirectory=hdfs://xxxxtaskowned, > metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), > fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in > thread Thread[Source: Custom Source -> Map -> Filter -> > Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms. > > 2020-06-24 21:55:41,721 DEBUG > org.apache.flink.runtime.state.AbstractSnapshotStrategy > - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd, > checkpointDirectory=hdfs://xxxxchk-316, > sharedStateDirectory=hdfs://xxxxshared, > taskOwnedStateDirectory=hdfs://xxxxtaskowned, > metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), > fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in > thread Thread[Source: Custom Source -> Map -> Filter -> > Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms. > > 2020-06-24 21:55:41,721 DEBUG > org.apache.flink.runtime.state.AbstractSnapshotStrategy > - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd, > checkpointDirectory=hdfs://xxxxchk-316, > sharedStateDirectory=hdfs://xxxxshared, > taskOwnedStateDirectory=hdfs://xxxxtaskowned, > metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), > fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in > thread Thread[Source: Custom Source -> Map -> Filter -> > Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms. > > 2020-06-24 21:55:41,721 DEBUG > org.apache.flink.streaming.runtime.tasks.StreamTask > - Finished synchronous checkpoints for checkpoint 316 on task > Source: Custom Source -> Map -> Filter -> Timestamps/Watermarks > (4/10) > > 2020-06-24 21:55:41,721 DEBUG > org.apache.flink.streaming.runtime.tasks.StreamTask > - Source: Custom Source -> Map -> Filter -> > Timestamps/Watermarks (4/10) - finished synchronous part of checkpoint 316. > Alignment duration: 0 ms, snapshot duration 0 ms > > 2020-06-24 21:55:41,737 DEBUG > org.apache.flink.runtime.state.AbstractSnapshotStrategy > - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd, > checkpointDirectory=hdfs://xxxxchk-316, > sharedStateDirectory=hdfs://xxxxshared, > taskOwnedStateDirectory=hdfs://xxxxtaskowned, > metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), > fileStateSizeThreshold=1024, writeBufferSize=4096}, asynchronous part) in > thread Thread[AsyncOperations-thread-316,5,Flink Task Threads] took 16 ms. > > 2020-06-24 21:55:41,738 DEBUG > org.apache.flink.streaming.runtime.tasks.StreamTask > - Source: Custom Source -> Map -> Filter -> > Timestamps/Watermarks (4/10) - finished asynchronous part of checkpoint > 316. Asynchronous duration: 16 ms > > 2020-06-24 21:55:42,008 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner > - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 3. > > 2020-06-24 21:55:42,008 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner > - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > (a608e8a1ab75622dc57f164bdbb86743): Starting stream alignment for > checkpoint 316. > > 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner > - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 7. > > 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner > - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 2. > > 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner > - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 8. > > 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner > - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 6. > > 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner > - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 0. > > 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner > - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 5. > > 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner > - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 9. > > 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner > - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 4. > > 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner > - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 1. > > 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner > - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > (a608e8a1ab75622dc57f164bdbb86743): Received all barriers, triggering > checkpoint 316 at 1593004193363. > > 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner > - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > (a608e8a1ab75622dc57f164bdbb86743): End of stream alignment, feeding > buffered data back. > > 2020-06-24 21:55:42,110 DEBUG > org.apache.flink.streaming.runtime.tasks.StreamTask > - Starting checkpoint (316) CHECKPOINT on task > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > > 2020-06-24 21:55:42,110 DEBUG > org.apache.flink.runtime.state.AbstractSnapshotStrategy > - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119, > checkpointDirectory=hdfs://xxxxchk-316, > sharedStateDirectory=hdfs://xxxxshared, > taskOwnedStateDirectory=hdfs://xxxxtaskowned, > metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), > fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in > thread Thread[Window(SlidingEventTimeWindows(10800000, 300000, 0), > EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map > (10/10),5,Flink Task Threads] took 0 ms. > > 2020-06-24 21:55:42,110 DEBUG > org.apache.flink.runtime.state.AbstractSnapshotStrategy > - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119, > checkpointDirectory=hdfs://xxxxchk-316, > sharedStateDirectory=hdfs://xxxxshared, > taskOwnedStateDirectory=hdfs://xxxxtaskowned, > metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), > fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in > thread Thread[Window(SlidingEventTimeWindows(10800000, 300000, 0), > EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map > (10/10),5,Flink Task Threads] took 0 ms. > > 2020-06-24 21:55:42,980 DEBUG > org.apache.flink.runtime.state.AbstractSnapshotStrategy > - Asynchronous incremental RocksDB snapshot > (FsCheckpointStorageLocation > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119, > checkpointDirectory=hdfs://xxxx/chk-316, > sharedStateDirectory=hdfs://xxxx/shared, > taskOwnedStateDirectory=hdfs://xxxx/taskowned, > metadataFilePath=hdfs://xxxx/chk-316/_metadata, reference=(default), > fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in > thread Thread[Window(SlidingEventTimeWindows(10800000, 300000, 0), > EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map > (10/10),5,Flink Task Threads] took 870 ms. > > 2020-06-24 21:55:42,981 DEBUG > org.apache.flink.streaming.runtime.tasks.StreamTask > - Finished synchronous checkpoints for checkpoint 316 on task > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > > 2020-06-24 21:55:42,981 DEBUG > org.apache.flink.streaming.runtime.tasks.StreamTask > - Window(SlidingEventTimeWindows(10800000, 300000, 0), > EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map > (10/10) - finished synchronous part of checkpoint 316. Alignment duration: > 101 ms, snapshot duration 870 ms > > 2020-06-24 21:55:42,981 DEBUG > org.apache.flink.streaming.runtime.io.CachedBufferStorage - > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > (a608e8a1ab75622dc57f164bdbb86743): Size of buffered data: 98304 bytes > > 2020-06-24 21:55:42,981 DEBUG > org.apache.flink.streaming.runtime.io.CachedBufferStorage - > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) > (a608e8a1ab75622dc57f164bdbb86743): Finished feeding back buffered data. > > 2020-06-24 21:55:43,814 DEBUG > org.apache.flink.runtime.taskexecutor.TaskExecutor > - Received heartbeat request from > e88ea2f790430c9c160e540ef0546d60. > > 2020-06-24 21:55:44,758 DEBUG > org.apache.flink.runtime.taskexecutor.TaskExecutor > - Received heartbeat request from > b93d7167db364dfdcbda886944f1482f. > > 2020-06-24 21:55:45,714 DEBUG > org.apache.flink.runtime.state.AbstractSnapshotStrategy > - Asynchronous incremental RocksDB snapshot > (FsCheckpointStorageLocation > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119, > checkpointDirectory=hdfs://xxxx/chk-316, > sharedStateDirectory=hdfs://xxxx/shared, > taskOwnedStateDirectory=hdfs://xxxx/taskowned, > metadataFilePath=hdfs://xxxx/chk-316/_metadata, reference=(default), > fileStateSizeThreshold=1024, writeBufferSize=4096}, asynchronous part) in > thread Thread[AsyncOperations-thread-316,5,Flink Task Threads] took 2733 ms. > > 2020-06-24 21:55:45,715 DEBUG > org.apache.flink.streaming.runtime.tasks.StreamTask > - Window(SlidingEventTimeWindows(10800000, 300000, 0), > EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map > (10/10) - finished asynchronous part of checkpoint 316. Asynchronous > duration: 2734 ms > > 2020-06-24 21:55:49,876 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > - Memory usage stats: [HEAP: 220/2589/2589 MB, NON HEAP: 111/114/424 > MB (used/committed/max)] > > 2020-06-24 21:55:49,876 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > - Direct memory stats: Count: 17414, Total Capacity: 584128352, Used > Memory: 584128353 > > 2020-06-24 21:55:49,876 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > - Off-heap pool stats: [Code Cache: 35/35/240 MB > (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)], > [Compressed Class Space: 8/9/88 MB (used/committed/max)] > > 2020-06-24 21:55:49,876 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > - Garbage collector stats: [PS Scavenge, GC TIME (ms): 110525, GC > COUNT: 7084], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6] > > > > > > -- > ************************************** > tivanli > ************************************** -- ************************************** tivanli ************************************** |
Free forum by Nabble | Edit this page |