再k8s 上面运行application demo 失败

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

再k8s 上面运行application demo 失败

levi-015
Hi Team,

我再k8s上面运行application的一个demo 失败, 下面是错误消息。 谢谢

java.lang.IllegalStateException: Adaptive Scheduler is required for reactive mode
        at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
        at org.apache.flink.runtime.dispatcher.JobMasterServiceLeadershipRunnerFactory.createJobManagerRunner(JobMasterServiceLeadershipRunnerFactory.java:80) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
        at org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:468) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
        at org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:399) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
        at org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:392) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
        at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJob$27(Dispatcher.java:954) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
        at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
        at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) ~[?:1.8.0_292]
        at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) ~[?:1.8.0_292]
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) ~[?:1.8.0_292]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
        at akka.actor.Actor.aroundReceive(Actor.scala:517) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
        at akka.actor.Actor.aroundReceive$(Actor.scala:515) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
        at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
        at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
2021-07-09 03:17:32,667 WARN  org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap [] - Application failed unexpectedly:
java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.


JM 定义

apiVersion: batch/v1
kind: Job
metadata:
  name: flink-jobmanager
spec:
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      imagePullSecrets:
        - name: artifactory-container-registry
      restartPolicy: OnFailure
      containers:
        - name: jobmanager
          image: txo-dswim-esb-docker-local.artifactory.swg-devops.com/diak8scluster/flink-hadoop-app:fvt
          imagePullPolicy: Always
          #args: ["standalone-job", "--job-classname", "org.apache.flink.streaming.examples.wordcount", <optional arguments>, <job arguments>]
          args: ["standalone-job", "--job-classname", "BatchWordCount", <optional arguments>, <job arguments>]
          ports:
            - containerPort: 6123
              name: rpc
            - containerPort: 6124
              name: blob-server
            - containerPort: 8081
              name: webui
          livenessProbe:
            tcpSocket:
              port: 6123
            initialDelaySeconds: 30
            periodSeconds: 60
          volumeMounts:
            - name: flink-config-volume
              mountPath: /opt/flink/conf
          securityContext:
            runAsUser: 0  # refers to user _flink_ from official flink image, change if necessary
      volumes:
        - name: flink-config-volume
          configMap:
            name: flink-config
            items:
              - key: flink-conf.yaml
                path: flink-conf.yaml
              - key: log4j-console.properties
                path: log4j-console.properties


The Dockerfille

FROM maven:3.6-jdk-8-slim AS builder
# get explore-flink job and compile it
WORKDIR /opt/explore-flink
COPY ./flinktest /opt/explore-flink
RUN mvn clean install

FROM flink:1.13.1-scala_2.12

WORKDIR $FLINK_HOME
RUN mkdir -p $FLINK_HOME/usrlib
COPY --from=builder --chown=flink:flink /opt/explore-flink/target/flinktest-1.0-SNAPSHOT-jar-with-dependencies.jar $FLINK_HOME/usrlib/flinktest-1.0-SNAPSHOT-jar-with-dependencies.jar

#Entropy injection for S3 file systems
RUN mkdir $FLINK_HOME/plugins/s3-fs-hadoop \
    && mkdir $FLINK_HOME/plugins/s3-fs-presto

COPY flink-s3-fs-hadoop-1.13.1.jar $FLINK_HOME/plugins/s3-fs-hadoop
COPY flink-s3-fs-presto-1.13.1.jar $FLINK_HOME/plugins/s3-fs-presto
RUN chown -R flink:flink .;

Have checked the image where has this jar under folder  /opt/flink/usrlib

flink@38374be9bdf1:~/usrlib$ ls -lrt
total 85660
-rw-r--r-- 1 flink flink 87712073 Jul  6 10:41 flinktest-1.0-SNAPSHOT-jar-with-dependencies.jar
flink@38374be9bdf1:~/usrlib$


The Java class

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
 *  Count each word when provide the source.
 */

public class BatchWordCount {



    public static void main(String[] args) throws Exception {

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<String> text = env.fromElements("Flink batch demo", "batch demo", "demo");
        DataSet<Tuple2<String, Integer>> ds = text.flatMap(new LineSplitter()).groupBy(0).sum(1);
        ds.print();
    }

    static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

        public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {

            for (String word : line.split(" ")) {
                collector.collect(new Tuple2<String, Integer>(word,1) );
            }
        }
    }
}
Reply | Threaded
Open this post in threaded view
|

Re: 再k8s 上面运行application demo 失败

levi-015
I'm comment this line in Java Code and it's working now.

#ds.print();

Thanks!