RichMapFunction的问题

classic Classic list List threaded Threaded
37 messages Options
12
Reply | Threaded
Open this post in threaded view
|

flink1.9 on yarn 运行二个多月之后出现错误

guanyq

附件为错误日志。哪位大佬帮忙分析下。



 


error.log (89K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

回复: flink1.9 on yarn 运行二个多月之后出现错误

xueaohui_com@163.com
不知道有没有yarn上面的详细日志。

hdfs是否有权限问题



[hidden email]
 
发件人: guanyq
发送时间: 2020-06-20 08:48
收件人: user-zh
主题: flink1.9 on yarn 运行二个多月之后出现错误
附件为错误日志。哪位大佬帮忙分析下。


 
Reply | Threaded
Open this post in threaded view
|

Re: flink1.9 on yarn 运行二个多月之后出现错误

LakeShen
Hi guanyq,

从日志中,我看到 TaskManager 所在机器的本地存储几乎快用完了。

看下是否因为 TaskManager 所在机器的存储不够导致

Best,
LakeShen

[hidden email] <[hidden email]> 于2020年6月20日周六 上午9:57写道:

> 不知道有没有yarn上面的详细日志。
>
> hdfs是否有权限问题
>
>
>
> [hidden email]
>
> 发件人: guanyq
> 发送时间: 2020-06-20 08:48
> 收件人: user-zh
> 主题: flink1.9 on yarn 运行二个多月之后出现错误
> 附件为错误日志。哪位大佬帮忙分析下。
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

flink1.9 on yarn

guanyq
In reply to this post by 马阳阳
问题1

./bin/flink run -m yarn-cluster每次提交是都会产生一个新的APP-ID如:application_1567067657620_0254

当yarn application -kill application_1567067657620_0254后,

在提交./bin/flink run -m yarn-cluster如何不让这个appid递增?

问题2

./bin/flink run -m yarn-cluster提交任务后。cancel掉job,如何在提交到这个appid上?

 
Reply | Threaded
Open this post in threaded view
|

Re: flink1.9 on yarn

LakeShen
Hi guanyq,

你为什么希望 app id 不变呢?

Best,
LakeShen

guanyq <[hidden email]> 于2020年6月28日周日 上午9:10写道:

> 问题1
>
> ./bin/flink run -m
> yarn-cluster每次提交是都会产生一个新的APP-ID如:application_1567067657620_0254
>
> 当yarn application -kill application_1567067657620_0254后,
>
> 在提交./bin/flink run -m yarn-cluster如何不让这个appid递增?
>
> 问题2
>
> ./bin/flink run -m yarn-cluster提交任务后。cancel掉job,如何在提交到这个appid上?
>
>
Reply | Threaded
Open this post in threaded view
|

Re: flink1.9 on yarn

Yangze Guo
In reply to this post by guanyq
我理解你需要使用session模式,即./bin/yarn-session.sh [1]

[1] https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/yarn_setup.html#flink-yarn-session

Best,
Yangze Guo

On Sun, Jun 28, 2020 at 9:10 AM guanyq <[hidden email]> wrote:

>
> 问题1
>
> ./bin/flink run -m yarn-cluster每次提交是都会产生一个新的APP-ID如:application_1567067657620_0254
>
> 当yarn application -kill application_1567067657620_0254后,
>
> 在提交./bin/flink run -m yarn-cluster如何不让这个appid递增?
>
> 问题2
>
> ./bin/flink run -m yarn-cluster提交任务后。cancel掉job,如何在提交到这个appid上?
>
>
Reply | Threaded
Open this post in threaded view
|

Re:flink1.9 on yarn

Roc Marshal
In reply to this post by guanyq
Hi, guanyq.

关于问题1:在提交./bin/flink run -m yarn-cluster如何不让这个appid递增?
这个appid的自增策略并不是根据Flink负责生成,如果有必要,你可以对hadoop-yarn进行调研,并做出你的结论。



关于问题2 ./bin/flink run -m yarn-cluster提交任务后。cancel掉job,如何在提交到这个appid上?
我是否可以理解为,flink yarn-session模式的集群更适合你的作业需求呢?因为在问题中提到的提交方式为per-job,job关闭后,Flink即关闭集群。
可参考: https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/yarn_setup.html#start-flink-session
Best,
Roc Marshal

在 2020-06-28 09:09:43,"guanyq" <[hidden email]> 写道:

>问题1
>
>./bin/flink run -m yarn-cluster每次提交是都会产生一个新的APP-ID如:application_1567067657620_0254
>
>当yarn application -kill application_1567067657620_0254后,
>
>在提交./bin/flink run -m yarn-cluster如何不让这个appid递增?
>
>问题2
>
>./bin/flink run -m yarn-cluster提交任务后。cancel掉job,如何在提交到这个appid上?
>
>
Reply | Threaded
Open this post in threaded view
|

Re: flink1.9 on yarn

zhisheng
In reply to this post by Yangze Guo
hi,guanyq

你这种提交方式属于 Flink On YARN 的 per job 模式,机制是这样的,当新提一个作业的时候,AppID 是会变化的。

Best!
zhisheng

Yangze Guo <[hidden email]> 于2020年6月28日周日 上午9:59写道:

> 我理解你需要使用session模式,即./bin/yarn-session.sh [1]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/yarn_setup.html#flink-yarn-session
>
> Best,
> Yangze Guo
>
> On Sun, Jun 28, 2020 at 9:10 AM guanyq <[hidden email]> wrote:
> >
> > 问题1
> >
> > ./bin/flink run -m
> yarn-cluster每次提交是都会产生一个新的APP-ID如:application_1567067657620_0254
> >
> > 当yarn application -kill application_1567067657620_0254后,
> >
> > 在提交./bin/flink run -m yarn-cluster如何不让这个appid递增?
> >
> > 问题2
> >
> > ./bin/flink run -m yarn-cluster提交任务后。cancel掉job,如何在提交到这个appid上?
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

flink1.9自定义实现source的问题

guanyq
In reply to this post by guanyq
附件图片,想把listener出来的数据,传给ctx。
如何实现这个数据的传递。
public class RMQRichParallelSource extends RichParallelSourceFunction<String> implements MessageOrderListener {

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Properties properties = new Properties();

// 在订阅消息前,必须调用 start 方法来启动 Consumer,只需调用一次即可。
OrderConsumer consumer = ONSFactory.createOrderedConsumer(properties);

consumer.subscribe(
"PRODNOC_KB_SYNC_CUST_ORDER",
"*",
this);
consumer.start();
}

@Override
public void run(SourceContext<String> ctx) {


}

@Override
public OrderAction consume(Message message, ConsumeOrderContext consumeOrderContext) {
try {
System.out.println(new String(message.getBody(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return OrderAction.Success;
}

@Override
public void cancel() {
}

@Override
public void close() throws Exception {
super.close();
}
}


 

Reply | Threaded
Open this post in threaded view
|

flink1.9读取阿里Mq问题

guanyq
flink1.9读取阿里RocketMQ
如何设置AccessKey,SecretKey 参数


finalRMQConnectionConfigconnectionConfig=newRMQConnectionConfig.Builder().setHost("localhost").setPort(5000)....build();
Reply | Threaded
Open this post in threaded view
|

Re: flink1.9读取阿里Mq问题

zhisheng
hi,guanyq

社区版本的 Flink 应该默认没有和 RocketMQ 连接的 Connector,在 RocketMQ 的社区项目中看到和 Flink 整合的模块:
https://github.com/apache/rocketmq-externals/tree/master/rocketmq-flink

你说的 AccessKey,SecretKey 参数应该是 ACL 权限校验,看了代码应该是不支持的,不过可以自己去进行扩展。

Best!
zhisheng

guanyq <[hidden email]> 于2020年7月3日周五 下午11:44写道:

> flink1.9读取阿里RocketMQ
> 如何设置AccessKey,SecretKey 参数
>
>
>
> finalRMQConnectionConfigconnectionConfig=newRMQConnectionConfig.Builder().setHost("localhost").setPort(5000)....build();
Reply | Threaded
Open this post in threaded view
|

回复:flink1.9读取阿里Mq问题

李军
In reply to this post by guanyq
        您好!
        自定义source继承RichSourceFuntion.open() 里去构建Conumer 可以设置AccessKey,SecretKey 参数;
        

2020-7-4
2020年7月3日 23:44[hidden email] 写道:
flink1.9读取阿里RocketMQ
如何设置AccessKey,SecretKey 参数


finalRMQConnectionConfigconnectionConfig=newRMQConnectionConfig.Builder().setHost("localhost").setPort(5000)....build();
Reply | Threaded
Open this post in threaded view
|

Re: flink1.9 on yarn 运行二个多月之后出现错误

Congxian Qiu
In reply to this post by LakeShen
你给的日志不像第一次失败的作业的日志,你可能需要看一下之前那个 job 的 jm log 看看是啥原因导致的失败。

Best,
Congxian


LakeShen <[hidden email]> 于2020年6月23日周二 下午10:07写道:

> Hi guanyq,
>
> 从日志中,我看到 TaskManager 所在机器的本地存储几乎快用完了。
>
> 看下是否因为 TaskManager 所在机器的存储不够导致
>
> Best,
> LakeShen
>
> [hidden email] <[hidden email]> 于2020年6月20日周六 上午9:57写道:
>
> > 不知道有没有yarn上面的详细日志。
> >
> > hdfs是否有权限问题
> >
> >
> >
> > [hidden email]
> >
> > 发件人: guanyq
> > 发送时间: 2020-06-20 08:48
> > 收件人: user-zh
> > 主题: flink1.9 on yarn 运行二个多月之后出现错误
> > 附件为错误日志。哪位大佬帮忙分析下。
> >
> >
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

flink1.9.2版本升级到1.12.0版本启动异常

guanyq
In reply to this post by guanyq
各位大佬。help


flink1.9.2版本升级到1.12.0版本
flink on yarn部署


异常日志如下
------------------------------------------------------------
The program finished with the following exception:


org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Could not deploy Yarn job cluster.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:743)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:242)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047)
Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: Could not deploy Yarn job cluster.
at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:460)
at org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1940)
at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1822)
at com.data.processing.publicbroadbandbusiness.tasks.DataProcessA.main(DataProcessA.java:582)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
... 11 more
Caused by: org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The YARN application unexpectedly switched to state FAILED during deployment.
Diagnostics from YARN: Application application_1607308494016_1858 failed 1 times (global limit =2; local limit is =1) due to AM Container for appattempt_1607308494016_1858_000001 exited with exitCode: 2
For more detailed output, check the application tracking page: http://nn01.hadoop.unicom:8088/cluster/app/application_1607308494016_1858 Then click on links to logsof each attempt.
Diagnostics: Exception from container-launch.
Container id: container_e70_1607308494016_1858_01_000001
Exit code: 2
Stack trace: ExitCodeException exitCode=2:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:933)
at org.apache.hadoop.util.Shell.run(Shell.java:844)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1123)
at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:237)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:317)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:83)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)



Container exited with a non-zero exit code 2
Failing this attempt. Failing the application.
If log aggregation is enabled on your cluster, use this command to further investigate the issue:
yarn logs -applicationId application_1607308494016_1858
at org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:1078)
at org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:558)
at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:453)
... 22 more
Reply | Threaded
Open this post in threaded view
|

1.12.0版本启动异常 on yarn per job方式

guanyq
In reply to this post by guanyq
看错误是与hadoop-common-2.7.4.jar冲突,但是不知道如何解决。
help
2021-01-1915:12:47,922ERRORorg.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Fatal error occurred in ResourceManager.
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: Could not start the ResourceManager akka.tcp://[hidden email]:45554/user/rpc/resourcemanager_0
at org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:220) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:183) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:551) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:172) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.12-1.12.0.jar:1.12.0]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-dist_2.12-1.12.0.jar:1.12.0]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.12-1.12.0.jar:1.12.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.12-1.12.0.jar:1.12.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.actor.Actor.aroundReceive(Actor.scala:517) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.actor.Actor.aroundReceive$(Actor.scala:515) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.12.0.jar:1.12.0]
Caused by: java.lang.NoSuchMethodError: org.apache.hadoop.io.retry.RetryPolicies.retryForeverWithFixedSleep(JLjava/util/concurrent/TimeUnit;)Lorg/apache/hadoop/io/retry/RetryPolicy;
at org.apache.hadoop.yarn.client.RMProxy.createRetryPolicy(RMProxy.java:280) ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.yarn.client.RMProxy.createRetryPolicy(RMProxy.java:211) ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider.createRetriableProxy(RequestHedgingRMFailoverProxyProvider.java:95) ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider.init(RequestHedgingRMFailoverProxyProvider.java:77) ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.yarn.client.RMProxy.createRMFailoverProxyProvider(RMProxy.java:190) ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.yarn.client.RMProxy.newProxyInstance(RMProxy.java:120) ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.yarn.client.RMProxy.createRMProxy(RMProxy.java:94) ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.yarn.client.ClientRMProxy.createRMProxy(ClientRMProxy.java:72) ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.serviceStart(AMRMClientImpl.java:186) ~[hadoop-yarn-client-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193) ~[FlinkDataProcess.jar:?]
at org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl.serviceStart(AMRMClientAsyncImpl.java:93) ~[hadoop-yarn-client-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193) ~[FlinkDataProcess.jar:?]
at org.apache.flink.yarn.YarnResourceManagerDriver.initializeInternal(YarnResourceManagerDriver.java:159) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver.initialize(AbstractResourceManagerDriver.java:80) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.initialize(ActiveResourceManager.java:121) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.resourcemanager.ResourceManager.startResourceManagerServices(ResourceManager.java:230) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:218) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
... 22 more
2021-01-1915:12:47,929ERRORorg.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error occurred in the cluster entrypoint.
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: Could not start the ResourceManager akka.tcp://[hidden email]:45554/user/rpc/resourcemanager_0
at org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:220) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:183) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:551) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:172) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.12-1.12.0.jar:1.12.0]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-dist_2.12-1.12.0.jar:1.12.0]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.12-1.12.0.jar:1.12.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.12-1.12.0.jar:1.12.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.actor.Actor.aroundReceive(Actor.scala:517) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.actor.Actor.aroundReceive$(Actor.scala:515) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.12.0.jar:1.12.0]
Caused by: java.lang.NoSuchMethodError: org.apache.hadoop.io.retry.RetryPolicies.retryForeverWithFixedSleep(JLjava/util/concurrent/TimeUnit;)Lorg/apache/hadoop/io/retry/RetryPolicy;
at org.apache.hadoop.yarn.client.RMProxy.createRetryPolicy(RMProxy.java:280) ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.yarn.client.RMProxy.createRetryPolicy(RMProxy.java:211) ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider.createRetriableProxy(RequestHedgingRMFailoverProxyProvider.java:95) ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider.init(RequestHedgingRMFailoverProxyProvider.java:77) ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.yarn.client.RMProxy.createRMFailoverProxyProvider(RMProxy.java:190) ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.yarn.client.RMProxy.newProxyInstance(RMProxy.java:120) ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.yarn.client.RMProxy.createRMProxy(RMProxy.java:94) ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.yarn.client.ClientRMProxy.createRMProxy(ClientRMProxy.java:72) ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.serviceStart(AMRMClientImpl.java:186) ~[hadoop-yarn-client-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193) ~[FlinkDataProcess.jar:?]
at org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl.serviceStart(AMRMClientAsyncImpl.java:93) ~[hadoop-yarn-client-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193) ~[FlinkDataProcess.jar:?]
at org.apache.flink.yarn.YarnResourceManagerDriver.initializeInternal(YarnResourceManagerDriver.java:159) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver.initialize(AbstractResourceManagerDriver.java:80) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.initialize(ActiveResourceManager.java:121) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.resourcemanager.ResourceManager.startResourceManagerServices(ResourceManager.java:230) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:218) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
... 22 more
2021-01-1915:12:47,934INFOorg.apache.flink.runtime.blob.BlobServer [] - Stopped BLOB server at 0.0.0.0:44328
Reply | Threaded
Open this post in threaded view
|

Re: 1.12.0版本启动异常 on yarn per job方式

Xintong Song
检查一下你的作业 jar 包里是否把 hadoop 依赖也打进去了。一般情况下 hadoop 依赖应该设成 provided,如果作业确实有需要用到和
yarn 集群不同版本的 hadoop 依赖,需要 shade。

Thank you~
Xintong Song



Thank you~

Xintong Song



On Tue, Jan 19, 2021 at 3:31 PM guanyq <[hidden email]> wrote:

> 看错误是与hadoop-common-2.7.4.jar冲突,但是不知道如何解决。
> help
> 2021-01-1915:12:47,922ERRORorg.apache.flink.runtime.resourcemanager.active.ActiveResourceManager
> [] - Fatal error occurred in ResourceManager.
> org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException:
> Could not start the ResourceManager akka.tcp://[hidden email]
> :45554/user/rpc/resourcemanager_0
> at
> org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:220)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:183)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:551)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:172)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> Caused by: java.lang.NoSuchMethodError:
> org.apache.hadoop.io.retry.RetryPolicies.retryForeverWithFixedSleep(JLjava/util/concurrent/TimeUnit;)Lorg/apache/hadoop/io/retry/RetryPolicy;
> at
> org.apache.hadoop.yarn.client.RMProxy.createRetryPolicy(RMProxy.java:280)
> ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
> at
> org.apache.hadoop.yarn.client.RMProxy.createRetryPolicy(RMProxy.java:211)
> ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
> at
> org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider.createRetriableProxy(RequestHedgingRMFailoverProxyProvider.java:95)
> ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
> at
> org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider.init(RequestHedgingRMFailoverProxyProvider.java:77)
> ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
> at
> org.apache.hadoop.yarn.client.RMProxy.createRMFailoverProxyProvider(RMProxy.java:190)
> ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
> at
> org.apache.hadoop.yarn.client.RMProxy.newProxyInstance(RMProxy.java:120)
> ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
> at org.apache.hadoop.yarn.client.RMProxy.createRMProxy(RMProxy.java:94)
> ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
> at
> org.apache.hadoop.yarn.client.ClientRMProxy.createRMProxy(ClientRMProxy.java:72)
> ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
> at
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.serviceStart(AMRMClientImpl.java:186)
> ~[hadoop-yarn-client-2.7.3.2.6.0.3-8.jar:?]
> at
> org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
> ~[FlinkDataProcess.jar:?]
> at
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl.serviceStart(AMRMClientAsyncImpl.java:93)
> ~[hadoop-yarn-client-2.7.3.2.6.0.3-8.jar:?]
> at
> org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
> ~[FlinkDataProcess.jar:?]
> at
> org.apache.flink.yarn.YarnResourceManagerDriver.initializeInternal(YarnResourceManagerDriver.java:159)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver.initialize(AbstractResourceManagerDriver.java:80)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.initialize(ActiveResourceManager.java:121)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.resourcemanager.ResourceManager.startResourceManagerServices(ResourceManager.java:230)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:218)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> ... 22 more
> 2021-01-1915:12:47,929ERRORorg.apache.flink.runtime.entrypoint.ClusterEntrypoint
> [] - Fatal error occurred in the cluster entrypoint.
> org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException:
> Could not start the ResourceManager akka.tcp://[hidden email]
> :45554/user/rpc/resourcemanager_0
> at
> org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:220)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:183)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:551)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:172)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> Caused by: java.lang.NoSuchMethodError:
> org.apache.hadoop.io.retry.RetryPolicies.retryForeverWithFixedSleep(JLjava/util/concurrent/TimeUnit;)Lorg/apache/hadoop/io/retry/RetryPolicy;
> at
> org.apache.hadoop.yarn.client.RMProxy.createRetryPolicy(RMProxy.java:280)
> ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
> at
> org.apache.hadoop.yarn.client.RMProxy.createRetryPolicy(RMProxy.java:211)
> ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
> at
> org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider.createRetriableProxy(RequestHedgingRMFailoverProxyProvider.java:95)
> ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
> at
> org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider.init(RequestHedgingRMFailoverProxyProvider.java:77)
> ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
> at
> org.apache.hadoop.yarn.client.RMProxy.createRMFailoverProxyProvider(RMProxy.java:190)
> ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
> at
> org.apache.hadoop.yarn.client.RMProxy.newProxyInstance(RMProxy.java:120)
> ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
> at org.apache.hadoop.yarn.client.RMProxy.createRMProxy(RMProxy.java:94)
> ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
> at
> org.apache.hadoop.yarn.client.ClientRMProxy.createRMProxy(ClientRMProxy.java:72)
> ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
> at
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.serviceStart(AMRMClientImpl.java:186)
> ~[hadoop-yarn-client-2.7.3.2.6.0.3-8.jar:?]
> at
> org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
> ~[FlinkDataProcess.jar:?]
> at
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl.serviceStart(AMRMClientAsyncImpl.java:93)
> ~[hadoop-yarn-client-2.7.3.2.6.0.3-8.jar:?]
> at
> org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
> ~[FlinkDataProcess.jar:?]
> at
> org.apache.flink.yarn.YarnResourceManagerDriver.initializeInternal(YarnResourceManagerDriver.java:159)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver.initialize(AbstractResourceManagerDriver.java:80)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.initialize(ActiveResourceManager.java:121)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.resourcemanager.ResourceManager.startResourceManagerServices(ResourceManager.java:230)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:218)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> ... 22 more
> 2021-01-1915:12:47,934INFOorg.apache.flink.runtime.blob.BlobServer [] -
> Stopped BLOB server at 0.0.0.0:44328
Reply | Threaded
Open this post in threaded view
|

flink 1.12.0版本 消费0.10版本kafka集群数据==>0.9版本kafka集

guanyq
In reply to this post by guanyq
请问下如何选择kafka connector的版本
如果选择1.12.0版本,就没有FlinkKafkaProducer09/FlinkKafkaConsumer09
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
</dependency>
12