flink on yarn jdk版本问题

classic Classic list List threaded Threaded
19 messages Options
Reply | Threaded
Open this post in threaded view
|

flink on yarn jdk版本问题

zjfplayer@hotmail.com
Hi,
        我们使用了TDH中的yarn(hadoop2.7,jdk1.7),原先使用spark的时候,也遇到过jdk版本最低1.8的情况,我们是通过如下方式解决的(考虑到现场公用TDH集群不能更改):
        spark-submit中可以通过添加参数 --conf "spark.yarn.appMasterEnv.JAVA_HOME=/usr/java/jdk1.8.0_25/" --conf "spark.executorEnv.JAVA_HOME=/usr/java/jdk1.8.0_25/"来指定当前spark任务特定在yarn中执行时的jdk目录

        请问下flink启动&提交任务时(bin/yarn-session.sh&flink run)有没有参数可以指定flink特定使用yarn时的jdk目录,flink中是否有这样的功能?(不影响其他yarn上应用正常运行)



________________________________
[hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: flink on yarn jdk版本问题

Benchao Li
Hi ,

Flink也支持传递环境变量的,也可以尝试一下:
https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#configuration-runtime-environment-variables

郑 洁锋 <[hidden email]> 于2020年1月15日周三 上午11:34写道:

> Hi,
>
> 我们使用了TDH中的yarn(hadoop2.7,jdk1.7),原先使用spark的时候,也遇到过jdk版本最低1.8的情况,我们是通过如下方式解决的(考虑到现场公用TDH集群不能更改):
>         spark-submit中可以通过添加参数 --conf
> "spark.yarn.appMasterEnv.JAVA_HOME=/usr/java/jdk1.8.0_25/" --conf
> "spark.executorEnv.JAVA_HOME=/usr/java/jdk1.8.0_25/"来指定当前spark任务特定在yarn中执行时的jdk目录
>
>         请问下flink启动&提交任务时(bin/yarn-session.sh&flink
> run)有没有参数可以指定flink特定使用yarn时的jdk目录,flink中是否有这样的功能?(不影响其他yarn上应用正常运行)
>
>
>
> ________________________________
> [hidden email]
>


--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Re: flink on yarn jdk版本问题

zjfplayer@hotmail.com
Hi,
    非常感谢,可以了,我在flink-conf.yaml中添加了如下配置项即可正常运行 yarn-session.sh了,且Flink Dashboard也能正常查看了
        containerized.master.env.JAVA_HOME: /usr/java/jdk1.8.0_25/
         containerized.taskmanager.env.JAVA_HOME: /usr/java/jdk1.8.0_25/

但是在运行官方例子https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/getting-started/tutorials/local_setup.html#run-the-example,测试时报错了:

bin/flink run examples/streaming/SocketWindowWordCount.jar  --port 9000
2020-01-15 00:46:25,574 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - Found Yarn properties file under /tmp/.yarn-properties-root.
2020-01-15 00:46:25,574 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - Found Yarn properties file under /tmp/.yarn-properties-root.
2020-01-15 00:46:26,097 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - YARN properties set default parallelism to 4
2020-01-15 00:46:26,097 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - YARN properties set default parallelism to 4
YARN properties set default parallelism to 4
2020-01-15 00:46:26,293 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2020-01-15 00:46:26,293 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2020-01-15 00:46:26,375 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Found application JobManager host name 'tdh-2' and port '8181' from supplied application id 'application_1578968334899_0010'
Starting execution of program
------------------------------------------------------------
 The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution result. (JobID: aaf4720f0c432de8d91b848838589c62)
	at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:255)
	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
	at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
	at org.apache.flink.streaming.examples.socket.SocketWindowWordCount.main(SocketWindowWordCount.java:92)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
	at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
	at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
	at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
	at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
	at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
	at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
	at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
	at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:382)
	at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
	at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
	at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:263)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
	at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
	at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#54443530]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.
	at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
	at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
	at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648)
	at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
	at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
	at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
	at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
	at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
	at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279)
	at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283)
	at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235)
	at java.lang.Thread.run(Thread.java:745)
End of exception on server side>]
	at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)
	at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)
	at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
	at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
	... 4 more

    有尝试过添加配置:
    akka.ask.timeout: 20s
    web.timeout: 20000
    还是一样的报错(超时时间生效了,日志中变更了)。

    而且dashboard也出现了问题:
    这个有人遇到过吗,我搜索还有说是flink版本的问题,但是我已经是最新的1.9.1版本了,这个是还未修复的bug还是其他的问题?


 
发件人: [hidden email]
发送时间: 2020-01-15 12:56
主题: Re: flink on yarn jdk版本问题

郑 洁锋 <[hidden email]> 于2020年1月15日周三 上午11:34写道:
Hi,
        我们使用了TDH中的yarn(hadoop2.7,jdk1.7),原先使用spark的时候,也遇到过jdk版本最低1.8的情况,我们是通过如下方式解决的(考虑到现场公用TDH集群不能更改):
        spark-submit中可以通过添加参数 --conf "spark.yarn.appMasterEnv.JAVA_HOME=/usr/java/jdk1.8.0_25/" --conf "spark.executorEnv.JAVA_HOME=/usr/java/jdk1.8.0_25/"来指定当前spark任务特定在yarn中执行时的jdk目录

        请问下flink启动&提交任务时(bin/yarn-session.sh&flink run)有没有参数可以指定flink特定使用yarn时的jdk目录,flink中是否有这样的功能?(不影响其他yarn上应用正常运行)



________________________________
[hidden email]


--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Re: flink on yarn jdk版本问题

tison
玄学问题,升级 JDK 小版本可接,或与类型擦除有关

你可以share一下 JM 侧的日志,应该有作业执行异常

Best,
tison.


郑 洁锋 <[hidden email]> 于2020年1月15日周三 下午2:17写道:

> Hi,
>
>     非常感谢,可以了,我在flink-conf.yaml中添加了如下配置项即可正常运行 yarn-session.sh了,且Flink Dashboard也能正常查看了
>
>         containerized.master.env.JAVA_HOME: /usr/java/jdk1.8.0_25/
>
>          containerized.taskmanager.env.JAVA_HOME: /usr/java/jdk1.8.0_25/
>
>
> 但是在运行官方例子https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/getting-started/tutorials/local_setup.html#run-the-example,测试时报错了:
>
>
> bin/flink run examples/streaming/SocketWindowWordCount.jar  --port 9000
> 2020-01-15 00:46:25,574 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - Found Yarn properties file under /tmp/.yarn-properties-root.
> 2020-01-15 00:46:25,574 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - Found Yarn properties file under /tmp/.yarn-properties-root.
> 2020-01-15 00:46:26,097 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - YARN properties set default parallelism to 4
> 2020-01-15 00:46:26,097 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - YARN properties set default parallelism to 4
> YARN properties set default parallelism to 4
> 2020-01-15 00:46:26,293 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2020-01-15 00:46:26,293 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2020-01-15 00:46:26,375 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Found application JobManager host name 'tdh-2' and port '8181' from supplied application id 'application_1578968334899_0010'
> Starting execution of program
> ------------------------------------------------------------
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution result. (JobID: aaf4720f0c432de8d91b848838589c62)
> at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:255)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
> at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60)
> at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
> at org.apache.flink.streaming.examples.socket.SocketWindowWordCount.main(SocketWindowWordCount.java:92)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
> at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
> at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
> at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
> at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
> at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
> at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
> at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:382)
> at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:263)
> at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
> at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
> at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:
> akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#54443530]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.
> at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
> at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
> at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648)
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
> at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
> at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
> at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
> at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279)
> at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283)
> at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235)
> at java.lang.Thread.run(Thread.java:745)
> End of exception on server side>]
> at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)
> at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)
> at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
> at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
> ... 4 more
>
>
>     有尝试过添加配置:
>     akka.ask.timeout: 20s
>     web.timeout: 20000
>     还是一样的报错(超时时间生效了,日志中变更了)。
>
>     而且dashboard也出现了问题:
>     这个有人遇到过吗,我搜索还有说是flink版本的问题,但是我已经是最新的1.9.1版本了,这个是还未修复的bug还是其他的问题?
>
> ------------------------------
> [hidden email]
>
>
> *发件人:* Benchao Li <[hidden email]>
> *发送时间:* 2020-01-15 12:56
> *收件人:* user-zh <[hidden email]>; zjfplayer
> <[hidden email]>
> *主题:* Re: flink on yarn jdk版本问题
> Hi ,
>
> Flink也支持传递环境变量的,也可以尝试一下:
> https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#configuration-runtime-environment-variables
>
> 郑 洁锋 <[hidden email]> 于2020年1月15日周三 上午11:34写道:
>
>> Hi,
>>
>> 我们使用了TDH中的yarn(hadoop2.7,jdk1.7),原先使用spark的时候,也遇到过jdk版本最低1.8的情况,我们是通过如下方式解决的(考虑到现场公用TDH集群不能更改):
>>         spark-submit中可以通过添加参数 --conf
>> "spark.yarn.appMasterEnv.JAVA_HOME=/usr/java/jdk1.8.0_25/" --conf
>> "spark.executorEnv.JAVA_HOME=/usr/java/jdk1.8.0_25/"来指定当前spark任务特定在yarn中执行时的jdk目录
>>
>>         请问下flink启动&提交任务时(bin/yarn-session.sh&flink
>> run)有没有参数可以指定flink特定使用yarn时的jdk目录,flink中是否有这样的功能?(不影响其他yarn上应用正常运行)
>>
>>
>>
>> ________________________________
>> [hidden email]
>>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: [hidden email]; [hidden email]
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: flink on yarn jdk版本问题

