测试代码如下:
-------------------------- public class Sink_KafkaSink_1{ public static void main(String[] args) throws Exception { final ParameterTool params = ParameterTool.fromPropertiesFile(Sink_KafkaSink_1.class.getResourceAsStream("/pro.properties")); String host = params.get("host"); int kafkaPort = Integer.parseInt(params.get("kafkaPort")); produceTestdata2kafka(new StringJoiner(":").add(host).add(String.valueOf(kafkaPort)).toString()); } private static void produceTestdata2kafka(String kafkaAddr) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); DataStreamSource<String> text = env.addSource(new CustomsourceFuncation()).setParallelism(1); Properties properties = new Properties(); properties.setProperty("bootstrap.servers",kafkaAddr); FlinkKafkaProducer producer = new FlinkKafkaProducer("flinktest",//topic new SimpleStringSchema(), //消息序列化 properties ); //写入 Kafka 时附加记录的事件时间戳 producer.setWriteTimestampToKafka(true); text.addSink(producer); env.execute("[kafkaSink with custom source]"); } } class CustomsourceFuncation implements SourceFunction<String> { //private long count = 1L; private boolean isRunning = true; @Override public void run(SourceContext<String> ctx) throws Exception { while(isRunning){ //图书的排行榜 List<String> books = new ArrayList<>(); books.add("msg1"); books.add("msg2"); books.add("msg3"); books.add("msg4"); books.add("msg5"); int i = new Random().nextInt(5); ctx.collect(books.get(i)); //每2秒产生一条数据 Thread.sleep(2000); } } //取消一个cancel的时候会调用的方法 @Override public void cancel() { isRunning = false; } } ------------------------------------------ 本地测试无异常,maven打包后提交yarn集群运行,application Mode模式,jobmanager循环一直报错如下: ---------------------------------- 2021-01-22 07:54:31,929 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job [kafkaSink with custom source] (c256bf309be7e543182c5e1d9af659ef) switched from state RUNNING to RESTARTING. 2021-01-22 07:54:32,930 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job [kafkaSink with custom source] (c256bf309be7e543182c5e1d9af659ef) switched from state RESTARTING to RUNNING. 2021-01-22 07:54:32,931 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No checkpoint found during restore. 2021-01-22 07:54:32,931 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28) switched from CREATED to SCHEDULED. 2021-01-22 07:54:32,932 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28) switched from SCHEDULED to DEPLOYING. 2021-01-22 07:54:32,932 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Source: Custom Source -> Sink: Unnamed (1/1) (attempt #2) with attempt id ca057bcbb78c0a81fc471d81db89ec28 to container_1611044725922_0027_01_000002 @ slave02 (dataPort=37913) with allocation id 3f0f1dc64e898272d68989ca9a8feff2 2021-01-22 07:54:32,950 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28) switched from DEPLOYING to RUNNING. 2021-01-22 07:54:32,969 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28) switched from RUNNING to FAILED on container_1611044725922_0027_01_000002 @ slave02 (dataPort=37913). org.apache.kafka.common.KafkaException: Failed to construct kafka producer at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432) ~[quickstart-0.1.jar:?] at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:77) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1230) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1346) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1342) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:990) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:99) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:403) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:394) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1195) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565) ~[quickstart-0.1.jar:?] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) ~[quickstart-0.1.jar:?] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) ~[quickstart-0.1.jar:?] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_231] Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374) ~[quickstart-0.1.jar:?] at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392) ~[quickstart-0.1.jar:?] at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359) ~[quickstart-0.1.jar:?] ... 23 more 2021-01-22 07:54:32,971 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - Calculating tasks to restart to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_0. 2021-01-22 07:54:32,971 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - 1 tasks should be restarted to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_0. 2021-01-22 07:54:32,971 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job [kafkaSink with custom source] (c256bf309be7e543182c5e1d9af659ef) switched from state RUNNING to RESTARTING. 2021-01-22 07:54:33,973 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job [kafkaSink with custom source] (c256bf309be7e543182c5e1d9af659ef) switched from state RESTARTING to RUNNING. ---------------------------------- flink 1.12.1版本,试着用per-job mode 部署是ok的,在flinktest 这个topic能正常消费到数据 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
hi,这个是依赖的问题。如果集群flink/lib下已经有了flink-connector-kafka.jar,提交的任务pom里面就要provider一下
在 2021-01-22 16:14:17,"lp" <[hidden email]> 写道: >测试代码如下: >-------------------------- >public class Sink_KafkaSink_1{ > public static void main(String[] args) throws Exception { > final ParameterTool params = >ParameterTool.fromPropertiesFile(Sink_KafkaSink_1.class.getResourceAsStream("/pro.properties")); > String host = params.get("host"); > int kafkaPort = Integer.parseInt(params.get("kafkaPort")); > produceTestdata2kafka(new >StringJoiner(":").add(host).add(String.valueOf(kafkaPort)).toString()); > } > > private static void produceTestdata2kafka(String kafkaAddr) throws >Exception { > StreamExecutionEnvironment env = >StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000); > >env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > > DataStreamSource<String> text = env.addSource(new >CustomsourceFuncation()).setParallelism(1); > > Properties properties = new Properties(); > properties.setProperty("bootstrap.servers",kafkaAddr); > > FlinkKafkaProducer producer = new >FlinkKafkaProducer("flinktest",//topic > new SimpleStringSchema(), //消息序列化 > properties > ); > //写入 Kafka 时附加记录的事件时间戳 > producer.setWriteTimestampToKafka(true); > text.addSink(producer); > env.execute("[kafkaSink with custom source]"); > } >} > >class CustomsourceFuncation implements SourceFunction<String> { > //private long count = 1L; > private boolean isRunning = true; > > @Override > public void run(SourceContext<String> ctx) throws Exception { > while(isRunning){ > //图书的排行榜 > List<String> books = new ArrayList<>(); > books.add("msg1"); > books.add("msg2"); > books.add("msg3"); > books.add("msg4"); > books.add("msg5"); > int i = new Random().nextInt(5); > ctx.collect(books.get(i)); > //每2秒产生一条数据 > Thread.sleep(2000); > } > } > > //取消一个cancel的时候会调用的方法 > @Override > public void cancel() { > isRunning = false; > } >} >------------------------------------------ > >本地测试无异常,maven打包后提交yarn集群运行,application Mode模式,jobmanager循环一直报错如下: >---------------------------------- >2021-01-22 07:54:31,929 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job >[kafkaSink with custom source] (c256bf309be7e543182c5e1d9af659ef) switched >from state RUNNING to RESTARTING. >2021-01-22 07:54:32,930 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job >[kafkaSink with custom source] (c256bf309be7e543182c5e1d9af659ef) switched >from state RESTARTING to RUNNING. >2021-01-22 07:54:32,931 INFO >org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No >checkpoint found during restore. >2021-01-22 07:54:32,931 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: >Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28) >switched from CREATED to SCHEDULED. >2021-01-22 07:54:32,932 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: >Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28) >switched from SCHEDULED to DEPLOYING. >2021-01-22 07:54:32,932 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying >Source: Custom Source -> Sink: Unnamed (1/1) (attempt #2) with attempt id >ca057bcbb78c0a81fc471d81db89ec28 to container_1611044725922_0027_01_000002 @ >slave02 (dataPort=37913) with allocation id 3f0f1dc64e898272d68989ca9a8feff2 >2021-01-22 07:54:32,950 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: >Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28) >switched from DEPLOYING to RUNNING. >2021-01-22 07:54:32,969 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: >Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28) >switched from RUNNING to FAILED on container_1611044725922_0027_01_000002 @ >slave02 (dataPort=37913). >org.apache.kafka.common.KafkaException: Failed to construct kafka producer > at >org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432) >~[quickstart-0.1.jar:?] > at >org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298) >~[quickstart-0.1.jar:?] > at >org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:77) >~[quickstart-0.1.jar:?] > at >org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1230) >~[quickstart-0.1.jar:?] > at >org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1346) >~[quickstart-0.1.jar:?] > at >org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1342) >~[quickstart-0.1.jar:?] > at >org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:990) >~[quickstart-0.1.jar:?] > at >org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:99) >~[quickstart-0.1.jar:?] > at >org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:403) >~[quickstart-0.1.jar:?] > at >org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:394) >~[quickstart-0.1.jar:?] > at >org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1195) >~[quickstart-0.1.jar:?] > at >org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189) >~[quickstart-0.1.jar:?] > at >org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171) >~[quickstart-0.1.jar:?] > at >org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) >~[quickstart-0.1.jar:?] > at >org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111) >~[quickstart-0.1.jar:?] > at >org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290) >~[quickstart-0.1.jar:?] > at >org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425) >~[quickstart-0.1.jar:?] > at >org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535) >~[quickstart-0.1.jar:?] > at >org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) >~[quickstart-0.1.jar:?] > at >org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525) >~[quickstart-0.1.jar:?] > at >org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565) >~[quickstart-0.1.jar:?] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) >~[quickstart-0.1.jar:?] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) >~[quickstart-0.1.jar:?] > at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_231] >Caused by: org.apache.kafka.common.KafkaException: class >org.apache.kafka.common.serialization.ByteArraySerializer is not an instance >of org.apache.kafka.common.serialization.Serializer > at >org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374) >~[quickstart-0.1.jar:?] > at >org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392) >~[quickstart-0.1.jar:?] > at >org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359) >~[quickstart-0.1.jar:?] > ... 23 more >2021-01-22 07:54:32,971 INFO >org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy >[] - Calculating tasks to restart to recover the failed task >cbc357ccb763df2852fee8c4fc7d55f2_0. >2021-01-22 07:54:32,971 INFO >org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy >[] - 1 tasks should be restarted to recover the failed task >cbc357ccb763df2852fee8c4fc7d55f2_0. >2021-01-22 07:54:32,971 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job >[kafkaSink with custom source] (c256bf309be7e543182c5e1d9af659ef) switched >from state RUNNING to RESTARTING. >2021-01-22 07:54:33,973 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job >[kafkaSink with custom source] (c256bf309be7e543182c5e1d9af659ef) switched >from state RESTARTING to RUNNING. >---------------------------------- > > >flink 1.12.1版本,试着用per-job mode 部署是ok的,在flinktest 这个topic能正常消费到数据 > > > >-- >Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |