dear all:
我用的flink版本1.9,现在我把flinksql语句生成了jobGroph,现在我不知道如何提交到yarn上,以pre-job方式运行。我尝试通过生成 yarnClusterDescriptor提交代码,但不成功,代码大致如下,各位有没有成功示例求分享。 ...... JobGraph jobGraph = env.getStreamGraph("StreamSql").getJobGraph(); ...... ...... yarnClusterDescriptor.deployJobCluster( clusterSpecification, jobGraph, true); |
不成功的报错是啥?
Best, tison. nicygan <[hidden email]> 于2020年3月7日周六 上午11:14写道: > dear all: > > 我用的flink版本1.9,现在我把flinksql语句生成了jobGroph,现在我不知道如何提交到yarn上,以pre-job方式运行。我尝试通过生成 > yarnClusterDescriptor提交代码,但不成功,代码大致如下,各位有没有成功示例求分享。 > > ...... > JobGraph jobGraph = env.getStreamGraph("StreamSql").getJobGraph(); > ...... > ...... > yarnClusterDescriptor.deployJobCluster( > clusterSpecification, > jobGraph, true); > > |
tison,你好运行到这里时,报空指针
Caused by: java.lang.NullPointerException at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:506) getNodeReports方法中: GetClusterNodesResponse response = rmClient.getClusterNodes(request); 这句的rmClient为null值。 我看YarnClientImpl中,有个start()可为rmClient赋值,但是,我加上此方法执行时又会报以下错误: Exception in thread "main" org.apache.hadoop.service.ServiceStateException: org.apache.hadoop.yarn.client.api.impl.YarnClientImpl cannot enter state STARTED from state NOTINITED at org.apache.hadoop.service.ServiceStateModel.checkStateTransition(ServiceStateModel.java:129) at org.apache.hadoop.service.ServiceStateModel.enterState(ServiceStateModel.java:111) at org.apache.hadoop.service.AbstractService.start(AbstractService.java:190) 在 2020-03-07 11:15:10,"tison" <[hidden email]> 写道: >不成功的报错是啥? > >Best, >tison. > > >nicygan <[hidden email]> 于2020年3月7日周六 上午11:14写道: > >> dear all: >> >> 我用的flink版本1.9,现在我把flinksql语句生成了jobGroph,现在我不知道如何提交到yarn上,以pre-job方式运行。我尝试通过生成 >> yarnClusterDescriptor提交代码,但不成功,代码大致如下,各位有没有成功示例求分享。 >> >> ...... >> JobGraph jobGraph = env.getStreamGraph("StreamSql").getJobGraph(); >> ...... >> ...... >> yarnClusterDescriptor.deployJobCluster( >> clusterSpecification, >> jobGraph, true); >> >> |
OK 你先说一下你的 Flink 版本是啥,我感觉这个是很久以前的版本,差不多 1.7
的样子,然后没有保密问题的话你的代码文件发一下,然后完整报错堆栈也发一下。 这个可能是你初始化 YarnClusterDescriptor 的时候有问题,你提供的都是很残缺的片段,没法猜测到底是啥原因。 一般来说 Flink 现在的 Client 实现并不是很好,直接使用 CLI 是不会有太多问题的,如果直接依赖 ClusterDescriptor 这些抽象里面有一些潜规则,你自己看的话可能得对着 Flink 的源码使用点逐步调试排查。 Best, tison. nicygan <[hidden email]> 于2020年3月7日周六 下午3:16写道: > tison,你好运行到这里时,报空指针 > Caused by: java.lang.NullPointerException > at > org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:506) > > getNodeReports方法中: > GetClusterNodesResponse response = rmClient.getClusterNodes(request); > 这句的rmClient为null值。 > > > > 我看YarnClientImpl中,有个start()可为rmClient赋值,但是,我加上此方法执行时又会报以下错误: > Exception in thread "main" > org.apache.hadoop.service.ServiceStateException: > org.apache.hadoop.yarn.client.api.impl.YarnClientImpl cannot enter state > STARTED from state NOTINITED > at > org.apache.hadoop.service.ServiceStateModel.checkStateTransition(ServiceStateModel.java:129) > at > org.apache.hadoop.service.ServiceStateModel.enterState(ServiceStateModel.java:111) > at > org.apache.hadoop.service.AbstractService.start(AbstractService.java:190) > > > > > > > > > 在 2020-03-07 11:15:10,"tison" <[hidden email]> 写道: > >不成功的报错是啥? > > > >Best, > >tison. > > > > > >nicygan <[hidden email]> 于2020年3月7日周六 上午11:14写道: > > > >> dear all: > >> > >> > 我用的flink版本1.9,现在我把flinksql语句生成了jobGroph,现在我不知道如何提交到yarn上,以pre-job方式运行。我尝试通过生成 > >> yarnClusterDescriptor提交代码,但不成功,代码大致如下,各位有没有成功示例求分享。 > >> > >> ...... > >> JobGraph jobGraph = env.getStreamGraph("StreamSql").getJobGraph(); > >> ...... > >> ...... > >> yarnClusterDescriptor.deployJobCluster( > >> clusterSpecification, > >> jobGraph, true); > >> > >> > |
tison,你好。
版本是1.9,没啥隐私,代码如下: JobGraph jobGraph = env.getStreamGraph("StreamSql").getJobGraph(); YarnConfiguration yarnConfiguration = new YarnConfiguration(); String configurationDirectory = CliFrontend.getConfigurationDirectoryFromEnv(); Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(configurationDirectory); YarnClient yarnClient = YarnClient.createYarnClient(); yarnClient.start(); //报错 YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor( flinkConfiguration, yarnConfiguration, configurationDirectory, yarnClient, false); yarnClusterDescriptor.setLocalJarPath(new Path(configurationDirectory + "/../lib/*.jar")); ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder() .setMasterMemoryMB(256) .setTaskManagerMemoryMB(740) .setNumberTaskManagers(1) .setSlotsPerTaskManager(1) .createClusterSpecification(); ClusterClient<ApplicationId> clusterClient = yarnClusterDescriptor.deployJobCluster( clusterSpecification, jobGraph, true); //报错 clusterClient.submitJob(jobGraph, StreamSQLPreJob.class.getClassLoader()); 在 2020-03-07 15:34:12,"tison" <[hidden email]> 写道: >OK 你先说一下你的 Flink 版本是啥,我感觉这个是很久以前的版本,差不多 1.7 >的样子,然后没有保密问题的话你的代码文件发一下,然后完整报错堆栈也发一下。 > >这个可能是你初始化 YarnClusterDescriptor 的时候有问题,你提供的都是很残缺的片段,没法猜测到底是啥原因。 > >一般来说 Flink 现在的 Client 实现并不是很好,直接使用 CLI 是不会有太多问题的,如果直接依赖 ClusterDescriptor >这些抽象里面有一些潜规则,你自己看的话可能得对着 Flink 的源码使用点逐步调试排查。 > >Best, >tison. > > >nicygan <[hidden email]> 于2020年3月7日周六 下午3:16写道: > >> tison,你好运行到这里时,报空指针 >> Caused by: java.lang.NullPointerException >> at >> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:506) >> >> getNodeReports方法中: >> GetClusterNodesResponse response = rmClient.getClusterNodes(request); >> 这句的rmClient为null值。 >> >> >> >> 我看YarnClientImpl中,有个start()可为rmClient赋值,但是,我加上此方法执行时又会报以下错误: >> Exception in thread "main" >> org.apache.hadoop.service.ServiceStateException: >> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl cannot enter state >> STARTED from state NOTINITED >> at >> org.apache.hadoop.service.ServiceStateModel.checkStateTransition(ServiceStateModel.java:129) >> at >> org.apache.hadoop.service.ServiceStateModel.enterState(ServiceStateModel.java:111) >> at >> org.apache.hadoop.service.AbstractService.start(AbstractService.java:190) >> >> >> >> >> >> >> >> >> 在 2020-03-07 11:15:10,"tison" <[hidden email]> 写道: >> >不成功的报错是啥? >> > >> >Best, >> >tison. >> > >> > >> >nicygan <[hidden email]> 于2020年3月7日周六 上午11:14写道: >> > >> >> dear all: >> >> >> >> >> 我用的flink版本1.9,现在我把flinksql语句生成了jobGroph,现在我不知道如何提交到yarn上,以pre-job方式运行。我尝试通过生成 >> >> yarnClusterDescriptor提交代码,但不成功,代码大致如下,各位有没有成功示例求分享。 >> >> >> >> ...... >> >> JobGraph jobGraph = env.getStreamGraph("StreamSql").getJobGraph(); >> >> ...... >> >> ...... >> >> yarnClusterDescriptor.deployJobCluster( >> >> clusterSpecification, >> >> jobGraph, true); >> >> >> >> >> |
报错前面加上这两行
final YarnConfiguration yarnConfiguration = new YarnConfiguration(); yarnClient.init(yarnConfiguration); 如果还不对就查一下 HADOOP_CLASSPATH 和 yarn-site 这些配置有没有正确配置上 Best, tison. nicygan <[hidden email]> 于2020年3月7日周六 下午4:53写道: > tison,你好。 > > > 版本是1.9,没啥隐私,代码如下: > JobGraph jobGraph = env.getStreamGraph("StreamSql").getJobGraph(); > > YarnConfiguration yarnConfiguration = new YarnConfiguration(); > String configurationDirectory = > CliFrontend.getConfigurationDirectoryFromEnv(); > Configuration flinkConfiguration = > GlobalConfiguration.loadConfiguration(configurationDirectory); > YarnClient yarnClient = YarnClient.createYarnClient(); > yarnClient.start(); //报错 > > YarnClusterDescriptor yarnClusterDescriptor = > new YarnClusterDescriptor( > flinkConfiguration, > yarnConfiguration, > configurationDirectory, > yarnClient, false); > > yarnClusterDescriptor.setLocalJarPath(new > Path(configurationDirectory + "/../lib/*.jar")); > > ClusterSpecification clusterSpecification = new > ClusterSpecification.ClusterSpecificationBuilder() > .setMasterMemoryMB(256) > .setTaskManagerMemoryMB(740) > .setNumberTaskManagers(1) > .setSlotsPerTaskManager(1) > .createClusterSpecification(); > > ClusterClient<ApplicationId> clusterClient = > yarnClusterDescriptor.deployJobCluster( > clusterSpecification, > jobGraph, true); //报错 > > clusterClient.submitJob(jobGraph, > StreamSQLPreJob.class.getClassLoader()); > > > > > 在 2020-03-07 15:34:12,"tison" <[hidden email]> 写道: > >OK 你先说一下你的 Flink 版本是啥,我感觉这个是很久以前的版本,差不多 1.7 > >的样子,然后没有保密问题的话你的代码文件发一下,然后完整报错堆栈也发一下。 > > > >这个可能是你初始化 YarnClusterDescriptor 的时候有问题,你提供的都是很残缺的片段,没法猜测到底是啥原因。 > > > >一般来说 Flink 现在的 Client 实现并不是很好,直接使用 CLI 是不会有太多问题的,如果直接依赖 ClusterDescriptor > >这些抽象里面有一些潜规则,你自己看的话可能得对着 Flink 的源码使用点逐步调试排查。 > > > >Best, > >tison. > > > > > >nicygan <[hidden email]> 于2020年3月7日周六 下午3:16写道: > > > >> tison,你好运行到这里时,报空指针 > >> Caused by: java.lang.NullPointerException > >> at > >> > org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:506) > >> > >> getNodeReports方法中: > >> GetClusterNodesResponse response = rmClient.getClusterNodes(request); > >> 这句的rmClient为null值。 > >> > >> > >> > >> 我看YarnClientImpl中,有个start()可为rmClient赋值,但是,我加上此方法执行时又会报以下错误: > >> Exception in thread "main" > >> org.apache.hadoop.service.ServiceStateException: > >> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl cannot enter state > >> STARTED from state NOTINITED > >> at > >> > org.apache.hadoop.service.ServiceStateModel.checkStateTransition(ServiceStateModel.java:129) > >> at > >> > org.apache.hadoop.service.ServiceStateModel.enterState(ServiceStateModel.java:111) > >> at > >> > org.apache.hadoop.service.AbstractService.start(AbstractService.java:190) > >> > >> > >> > >> > >> > >> > >> > >> > >> 在 2020-03-07 11:15:10,"tison" <[hidden email]> 写道: > >> >不成功的报错是啥? > >> > > >> >Best, > >> >tison. > >> > > >> > > >> >nicygan <[hidden email]> 于2020年3月7日周六 上午11:14写道: > >> > > >> >> dear all: > >> >> > >> >> > >> > 我用的flink版本1.9,现在我把flinksql语句生成了jobGroph,现在我不知道如何提交到yarn上,以pre-job方式运行。我尝试通过生成 > >> >> yarnClusterDescriptor提交代码,但不成功,代码大致如下,各位有没有成功示例求分享。 > >> >> > >> >> ...... > >> >> JobGraph jobGraph = env.getStreamGraph("StreamSql").getJobGraph(); > >> >> ...... > >> >> ...... > >> >> yarnClusterDescriptor.deployJobCluster( > >> >> clusterSpecification, > >> >> jobGraph, true); > >> >> > >> >> > >> > |
谢谢,看起来好像可以,暂时没报错了,只是还没有正确读到hadoop的配置。
在 2020-03-07 17:06:48,"tison" <[hidden email]> 写道: >报错前面加上这两行 > >final YarnConfiguration yarnConfiguration = new YarnConfiguration(); >yarnClient.init(yarnConfiguration); > >如果还不对就查一下 HADOOP_CLASSPATH 和 yarn-site 这些配置有没有正确配置上 > >Best, >tison. > > >nicygan <[hidden email]> 于2020年3月7日周六 下午4:53写道: > >> tison,你好。 >> >> >> 版本是1.9,没啥隐私,代码如下: >> JobGraph jobGraph = env.getStreamGraph("StreamSql").getJobGraph(); >> >> YarnConfiguration yarnConfiguration = new YarnConfiguration(); >> String configurationDirectory = >> CliFrontend.getConfigurationDirectoryFromEnv(); >> Configuration flinkConfiguration = >> GlobalConfiguration.loadConfiguration(configurationDirectory); >> YarnClient yarnClient = YarnClient.createYarnClient(); >> yarnClient.start(); //报错 >> >> YarnClusterDescriptor yarnClusterDescriptor = >> new YarnClusterDescriptor( >> flinkConfiguration, >> yarnConfiguration, >> configurationDirectory, >> yarnClient, false); >> >> yarnClusterDescriptor.setLocalJarPath(new >> Path(configurationDirectory + "/../lib/*.jar")); >> >> ClusterSpecification clusterSpecification = new >> ClusterSpecification.ClusterSpecificationBuilder() >> .setMasterMemoryMB(256) >> .setTaskManagerMemoryMB(740) >> .setNumberTaskManagers(1) >> .setSlotsPerTaskManager(1) >> .createClusterSpecification(); >> >> ClusterClient<ApplicationId> clusterClient = >> yarnClusterDescriptor.deployJobCluster( >> clusterSpecification, >> jobGraph, true); //报错 >> >> clusterClient.submitJob(jobGraph, >> StreamSQLPreJob.class.getClassLoader()); >> >> >> >> >> 在 2020-03-07 15:34:12,"tison" <[hidden email]> 写道: >> >OK 你先说一下你的 Flink 版本是啥,我感觉这个是很久以前的版本,差不多 1.7 >> >的样子,然后没有保密问题的话你的代码文件发一下,然后完整报错堆栈也发一下。 >> > >> >这个可能是你初始化 YarnClusterDescriptor 的时候有问题,你提供的都是很残缺的片段,没法猜测到底是啥原因。 >> > >> >一般来说 Flink 现在的 Client 实现并不是很好,直接使用 CLI 是不会有太多问题的,如果直接依赖 ClusterDescriptor >> >这些抽象里面有一些潜规则,你自己看的话可能得对着 Flink 的源码使用点逐步调试排查。 >> > >> >Best, >> >tison. >> > >> > >> >nicygan <[hidden email]> 于2020年3月7日周六 下午3:16写道: >> > >> >> tison,你好运行到这里时,报空指针 >> >> Caused by: java.lang.NullPointerException >> >> at >> >> >> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:506) >> >> >> >> getNodeReports方法中: >> >> GetClusterNodesResponse response = rmClient.getClusterNodes(request); >> >> 这句的rmClient为null值。 >> >> >> >> >> >> >> >> 我看YarnClientImpl中,有个start()可为rmClient赋值,但是,我加上此方法执行时又会报以下错误: >> >> Exception in thread "main" >> >> org.apache.hadoop.service.ServiceStateException: >> >> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl cannot enter state >> >> STARTED from state NOTINITED >> >> at >> >> >> org.apache.hadoop.service.ServiceStateModel.checkStateTransition(ServiceStateModel.java:129) >> >> at >> >> >> org.apache.hadoop.service.ServiceStateModel.enterState(ServiceStateModel.java:111) >> >> at >> >> >> org.apache.hadoop.service.AbstractService.start(AbstractService.java:190) >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-03-07 11:15:10,"tison" <[hidden email]> 写道: >> >> >不成功的报错是啥? >> >> > >> >> >Best, >> >> >tison. >> >> > >> >> > >> >> >nicygan <[hidden email]> 于2020年3月7日周六 上午11:14写道: >> >> > >> >> >> dear all: >> >> >> >> >> >> >> >> >> 我用的flink版本1.9,现在我把flinksql语句生成了jobGroph,现在我不知道如何提交到yarn上,以pre-job方式运行。我尝试通过生成 >> >> >> yarnClusterDescriptor提交代码,但不成功,代码大致如下,各位有没有成功示例求分享。 >> >> >> >> >> >> ...... >> >> >> JobGraph jobGraph = env.getStreamGraph("StreamSql").getJobGraph(); >> >> >> ...... >> >> >> ...... >> >> >> yarnClusterDescriptor.deployJobCluster( >> >> >> clusterSpecification, >> >> >> jobGraph, true); >> >> >> >> >> >> >> >> >> |
Free forum by Nabble | Edit this page |