zjfplayer@hotmail.com
果然是,yarn上的jdk版本升级后就可以了。。

________________________________
[hidden email]

发件人: tison<mailto:[hidden email]>
发送时间: 2020-01-15 14:22
收件人: user-zh<mailto:[hidden email]>
抄送: Benchao Li<mailto:[hidden email]>
主题: Re: Re: flink on yarn jdk版本问题
玄学问题,升级 JDK 小版本可接,或与类型擦除有关

你可以share一下 JM 侧的日志,应该有作业执行异常

Best,
tison.


郑 洁锋 <[hidden email]> 于2020年1月15日周三 下午2:17写道:

> Hi,
>
>     非常感谢,可以了,我在flink-conf.yaml中添加了如下配置项即可正常运行 yarn-session.sh了,且Flink Dashboard也能正常查看了
>
>         containerized.master.env.JAVA_HOME: /usr/java/jdk1.8.0_25/
>
>          containerized.taskmanager.env.JAVA_HOME: /usr/java/jdk1.8.0_25/
>
>
> 但是在运行官方例子https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/getting-started/tutorials/local_setup.html#run-the-example,测试时报错了:
>
>
> bin/flink run examples/streaming/SocketWindowWordCount.jar  --port 9000
> 2020-01-15 00:46:25,574 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - Found Yarn properties file under /tmp/.yarn-properties-root.
> 2020-01-15 00:46:25,574 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - Found Yarn properties file under /tmp/.yarn-properties-root.
> 2020-01-15 00:46:26,097 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - YARN properties set default parallelism to 4
> 2020-01-15 00:46:26,097 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - YARN properties set default parallelism to 4
> YARN properties set default parallelism to 4
> 2020-01-15 00:46:26,293 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2020-01-15 00:46:26,293 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2020-01-15 00:46:26,375 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Found application JobManager host name 'tdh-2' and port '8181' from supplied application id 'application_1578968334899_0010'
> Starting execution of program
> ------------------------------------------------------------
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution result. (JobID: aaf4720f0c432de8d91b848838589c62)
> at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:255)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
> at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60)
> at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
> at org.apache.flink.streaming.examples.socket.SocketWindowWordCount.main(SocketWindowWordCount.java:92)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
> at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
> at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
> at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
> at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
> at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
> at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
> at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:382)
> at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:263)
> at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
> at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
> at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:
> akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#54443530]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.
> at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
> at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
> at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648)
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
> at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
> at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
> at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
> at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279)
> at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283)
> at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235)
> at java.lang.Thread.run(Thread.java:745)
> End of exception on server side>]
> at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)
> at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)
> at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
> at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
> ... 4 more
>
>
>     有尝试过添加配置:
>     akka.ask.timeout: 20s
>     web.timeout: 20000
>     还是一样的报错(超时时间生效了,日志中变更了)。
>
>     而且dashboard也出现了问题:
>     这个有人遇到过吗,我搜索还有说是flink版本的问题,但是我已经是最新的1.9.1版本了,这个是还未修复的bug还是其他的问题?
>
> ------------------------------
> [hidden email]
>
>
> *发件人:* Benchao Li <[hidden email]>
> *发送时间:* 2020-01-15 12:56
> *收件人:* user-zh <[hidden email]>; zjfplayer
> <[hidden email]>
> *主题:* Re: flink on yarn jdk版本问题
> Hi ,
>
> Flink也支持传递环境变量的,也可以尝试一下:
> https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#configuration-runtime-environment-variables
>
> 郑 洁锋 <[hidden email]> 于2020年1月15日周三 上午11:34写道:
>
>> Hi,
>>
>> 我们使用了TDH中的yarn(hadoop2.7,jdk1.7),原先使用spark的时候,也遇到过jdk版本最低1.8的情况,我们是通过如下方式解决的(考虑到现场公用TDH集群不能更改):
>>         spark-submit中可以通过添加参数 --conf
>> "spark.yarn.appMasterEnv.JAVA_HOME=/usr/java/jdk1.8.0_25/" --conf
>> "spark.executorEnv.JAVA_HOME=/usr/java/jdk1.8.0_25/"来指定当前spark任务特定在yarn中执行时的jdk目录
>>
>>         请问下flink启动&提交任务时(bin/yarn-session.sh&flink
>> run)有没有参数可以指定flink特定使用yarn时的jdk目录,flink中是否有这样的功能?(不影响其他yarn上应用正常运行)
>>
>>
>>
>> ________________________________
>> [hidden email]
>>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: [hidden email]; [hidden email]
>
>
Reply | Threaded
Open this post in threaded view
|

MiniCluster问题

zjfplayer@hotmail.com
MiniCluster代码执行过程中报错:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Exception in thread "main" java.lang.IllegalStateException: MiniCluster is not yet running.
        at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
        at org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223)
        at org.apache.flink.client.program.MiniClusterClient.<init>(MiniClusterClient.java:61)
        at com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82)
        at com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69)
        at com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99)
报错段代码如下:
Configuration config = GlobalConfiguration.loadConfiguration(flinkConfDir);
MiniClusterConfiguration.Builder configBuilder = new MiniClusterConfiguration.Builder();
configBuilder.setConfiguration(config);
MiniCluster miniCluster = new MiniCluster(configBuilder.build());
MiniClusterClient clusterClient = new MiniClusterClient(config, miniCluster);
其中flinkConfDir为/opt/flink/conf


flink standalone HA集群信息如下:


Reply | Threaded
Open this post in threaded view
|

Re: MiniCluster问题

tison
你 MiniCluster 要 start 啊(x

Best,
tison.


郑 洁锋 <[hidden email]> 于2020年1月16日周四 上午11:38写道:

> MiniCluster代码执行过程中报错:
>
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
> Exception in thread "main" java.lang.IllegalStateException: MiniCluster is not yet running.
>         at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>         at org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223)
>         at org.apache.flink.client.program.MiniClusterClient.<init>(MiniClusterClient.java:61)
>         at com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82)
>         at com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69)
>         at com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99)
>
> 报错段代码如下:
>
> Configuration config = GlobalConfiguration.loadConfiguration(flinkConfDir);
> MiniClusterConfiguration.Builder configBuilder = new MiniClusterConfiguration.Builder();
> configBuilder.setConfiguration(config);
> MiniCluster miniCluster = new MiniCluster(configBuilder.build());
> MiniClusterClient clusterClient = new MiniClusterClient(config, miniCluster);
>
> 其中flinkConfDir为/opt/flink/conf
>
>
> flink standalone HA集群信息如下:
> ------------------------------
> [hidden email]
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: MiniCluster问题

