flink sink kafka 报错

classic Classic list List threaded Threaded
2 messages Options
lp
Reply | Threaded
Open this post in threaded view
|

flink sink kafka 报错

lp
我写了一个stream程序,从kafka
topicA中读取数据,进行简单预处理后,sink回kafka的topicB,程序本地正常running,但是中间报了一些错误如下:
---------
20:11:47,078 INFO  org.apache.kafka.clients.Metadata                          
[] - [Producer clientId=producer-Source: Custom Source -> Flat Map -> Map ->
Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-31, transactionalId=Source:
Custom Source -> Flat Map -> Map -> Sink:
Unnamed-3d05135cf7d8f1375d8f655ba9d20255-31] Cluster ID:
IKSZYfPVTaGGwDrkST0v_A
20:11:47,079 INFO  org.apache.kafka.clients.Metadata                          
[] - [Producer clientId=producer-Source: Custom Source -> Flat Map -> Map ->
Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-26, transactionalId=Source:
Custom Source -> Flat Map -> Map -> Sink:
Unnamed-3d05135cf7d8f1375d8f655ba9d20255-26] Cluster ID:
IKSZYfPVTaGGwDrkST0v_A
20:11:47,079 INFO  org.apache.kafka.clients.Metadata                          
[] - [Producer clientId=producer-Source: Custom Source -> Flat Map -> Map ->
Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-7, transactionalId=Source:
Custom Source -> Flat Map -> Map -> Sink:
Unnamed-3d05135cf7d8f1375d8f655ba9d20255-7] Cluster ID:
IKSZYfPVTaGGwDrkST0v_A
20:11:47,066 WARN  org.apache.kafka.clients.NetworkClient                      
[] - [Producer clientId=producer-Source: Custom Source -> Flat Map -> Map ->
Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-4, transactionalId=Source:
Custom Source -> Flat Map -> Map -> Sink:
Unnamed-3d05135cf7d8f1375d8f655ba9d20255-4] Error connecting to node
10.66.0.129:9092 (id: -1 rack: null)
java.net.SocketException: Permission denied: connect
        at sun.nio.ch.Net.connect0(Native Method) ~[?:1.8.0_231]
        at sun.nio.ch.Net.connect(Net.java:454) ~[?:1.8.0_231]
        at sun.nio.ch.Net.connect(Net.java:446) ~[?:1.8.0_231]
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648)
~[?:1.8.0_231]
        at org.apache.kafka.common.network.Selector.doConnect(Selector.java:280)
~[kafka-clients-2.4.1.jar:?]
        at org.apache.kafka.common.network.Selector.connect(Selector.java:258)
~[kafka-clients-2.4.1.jar:?]
        at
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:951)
[kafka-clients-2.4.1.jar:?]
        at org.apache.kafka.clients.NetworkClient.access$500(NetworkClient.java:71)
[kafka-clients-2.4.1.jar:?]
        at
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1122)
[kafka-clients-2.4.1.jar:?]
        at
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1010)
[kafka-clients-2.4.1.jar:?]
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:545)
[kafka-clients-2.4.1.jar:?]
        at
org.apache.kafka.clients.NetworkClientUtils.isReady(NetworkClientUtils.java:42)
[kafka-clients-2.4.1.jar:?]
        at
org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:65)
[kafka-clients-2.4.1.jar:?]
        at
org.apache.kafka.clients.producer.internals.Sender.awaitNodeReady(Sender.java:529)
[kafka-clients-2.4.1.jar:?]
        at
org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:447)
[kafka-clients-2.4.1.jar:?]
        at
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311)
[kafka-clients-2.4.1.jar:?]
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
[kafka-clients-2.4.1.jar:?]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_231]
20:11:47,081 WARN  org.apache.kafka.clients.NetworkClient                      
[] - [Producer clientId=producer-Source: Custom Source -> Flat Map -> Map ->
Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-4, transactionalId=Source:
Custom Source -> Flat Map -> Map -> Sink:
Unnamed-3d05135cf7d8f1375d8f655ba9d20255-4] Bootstrap broker
10.66.0.129:9092 (id: -1 rack: null) disconnected
20:11:47,081 INFO  org.apache.kafka.clients.Metadata                          
[] - [Producer clientId=producer-Source: Custom Source -> Flat Map -> Map ->
Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-36, transactionalId=Source:
Custom Source -> Flat Map -> Map -> Sink:
Unnamed-3d05135cf7d8f1375d8f655ba9d20255-36] Cluster ID:
IKSZYfPVTaGGwDrkST0v_A
20:11:47,081 INFO  org.apache.kafka.clients.Metadata                          
[] - [Producer clientId=producer-Source: Custom Source -> Flat Map -> Map ->
Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-11, transactionalId=Source:
Custom Source -> Flat Map -> Map -> Sink:
Unnamed-3d05135cf7d8f1375d8f655ba9d20255-11] Cluster ID:
IKSZYfPVTaGGwDrkST0v_A
20:11:47,084 INFO
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl []
- Could not complete snapshot 28 for operator Source: Custom Source -> Flat
Map -> Map -> Sink: Unnamed (5/8)#0. Failure reason: Checkpoint was
declined.
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete
snapshot 28 for operator Source: Custom Source -> Flat Map -> Map -> Sink:
Unnamed (5/8)#0. Failure reason: Checkpoint was declined.
        at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:685)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:606)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:571)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1003)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:993)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:912)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$8(StreamTask.java:880)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
