Hi,
我们使用了TDH中的yarn(hadoop2.7,jdk1.7),原先使用spark的时候,也遇到过jdk版本最低1.8的情况,我们是通过如下方式解决的(考虑到现场公用TDH集群不能更改): spark-submit中可以通过添加参数 --conf "spark.yarn.appMasterEnv.JAVA_HOME=/usr/java/jdk1.8.0_25/" --conf "spark.executorEnv.JAVA_HOME=/usr/java/jdk1.8.0_25/"来指定当前spark任务特定在yarn中执行时的jdk目录 请问下flink启动&提交任务时(bin/yarn-session.sh&flink run)有没有参数可以指定flink特定使用yarn时的jdk目录,flink中是否有这样的功能?(不影响其他yarn上应用正常运行) ________________________________ [hidden email] |
Hi ,
Flink也支持传递环境变量的,也可以尝试一下: https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#configuration-runtime-environment-variables 郑 洁锋 <[hidden email]> 于2020年1月15日周三 上午11:34写道: > Hi, > > 我们使用了TDH中的yarn(hadoop2.7,jdk1.7),原先使用spark的时候,也遇到过jdk版本最低1.8的情况,我们是通过如下方式解决的(考虑到现场公用TDH集群不能更改): > spark-submit中可以通过添加参数 --conf > "spark.yarn.appMasterEnv.JAVA_HOME=/usr/java/jdk1.8.0_25/" --conf > "spark.executorEnv.JAVA_HOME=/usr/java/jdk1.8.0_25/"来指定当前spark任务特定在yarn中执行时的jdk目录 > > 请问下flink启动&提交任务时(bin/yarn-session.sh&flink > run)有没有参数可以指定flink特定使用yarn时的jdk目录,flink中是否有这样的功能?(不影响其他yarn上应用正常运行) > > > > ________________________________ > [hidden email] > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
Hi, 非常感谢,可以了,我在flink-conf.yaml中添加了如下配置项即可正常运行 yarn-session.sh了,且Flink Dashboard也能正常查看了
containerized.master.env.JAVA_HOME: /usr/java/jdk1.8.0_25/ containerized.taskmanager.env.JAVA_HOME: /usr/java/jdk1.8.0_25/ 但是在运行官方例子https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/getting-started/tutorials/local_setup.html#run-the-example,测试时报错了:
bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000
2020-01-15 00:46:25,574 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn properties file under /tmp/.yarn-properties-root.
2020-01-15 00:46:25,574 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn properties file under /tmp/.yarn-properties-root.
2020-01-15 00:46:26,097 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - YARN properties set default parallelism to 4
2020-01-15 00:46:26,097 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - YARN properties set default parallelism to 4
YARN properties set default parallelism to 4
2020-01-15 00:46:26,293 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2020-01-15 00:46:26,293 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2020-01-15 00:46:26,375 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Found application JobManager host name 'tdh-2' and port '8181' from supplied application id 'application_1578968334899_0010'
Starting execution of program
------------------------------------------------------------
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution result. (JobID: aaf4720f0c432de8d91b848838589c62)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:255)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.examples.socket.SocketWindowWordCount.main(SocketWindowWordCount.java:92)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:382)
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:263)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#54443530]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.
at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279)
at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283)
at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235)
at java.lang.Thread.run(Thread.java:745)
End of exception on server side>]
at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)
at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
... 4 more
有尝试过添加配置:
akka.ask.timeout: 20s
web.timeout: 20000
还是一样的报错(超时时间生效了,日志中变更了)。
而且dashboard也出现了问题:
这个有人遇到过吗,我搜索还有说是flink版本的问题,但是我已经是最新的1.9.1版本了,这个是还未修复的bug还是其他的问题?
|
玄学问题,升级 JDK 小版本可接,或与类型擦除有关
你可以share一下 JM 侧的日志,应该有作业执行异常 Best, tison. 郑 洁锋 <[hidden email]> 于2020年1月15日周三 下午2:17写道: > Hi, > > 非常感谢,可以了,我在flink-conf.yaml中添加了如下配置项即可正常运行 yarn-session.sh了,且Flink Dashboard也能正常查看了 > > containerized.master.env.JAVA_HOME: /usr/java/jdk1.8.0_25/ > > containerized.taskmanager.env.JAVA_HOME: /usr/java/jdk1.8.0_25/ > > > 但是在运行官方例子https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/getting-started/tutorials/local_setup.html#run-the-example,测试时报错了: > > > bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000 > 2020-01-15 00:46:25,574 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn properties file under /tmp/.yarn-properties-root. > 2020-01-15 00:46:25,574 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn properties file under /tmp/.yarn-properties-root. > 2020-01-15 00:46:26,097 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - YARN properties set default parallelism to 4 > 2020-01-15 00:46:26,097 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - YARN properties set default parallelism to 4 > YARN properties set default parallelism to 4 > 2020-01-15 00:46:26,293 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar > 2020-01-15 00:46:26,293 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar > 2020-01-15 00:46:26,375 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Found application JobManager host name 'tdh-2' and port '8181' from supplied application id 'application_1578968334899_0010' > Starting execution of program > ------------------------------------------------------------ > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution result. (JobID: aaf4720f0c432de8d91b848838589c62) > at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:255) > at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338) > at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60) > at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) > at org.apache.flink.streaming.examples.socket.SocketWindowWordCount.main(SocketWindowWordCount.java:92) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) > at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) > at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274) > at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746) > at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205) > at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010) > at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) > at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083) > Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph. > at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:382) > at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) > at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) > at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:263) > at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) > at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929) > at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side: > akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#54443530]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply. > at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) > at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) > at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648) > at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205) > at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) > at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) > at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) > at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328) > at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279) > at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283) > at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235) > at java.lang.Thread.run(Thread.java:745) > End of exception on server side>] > at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389) > at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373) > at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) > at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) > ... 4 more > > > 有尝试过添加配置: > akka.ask.timeout: 20s > web.timeout: 20000 > 还是一样的报错(超时时间生效了,日志中变更了)。 > > 而且dashboard也出现了问题: > 这个有人遇到过吗,我搜索还有说是flink版本的问题,但是我已经是最新的1.9.1版本了,这个是还未修复的bug还是其他的问题? > > ------------------------------ > [hidden email] > > > *发件人:* Benchao Li <[hidden email]> > *发送时间:* 2020-01-15 12:56 > *收件人:* user-zh <[hidden email]>; zjfplayer > <[hidden email]> > *主题:* Re: flink on yarn jdk版本问题 > Hi , > > Flink也支持传递环境变量的,也可以尝试一下: > https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#configuration-runtime-environment-variables > > 郑 洁锋 <[hidden email]> 于2020年1月15日周三 上午11:34写道: > >> Hi, >> >> 我们使用了TDH中的yarn(hadoop2.7,jdk1.7),原先使用spark的时候,也遇到过jdk版本最低1.8的情况,我们是通过如下方式解决的(考虑到现场公用TDH集群不能更改): >> spark-submit中可以通过添加参数 --conf >> "spark.yarn.appMasterEnv.JAVA_HOME=/usr/java/jdk1.8.0_25/" --conf >> "spark.executorEnv.JAVA_HOME=/usr/java/jdk1.8.0_25/"来指定当前spark任务特定在yarn中执行时的jdk目录 >> >> 请问下flink启动&提交任务时(bin/yarn-session.sh&flink >> run)有没有参数可以指定flink特定使用yarn时的jdk目录,flink中是否有这样的功能?(不影响其他yarn上应用正常运行) >> >> >> >> ________________________________ >> [hidden email] >> > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: [hidden email]; [hidden email] > > |
果然是,yarn上的jdk版本升级后就可以了。。
________________________________ [hidden email] 发件人: tison<mailto:[hidden email]> 发送时间: 2020-01-15 14:22 收件人: user-zh<mailto:[hidden email]> 抄送: Benchao Li<mailto:[hidden email]> 主题: Re: Re: flink on yarn jdk版本问题 玄学问题,升级 JDK 小版本可接,或与类型擦除有关 你可以share一下 JM 侧的日志,应该有作业执行异常 Best, tison. 郑 洁锋 <[hidden email]> 于2020年1月15日周三 下午2:17写道: > Hi, > > 非常感谢,可以了,我在flink-conf.yaml中添加了如下配置项即可正常运行 yarn-session.sh了,且Flink Dashboard也能正常查看了 > > containerized.master.env.JAVA_HOME: /usr/java/jdk1.8.0_25/ > > containerized.taskmanager.env.JAVA_HOME: /usr/java/jdk1.8.0_25/ > > > 但是在运行官方例子https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/getting-started/tutorials/local_setup.html#run-the-example,测试时报错了: > > > bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000 > 2020-01-15 00:46:25,574 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn properties file under /tmp/.yarn-properties-root. > 2020-01-15 00:46:25,574 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn properties file under /tmp/.yarn-properties-root. > 2020-01-15 00:46:26,097 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - YARN properties set default parallelism to 4 > 2020-01-15 00:46:26,097 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - YARN properties set default parallelism to 4 > YARN properties set default parallelism to 4 > 2020-01-15 00:46:26,293 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar > 2020-01-15 00:46:26,293 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar > 2020-01-15 00:46:26,375 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Found application JobManager host name 'tdh-2' and port '8181' from supplied application id 'application_1578968334899_0010' > Starting execution of program > ------------------------------------------------------------ > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution result. (JobID: aaf4720f0c432de8d91b848838589c62) > at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:255) > at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338) > at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60) > at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) > at org.apache.flink.streaming.examples.socket.SocketWindowWordCount.main(SocketWindowWordCount.java:92) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) > at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) > at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274) > at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746) > at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205) > at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010) > at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) > at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083) > Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph. > at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:382) > at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) > at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) > at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:263) > at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) > at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929) > at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side: > akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#54443530]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply. > at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) > at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) > at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648) > at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205) > at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) > at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) > at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) > at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328) > at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279) > at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283) > at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235) > at java.lang.Thread.run(Thread.java:745) > End of exception on server side>] > at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389) > at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373) > at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) > at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) > ... 4 more > > > 有尝试过添加配置: > akka.ask.timeout: 20s > web.timeout: 20000 > 还是一样的报错(超时时间生效了,日志中变更了)。 > > 而且dashboard也出现了问题: > 这个有人遇到过吗,我搜索还有说是flink版本的问题,但是我已经是最新的1.9.1版本了,这个是还未修复的bug还是其他的问题? > > ------------------------------ > [hidden email] > > > *发件人:* Benchao Li <[hidden email]> > *发送时间:* 2020-01-15 12:56 > *收件人:* user-zh <[hidden email]>; zjfplayer > <[hidden email]> > *主题:* Re: flink on yarn jdk版本问题 > Hi , > > Flink也支持传递环境变量的,也可以尝试一下: > https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#configuration-runtime-environment-variables > > 郑 洁锋 <[hidden email]> 于2020年1月15日周三 上午11:34写道: > >> Hi, >> >> 我们使用了TDH中的yarn(hadoop2.7,jdk1.7),原先使用spark的时候,也遇到过jdk版本最低1.8的情况,我们是通过如下方式解决的(考虑到现场公用TDH集群不能更改): >> spark-submit中可以通过添加参数 --conf >> "spark.yarn.appMasterEnv.JAVA_HOME=/usr/java/jdk1.8.0_25/" --conf >> "spark.executorEnv.JAVA_HOME=/usr/java/jdk1.8.0_25/"来指定当前spark任务特定在yarn中执行时的jdk目录 >> >> 请问下flink启动&提交任务时(bin/yarn-session.sh&flink >> run)有没有参数可以指定flink特定使用yarn时的jdk目录,flink中是否有这样的功能?(不影响其他yarn上应用正常运行) >> >> >> >> ________________________________ >> [hidden email] >> > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: [hidden email]; [hidden email] > > |
MiniCluster代码执行过程中报错:
报错段代码如下: Configuration config = GlobalConfiguration.loadConfiguration(flinkConfDir); 其中flinkConfDir为/opt/flink/conf
flink standalone HA集群信息如下:
|
你 MiniCluster 要 start 啊(x
Best, tison. 郑 洁锋 <[hidden email]> 于2020年1月16日周四 上午11:38写道: > MiniCluster代码执行过程中报错: > > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". > SLF4J: Defaulting to no-operation (NOP) logger implementation > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. > Exception in thread "main" java.lang.IllegalStateException: MiniCluster is not yet running. > at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) > at org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223) > at org.apache.flink.client.program.MiniClusterClient.<init>(MiniClusterClient.java:61) > at com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82) > at com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69) > at com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99) > > 报错段代码如下: > > Configuration config = GlobalConfiguration.loadConfiguration(flinkConfDir); > MiniClusterConfiguration.Builder configBuilder = new MiniClusterConfiguration.Builder(); > configBuilder.setConfiguration(config); > MiniCluster miniCluster = new MiniCluster(configBuilder.build()); > MiniClusterClient clusterClient = new MiniClusterClient(config, miniCluster); > > 其中flinkConfDir为/opt/flink/conf > > > flink standalone HA集群信息如下: > ------------------------------ > [hidden email] > > > |
MiniCluster有没有相关文档,官方文档里面看到部署模式里面没有相关的内容?
我是通过bin/start-cluster.sh启动的flink standalone集群 ________________________________ [hidden email] 发件人: tison<mailto:[hidden email]> 发送时间: 2020-01-16 12:39 收件人: user-zh<mailto:[hidden email]> 主题: Re: MiniCluster问题 你 MiniCluster 要 start 啊(x Best, tison. 郑 洁锋 <[hidden email]> 于2020年1月16日周四 上午11:38写道: > MiniCluster代码执行过程中报错: > > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". > SLF4J: Defaulting to no-operation (NOP) logger implementation > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. > Exception in thread "main" java.lang.IllegalStateException: MiniCluster is not yet running. > at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) > at org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223) > at org.apache.flink.client.program.MiniClusterClient.<init>(MiniClusterClient.java:61) > at com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82) > at com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69) > at com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99) > > 报错段代码如下: > > Configuration config = GlobalConfiguration.loadConfiguration(flinkConfDir); > MiniClusterConfiguration.Builder configBuilder = new MiniClusterConfiguration.Builder(); > configBuilder.setConfiguration(config); > MiniCluster miniCluster = new MiniCluster(configBuilder.build()); > MiniClusterClient clusterClient = new MiniClusterClient(config, miniCluster); > > 其中flinkConfDir为/opt/flink/conf > > > flink standalone HA集群信息如下: > ------------------------------ > [hidden email] > > > |
1. 这是个内部实现类,文档是一个问题,但最好不要直接依赖
2. ?你截图的代码片段在哪?我看是你自己代码创建使用的啊 Best, tison. 郑 洁锋 <[hidden email]> 于2020年1月16日周四 下午1:18写道: > MiniCluster有没有相关文档,官方文档里面看到部署模式里面没有相关的内容? > 我是通过bin/start-cluster.sh启动的flink standalone集群 > > > ________________________________ > [hidden email] > > 发件人: tison<mailto:[hidden email]> > 发送时间: 2020-01-16 12:39 > 收件人: user-zh<mailto:[hidden email]> > 主题: Re: MiniCluster问题 > 你 MiniCluster 要 start 啊(x > > Best, > tison. > > > 郑 洁锋 <[hidden email]> 于2020年1月16日周四 上午11:38写道: > > > MiniCluster代码执行过程中报错: > > > > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". > > SLF4J: Defaulting to no-operation (NOP) logger implementation > > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for > further details. > > Exception in thread "main" java.lang.IllegalStateException: MiniCluster > is not yet running. > > at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) > > at > org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223) > > at > org.apache.flink.client.program.MiniClusterClient.<init>(MiniClusterClient.java:61) > > at > com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82) > > at > com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69) > > at > com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99) > > > > 报错段代码如下: > > > > Configuration config = > GlobalConfiguration.loadConfiguration(flinkConfDir); > > MiniClusterConfiguration.Builder configBuilder = new > MiniClusterConfiguration.Builder(); > > configBuilder.setConfiguration(config); > > MiniCluster miniCluster = new MiniCluster(configBuilder.build()); > > MiniClusterClient clusterClient = new MiniClusterClient(config, > miniCluster); > > > > 其中flinkConfDir为/opt/flink/conf > > > > > > flink standalone HA集群信息如下: > > ------------------------------ > > [hidden email] > > > > > > > |
跟集群无关
Best, tison. tison <[hidden email]> 于2020年1月16日周四 下午1:30写道: > 1. 这是个内部实现类,文档是一个问题,但最好不要直接依赖 > > 2. ?你截图的代码片段在哪?我看是你自己代码创建使用的啊 > > Best, > tison. > > > 郑 洁锋 <[hidden email]> 于2020年1月16日周四 下午1:18写道: > >> MiniCluster有没有相关文档,官方文档里面看到部署模式里面没有相关的内容? >> 我是通过bin/start-cluster.sh启动的flink standalone集群 >> >> >> ________________________________ >> [hidden email] >> >> 发件人: tison<mailto:[hidden email]> >> 发送时间: 2020-01-16 12:39 >> 收件人: user-zh<mailto:[hidden email]> >> 主题: Re: MiniCluster问题 >> 你 MiniCluster 要 start 啊(x >> >> Best, >> tison. >> >> >> 郑 洁锋 <[hidden email]> 于2020年1月16日周四 上午11:38写道: >> >> > MiniCluster代码执行过程中报错: >> > >> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". >> > SLF4J: Defaulting to no-operation (NOP) logger implementation >> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for >> further details. >> > Exception in thread "main" java.lang.IllegalStateException: MiniCluster >> is not yet running. >> > at >> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) >> > at >> org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223) >> > at >> org.apache.flink.client.program.MiniClusterClient.<init>(MiniClusterClient.java:61) >> > at >> com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82) >> > at >> com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69) >> > at >> com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99) >> > >> > 报错段代码如下: >> > >> > Configuration config = >> GlobalConfiguration.loadConfiguration(flinkConfDir); >> > MiniClusterConfiguration.Builder configBuilder = new >> MiniClusterConfiguration.Builder(); >> > configBuilder.setConfiguration(config); >> > MiniCluster miniCluster = new MiniCluster(configBuilder.build()); >> > MiniClusterClient clusterClient = new MiniClusterClient(config, >> miniCluster); >> > >> > 其中flinkConfDir为/opt/flink/conf >> > >> > >> > flink standalone HA集群信息如下: >> > ------------------------------ >> > [hidden email] >> > >> > >> > >> > |
MiniCluster miniCluster = new MiniCluster(configBuilder.build());
miniCluster.start(); MiniClusterClient clusterClient = new MiniClusterClient(config, miniCluster) ; Best, tison. tison <[hidden email]> 于2020年1月16日周四 下午1:30写道: > 跟集群无关 > Best, > tison. > > > tison <[hidden email]> 于2020年1月16日周四 下午1:30写道: > >> 1. 这是个内部实现类,文档是一个问题,但最好不要直接依赖 >> >> 2. ?你截图的代码片段在哪?我看是你自己代码创建使用的啊 >> >> Best, >> tison. >> >> >> 郑 洁锋 <[hidden email]> 于2020年1月16日周四 下午1:18写道: >> >>> MiniCluster有没有相关文档,官方文档里面看到部署模式里面没有相关的内容? >>> 我是通过bin/start-cluster.sh启动的flink standalone集群 >>> >>> >>> ________________________________ >>> [hidden email] >>> >>> 发件人: tison<mailto:[hidden email]> >>> 发送时间: 2020-01-16 12:39 >>> 收件人: user-zh<mailto:[hidden email]> >>> 主题: Re: MiniCluster问题 >>> 你 MiniCluster 要 start 啊(x >>> >>> Best, >>> tison. >>> >>> >>> 郑 洁锋 <[hidden email]> 于2020年1月16日周四 上午11:38写道: >>> >>> > MiniCluster代码执行过程中报错: >>> > >>> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". >>> > SLF4J: Defaulting to no-operation (NOP) logger implementation >>> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for >>> further details. >>> > Exception in thread "main" java.lang.IllegalStateException: >>> MiniCluster is not yet running. >>> > at >>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) >>> > at >>> org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223) >>> > at >>> org.apache.flink.client.program.MiniClusterClient.<init>(MiniClusterClient.java:61) >>> > at >>> com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82) >>> > at >>> com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69) >>> > at >>> com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99) >>> > >>> > 报错段代码如下: >>> > >>> > Configuration config = >>> GlobalConfiguration.loadConfiguration(flinkConfDir); >>> > MiniClusterConfiguration.Builder configBuilder = new >>> MiniClusterConfiguration.Builder(); >>> > configBuilder.setConfiguration(config); >>> > MiniCluster miniCluster = new MiniCluster(configBuilder.build()); >>> > MiniClusterClient clusterClient = new MiniClusterClient(config, >>> miniCluster); >>> > >>> > 其中flinkConfDir为/opt/flink/conf >>> > >>> > >>> > flink standalone HA集群信息如下: >>> > ------------------------------ >>> > [hidden email] >>> > >>> > >>> > >>> >> |
In reply to this post by tison
Hi
可以参考org.apache.flink.streaming.api.environment.LocalStreamEnvironment:: execute public JobExecutionResult execute(String jobName) throws Exception { // transform the streaming program into a JobGraph StreamGraph streamGraph = getStreamGraph(); streamGraph.setJobName(jobName); JobGraph jobGraph = streamGraph.getJobGraph(); jobGraph.setAllowQueuedScheduling(true); Configuration configuration = new Configuration(); configuration.addAll(jobGraph.getJobConfiguration()); configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0"); // add (and override) the settings with what the user defined configuration.addAll(this.configuration); if (!configuration.contains(RestOptions.BIND_PORT)) { configuration.setString(RestOptions.BIND_PORT, "0"); } int numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism()); MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder() .setConfiguration(configuration) .setNumSlotsPerTaskManager(numSlotsPerTaskManager) .build(); if (LOG.isInfoEnabled()) { LOG.info("Running job on local embedded Flink mini cluster"); } MiniCluster miniCluster = new MiniCluster(cfg); try { miniCluster.start(); configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().get().getPort()); return miniCluster.executeJobBlocking(jobGraph); } finally { transformations.clear(); miniCluster.close(); } } } Best, Eleanore On Wed, Jan 15, 2020 at 8:40 PM tison <[hidden email]> wrote: > 你 MiniCluster 要 start 啊(x > > Best, > tison. > > > 郑 洁锋 <[hidden email]> 于2020年1月16日周四 上午11:38写道: > > > MiniCluster代码执行过程中报错: > > > > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". > > SLF4J: Defaulting to no-operation (NOP) logger implementation > > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for > further details. > > Exception in thread "main" java.lang.IllegalStateException: MiniCluster > is not yet running. > > at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) > > at > org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223) > > at > org.apache.flink.client.program.MiniClusterClient.<init>(MiniClusterClient.java:61) > > at > com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82) > > at > com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69) > > at > com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99) > > > > 报错段代码如下: > > > > Configuration config = > GlobalConfiguration.loadConfiguration(flinkConfDir); > > MiniClusterConfiguration.Builder configBuilder = new > MiniClusterConfiguration.Builder(); > > configBuilder.setConfiguration(config); > > MiniCluster miniCluster = new MiniCluster(configBuilder.build()); > > MiniClusterClient clusterClient = new MiniClusterClient(config, > miniCluster); > > > > 其中flinkConfDir为/opt/flink/conf > > > > > > flink standalone HA集群信息如下: > > ------------------------------ > > [hidden email] > > > > > > > |
In reply to this post by tison
而且不对啊,你起了 standalone 集群还要 MiniCluster 干嘛...
Best, tison. tison <[hidden email]> 于2020年1月16日周四 下午1:31写道: > MiniCluster miniCluster = new MiniCluster(configBuilder.build()); > > miniCluster.start(); > > > MiniClusterClient clusterClient = new MiniClusterClient(config, > miniCluster); > > Best, > tison. > > > tison <[hidden email]> 于2020年1月16日周四 下午1:30写道: > >> 跟集群无关 >> Best, >> tison. >> >> >> tison <[hidden email]> 于2020年1月16日周四 下午1:30写道: >> >>> 1. 这是个内部实现类,文档是一个问题,但最好不要直接依赖 >>> >>> 2. ?你截图的代码片段在哪?我看是你自己代码创建使用的啊 >>> >>> Best, >>> tison. >>> >>> >>> 郑 洁锋 <[hidden email]> 于2020年1月16日周四 下午1:18写道: >>> >>>> MiniCluster有没有相关文档,官方文档里面看到部署模式里面没有相关的内容? >>>> 我是通过bin/start-cluster.sh启动的flink standalone集群 >>>> >>>> >>>> ________________________________ >>>> [hidden email] >>>> >>>> 发件人: tison<mailto:[hidden email]> >>>> 发送时间: 2020-01-16 12:39 >>>> 收件人: user-zh<mailto:[hidden email]> >>>> 主题: Re: MiniCluster问题 >>>> 你 MiniCluster 要 start 啊(x >>>> >>>> Best, >>>> tison. >>>> >>>> >>>> 郑 洁锋 <[hidden email]> 于2020年1月16日周四 上午11:38写道: >>>> >>>> > MiniCluster代码执行过程中报错: >>>> > >>>> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". >>>> > SLF4J: Defaulting to no-operation (NOP) logger implementation >>>> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for >>>> further details. >>>> > Exception in thread "main" java.lang.IllegalStateException: >>>> MiniCluster is not yet running. >>>> > at >>>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) >>>> > at >>>> org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223) >>>> > at >>>> org.apache.flink.client.program.MiniClusterClient.<init>(MiniClusterClient.java:61) >>>> > at >>>> com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82) >>>> > at >>>> com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69) >>>> > at >>>> com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99) >>>> > >>>> > 报错段代码如下: >>>> > >>>> > Configuration config = >>>> GlobalConfiguration.loadConfiguration(flinkConfDir); >>>> > MiniClusterConfiguration.Builder configBuilder = new >>>> MiniClusterConfiguration.Builder(); >>>> > configBuilder.setConfiguration(config); >>>> > MiniCluster miniCluster = new MiniCluster(configBuilder.build()); >>>> > MiniClusterClient clusterClient = new MiniClusterClient(config, >>>> miniCluster); >>>> > >>>> > 其中flinkConfDir为/opt/flink/conf >>>> > >>>> > >>>> > flink standalone HA集群信息如下: >>>> > ------------------------------ >>>> > [hidden email] >>>> > >>>> > >>>> > >>>> >>> |
In reply to this post by tison
我们这边是在尝试使用github上的袋鼠云的flinkStreamSQL项目,其中的flink standalone模式,报了这个错误
________________________________ [hidden email] 发件人: tison<mailto:[hidden email]> 发送时间: 2020-01-16 13:30 收件人: user-zh<mailto:[hidden email]> 主题: Re: Re: MiniCluster问题 1. 这是个内部实现类,文档是一个问题,但最好不要直接依赖 2. ?你截图的代码片段在哪?我看是你自己代码创建使用的啊 Best, tison. 郑 洁锋 <[hidden email]> 于2020年1月16日周四 下午1:18写道: > MiniCluster有没有相关文档,官方文档里面看到部署模式里面没有相关的内容? > 我是通过bin/start-cluster.sh启动的flink standalone集群 > > > ________________________________ > [hidden email] > > 发件人: tison<mailto:[hidden email]> > 发送时间: 2020-01-16 12:39 > 收件人: user-zh<mailto:[hidden email]> > 主题: Re: MiniCluster问题 > 你 MiniCluster 要 start 啊(x > > Best, > tison. > > > 郑 洁锋 <[hidden email]> 于2020年1月16日周四 上午11:38写道: > > > MiniCluster代码执行过程中报错: > > > > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". > > SLF4J: Defaulting to no-operation (NOP) logger implementation > > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for > further details. > > Exception in thread "main" java.lang.IllegalStateException: MiniCluster > is not yet running. > > at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) > > at > org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223) > > at > org.apache.flink.client.program.MiniClusterClient.<init>(MiniClusterClient.java:61) > > at > com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82) > > at > com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69) > > at > com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99) > > > > 报错段代码如下: > > > > Configuration config = > GlobalConfiguration.loadConfiguration(flinkConfDir); > > MiniClusterConfiguration.Builder configBuilder = new > MiniClusterConfiguration.Builder(); > > configBuilder.setConfiguration(config); > > MiniCluster miniCluster = new MiniCluster(configBuilder.build()); > > MiniClusterClient clusterClient = new MiniClusterClient(config, > miniCluster); > > > > 其中flinkConfDir为/opt/flink/conf > > > > > > flink standalone HA集群信息如下: > > ------------------------------ > > [hidden email] > > > > > > > |
In reply to this post by tison
这是完整的到启动的代码
public class ClusterClientFactory { public static ClusterClient createClusterClient(Options launcherOptions) throws Exception { String mode = launcherOptions.getMode(); if(mode.equals(ClusterMode.standalone.name())) { return createStandaloneClient(launcherOptions); } else if(mode.equals(ClusterMode.yarn.name())) { return createYarnClient(launcherOptions,mode); } throw new IllegalArgumentException("Unsupported cluster client type: "); } public static ClusterClient createStandaloneClient(Options launcherOptions) throws Exception { String flinkConfDir = launcherOptions.getFlinkconf(); Configuration config = GlobalConfiguration.loadConfiguration(flinkConfDir); MiniClusterConfiguration.Builder configBuilder = new MiniClusterConfiguration.Builder(); configBuilder.setConfiguration(config); MiniCluster miniCluster = new MiniCluster(configBuilder.build()); MiniClusterClient clusterClient = new MiniClusterClient(config, miniCluster); LeaderConnectionInfo connectionInfo = clusterClient.getClusterConnectionInfo(); InetSocketAddress address = AkkaUtils.getInetSocketAddressFromAkkaURL(connectionInfo.getAddress()); config.setString(JobManagerOptions.ADDRESS, address.getAddress().getHostName()); config.setInteger(JobManagerOptions.PORT, address.getPort()); clusterClient.setDetached(true); return clusterClient; } 启动类中: ClusterClient clusterClient = ClusterClientFactory.createClusterClient(launcherOptions); clusterClient.run(program, 1); clusterClient.shutdown(); ________________________________ [hidden email] 发件人: tison<mailto:[hidden email]> 发送时间: 2020-01-16 13:31 收件人: user-zh<mailto:[hidden email]> 主题: Re: Re: MiniCluster问题 MiniCluster miniCluster = new MiniCluster(configBuilder.build()); miniCluster.start(); MiniClusterClient clusterClient = new MiniClusterClient(config, miniCluster) ; Best, tison. tison <[hidden email]> 于2020年1月16日周四 下午1:30写道: > 跟集群无关 > Best, > tison. > > > tison <[hidden email]> 于2020年1月16日周四 下午1:30写道: > >> 1. 这是个内部实现类,文档是一个问题,但最好不要直接依赖 >> >> 2. ?你截图的代码片段在哪?我看是你自己代码创建使用的啊 >> >> Best, >> tison. >> >> >> 郑 洁锋 <[hidden email]> 于2020年1月16日周四 下午1:18写道: >> >>> MiniCluster有没有相关文档,官方文档里面看到部署模式里面没有相关的内容? >>> 我是通过bin/start-cluster.sh启动的flink standalone集群 >>> >>> >>> ________________________________ >>> [hidden email] >>> >>> 发件人: tison<mailto:[hidden email]> >>> 发送时间: 2020-01-16 12:39 >>> 收件人: user-zh<mailto:[hidden email]> >>> 主题: Re: MiniCluster问题 >>> 你 MiniCluster 要 start 啊(x >>> >>> Best, >>> tison. >>> >>> >>> 郑 洁锋 <[hidden email]> 于2020年1月16日周四 上午11:38写道: >>> >>> > MiniCluster代码执行过程中报错: >>> > >>> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". >>> > SLF4J: Defaulting to no-operation (NOP) logger implementation >>> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for >>> further details. >>> > Exception in thread "main" java.lang.IllegalStateException: >>> MiniCluster is not yet running. >>> > at >>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) >>> > at >>> org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223) >>> > at >>> org.apache.flink.client.program.MiniClusterClient.<init>(MiniClusterClient.java:61) >>> > at >>> com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82) >>> > at >>> com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69) >>> > at >>> com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99) >>> > >>> > 报错段代码如下: >>> > >>> > Configuration config = >>> GlobalConfiguration.loadConfiguration(flinkConfDir); >>> > MiniClusterConfiguration.Builder configBuilder = new >>> MiniClusterConfiguration.Builder(); >>> > configBuilder.setConfiguration(config); >>> > MiniCluster miniCluster = new MiniCluster(configBuilder.build()); >>> > MiniClusterClient clusterClient = new MiniClusterClient(config, >>> miniCluster); >>> > >>> > 其中flinkConfDir为/opt/flink/conf >>> > >>> > >>> > flink standalone HA集群信息如下: >>> > ------------------------------ >>> > [hidden email] >>> > >>> > >>> > >>> >> |
因为我这边看他是standalone,然后看到flink官方文档里面部署模式也有standalone模式,所以按照那个模式部署后来尝试
________________________________ [hidden email] 发件人: 郑 洁锋<mailto:[hidden email]> 发送时间: 2020-01-16 14:24 收件人: user-zh<mailto:[hidden email]> 主题: Re: Re: MiniCluster问题 这是完整的到启动的代码 public class ClusterClientFactory { public static ClusterClient createClusterClient(Options launcherOptions) throws Exception { String mode = launcherOptions.getMode(); if(mode.equals(ClusterMode.standalone.name())) { return createStandaloneClient(launcherOptions); } else if(mode.equals(ClusterMode.yarn.name())) { return createYarnClient(launcherOptions,mode); } throw new IllegalArgumentException("Unsupported cluster client type: "); } public static ClusterClient createStandaloneClient(Options launcherOptions) throws Exception { String flinkConfDir = launcherOptions.getFlinkconf(); Configuration config = GlobalConfiguration.loadConfiguration(flinkConfDir); MiniClusterConfiguration.Builder configBuilder = new MiniClusterConfiguration.Builder(); configBuilder.setConfiguration(config); MiniCluster miniCluster = new MiniCluster(configBuilder.build()); MiniClusterClient clusterClient = new MiniClusterClient(config, miniCluster); LeaderConnectionInfo connectionInfo = clusterClient.getClusterConnectionInfo(); InetSocketAddress address = AkkaUtils.getInetSocketAddressFromAkkaURL(connectionInfo.getAddress()); config.setString(JobManagerOptions.ADDRESS, address.getAddress().getHostName()); config.setInteger(JobManagerOptions.PORT, address.getPort()); clusterClient.setDetached(true); return clusterClient; } 启动类中: ClusterClient clusterClient = ClusterClientFactory.createClusterClient(launcherOptions); clusterClient.run(program, 1); clusterClient.shutdown(); ________________________________ [hidden email] 发件人: tison<mailto:[hidden email]> 发送时间: 2020-01-16 13:31 收件人: user-zh<mailto:[hidden email]> 主题: Re: Re: MiniCluster问题 MiniCluster miniCluster = new MiniCluster(configBuilder.build()); miniCluster.start(); MiniClusterClient clusterClient = new MiniClusterClient(config, miniCluster) ; Best, tison. tison <[hidden email]> 于2020年1月16日周四 下午1:30写道: > 跟集群无关 > Best, > tison. > > > tison <[hidden email]> 于2020年1月16日周四 下午1:30写道: > >> 1. 这是个内部实现类,文档是一个问题,但最好不要直接依赖 >> >> 2. ?你截图的代码片段在哪?我看是你自己代码创建使用的啊 >> >> Best, >> tison. >> >> >> 郑 洁锋 <[hidden email]> 于2020年1月16日周四 下午1:18写道: >> >>> MiniCluster有没有相关文档,官方文档里面看到部署模式里面没有相关的内容? >>> 我是通过bin/start-cluster.sh启动的flink standalone集群 >>> >>> >>> ________________________________ >>> [hidden email] >>> >>> 发件人: tison<mailto:[hidden email]> >>> 发送时间: 2020-01-16 12:39 >>> 收件人: user-zh<mailto:[hidden email]> >>> 主题: Re: MiniCluster问题 >>> 你 MiniCluster 要 start 啊(x >>> >>> Best, >>> tison. >>> >>> >>> 郑 洁锋 <[hidden email]> 于2020年1月16日周四 上午11:38写道: >>> >>> > MiniCluster代码执行过程中报错: >>> > >>> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". >>> > SLF4J: Defaulting to no-operation (NOP) logger implementation >>> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for >>> further details. >>> > Exception in thread "main" java.lang.IllegalStateException: >>> MiniCluster is not yet running. >>> > at >>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) >>> > at >>> org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223) >>> > at >>> org.apache.flink.client.program.MiniClusterClient.<init>(MiniClusterClient.java:61) >>> > at >>> com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82) >>> > at >>> com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69) >>> > at >>> com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99) >>> > >>> > 报错段代码如下: >>> > >>> > Configuration config = >>> GlobalConfiguration.loadConfiguration(flinkConfDir); >>> > MiniClusterConfiguration.Builder configBuilder = new >>> MiniClusterConfiguration.Builder(); >>> > configBuilder.setConfiguration(config); >>> > MiniCluster miniCluster = new MiniCluster(configBuilder.build()); >>> > MiniClusterClient clusterClient = new MiniClusterClient(config, >>> miniCluster); >>> > >>> > 其中flinkConfDir为/opt/flink/conf >>> > >>> > >>> > flink standalone HA集群信息如下: >>> > ------------------------------ >>> > [hidden email] >>> > >>> > >>> > >>> >> |
你这完全是把几个概念混在一起了,MiniCluster 就是一个集群,是一个内部的部署模式,跟 standalone
是平行的概念。我看不懂你要干什么,但是就你的问题来说,我上面说了,就是你 MiniCluster new 出来之后没 start。但我不确定这个是不是你要的效果,MiniCluster 和 standalone 是平行的两种东西。 Best, tison. 郑 洁锋 <[hidden email]> 于2020年1月16日周四 下午2:27写道: > 因为我这边看他是standalone,然后看到flink官方文档里面部署模式也有standalone模式,所以按照那个模式部署后来尝试 > > ________________________________ > [hidden email] > > 发件人: 郑 洁锋<mailto:[hidden email]> > 发送时间: 2020-01-16 14:24 > 收件人: user-zh<mailto:[hidden email]> > 主题: Re: Re: MiniCluster问题 > 这是完整的到启动的代码 > > public class ClusterClientFactory { > > public static ClusterClient createClusterClient(Options > launcherOptions) throws Exception { > String mode = launcherOptions.getMode(); > if(mode.equals(ClusterMode.standalone.name())) { > return createStandaloneClient(launcherOptions); > } else if(mode.equals(ClusterMode.yarn.name())) { > return createYarnClient(launcherOptions,mode); > } > throw new IllegalArgumentException("Unsupported cluster client > type: "); > } > > public static ClusterClient createStandaloneClient(Options > launcherOptions) throws Exception { > String flinkConfDir = launcherOptions.getFlinkconf(); > Configuration config = > GlobalConfiguration.loadConfiguration(flinkConfDir); > MiniClusterConfiguration.Builder configBuilder = new > MiniClusterConfiguration.Builder(); > configBuilder.setConfiguration(config); > MiniCluster miniCluster = new MiniCluster(configBuilder.build()); > MiniClusterClient clusterClient = new MiniClusterClient(config, > miniCluster); > LeaderConnectionInfo connectionInfo = > clusterClient.getClusterConnectionInfo(); > InetSocketAddress address = > AkkaUtils.getInetSocketAddressFromAkkaURL(connectionInfo.getAddress()); > config.setString(JobManagerOptions.ADDRESS, > address.getAddress().getHostName()); > config.setInteger(JobManagerOptions.PORT, address.getPort()); > clusterClient.setDetached(true); > return clusterClient; > } > > > 启动类中: > > ClusterClient clusterClient = > ClusterClientFactory.createClusterClient(launcherOptions); > clusterClient.run(program, 1); > clusterClient.shutdown(); > > ________________________________ > [hidden email] > > 发件人: tison<mailto:[hidden email]> > 发送时间: 2020-01-16 13:31 > 收件人: user-zh<mailto:[hidden email]> > 主题: Re: Re: MiniCluster问题 > MiniCluster miniCluster = new MiniCluster(configBuilder.build()); > > miniCluster.start(); > > > MiniClusterClient clusterClient = new MiniClusterClient(config, > miniCluster) > ; > > Best, > tison. > > > tison <[hidden email]> 于2020年1月16日周四 下午1:30写道: > > > 跟集群无关 > > Best, > > tison. > > > > > > tison <[hidden email]> 于2020年1月16日周四 下午1:30写道: > > > >> 1. 这是个内部实现类,文档是一个问题,但最好不要直接依赖 > >> > >> 2. ?你截图的代码片段在哪?我看是你自己代码创建使用的啊 > >> > >> Best, > >> tison. > >> > >> > >> 郑 洁锋 <[hidden email]> 于2020年1月16日周四 下午1:18写道: > >> > >>> MiniCluster有没有相关文档,官方文档里面看到部署模式里面没有相关的内容? > >>> 我是通过bin/start-cluster.sh启动的flink standalone集群 > >>> > >>> > >>> ________________________________ > >>> [hidden email] > >>> > >>> 发件人: tison<mailto:[hidden email]> > >>> 发送时间: 2020-01-16 12:39 > >>> 收件人: user-zh<mailto:[hidden email]> > >>> 主题: Re: MiniCluster问题 > >>> 你 MiniCluster 要 start 啊(x > >>> > >>> Best, > >>> tison. > >>> > >>> > >>> 郑 洁锋 <[hidden email]> 于2020年1月16日周四 上午11:38写道: > >>> > >>> > MiniCluster代码执行过程中报错: > >>> > > >>> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". > >>> > SLF4J: Defaulting to no-operation (NOP) logger implementation > >>> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for > >>> further details. > >>> > Exception in thread "main" java.lang.IllegalStateException: > >>> MiniCluster is not yet running. > >>> > at > >>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) > >>> > at > >>> > org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223) > >>> > at > >>> > org.apache.flink.client.program.MiniClusterClient.<init>(MiniClusterClient.java:61) > >>> > at > >>> > com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82) > >>> > at > >>> > com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69) > >>> > at > >>> com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99) > >>> > > >>> > 报错段代码如下: > >>> > > >>> > Configuration config = > >>> GlobalConfiguration.loadConfiguration(flinkConfDir); > >>> > MiniClusterConfiguration.Builder configBuilder = new > >>> MiniClusterConfiguration.Builder(); > >>> > configBuilder.setConfiguration(config); > >>> > MiniCluster miniCluster = new MiniCluster(configBuilder.build()); > >>> > MiniClusterClient clusterClient = new MiniClusterClient(config, > >>> miniCluster); > >>> > > >>> > 其中flinkConfDir为/opt/flink/conf > >>> > > >>> > > >>> > flink standalone HA集群信息如下: > >>> > ------------------------------ > >>> > [hidden email] > >>> > > >>> > > >>> > > >>> > >> > |
我是不是可以理解为是local的cluster,本身会去启动一个flink cluster(当前服务器模拟分布式环境),无需单独部署一个flink集群
________________________________ [hidden email] 发件人: tison<mailto:[hidden email]> 发送时间: 2020-01-16 14:29 收件人: user-zh<mailto:[hidden email]> 主题: Re: Re: MiniCluster问题 你这完全是把几个概念混在一起了,MiniCluster 就是一个集群,是一个内部的部署模式,跟 standalone 是平行的概念。我看不懂你要干什么,但是就你的问题来说,我上面说了,就是你 MiniCluster new 出来之后没 start。但我不确定这个是不是你要的效果,MiniCluster 和 standalone 是平行的两种东西。 Best, tison. 郑 洁锋 <[hidden email]> 于2020年1月16日周四 下午2:27写道: > 因为我这边看他是standalone,然后看到flink官方文档里面部署模式也有standalone模式,所以按照那个模式部署后来尝试 > > ________________________________ > [hidden email] > > 发件人: 郑 洁锋<mailto:[hidden email]> > 发送时间: 2020-01-16 14:24 > 收件人: user-zh<mailto:[hidden email]> > 主题: Re: Re: MiniCluster问题 > 这是完整的到启动的代码 > > public class ClusterClientFactory { > > public static ClusterClient createClusterClient(Options > launcherOptions) throws Exception { > String mode = launcherOptions.getMode(); > if(mode.equals(ClusterMode.standalone.name())) { > return createStandaloneClient(launcherOptions); > } else if(mode.equals(ClusterMode.yarn.name())) { > return createYarnClient(launcherOptions,mode); > } > throw new IllegalArgumentException("Unsupported cluster client > type: "); > } > > public static ClusterClient createStandaloneClient(Options > launcherOptions) throws Exception { > String flinkConfDir = launcherOptions.getFlinkconf(); > Configuration config = > GlobalConfiguration.loadConfiguration(flinkConfDir); > MiniClusterConfiguration.Builder configBuilder = new > MiniClusterConfiguration.Builder(); > configBuilder.setConfiguration(config); > MiniCluster miniCluster = new MiniCluster(configBuilder.build()); > MiniClusterClient clusterClient = new MiniClusterClient(config, > miniCluster); > LeaderConnectionInfo connectionInfo = > clusterClient.getClusterConnectionInfo(); > InetSocketAddress address = > AkkaUtils.getInetSocketAddressFromAkkaURL(connectionInfo.getAddress()); > config.setString(JobManagerOptions.ADDRESS, > address.getAddress().getHostName()); > config.setInteger(JobManagerOptions.PORT, address.getPort()); > clusterClient.setDetached(true); > return clusterClient; > } > > > 启动类中: > > ClusterClient clusterClient = > ClusterClientFactory.createClusterClient(launcherOptions); > clusterClient.run(program, 1); > clusterClient.shutdown(); > > ________________________________ > [hidden email] > > 发件人: tison<mailto:[hidden email]> > 发送时间: 2020-01-16 13:31 > 收件人: user-zh<mailto:[hidden email]> > 主题: Re: Re: MiniCluster问题 > MiniCluster miniCluster = new MiniCluster(configBuilder.build()); > > miniCluster.start(); > > > MiniClusterClient clusterClient = new MiniClusterClient(config, > miniCluster) > ; > > Best, > tison. > > > tison <[hidden email]> 于2020年1月16日周四 下午1:30写道: > > > 跟集群无关 > > Best, > > tison. > > > > > > tison <[hidden email]> 于2020年1月16日周四 下午1:30写道: > > > >> 1. 这是个内部实现类,文档是一个问题,但最好不要直接依赖 > >> > >> 2. ?你截图的代码片段在哪?我看是你自己代码创建使用的啊 > >> > >> Best, > >> tison. > >> > >> > >> 郑 洁锋 <[hidden email]> 于2020年1月16日周四 下午1:18写道: > >> > >>> MiniCluster有没有相关文档,官方文档里面看到部署模式里面没有相关的内容? > >>> 我是通过bin/start-cluster.sh启动的flink standalone集群 > >>> > >>> > >>> ________________________________ > >>> [hidden email] > >>> > >>> 发件人: tison<mailto:[hidden email]> > >>> 发送时间: 2020-01-16 12:39 > >>> 收件人: user-zh<mailto:[hidden email]> > >>> 主题: Re: MiniCluster问题 > >>> 你 MiniCluster 要 start 啊(x > >>> > >>> Best, > >>> tison. > >>> > >>> > >>> 郑 洁锋 <[hidden email]> 于2020年1月16日周四 上午11:38写道: > >>> > >>> > MiniCluster代码执行过程中报错: > >>> > > >>> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". > >>> > SLF4J: Defaulting to no-operation (NOP) logger implementation > >>> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for > >>> further details. > >>> > Exception in thread "main" java.lang.IllegalStateException: > >>> MiniCluster is not yet running. > >>> > at > >>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) > >>> > at > >>> > org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223) > >>> > at > >>> > org.apache.flink.client.program.MiniClusterClient.<init>(MiniClusterClient.java:61) > >>> > at > >>> > com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82) > >>> > at > >>> > com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69) > >>> > at > >>> com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99) > >>> > > >>> > 报错段代码如下: > >>> > > >>> > Configuration config = > >>> GlobalConfiguration.loadConfiguration(flinkConfDir); > >>> > MiniClusterConfiguration.Builder configBuilder = new > >>> MiniClusterConfiguration.Builder(); > >>> > configBuilder.setConfiguration(config); > >>> > MiniCluster miniCluster = new MiniCluster(configBuilder.build()); > >>> > MiniClusterClient clusterClient = new MiniClusterClient(config, > >>> miniCluster); > >>> > > >>> > 其中flinkConfDir为/opt/flink/conf > >>> > > >>> > > >>> > flink standalone HA集群信息如下: > >>> > ------------------------------ > >>> > [hidden email] > >>> > > >>> > > >>> > > >>> > >> > |
是的,MiniCluster 会在同一个进程里起 JM TM,是一个主要用于测试的集群
standalone 的意思是没有接 YARN 这种资源管理框架,TM 由用户自己手动起,是一个可用于生产的集群 Best, tison. 郑 洁锋 <[hidden email]> 于2020年1月16日周四 下午2:39写道: > 我是不是可以理解为是local的cluster,本身会去启动一个flink cluster(当前服务器模拟分布式环境),无需单独部署一个flink集群 > > ________________________________ > [hidden email] > > 发件人: tison<mailto:[hidden email]> > 发送时间: 2020-01-16 14:29 > 收件人: user-zh<mailto:[hidden email]> > 主题: Re: Re: MiniCluster问题 > 你这完全是把几个概念混在一起了,MiniCluster 就是一个集群,是一个内部的部署模式,跟 standalone > 是平行的概念。我看不懂你要干什么,但是就你的问题来说,我上面说了,就是你 MiniCluster new 出来之后没 > start。但我不确定这个是不是你要的效果,MiniCluster 和 standalone 是平行的两种东西。 > > Best, > tison. > > > 郑 洁锋 <[hidden email]> 于2020年1月16日周四 下午2:27写道: > > > 因为我这边看他是standalone,然后看到flink官方文档里面部署模式也有standalone模式,所以按照那个模式部署后来尝试 > > > > ________________________________ > > [hidden email] > > > > 发件人: 郑 洁锋<mailto:[hidden email]> > > 发送时间: 2020-01-16 14:24 > > 收件人: user-zh<mailto:[hidden email]> > > 主题: Re: Re: MiniCluster问题 > > 这是完整的到启动的代码 > > > > public class ClusterClientFactory { > > > > public static ClusterClient createClusterClient(Options > > launcherOptions) throws Exception { > > String mode = launcherOptions.getMode(); > > if(mode.equals(ClusterMode.standalone.name())) { > > return createStandaloneClient(launcherOptions); > > } else if(mode.equals(ClusterMode.yarn.name())) { > > return createYarnClient(launcherOptions,mode); > > } > > throw new IllegalArgumentException("Unsupported cluster client > > type: "); > > } > > > > public static ClusterClient createStandaloneClient(Options > > launcherOptions) throws Exception { > > String flinkConfDir = launcherOptions.getFlinkconf(); > > Configuration config = > > GlobalConfiguration.loadConfiguration(flinkConfDir); > > MiniClusterConfiguration.Builder configBuilder = new > > MiniClusterConfiguration.Builder(); > > configBuilder.setConfiguration(config); > > MiniCluster miniCluster = new MiniCluster(configBuilder.build()); > > MiniClusterClient clusterClient = new MiniClusterClient(config, > > miniCluster); > > LeaderConnectionInfo connectionInfo = > > clusterClient.getClusterConnectionInfo(); > > InetSocketAddress address = > > AkkaUtils.getInetSocketAddressFromAkkaURL(connectionInfo.getAddress()); > > config.setString(JobManagerOptions.ADDRESS, > > address.getAddress().getHostName()); > > config.setInteger(JobManagerOptions.PORT, address.getPort()); > > clusterClient.setDetached(true); > > return clusterClient; > > } > > > > > > 启动类中: > > > > ClusterClient clusterClient = > > ClusterClientFactory.createClusterClient(launcherOptions); > > clusterClient.run(program, 1); > > clusterClient.shutdown(); > > > > ________________________________ > > [hidden email] > > > > 发件人: tison<mailto:[hidden email]> > > 发送时间: 2020-01-16 13:31 > > 收件人: user-zh<mailto:[hidden email]> > > 主题: Re: Re: MiniCluster问题 > > MiniCluster miniCluster = new MiniCluster(configBuilder.build()); > > > > miniCluster.start(); > > > > > > MiniClusterClient clusterClient = new MiniClusterClient(config, > > miniCluster) > > ; > > > > Best, > > tison. > > > > > > tison <[hidden email]> 于2020年1月16日周四 下午1:30写道: > > > > > 跟集群无关 > > > Best, > > > tison. > > > > > > > > > tison <[hidden email]> 于2020年1月16日周四 下午1:30写道: > > > > > >> 1. 这是个内部实现类,文档是一个问题,但最好不要直接依赖 > > >> > > >> 2. ?你截图的代码片段在哪?我看是你自己代码创建使用的啊 > > >> > > >> Best, > > >> tison. > > >> > > >> > > >> 郑 洁锋 <[hidden email]> 于2020年1月16日周四 下午1:18写道: > > >> > > >>> MiniCluster有没有相关文档,官方文档里面看到部署模式里面没有相关的内容? > > >>> 我是通过bin/start-cluster.sh启动的flink standalone集群 > > >>> > > >>> > > >>> ________________________________ > > >>> [hidden email] > > >>> > > >>> 发件人: tison<mailto:[hidden email]> > > >>> 发送时间: 2020-01-16 12:39 > > >>> 收件人: user-zh<mailto:[hidden email]> > > >>> 主题: Re: MiniCluster问题 > > >>> 你 MiniCluster 要 start 啊(x > > >>> > > >>> Best, > > >>> tison. > > >>> > > >>> > > >>> 郑 洁锋 <[hidden email]> 于2020年1月16日周四 上午11:38写道: > > >>> > > >>> > MiniCluster代码执行过程中报错: > > >>> > > > >>> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". > > >>> > SLF4J: Defaulting to no-operation (NOP) logger implementation > > >>> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for > > >>> further details. > > >>> > Exception in thread "main" java.lang.IllegalStateException: > > >>> MiniCluster is not yet running. > > >>> > at > > >>> > org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) > > >>> > at > > >>> > > > org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223) > > >>> > at > > >>> > > > org.apache.flink.client.program.MiniClusterClient.<init>(MiniClusterClient.java:61) > > >>> > at > > >>> > > > com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82) > > >>> > at > > >>> > > > com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69) > > >>> > at > > >>> > com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99) > > >>> > > > >>> > 报错段代码如下: > > >>> > > > >>> > Configuration config = > > >>> GlobalConfiguration.loadConfiguration(flinkConfDir); > > >>> > MiniClusterConfiguration.Builder configBuilder = new > > >>> MiniClusterConfiguration.Builder(); > > >>> > configBuilder.setConfiguration(config); > > >>> > MiniCluster miniCluster = new MiniCluster(configBuilder.build()); > > >>> > MiniClusterClient clusterClient = new MiniClusterClient(config, > > >>> miniCluster); > > >>> > > > >>> > 其中flinkConfDir为/opt/flink/conf > > >>> > > > >>> > > > >>> > flink standalone HA集群信息如下: > > >>> > ------------------------------ > > >>> > [hidden email] > > >>> > > > >>> > > > >>> > > > >>> > > >> > > > |
Free forum by Nabble | Edit this page |