zjfplayer@hotmail.com
MiniCluster有没有相关文档,官方文档里面看到部署模式里面没有相关的内容?
我是通过bin/start-cluster.sh启动的flink standalone集群


________________________________
[hidden email]

发件人: tison<mailto:[hidden email]>
发送时间: 2020-01-16 12:39
收件人: user-zh<mailto:[hidden email]>
主题: Re: MiniCluster问题
你 MiniCluster 要 start 啊(x

Best,
tison.


郑 洁锋 <[hidden email]> 于2020年1月16日周四 上午11:38写道:

> MiniCluster代码执行过程中报错:
>
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
> Exception in thread "main" java.lang.IllegalStateException: MiniCluster is not yet running.
>         at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>         at org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223)
>         at org.apache.flink.client.program.MiniClusterClient.<init>(MiniClusterClient.java:61)
>         at com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82)
>         at com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69)
>         at com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99)
>
> 报错段代码如下:
>
> Configuration config = GlobalConfiguration.loadConfiguration(flinkConfDir);
> MiniClusterConfiguration.Builder configBuilder = new MiniClusterConfiguration.Builder();
> configBuilder.setConfiguration(config);
> MiniCluster miniCluster = new MiniCluster(configBuilder.build());
> MiniClusterClient clusterClient = new MiniClusterClient(config, miniCluster);
>
> 其中flinkConfDir为/opt/flink/conf
>
>
> flink standalone HA集群信息如下:
> ------------------------------
> [hidden email]
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: MiniCluster问题

tison
1. 这是个内部实现类,文档是一个问题,但最好不要直接依赖

2. ?你截图的代码片段在哪?我看是你自己代码创建使用的啊

Best,
tison.


郑 洁锋 <[hidden email]> 于2020年1月16日周四 下午1:18写道:

> MiniCluster有没有相关文档,官方文档里面看到部署模式里面没有相关的内容?
> 我是通过bin/start-cluster.sh启动的flink standalone集群
>
>
> ________________________________
> [hidden email]
>
> 发件人: tison<mailto:[hidden email]>
> 发送时间: 2020-01-16 12:39
> 收件人: user-zh<mailto:[hidden email]>
> 主题: Re: MiniCluster问题
> 你 MiniCluster 要 start 啊(x
>
> Best,
> tison.
>
>
> 郑 洁锋 <[hidden email]> 于2020年1月16日周四 上午11:38写道:
>
> > MiniCluster代码执行过程中报错:
> >
> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> > SLF4J: Defaulting to no-operation (NOP) logger implementation
> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> further details.
> > Exception in thread "main" java.lang.IllegalStateException: MiniCluster
> is not yet running.
> >         at
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
> >         at
> org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223)
> >         at
> org.apache.flink.client.program.MiniClusterClient.<init>(MiniClusterClient.java:61)
> >         at
> com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82)
> >         at
> com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69)
> >         at
> com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99)
> >
> > 报错段代码如下:
> >
> > Configuration config =
> GlobalConfiguration.loadConfiguration(flinkConfDir);
> > MiniClusterConfiguration.Builder configBuilder = new
> MiniClusterConfiguration.Builder();
> > configBuilder.setConfiguration(config);
> > MiniCluster miniCluster = new MiniCluster(configBuilder.build());
> > MiniClusterClient clusterClient = new MiniClusterClient(config,
> miniCluster);
> >
> > 其中flinkConfDir为/opt/flink/conf
> >
> >
> > flink standalone HA集群信息如下:
> > ------------------------------
> > [hidden email]
> >
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: MiniCluster问题

tison
跟集群无关
Best,
tison.


tison <[hidden email]> 于2020年1月16日周四 下午1:30写道:

> 1. 这是个内部实现类,文档是一个问题,但最好不要直接依赖
>
> 2. ?你截图的代码片段在哪?我看是你自己代码创建使用的啊
>
> Best,
> tison.
>
>
> 郑 洁锋 <[hidden email]> 于2020年1月16日周四 下午1:18写道:
>
>> MiniCluster有没有相关文档,官方文档里面看到部署模式里面没有相关的内容?
>> 我是通过bin/start-cluster.sh启动的flink standalone集群
>>
>>
>> ________________________________
>> [hidden email]
>>
>> 发件人: tison<mailto:[hidden email]>
>> 发送时间: 2020-01-16 12:39
>> 收件人: user-zh<mailto:[hidden email]>
>> 主题: Re: MiniCluster问题
>> 你 MiniCluster 要 start 啊(x
>>
>> Best,
>> tison.
>>
>>
>> 郑 洁锋 <[hidden email]> 于2020年1月16日周四 上午11:38写道:
>>
>> > MiniCluster代码执行过程中报错:
>> >
>> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
>> > SLF4J: Defaulting to no-operation (NOP) logger implementation
>> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
>> further details.
>> > Exception in thread "main" java.lang.IllegalStateException: MiniCluster
>> is not yet running.
>> >         at
>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>> >         at
>> org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223)
>> >         at
>> org.apache.flink.client.program.MiniClusterClient.<init>(MiniClusterClient.java:61)
>> >         at
>> com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82)
>> >         at
>> com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69)
>> >         at
>> com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99)
>> >
>> > 报错段代码如下:
>> >
>> > Configuration config =
>> GlobalConfiguration.loadConfiguration(flinkConfDir);
>> > MiniClusterConfiguration.Builder configBuilder = new
>> MiniClusterConfiguration.Builder();
>> > configBuilder.setConfiguration(config);
>> > MiniCluster miniCluster = new MiniCluster(configBuilder.build());
>> > MiniClusterClient clusterClient = new MiniClusterClient(config,
>> miniCluster);
>> >
>> > 其中flinkConfDir为/opt/flink/conf
>> >
>> >
>> > flink standalone HA集群信息如下:
>> > ------------------------------
>> > [hidden email]
>> >
>> >
>> >
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: MiniCluster问题

tison
MiniCluster miniCluster = new MiniCluster(configBuilder.build());

miniCluster.start();


MiniClusterClient clusterClient = new MiniClusterClient(config, miniCluster)
;

Best,
tison.


tison <[hidden email]> 于2020年1月16日周四 下午1:30写道:

> 跟集群无关
> Best,
> tison.
>
>
> tison <[hidden email]> 于2020年1月16日周四 下午1:30写道:
>
>> 1. 这是个内部实现类,文档是一个问题,但最好不要直接依赖
>>
>> 2. ?你截图的代码片段在哪?我看是你自己代码创建使用的啊
>>
>> Best,
>> tison.
>>
>>
>> 郑 洁锋 <[hidden email]> 于2020年1月16日周四 下午1:18写道:
>>
>>> MiniCluster有没有相关文档,官方文档里面看到部署模式里面没有相关的内容?
>>> 我是通过bin/start-cluster.sh启动的flink standalone集群
>>>
>>>
>>> ________________________________
>>> [hidden email]
>>>
>>> 发件人: tison<mailto:[hidden email]>
>>> 发送时间: 2020-01-16 12:39
>>> 收件人: user-zh<mailto:[hidden email]>
>>> 主题: Re: MiniCluster问题
>>> 你 MiniCluster 要 start 啊(x
>>>
>>> Best,
>>> tison.
>>>
>>>
>>> 郑 洁锋 <[hidden email]> 于2020年1月16日周四 上午11:38写道:
>>>
>>> > MiniCluster代码执行过程中报错:
>>> >
>>> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
>>> > SLF4J: Defaulting to no-operation (NOP) logger implementation
>>> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
>>> further details.
>>> > Exception in thread "main" java.lang.IllegalStateException:
>>> MiniCluster is not yet running.
>>> >         at
>>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>>> >         at
>>> org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223)
>>> >         at
>>> org.apache.flink.client.program.MiniClusterClient.<init>(MiniClusterClient.java:61)
>>> >         at
>>> com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82)
>>> >         at
>>> com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69)
>>> >         at
>>> com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99)
>>> >
>>> > 报错段代码如下:
>>> >
>>> > Configuration config =
>>> GlobalConfiguration.loadConfiguration(flinkConfDir);
>>> > MiniClusterConfiguration.Builder configBuilder = new
>>> MiniClusterConfiguration.Builder();
>>> > configBuilder.setConfiguration(config);
>>> > MiniCluster miniCluster = new MiniCluster(configBuilder.build());
>>> > MiniClusterClient clusterClient = new MiniClusterClient(config,
>>> miniCluster);
>>> >
>>> > 其中flinkConfDir为/opt/flink/conf
>>> >
>>> >
>>> > flink standalone HA集群信息如下:
>>> > ------------------------------
>>> > [hidden email]
>>> >
>>> >
>>> >
>>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: MiniCluster问题