[flink-runtime_2.11-1.12.1.jar:1.12.1]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
[flink-runtime_2.11-1.12.1.jar:1.12.1]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_231]
Caused by: org.apache.flink.util.SerializedThrowable: Failed to construct
kafka producer
        at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
~[kafka-clients-2.4.1.jar:?]
        at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
~[kafka-clients-2.4.1.jar:?]
        at
org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:77)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1230)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1346)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initTransactionalProducer(FlinkKafkaProducer.java:1331)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createTransactionalProducer(FlinkKafkaProducer.java:1317)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:976)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:99)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:403)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:324)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:1100)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:205)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        ... 20 more
Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException:
Unable to establish loopback connection
        at org.apache.kafka.common.network.Selector.<init>(Selector.java:162)
~[kafka-clients-2.4.1.jar:?]
        at org.apache.kafka.common.network.Selector.<init>(Selector.java:214)
~[kafka-clients-2.4.1.jar:?]
        at org.apache.kafka.common.network.Selector.<init>(Selector.java:227)
~[kafka-clients-2.4.1.jar:?]
        at org.apache.kafka.common.network.Selector.<init>(Selector.java:231)
~[kafka-clients-2.4.1.jar:?]
        at
org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:454)
~[kafka-clients-2.4.1.jar:?]
        at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:421)
~[kafka-clients-2.4.1.jar:?]
        at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
~[kafka-clients-2.4.1.jar:?]
        at
org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:77)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1230)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1346)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initTransactionalProducer(FlinkKafkaProducer.java:1331)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createTransactionalProducer(FlinkKafkaProducer.java:1317)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:976)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:99)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:403)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:324)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:1100)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:205)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        ... 20 more
Caused by: org.apache.flink.util.SerializedThrowable: Unable to establish
loopback connection
        at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:94) ~[?:1.8.0_231]
        at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:61) ~[?:1.8.0_231]
        at java.security.AccessController.doPrivileged(Native Method)
~[?:1.8.0_231]
        at sun.nio.ch.PipeImpl.<init>(PipeImpl.java:171) ~[?:1.8.0_231]
        at sun.nio.ch.SelectorProviderImpl.openPipe(SelectorProviderImpl.java:50)
~[?:1.8.0_231]
        at java.nio.channels.Pipe.open(Pipe.java:155) ~[?:1.8.0_231]
        at sun.nio.ch.WindowsSelectorImpl.<init>(WindowsSelectorImpl.java:127)
~[?:1.8.0_231]
        at
sun.nio.ch.WindowsSelectorProvider.openSelector(WindowsSelectorProvider.java:44)
~[?:1.8.0_231]
        at java.nio.channels.Selector.open(Selector.java:227) ~[?:1.8.0_231]
        at org.apache.kafka.common.network.Selector.<init>(Selector.java:160)
~[kafka-clients-2.4.1.jar:?]
        at org.apache.kafka.common.network.Selector.<init>(Selector.java:214)
~[kafka-clients-2.4.1.jar:?]
        at org.apache.kafka.common.network.Selector.<init>(Selector.java:227)
~[kafka-clients-2.4.1.jar:?]
        at org.apache.kafka.common.network.Selector.<init>(Selector.java:231)
~[kafka-clients-2.4.1.jar:?]
        at
org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:454)
~[kafka-clients-2.4.1.jar:?]
        at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:421)
~[kafka-clients-2.4.1.jar:?]
        at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
~[kafka-clients-2.4.1.jar:?]
        at
org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:77)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1230)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1346)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initTransactionalProducer(FlinkKafkaProducer.java:1331)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createTransactionalProducer(FlinkKafkaProducer.java:1317)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:976)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:99)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:403)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:324)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:1100)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:205)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        ... 20 more
Caused by: org.apache.flink.util.SerializedThrowable: Permission denied:
connect
        at sun.nio.ch.Net.connect0(Native Method) ~[?:1.8.0_231]
        at sun.nio.ch.Net.connect(Net.java:454) ~[?:1.8.0_231]
        at sun.nio.ch.Net.connect(Net.java:446) ~[?:1.8.0_231]
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648)
~[?:1.8.0_231]
        at java.nio.channels.SocketChannel.open(SocketChannel.java:189)
