hi:
这个flink 版本1.10 全是提交sql 运行,生产环境经常出现这种问题,然后节点就死了,任务又只能从checkpoits 恢复,该如何解决?sql 里mysql 如何释放mysql 这个,求大佬回答?这是生产环境 2020-07-22 11:46:40,085 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 43842 of job a3eae3f691bdea687b9979b9e0ac28e2. org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 43842 for operator GroupAggregate(groupBy=[item_code], select=[item_code, COUNT(ts) AS pv, COUNT(DISTINCT channelOrOfflineId) FILTER $f3 AS share_time, COUNT(ts) FILTER $f4 AS ios_pv, COUNT(ts) FILTER $f5 AS android_pv, COUNT(ts) FILTER $f6 AS other_pv, COUNT(DISTINCT userId) AS uv, COUNT(DISTINCT userId) FILTER $f4 AS ios_uv, COUNT(DISTINCT userId) FILTER $f5 AS android_uv, COUNT(DISTINCT userId) FILTER $f6 AS other_uv]) -> SinkConversionToTuple2 -> Sink: JDBCUpsertTableSink(item_code, pv, share_time, ios_pv, android_pv, other_pv, uv, ios_uv, android_uv, other_uv) (1/1). Failure reason: Checkpoint was declined. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:434) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1354) at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:820) at org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:86) at org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:113) at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:133) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Writing records to JDBC failed. at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.checkFlushException(JDBCUpsertOutputFormat.java:135) at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:155) at org.apache.flink.api.java.io.jdbc.JDBCUpsertSinkFunction.snapshotState(JDBCUpsertSinkFunction.java:56) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402) ... 19 more Caused by: java.lang.RuntimeException: Writing records to JDBC failed. at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.checkFlushException(JDBCUpsertOutputFormat.java:135) at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:155) at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) |
Free forum by Nabble | Edit this page |