Eleanore Jin
In reply to this post by tison
Hi

可以参考org.apache.flink.streaming.api.environment.LocalStreamEnvironment::
execute

public JobExecutionResult execute(String jobName) throws Exception {
  // transform the streaming program into a JobGraph
  StreamGraph streamGraph = getStreamGraph();
  streamGraph.setJobName(jobName);

  JobGraph jobGraph = streamGraph.getJobGraph();
  jobGraph.setAllowQueuedScheduling(true);

  Configuration configuration = new Configuration();
  configuration.addAll(jobGraph.getJobConfiguration());
  configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");

  // add (and override) the settings with what the user defined
  configuration.addAll(this.configuration);

  if (!configuration.contains(RestOptions.BIND_PORT)) {
   configuration.setString(RestOptions.BIND_PORT, "0");
  }

  int numSlotsPerTaskManager =
configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS,
jobGraph.getMaximumParallelism());

  MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
   .setConfiguration(configuration)
   .setNumSlotsPerTaskManager(numSlotsPerTaskManager)
   .build();

  if (LOG.isInfoEnabled()) {
   LOG.info("Running job on local embedded Flink mini cluster");
  }

  MiniCluster miniCluster = new MiniCluster(cfg);

  try {
   miniCluster.start();
   configuration.setInteger(RestOptions.PORT,
miniCluster.getRestAddress().get().getPort());

   return miniCluster.executeJobBlocking(jobGraph);
  }
  finally {
   transformations.clear();
   miniCluster.close();
  }
 }
}

Best,
Eleanore

On Wed, Jan 15, 2020 at 8:40 PM tison <[hidden email]> wrote:

> 你 MiniCluster 要 start 啊(x
>
> Best,
> tison.
>
>
> 郑 洁锋 <[hidden email]> 于2020年1月16日周四 上午11:38写道:
>
> > MiniCluster代码执行过程中报错:
> >
> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> > SLF4J: Defaulting to no-operation (NOP) logger implementation
> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> further details.
> > Exception in thread "main" java.lang.IllegalStateException: MiniCluster
> is not yet running.
> >         at
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
> >         at
> org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223)
> >         at
> org.apache.flink.client.program.MiniClusterClient.<init>(MiniClusterClient.java:61)
> >         at
> com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82)
> >         at
> com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69)
> >         at
> com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99)
> >
> > 报错段代码如下:
> >
> > Configuration config =
> GlobalConfiguration.loadConfiguration(flinkConfDir);
> > MiniClusterConfiguration.Builder configBuilder = new
> MiniClusterConfiguration.Builder();
> > configBuilder.setConfiguration(config);
> > MiniCluster miniCluster = new MiniCluster(configBuilder.build());
> > MiniClusterClient clusterClient = new MiniClusterClient(config,
> miniCluster);
> >
> > 其中flinkConfDir为/opt/flink/conf
> >
> >
> > flink standalone HA集群信息如下:
> > ------------------------------
> > [hidden email]
> >
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: MiniCluster问题

tison
In reply to this post by tison
而且不对啊,你起了 standalone 集群还要 MiniCluster 干嘛...

Best,
tison.


tison <[hidden email]> 于2020年1月16日周四 下午1:31写道:

> MiniCluster miniCluster = new MiniCluster(configBuilder.build());
>
> miniCluster.start();
>
>
> MiniClusterClient clusterClient = new MiniClusterClient(config,
> miniCluster);
>
> Best,
> tison.
>
>
> tison <[hidden email]> 于2020年1月16日周四 下午1:30写道:
>
>> 跟集群无关
>> Best,
>> tison.
>>
>>
>> tison <[hidden email]> 于2020年1月16日周四 下午1:30写道:
>>
>>> 1. 这是个内部实现类,文档是一个问题,但最好不要直接依赖
>>>
>>> 2. ?你截图的代码片段在哪?我看是你自己代码创建使用的啊
>>>
>>> Best,
>>> tison.
>>>
>>>
>>> 郑 洁锋 <[hidden email]> 于2020年1月16日周四 下午1:18写道:
>>>
>>>> MiniCluster有没有相关文档,官方文档里面看到部署模式里面没有相关的内容?
>>>> 我是通过bin/start-cluster.sh启动的flink standalone集群
>>>>
>>>>
>>>> ________________________________
>>>> [hidden email]
>>>>
>>>> 发件人: tison<mailto:[hidden email]>
>>>> 发送时间: 2020-01-16 12:39
>>>> 收件人: user-zh<mailto:[hidden email]>
>>>> 主题: Re: MiniCluster问题
>>>> 你 MiniCluster 要 start 啊(x
>>>>
>>>> Best,
>>>> tison.
>>>>
>>>>
>>>> 郑 洁锋 <[hidden email]> 于2020年1月16日周四 上午11:38写道:
>>>>
>>>> > MiniCluster代码执行过程中报错:
>>>> >
>>>> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
>>>> > SLF4J: Defaulting to no-operation (NOP) logger implementation
>>>> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
>>>> further details.
>>>> > Exception in thread "main" java.lang.IllegalStateException:
>>>> MiniCluster is not yet running.
>>>> >         at
>>>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>>>> >         at
>>>> org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223)
>>>> >         at
>>>> org.apache.flink.client.program.MiniClusterClient.<init>(MiniClusterClient.java:61)
>>>> >         at
>>>> com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82)
>>>> >         at
>>>> com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69)
>>>> >         at
>>>> com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99)
>>>> >
>>>> > 报错段代码如下:
>>>> >
>>>> > Configuration config =
>>>> GlobalConfiguration.loadConfiguration(flinkConfDir);
>>>> > MiniClusterConfiguration.Builder configBuilder = new
>>>> MiniClusterConfiguration.Builder();
>>>> > configBuilder.setConfiguration(config);
>>>> > MiniCluster miniCluster = new MiniCluster(configBuilder.build());
>>>> > MiniClusterClient clusterClient = new MiniClusterClient(config,
>>>> miniCluster);
>>>> >
>>>> > 其中flinkConfDir为/opt/flink/conf
>>>> >
>>>> >
>>>> > flink standalone HA集群信息如下:
>>>> > ------------------------------
>>>> > [hidden email]
>>>> >
>>>> >
>>>> >
>>>>
>>>
Reply | Threaded
Open this post in threaded view
|

Re: Re: MiniCluster问题

zjfplayer@hotmail.com
In reply to this post by tison
我们这边是在尝试使用github上的袋鼠云的flinkStreamSQL项目,其中的flink standalone模式,报了这个错误
________________________________
[hidden email]