~[?:1.8.0_231]
        at sun.nio.ch.PipeImpl$Initializer$LoopbackConnector.run(PipeImpl.java:127)
~[?:1.8.0_231]
        at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:76) ~[?:1.8.0_231]
        at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:61) ~[?:1.8.0_231]
        at java.security.AccessController.doPrivileged(Native Method)
~[?:1.8.0_231]
        at sun.nio.ch.PipeImpl.<init>(PipeImpl.java:171) ~[?:1.8.0_231]
        at sun.nio.ch.SelectorProviderImpl.openPipe(SelectorProviderImpl.java:50)
~[?:1.8.0_231]
        at java.nio.channels.Pipe.open(Pipe.java:155) ~[?:1.8.0_231]
        at sun.nio.ch.WindowsSelectorImpl.<init>(WindowsSelectorImpl.java:127)
~[?:1.8.0_231]
        at
sun.nio.ch.WindowsSelectorProvider.openSelector(WindowsSelectorProvider.java:44)
~[?:1.8.0_231]
        at java.nio.channels.Selector.open(Selector.java:227) ~[?:1.8.0_231]
        at org.apache.kafka.common.network.Selector.<init>(Selector.java:160)
~[kafka-clients-2.4.1.jar:?]
        at org.apache.kafka.common.network.Selector.<init>(Selector.java:214)
~[kafka-clients-2.4.1.jar:?]
        at org.apache.kafka.common.network.Selector.<init>(Selector.java:227)
~[kafka-clients-2.4.1.jar:?]
        at org.apache.kafka.common.network.Selector.<init>(Selector.java:231)
~[kafka-clients-2.4.1.jar:?]
        at
org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:454)
~[kafka-clients-2.4.1.jar:?]
        at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:421)
~[kafka-clients-2.4.1.jar:?]
        at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
~[kafka-clients-2.4.1.jar:?]
        at
org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:77)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1230)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1346)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initTransactionalProducer(FlinkKafkaProducer.java:1331)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createTransactionalProducer(FlinkKafkaProducer.java:1317)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:976)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:99)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:403)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:324)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:1100)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:205)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        ... 20 more
20:11:47,190 INFO
org.apache.kafka.clients.producer.internals.TransactionManager [] -
[Producer clientId=producer-Source: Custom Source -> Flat Map -> Map ->
Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-31, transactionalId=Source:
Custom Source -> Flat Map -> Map -> Sink:
Unnamed-3d05135cf7d8f1375d8f655ba9d20255-31] ProducerId set to 2250 with
epoch 6
20:11:47,191 INFO
org.apache.kafka.clients.producer.internals.TransactionManager [] -
[Producer clientId=producer-Source: Custom Source -> Flat Map -> Map ->
Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-36, transactionalId=Source:
Custom Source -> Flat Map -> Map -> Sink:
Unnamed-3d05135cf7d8f1375d8f655ba9d20255-36] ProducerId set to 2334 with
epoch 6
20:11:47,191 INFO
org.apache.kafka.clients.producer.internals.TransactionManager [] -
[Producer clientId=producer-Source: Custom Source -> Flat Map -> Map ->
Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-16, transactionalId=Source:
Custom Source -> Flat Map -> Map -> Sink:
Unnamed-3d05135cf7d8f1375d8f655ba9d20255-16] ProducerId set to 2327 with
epoch 6
20:11:47,191 INFO
org.apache.kafka.clients.producer.internals.TransactionManager [] -
[Producer clientId=producer-Source: Custom Source -> Flat Map -> Map ->
Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-7, transactionalId=Source:
Custom Source -> Flat Map -> Map -> Sink:
Unnamed-3d05135cf7d8f1375d8f655ba9d20255-7] ProducerId set to 2389 with
epoch 6
20:11:47,201 INFO
org.apache.kafka.clients.producer.internals.TransactionManager [] -
[Producer clientId=producer-Source: Custom Source -> Flat Map -> Map ->
Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-26, transactionalId=Source:
Custom Source -> Flat Map -> Map -> Sink:
Unnamed-3d05135cf7d8f1375d8f655ba9d20255-26] ProducerId set to 2239 with
epoch 6
20:11:47,201 INFO
org.apache.kafka.clients.producer.internals.TransactionManager [] -
[Producer clientId=producer-Source: Custom Source -> Flat Map -> Map ->
Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-11, transactionalId=Source:
Custom Source -> Flat Map -> Map -> Sink:
Unnamed-3d05135cf7d8f1375d8f655ba9d20255-11] ProducerId set to 2232 with
epoch 6
20:11:47,284 INFO  org.apache.kafka.clients.Metadata                          
[] - [Producer clientId=producer-Source: Custom Source -> Flat Map -> Map ->
Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-4, transactionalId=Source:
Custom Source -> Flat Map -> Map -> Sink:
Unnamed-3d05135cf7d8f1375d8f655ba9d20255-4] Cluster ID:
IKSZYfPVTaGGwDrkST0v_A
20:11:47,389 INFO
org.apache.kafka.clients.producer.internals.TransactionManager [] -
[Producer clientId=producer-Source: Custom Source -> Flat Map -> Map ->
Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-4, transactionalId=Source:
Custom Source -> Flat Map -> Map -> Sink:
Unnamed-3d05135cf7d8f1375d8f655ba9d20255-4] ProducerId set to 2368 with
epoch 6
20:14:47,062 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
[] - Checkpoint 28 of job f0ab809d3601d24f71c737d568b8430e expired before
completing.
20:14:47,065 INFO  org.apache.flink.runtime.jobmaster.JobMaster                
[] - Trying to recover from a global failure.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
failure threshold.
        at
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:96)
~[flink-runtime_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:65)
~[flink-runtime_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1924)
~[flink-runtime_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1897)
~[flink-runtime_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:93)
~[flink-runtime_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2038)
~[flink-runtime_2.11-1.12.1.jar:1.12.1]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
~[?:1.8.0_231]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_231]
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
~[?:1.8.0_231]
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
~[?:1.8.0_231]
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
~[?:1.8.0_231]
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
~[?:1.8.0_231]
        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_231]
