Hi Team,
我再k8s上面运行application的一个demo 失败, 下面是错误消息。 谢谢 java.lang.IllegalStateException: Adaptive Scheduler is required for reactive mode at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at org.apache.flink.runtime.dispatcher.JobMasterServiceLeadershipRunnerFactory.createJobManagerRunner(JobMasterServiceLeadershipRunnerFactory.java:80) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:468) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:399) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:392) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJob$27(Dispatcher.java:954) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) ~[?:1.8.0_292] at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) ~[?:1.8.0_292] at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) ~[?:1.8.0_292] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at akka.actor.Actor.aroundReceive(Actor.scala:517) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at akka.actor.Actor.aroundReceive$(Actor.scala:515) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~[flink-dist_2.12-1.13.1.jar:1.13.1] 2021-07-09 03:17:32,667 WARN org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap [] - Application failed unexpectedly: java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application. JM 定义 apiVersion: batch/v1 kind: Job metadata: name: flink-jobmanager spec: template: metadata: labels: app: flink component: jobmanager spec: imagePullSecrets: - name: artifactory-container-registry restartPolicy: OnFailure containers: - name: jobmanager image: txo-dswim-esb-docker-local.artifactory.swg-devops.com/diak8scluster/flink-hadoop-app:fvt imagePullPolicy: Always #args: ["standalone-job", "--job-classname", "org.apache.flink.streaming.examples.wordcount", <optional arguments>, <job arguments>] args: ["standalone-job", "--job-classname", "BatchWordCount", <optional arguments>, <job arguments>] ports: - containerPort: 6123 name: rpc - containerPort: 6124 name: blob-server - containerPort: 8081 name: webui livenessProbe: tcpSocket: port: 6123 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/flink/conf securityContext: runAsUser: 0 # refers to user _flink_ from official flink image, change if necessary volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties path: log4j-console.properties The Dockerfille FROM maven:3.6-jdk-8-slim AS builder # get explore-flink job and compile it WORKDIR /opt/explore-flink COPY ./flinktest /opt/explore-flink RUN mvn clean install FROM flink:1.13.1-scala_2.12 WORKDIR $FLINK_HOME RUN mkdir -p $FLINK_HOME/usrlib COPY --from=builder --chown=flink:flink /opt/explore-flink/target/flinktest-1.0-SNAPSHOT-jar-with-dependencies.jar $FLINK_HOME/usrlib/flinktest-1.0-SNAPSHOT-jar-with-dependencies.jar #Entropy injection for S3 file systems RUN mkdir $FLINK_HOME/plugins/s3-fs-hadoop \ && mkdir $FLINK_HOME/plugins/s3-fs-presto COPY flink-s3-fs-hadoop-1.13.1.jar $FLINK_HOME/plugins/s3-fs-hadoop COPY flink-s3-fs-presto-1.13.1.jar $FLINK_HOME/plugins/s3-fs-presto RUN chown -R flink:flink .; Have checked the image where has this jar under folder /opt/flink/usrlib flink@38374be9bdf1:~/usrlib$ ls -lrt total 85660 -rw-r--r-- 1 flink flink 87712073 Jul 6 10:41 flinktest-1.0-SNAPSHOT-jar-with-dependencies.jar flink@38374be9bdf1:~/usrlib$ The Java class import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; /** * Count each word when provide the source. */ public class BatchWordCount { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<String> text = env.fromElements("Flink batch demo", "batch demo", "demo"); DataSet<Tuple2<String, Integer>> ds = text.flatMap(new LineSplitter()).groupBy(0).sum(1); ds.print(); } static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception { for (String word : line.split(" ")) { collector.collect(new Tuple2<String, Integer>(word,1) ); } } } } |
Free forum by Nabble | Edit this page |