发件人: tison<mailto:[hidden email]>
发送时间: 2020-01-16 13:30
收件人: user-zh<mailto:[hidden email]>
主题: Re: Re: MiniCluster问题
1. 这是个内部实现类,文档是一个问题,但最好不要直接依赖

2. ?你截图的代码片段在哪?我看是你自己代码创建使用的啊

Best,
tison.


郑 洁锋 <[hidden email]> 于2020年1月16日周四 下午1:18写道:

> MiniCluster有没有相关文档,官方文档里面看到部署模式里面没有相关的内容?
> 我是通过bin/start-cluster.sh启动的flink standalone集群
>
>
> ________________________________
> [hidden email]
>
> 发件人: tison<mailto:[hidden email]>
> 发送时间: 2020-01-16 12:39
> 收件人: user-zh<mailto:[hidden email]>
> 主题: Re: MiniCluster问题
> 你 MiniCluster 要 start 啊(x
>
> Best,
> tison.
>
>
> 郑 洁锋 <[hidden email]> 于2020年1月16日周四 上午11:38写道:
>
> > MiniCluster代码执行过程中报错:
> >
> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> > SLF4J: Defaulting to no-operation (NOP) logger implementation
> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> further details.
> > Exception in thread "main" java.lang.IllegalStateException: MiniCluster
> is not yet running.
> >         at
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
> >         at
> org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223)
> >         at
> org.apache.flink.client.program.MiniClusterClient.<init>(MiniClusterClient.java:61)
> >         at
> com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82)
> >         at
> com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69)
> >         at
> com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99)
> >
> > 报错段代码如下:
> >
> > Configuration config =
> GlobalConfiguration.loadConfiguration(flinkConfDir);
> > MiniClusterConfiguration.Builder configBuilder = new
> MiniClusterConfiguration.Builder();
> > configBuilder.setConfiguration(config);
> > MiniCluster miniCluster = new MiniCluster(configBuilder.build());
> > MiniClusterClient clusterClient = new MiniClusterClient(config,
> miniCluster);
> >
> > 其中flinkConfDir为/opt/flink/conf
> >
> >
> > flink standalone HA集群信息如下:
> > ------------------------------
> > [hidden email]
> >
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: MiniCluster问题

zjfplayer@hotmail.com
In reply to this post by tison
这是完整的到启动的代码

public class ClusterClientFactory {

    public static ClusterClient createClusterClient(Options launcherOptions) throws Exception {
        String mode = launcherOptions.getMode();
        if(mode.equals(ClusterMode.standalone.name())) {
            return createStandaloneClient(launcherOptions);
        } else if(mode.equals(ClusterMode.yarn.name())) {
            return createYarnClient(launcherOptions,mode);
        }
        throw new IllegalArgumentException("Unsupported cluster client type: ");
    }

    public static ClusterClient createStandaloneClient(Options launcherOptions) throws Exception {
        String flinkConfDir = launcherOptions.getFlinkconf();
        Configuration config = GlobalConfiguration.loadConfiguration(flinkConfDir);
        MiniClusterConfiguration.Builder configBuilder = new MiniClusterConfiguration.Builder();
        configBuilder.setConfiguration(config);
        MiniCluster miniCluster = new MiniCluster(configBuilder.build());
        MiniClusterClient clusterClient = new MiniClusterClient(config, miniCluster);
        LeaderConnectionInfo connectionInfo = clusterClient.getClusterConnectionInfo();
        InetSocketAddress address = AkkaUtils.getInetSocketAddressFromAkkaURL(connectionInfo.getAddress());
        config.setString(JobManagerOptions.ADDRESS, address.getAddress().getHostName());
        config.setInteger(JobManagerOptions.PORT, address.getPort());
        clusterClient.setDetached(true);
        return clusterClient;
    }


启动类中:

ClusterClient clusterClient = ClusterClientFactory.createClusterClient(launcherOptions);
clusterClient.run(program, 1);
clusterClient.shutdown();

________________________________
[hidden email]

发件人: tison<mailto:[hidden email]>
发送时间: 2020-01-16 13:31
收件人: user-zh<mailto:[hidden email]>
主题: Re: Re: MiniCluster问题
MiniCluster miniCluster = new MiniCluster(configBuilder.build());

miniCluster.start();


MiniClusterClient clusterClient = new MiniClusterClient(config, miniCluster)
;

Best,
tison.


tison <[hidden email]> 于2020年1月16日周四 下午1:30写道:

> 跟集群无关
> Best,
> tison.
>
>
> tison <[hidden email]> 于2020年1月16日周四 下午1:30写道:
>
>> 1. 这是个内部实现类,文档是一个问题,但最好不要直接依赖
>>
>> 2. ?你截图的代码片段在哪?我看是你自己代码创建使用的啊
>>
>> Best,
>> tison.
>>
>>
>> 郑 洁锋 <[hidden email]> 于2020年1月16日周四 下午1:18写道:
>>
>>> MiniCluster有没有相关文档,官方文档里面看到部署模式里面没有相关的内容?
>>> 我是通过bin/start-cluster.sh启动的flink standalone集群
>>>
>>>
>>> ________________________________
>>> [hidden email]
>>>
>>> 发件人: tison<mailto:[hidden email]>
>>> 发送时间: 2020-01-16 12:39
>>> 收件人: user-zh<mailto:[hidden email]>
>>> 主题: Re: MiniCluster问题
>>> 你 MiniCluster 要 start 啊(x
>>>
>>> Best,
>>> tison.
>>>
>>>
>>> 郑 洁锋 <[hidden email]> 于2020年1月16日周四 上午11:38写道:
>>>
>>> > MiniCluster代码执行过程中报错:
>>> >
>>> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
>>> > SLF4J: Defaulting to no-operation (NOP) logger implementation
>>> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
>>> further details.
>>> > Exception in thread "main" java.lang.IllegalStateException:
>>> MiniCluster is not yet running.
>>> >         at
>>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>>> >         at
>>> org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223)
>>> >         at
>>> org.apache.flink.client.program.MiniClusterClient.<init>(MiniClusterClient.java:61)
>>> >         at
>>> com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82)
>>> >         at
>>> com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69)
>>> >         at
>>> com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99)
>>> >
>>> > 报错段代码如下:
>>> >
>>> > Configuration config =
>>> GlobalConfiguration.loadConfiguration(flinkConfDir);
>>> > MiniClusterConfiguration.Builder configBuilder = new
>>> MiniClusterConfiguration.Builder();
>>> > configBuilder.setConfiguration(config);
>>> > MiniCluster miniCluster = new MiniCluster(configBuilder.build());
>>> > MiniClusterClient clusterClient = new MiniClusterClient(config,
>>> miniCluster);
>>> >
>>> > 其中flinkConfDir为/opt/flink/conf
>>> >
>>> >
>>> > flink standalone HA集群信息如下:
>>> > ------------------------------
>>> > [hidden email]
>>> >
>>> >
>>> >
>>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: Re: MiniCluster问题

zjfplayer@hotmail.com
因为我这边看他是standalone,然后看到flink官方文档里面部署模式也有standalone模式,所以按照那个模式部署后来尝试

________________________________
[hidden email]

发件人: 郑 洁锋<mailto:[hidden email]>
发送时间: 2020-01-16 14:24
收件人: user-zh<mailto:[hidden email]>
主题: Re: Re: MiniCluster问题
这是完整的到启动的代码

public class ClusterClientFactory {

    public static ClusterClient createClusterClient(Options launcherOptions) throws Exception {
        String mode = launcherOptions.getMode();
        if(mode.equals(ClusterMode.standalone.name())) {
            return createStandaloneClient(launcherOptions);
        } else if(mode.equals(ClusterMode.yarn.name())) {
            return createYarnClient(launcherOptions,mode);
        }
        throw new IllegalArgumentException("Unsupported cluster client type: ");
    }