20:14:47,066 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
[] - Triggering checkpoint 29 (type=CHECKPOINT) @ 1617797687065 for job
f0ab809d3601d24f71c737d568b8430e.
20:14:47,067 INFO
org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer
[] - Flushing new partitions
20:14:47,067 INFO
org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer
[] - Flushing new partitions
20:14:47,067 INFO
org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer
[] - Flushing new partitions
20:14:47,067 INFO
org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer
[] - Flushing new partitions
20:14:47,067 INFO
org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer
[] - Flushing new partitions
---------------


感觉像是checkpoint 过程出错了,我的chk配置如下:

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000L,CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(180000L);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L);
       
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.setStateBackend(new FsStateBackend("file:///osquerydemo/chk"));



是否是因为chk太平凡,写文件操作不过来导致的问题?请帮忙查看



--
Sent from: http://apache-flink.147419.n8.nabble.com/
lp
Reply | Threaded
Open this post in threaded view
|

Re: flink sink kafka 报错

lp
中间还有这样的错误:


20:14:48,707 WARN  org.apache.kafka.common.utils.AppInfoParser                
[] - Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException:
kafka.producer:type=app-info,id="producer-Source: Custom Source -> Flat Map
-> Map -> Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-8"
        at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
~[?:1.8.0_231]
        at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
~[?:1.8.0_231]
        at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
~[?:1.8.0_231]
        at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
~[?:1.8.0_231]
        at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
~[?:1.8.0_231]
        at
com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
~[?:1.8.0_231]
        at
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)
~[kafka-clients-2.4.1.jar:?]
        at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:426)
~[kafka-clients-2.4.1.jar:?]
        at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
~[kafka-clients-2.4.1.jar:?]
        at
org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:77)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.lambda$abortTransactions$3(FlinkKafkaProducer.java:1282)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
        at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
~[?:1.8.0_231]
        at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
~[?:1.8.0_231]
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
~[?:1.8.0_231]
        at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
~[?:1.8.0_231]
        at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
~[?:1.8.0_231]
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
~[?:1.8.0_231]
        at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:401)
~[?:1.8.0_231]
        at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734)
~[?:1.8.0_231]
        at
java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
~[?:1.8.0_231]
        at
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
~[?:1.8.0_231]
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
~[?:1.8.0_231]
        at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
~[?:1.8.0_231]
        at
java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583)
~[?:1.8.0_231]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.abortTransactions(FlinkKafkaProducer.java:1263)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.cleanUpUserContext(FlinkKafkaProducer.java:1249)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.finishRecoveringContext(FlinkKafkaProducer.java:1224)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:380)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1195)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
[flink-runtime_2.11-1.12.1.jar:1.12.1]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
[flink-runtime_2.11-1.12.1.jar:1.12.1]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_231]
20:14:48,708 INFO
org.apache.kafka.clients.producer.internals.TransactionManager [] -
[Producer clientId=producer-Source: Custom Source -> Flat Map -> Map ->
Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-8, transactionalId=Source:
Custom Source -> Flat Map -> Map -> Sink:
Unnamed-3d05135cf7d8f1375d8f655ba9d20255-8] ProducerId set to -1 with epoch
-1
20:14:48,708 INFO  org.apache.kafka.common.utils.AppInfoParser                
[] - Kafka version: 2.4.1



--
Sent from: http://apache-flink.147419.n8.nabble.com/