    public static ClusterClient createStandaloneClient(Options launcherOptions) throws Exception {
        String flinkConfDir = launcherOptions.getFlinkconf();
        Configuration config = GlobalConfiguration.loadConfiguration(flinkConfDir);
        MiniClusterConfiguration.Builder configBuilder = new MiniClusterConfiguration.Builder();
        configBuilder.setConfiguration(config);
        MiniCluster miniCluster = new MiniCluster(configBuilder.build());
        MiniClusterClient clusterClient = new MiniClusterClient(config, miniCluster);
        LeaderConnectionInfo connectionInfo = clusterClient.getClusterConnectionInfo();
        InetSocketAddress address = AkkaUtils.getInetSocketAddressFromAkkaURL(connectionInfo.getAddress());
        config.setString(JobManagerOptions.ADDRESS, address.getAddress().getHostName());
        config.setInteger(JobManagerOptions.PORT, address.getPort());
        clusterClient.setDetached(true);
        return clusterClient;
    }


启动类中:

ClusterClient clusterClient = ClusterClientFactory.createClusterClient(launcherOptions);
clusterClient.run(program, 1);
clusterClient.shutdown();

________________________________
[hidden email]

发件人: tison<mailto:[hidden email]>
发送时间: 2020-01-16 13:31
收件人: user-zh<mailto:[hidden email]>
主题: Re: Re: MiniCluster问题
MiniCluster miniCluster = new MiniCluster(configBuilder.build());

miniCluster.start();


MiniClusterClient clusterClient = new MiniClusterClient(config, miniCluster)
;

Best,
tison.


tison <[hidden email]> 于2020年1月16日周四 下午1:30写道:

> 跟集群无关
> Best,
> tison.
>
>
> tison <[hidden email]> 于2020年1月16日周四 下午1:30写道:
>
>> 1. 这是个内部实现类,文档是一个问题,但最好不要直接依赖
>>
>> 2. ?你截图的代码片段在哪?我看是你自己代码创建使用的啊
>>
>> Best,
>> tison.
>>
>>
>> 郑 洁锋 <[hidden email]> 于2020年1月16日周四 下午1:18写道:
>>
>>> MiniCluster有没有相关文档,官方文档里面看到部署模式里面没有相关的内容?
>>> 我是通过bin/start-cluster.sh启动的flink standalone集群
>>>
>>>
>>> ________________________________
>>> [hidden email]
>>>
>>> 发件人: tison<mailto:[hidden email]>
>>> 发送时间: 2020-01-16 12:39
>>> 收件人: user-zh<mailto:[hidden email]>
>>> 主题: Re: MiniCluster问题
>>> 你 MiniCluster 要 start 啊(x
>>>
>>> Best,
>>> tison.
>>>
>>>
>>> 郑 洁锋 <[hidden email]> 于2020年1月16日周四 上午11:38写道:
>>>
>>> > MiniCluster代码执行过程中报错:
>>> >
>>> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
>>> > SLF4J: Defaulting to no-operation (NOP) logger implementation
>>> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
>>> further details.
>>> > Exception in thread "main" java.lang.IllegalStateException:
>>> MiniCluster is not yet running.
>>> >         at
>>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>>> >         at
>>> org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223)
>>> >         at
>>> org.apache.flink.client.program.MiniClusterClient.<init>(MiniClusterClient.java:61)
>>> >         at
>>> com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82)
>>> >         at
>>> com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69)
>>> >         at
>>> com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99)
>>> >
>>> > 报错段代码如下:
>>> >
>>> > Configuration config =
>>> GlobalConfiguration.loadConfiguration(flinkConfDir);
>>> > MiniClusterConfiguration.Builder configBuilder = new
>>> MiniClusterConfiguration.Builder();
>>> > configBuilder.setConfiguration(config);
>>> > MiniCluster miniCluster = new MiniCluster(configBuilder.build());
>>> > MiniClusterClient clusterClient = new MiniClusterClient(config,
>>> miniCluster);
>>> >
>>> > 其中flinkConfDir为/opt/flink/conf
>>> >
>>> >
>>> > flink standalone HA集群信息如下:
>>> > ------------------------------
>>> > [hidden email]
>>> >
>>> >
>>> >
>>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: Re: MiniCluster问题

tison
你这完全是把几个概念混在一起了,MiniCluster 就是一个集群,是一个内部的部署模式,跟 standalone
是平行的概念。我看不懂你要干什么,但是就你的问题来说,我上面说了,就是你 MiniCluster new 出来之后没
start。但我不确定这个是不是你要的效果,MiniCluster 和 standalone 是平行的两种东西。

Best,
tison.


郑 洁锋 <[hidden email]> 于2020年1月16日周四 下午2:27写道:

> 因为我这边看他是standalone,然后看到flink官方文档里面部署模式也有standalone模式,所以按照那个模式部署后来尝试
>
> ________________________________
> [hidden email]
>
> 发件人: 郑 洁锋<mailto:[hidden email]>
> 发送时间: 2020-01-16 14:24
> 收件人: user-zh<mailto:[hidden email]>
> 主题: Re: Re: MiniCluster问题
> 这是完整的到启动的代码
>
> public class ClusterClientFactory {
>
>     public static ClusterClient createClusterClient(Options
> launcherOptions) throws Exception {
>         String mode = launcherOptions.getMode();
>         if(mode.equals(ClusterMode.standalone.name())) {
>             return createStandaloneClient(launcherOptions);
>         } else if(mode.equals(ClusterMode.yarn.name())) {
>             return createYarnClient(launcherOptions,mode);
>         }
>         throw new IllegalArgumentException("Unsupported cluster client
> type: ");
>     }
>
>     public static ClusterClient createStandaloneClient(Options
> launcherOptions) throws Exception {
>         String flinkConfDir = launcherOptions.getFlinkconf();
>         Configuration config =
> GlobalConfiguration.loadConfiguration(flinkConfDir);
>         MiniClusterConfiguration.Builder configBuilder = new
> MiniClusterConfiguration.Builder();
>         configBuilder.setConfiguration(config);
>         MiniCluster miniCluster = new MiniCluster(configBuilder.build());
>         MiniClusterClient clusterClient = new MiniClusterClient(config,
> miniCluster);
>         LeaderConnectionInfo connectionInfo =
> clusterClient.getClusterConnectionInfo();
>         InetSocketAddress address =
> AkkaUtils.getInetSocketAddressFromAkkaURL(connectionInfo.getAddress());
>         config.setString(JobManagerOptions.ADDRESS,
> address.getAddress().getHostName());
>         config.setInteger(JobManagerOptions.PORT, address.getPort());
>         clusterClient.setDetached(true);
>         return clusterClient;
>     }
>
>
> 启动类中:
>
> ClusterClient clusterClient =
> ClusterClientFactory.createClusterClient(launcherOptions);
> clusterClient.run(program, 1);
> clusterClient.shutdown();
>
> ________________________________
> [hidden email]
>
> 发件人: tison<mailto:[hidden email]>
> 发送时间: 2020-01-16 13:31
> 收件人: user-zh<mailto:[hidden email]>
> 主题: Re: Re: MiniCluster问题
> MiniCluster miniCluster = new MiniCluster(configBuilder.build());
>
> miniCluster.start();
>
>
> MiniClusterClient clusterClient = new MiniClusterClient(config,
> miniCluster)
> ;
>
> Best,
> tison.
>
>
> tison <[hidden email]> 于2020年1月16日周四 下午1:30写道:
>
> > 跟集群无关
> > Best,
> > tison.
> >
> >
> > tison <[hidden email]> 于2020年1月16日周四 下午1:30写道:
> >
> >> 1. 这是个内部实现类,文档是一个问题,但最好不要直接依赖
> >>
> >> 2. ?你截图的代码片段在哪?我看是你自己代码创建使用的啊
> >>
> >> Best,
> >> tison.
> >>
> >>
> >> 郑 洁锋 <[hidden email]> 于2020年1月16日周四 下午1:18写道:
> >>
> >>> MiniCluster有没有相关文档,官方文档里面看到部署模式里面没有相关的内容?
> >>> 我是通过bin/start-cluster.sh启动的flink standalone集群
> >>>
> >>>
> >>> ________________________________
> >>> [hidden email]
> >>>
> >>> 发件人: tison<mailto:[hidden email]>
> >>> 发送时间: 2020-01-16 12:39
> >>> 收件人: user-zh<mailto:[hidden email]>
> >>> 主题: Re: MiniCluster问题
> >>> 你 MiniCluster 要 start 啊(x
> >>>
> >>> Best,
> >>> tison.
> >>>
> >>>
> >>> 郑 洁锋 <[hidden email]> 于2020年1月16日周四 上午11:38写道:
> >>>
> >>> > MiniCluster代码执行过程中报错:
> >>> >
> >>> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> >>> > SLF4J: Defaulting to no-operation (NOP) logger implementation
> >>> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> >>> further details.
> >>> > Exception in thread "main" java.lang.IllegalStateException:
> >>> MiniCluster is not yet running.
> >>> >         at
> >>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
> >>> >         at
> >>>
> org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223)
> >>> >         at
> >>>
> org.apache.flink.client.program.MiniClusterClient.<init>(MiniClusterClient.java:61)
> >>> >         at
> >>>
> com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82)
> >>> >         at
> >>>
> com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69)
> >>> >         at
> >>> com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99)
> >>> >
> >>> > 报错段代码如下:
> >>> >
> >>> > Configuration config =
> >>> GlobalConfiguration.loadConfiguration(flinkConfDir);
> >>> > MiniClusterConfiguration.Builder configBuilder = new
> >>> MiniClusterConfiguration.Builder();
> >>> > configBuilder.setConfiguration(config);
> >>> > MiniCluster miniCluster = new MiniCluster(configBuilder.build());
> >>> > MiniClusterClient clusterClient = new MiniClusterClient(config,
> >>> miniCluster);
> >>> >
> >>> > 其中flinkConfDir为/opt/flink/conf
> >>> >
> >>> >
> >>> > flink standalone HA集群信息如下:
> >>> > ------------------------------
> >>> > [hidden email]
> >>> >
> >>> >
> >>> >
> >>>
> >>
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: MiniCluster问题

zjfplayer@hotmail.com
我是不是可以理解为是local的cluster,本身会去启动一个flink cluster(当前服务器模拟分布式环境),无需单独部署一个flink集群

________________________________
[hidden email]

发件人: tison<mailto:[hidden email]>
发送时间: 2020-01-16 14:29
收件人: user-zh<mailto:[hidden email]>
主题: Re: Re: MiniCluster问题
你这完全是把几个概念混在一起了,MiniCluster 就是一个集群,是一个内部的部署模式,跟 standalone
是平行的概念。我看不懂你要干什么,但是就你的问题来说,我上面说了,就是你 MiniCluster new 出来之后没
start。但我不确定这个是不是你要的效果,MiniCluster 和 standalone 是平行的两种东西。

Best,
tison.


郑 洁锋 <[hidden email]> 于2020年1月16日周四 下午2:27写道:

> 因为我这边看他是standalone,然后看到flink官方文档里面部署模式也有standalone模式,所以按照那个模式部署后来尝试
>
> ________________________________
> [hidden email]
>
> 发件人: 郑 洁锋<mailto:[hidden email]>
> 发送时间: 2020-01-16 14:24
> 收件人: user-zh<mailto:[hidden email]>
> 主题: Re: Re: MiniCluster问题
> 这是完整的到启动的代码
>
> public class ClusterClientFactory {
>
>     public static ClusterClient createClusterClient(Options
> launcherOptions) throws Exception {
>         String mode = launcherOptions.getMode();
>         if(mode.equals(ClusterMode.standalone.name())) {
>             return createStandaloneClient(launcherOptions);
>         } else if(mode.equals(ClusterMode.yarn.name())) {
>             return createYarnClient(launcherOptions,mode);
>         }
>         throw new IllegalArgumentException("Unsupported cluster client
> type: ");
>     }
>
>     public static ClusterClient createStandaloneClient(Options
> launcherOptions) throws Exception {
>         String flinkConfDir = launcherOptions.getFlinkconf();
>         Configuration config =
> GlobalConfiguration.loadConfiguration(flinkConfDir);
>         MiniClusterConfiguration.Builder configBuilder = new
> MiniClusterConfiguration.Builder();
>         configBuilder.setConfiguration(config);
>         MiniCluster miniCluster = new MiniCluster(configBuilder.build());
>         MiniClusterClient clusterClient = new MiniClusterClient(config,
> miniCluster);
>         LeaderConnectionInfo connectionInfo =
> clusterClient.getClusterConnectionInfo();
>         InetSocketAddress address =
> AkkaUtils.getInetSocketAddressFromAkkaURL(connectionInfo.getAddress());
>         config.setString(JobManagerOptions.ADDRESS,
> address.getAddress().getHostName());
>         config.setInteger(JobManagerOptions.PORT, address.getPort());
>         clusterClient.setDetached(true);
>         return clusterClient;
>     }
>
>
> 启动类中:
>
> ClusterClient clusterClient =
> ClusterClientFactory.createClusterClient(launcherOptions);
> clusterClient.run(program, 1);
> clusterClient.shutdown();
>
> ________________________________
> [hidden email]
>
> 发件人: tison<mailto:[hidden email]>
> 发送时间: 2020-01-16 13:31
> 收件人: user-zh<mailto:[hidden email]>
> 主题: Re: Re: MiniCluster问题
> MiniCluster miniCluster = new MiniCluster(configBuilder.build());
>
> miniCluster.start();
>
>
> MiniClusterClient clusterClient = new MiniClusterClient(config,
> miniCluster)
> ;
>
> Best,
> tison.
>
>
> tison <[hidden email]> 于2020年1月16日周四 下午1:30写道:
>
> > 跟集群无关
> > Best,
> > tison.
> >
> >
> > tison <[hidden email]> 于2020年1月16日周四 下午1:30写道:
> >
> >> 1. 这是个内部实现类,文档是一个问题,但最好不要直接依赖
> >>
> >> 2. ?你截图的代码片段在哪?我看是你自己代码创建使用的啊
> >>
> >> Best,
> >> tison.
> >>
> >>
> >> 郑 洁锋 <[hidden email]> 于2020年1月16日周四 下午1:18写道:
> >>
> >>> MiniCluster有没有相关文档,官方文档里面看到部署模式里面没有相关的内容?
> >>> 我是通过bin/start-cluster.sh启动的flink standalone集群
> >>>
> >>>
> >>> ________________________________
> >>> [hidden email]
> >>>
> >>> 发件人: tison<mailto:[hidden email]>
> >>> 发送时间: 2020-01-16 12:39
> >>> 收件人: user-zh<mailto:[hidden email]>
> >>> 主题: Re: MiniCluster问题
> >>> 你 MiniCluster 要 start 啊(x
> >>>
> >>> Best,
> >>> tison.
> >>>
> >>>
> >>> 郑 洁锋 <[hidden email]> 于2020年1月16日周四 上午11:38写道:
> >>>
> >>> > MiniCluster代码执行过程中报错:
> >>> >
> >>> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> >>> > SLF4J: Defaulting to no-operation (NOP) logger implementation
> >>> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> >>> further details.
> >>> > Exception in thread "main" java.lang.IllegalStateException:
> >>> MiniCluster is not yet running.
> >>> >         at
> >>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
> >>> >         at
> >>>
> org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223)
> >>> >         at
> >>>
> org.apache.flink.client.program.MiniClusterClient.<init>(MiniClusterClient.java:61)
> >>> >         at
> >>>
> com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82)
> >>> >         at
> >>>
> com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69)
> >>> >         at
> >>> com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99)
> >>> >
> >>> > 报错段代码如下:
> >>> >
> >>> > Configuration config =
> >>> GlobalConfiguration.loadConfiguration(flinkConfDir);
> >>> > MiniClusterConfiguration.Builder configBuilder = new
> >>> MiniClusterConfiguration.Builder();
> >>> > configBuilder.setConfiguration(config);
> >>> > MiniCluster miniCluster = new MiniCluster(configBuilder.build());
> >>> > MiniClusterClient clusterClient = new MiniClusterClient(config,
> >>> miniCluster);
> >>> >
> >>> > 其中flinkConfDir为/opt/flink/conf
> >>> >
> >>> >
> >>> > flink standalone HA集群信息如下:
> >>> > ------------------------------
> >>> > [hidden email]
> >>> >
> >>> >
> >>> >
> >>>
> >>
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: MiniCluster问题

tison
是的,MiniCluster 会在同一个进程里起 JM TM,是一个主要用于测试的集群

standalone 的意思是没有接 YARN 这种资源管理框架,TM 由用户自己手动起,是一个可用于生产的集群

Best,
tison.


郑 洁锋 <[hidden email]> 于2020年1月16日周四 下午2:39写道:

> 我是不是可以理解为是local的cluster,本身会去启动一个flink cluster(当前服务器模拟分布式环境),无需单独部署一个flink集群
>
> ________________________________
> [hidden email]
>
> 发件人: tison<mailto:[hidden email]>
> 发送时间: 2020-01-16 14:29
> 收件人: user-zh<mailto:[hidden email]>
> 主题: Re: Re: MiniCluster问题
> 你这完全是把几个概念混在一起了,MiniCluster 就是一个集群,是一个内部的部署模式,跟 standalone
> 是平行的概念。我看不懂你要干什么,但是就你的问题来说,我上面说了,就是你 MiniCluster new 出来之后没
> start。但我不确定这个是不是你要的效果,MiniCluster 和 standalone 是平行的两种东西。
>
> Best,
> tison.
>
>
> 郑 洁锋 <[hidden email]> 于2020年1月16日周四 下午2:27写道:
>
> > 因为我这边看他是standalone,然后看到flink官方文档里面部署模式也有standalone模式,所以按照那个模式部署后来尝试
> >
> > ________________________________
> > [hidden email]
> >
> > 发件人: 郑 洁锋<mailto:[hidden email]>
> > 发送时间: 2020-01-16 14:24
> > 收件人: user-zh<mailto:[hidden email]>
> > 主题: Re: Re: MiniCluster问题
> > 这是完整的到启动的代码
> >
> > public class ClusterClientFactory {
> >
> >     public static ClusterClient createClusterClient(Options
> > launcherOptions) throws Exception {
> >         String mode = launcherOptions.getMode();
> >         if(mode.equals(ClusterMode.standalone.name())) {
> >             return createStandaloneClient(launcherOptions);
> >         } else if(mode.equals(ClusterMode.yarn.name())) {
> >             return createYarnClient(launcherOptions,mode);
> >         }
> >         throw new IllegalArgumentException("Unsupported cluster client
> > type: ");
> >     }
> >
> >     public static ClusterClient createStandaloneClient(Options
> > launcherOptions) throws Exception {
> >         String flinkConfDir = launcherOptions.getFlinkconf();
> >         Configuration config =
> > GlobalConfiguration.loadConfiguration(flinkConfDir);
> >         MiniClusterConfiguration.Builder configBuilder = new
> > MiniClusterConfiguration.Builder();
> >         configBuilder.setConfiguration(config);
> >         MiniCluster miniCluster = new MiniCluster(configBuilder.build());
> >         MiniClusterClient clusterClient = new MiniClusterClient(config,
> > miniCluster);
> >         LeaderConnectionInfo connectionInfo =
> > clusterClient.getClusterConnectionInfo();
> >         InetSocketAddress address =
> > AkkaUtils.getInetSocketAddressFromAkkaURL(connectionInfo.getAddress());
> >         config.setString(JobManagerOptions.ADDRESS,
> > address.getAddress().getHostName());
> >         config.setInteger(JobManagerOptions.PORT, address.getPort());
> >         clusterClient.setDetached(true);
> >         return clusterClient;
> >     }
> >
> >
> > 启动类中:
> >
> > ClusterClient clusterClient =
> > ClusterClientFactory.createClusterClient(launcherOptions);
> > clusterClient.run(program, 1);
> > clusterClient.shutdown();
> >
> > ________________________________
> > [hidden email]
> >
> > 发件人: tison<mailto:[hidden email]>
> > 发送时间: 2020-01-16 13:31
> > 收件人: user-zh<mailto:[hidden email]>
> > 主题: Re: Re: MiniCluster问题
> > MiniCluster miniCluster = new MiniCluster(configBuilder.build());
> >
> > miniCluster.start();
> >
> >
> > MiniClusterClient clusterClient = new MiniClusterClient(config,
> > miniCluster)
> > ;
> >
> > Best,
> > tison.
> >
> >
> > tison <[hidden email]> 于2020年1月16日周四 下午1:30写道:
> >
> > > 跟集群无关
> > > Best,
> > > tison.
> > >
> > >
> > > tison <[hidden email]> 于2020年1月16日周四 下午1:30写道:
> > >
> > >> 1. 这是个内部实现类,文档是一个问题,但最好不要直接依赖
> > >>
> > >> 2. ?你截图的代码片段在哪?我看是你自己代码创建使用的啊
> > >>
> > >> Best,
> > >> tison.
> > >>
> > >>
> > >> 郑 洁锋 <[hidden email]> 于2020年1月16日周四 下午1:18写道:
> > >>
> > >>> MiniCluster有没有相关文档,官方文档里面看到部署模式里面没有相关的内容?
> > >>> 我是通过bin/start-cluster.sh启动的flink standalone集群
> > >>>
> > >>>
> > >>> ________________________________
> > >>> [hidden email]
> > >>>
> > >>> 发件人: tison<mailto:[hidden email]>
> > >>> 发送时间: 2020-01-16 12:39
> > >>> 收件人: user-zh<mailto:[hidden email]>
> > >>> 主题: Re: MiniCluster问题
> > >>> 你 MiniCluster 要 start 啊(x
> > >>>
> > >>> Best,
> > >>> tison.
> > >>>
> > >>>
> > >>> 郑 洁锋 <[hidden email]> 于2020年1月16日周四 上午11:38写道:
> > >>>
> > >>> > MiniCluster代码执行过程中报错:
> > >>> >
> > >>> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> > >>> > SLF4J: Defaulting to no-operation (NOP) logger implementation
> > >>> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> > >>> further details.
> > >>> > Exception in thread "main" java.lang.IllegalStateException:
> > >>> MiniCluster is not yet running.
> > >>> >         at
> > >>>
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
> > >>> >         at
> > >>>
> >
> org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223)
> > >>> >         at
> > >>>
> >
> org.apache.flink.client.program.MiniClusterClient.<init>(MiniClusterClient.java:61)
> > >>> >         at
> > >>>
> >
> com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82)
> > >>> >         at
> > >>>
> >
> com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69)
> > >>> >         at
> > >>>
> com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99)
> > >>> >
> > >>> > 报错段代码如下:
> > >>> >
> > >>> > Configuration config =
> > >>> GlobalConfiguration.loadConfiguration(flinkConfDir);
> > >>> > MiniClusterConfiguration.Builder configBuilder = new
> > >>> MiniClusterConfiguration.Builder();
> > >>> > configBuilder.setConfiguration(config);
> > >>> > MiniCluster miniCluster = new MiniCluster(configBuilder.build());
> > >>> > MiniClusterClient clusterClient = new MiniClusterClient(config,
> > >>> miniCluster);
> > >>> >
> > >>> > 其中flinkConfDir为/opt/flink/conf
> > >>> >
> > >>> >
> > >>> > flink standalone HA集群信息如下:
> > >>> > ------------------------------
> > >>> > [hidden email]
> > >>> >
> > >>> >
> > >>> >
> > >>>
> > >>
> >
>