MiniCluster启动报actor初始化异常

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

MiniCluster启动报actor初始化异常

叶贤勋
Hi 大家好,请教一个问题:
    Flink版本1.9.1,使用MiniCluster在本地提交任务, 不能正常执行任务,下面的报错信息是在执行miniCluster.start()时报的,并且程序一直卡在这一步 miniCluster.executeJobBlocking(getSimpleJob(numOfTMs * slotsPerTM))
    Java Code:

public static void main(String[] args) throws Exception {
final int numOfTMs = 3;
final int slotsPerTM = 7;

final MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
.setNumTaskManagers(numOfTMs)
.setNumSlotsPerTaskManager(slotsPerTM)
.setRpcServiceSharing(RpcServiceSharing.SHARED)
.setConfiguration(getDefaultConfiguration())
.build();
final MiniCluster miniCluster = new MiniCluster(cfg);
try {
miniCluster.start();
JobExecutionResult result = miniCluster.executeJobBlocking(getSimpleJob(numOfTMs * slotsPerTM));
System.out.println(result.toString());
} catch (Exception e) {
e.printStackTrace();
} finally {
miniCluster.close();
}
}

private static Configuration getDefaultConfiguration() {
final Configuration configuration = new Configuration();
configuration.setString(RestOptions.BIND_PORT, "0");

return configuration;
}

private static JobGraph getSimpleJob(int parallelism) throws IOException {
final JobVertex task = new JobVertex("Test task");
task.setParallelism(parallelism);
task.setMaxParallelism(parallelism);
task.setInvokableClass(NoOpInvokable.class);

final JobGraph jg = new JobGraph(new JobID(), "Test Job", task);
jg.setScheduleMode(ScheduleMode.EAGER);

final ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000));
jg.setExecutionConfig(executionConfig);

return jg;
}
      报错信息:
        02/12/2019 20:49:21.212  INFO [org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils] Actor system started at akka.tcp://flink-metrics@10.242.32.235:65105
02/12/2019 20:49:21.219  INFO [org.apache.flink.runtime.rpc.akka.AkkaRpcService] Starting RPC endpoint for org.apache.flink.runtime.metrics.dump.MetricQueryService at akka://flink-metrics/user/MetricQueryService .
02/12/2019 20:49:21.227 ERROR [org.apache.flink.runtime.akka.StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy] Actor failed with exception. Stopping it now.
akka.actor.ActorInitializationException: akka://flink-metrics/user/MetricQueryService: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:175)
at akka.actor.ActorCell.create(ActorCell.scala:607)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:461)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
at akka.dispatch.Mailbox.run(Mailbox.scala:223)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: akka.actor.IllegalActorStateException: Actor behavior has not been set with receive(...)
at akka.actor.IllegalActorStateException$.apply(Actor.scala:140)
at akka.actor.AbstractActor.receive(AbstractActor.scala:74)
at akka.actor.ActorCell.newActor(ActorCell.scala:568)
at akka.actor.ActorCell.create(ActorCell.scala:588)
... 7 more
02/12/2019 20:49:21.229  INFO [org.apache.flink.runtime.minicluster.MiniCluster] Starting high-availability services
02/12/2019 20:49:21.245  INFO [org.apache.flink.runtime.blob.BlobServer] Created BLOB server storage directory /var/folders/7b/x2zfnt157ls4f27syvlyjjsc0000gn/T/blobStore-575c98fc-54f2-45f8-a183-96d3880e940b
……
02/12/2019 20:49:21.671  INFO [org.apache.flink.runtime.taskexecutor.TaskManagerServices] Limiting managed memory to 524287 MB, memory will be allocated lazily.
02/12/2019 20:49:21.674 DEBUG [org.apache.flink.runtime.memory.MemoryManager] Initialized MemoryManager with total memory size 549754765312, number of slots 1, page size 32768, memory type HEAP, pre allocate memory false and number of non allocated pages 16777184.
02/12/2019 20:49:21.685  INFO [org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration] Messages have a max timeout of 10000 ms
02/12/2019 20:49:21.696  INFO [org.apache.flink.runtime.rpc.akka.AkkaRpcService] Starting RPC endpoint for org.apache.flink.runtime.taskexecutor.TaskExecutor at akka://flink/user/taskmanager_0 .
02/12/2019 20:49:21.697 ERROR [org.apache.flink.runtime.akka.StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy] Actor failed with exception. Stopping it now.
akka.actor.ActorInitializationException: akka://flink/user/taskmanager_0: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:175)
at akka.actor.ActorCell.create(ActorCell.scala:607)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:461)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
at akka.dispatch.Mailbox.run(Mailbox.scala:223)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.actor.IllegalActorStateException: Actor behavior has not been set with receive(...)
at akka.actor.IllegalActorStateException$.apply(Actor.scala:140)
at akka.actor.AbstractActor.receive(AbstractActor.scala:74)
at akka.actor.ActorCell.newActor(ActorCell.scala:568)
at akka.actor.ActorCell.create(ActorCell.scala:588)
... 9 more
02/12/2019 20:49:21.735  INFO [org.apache.flink.configuration.Configuration] Config uses fallback configuration key 'rest.port' instead of key 'rest.bind-port'
02/12/2019 20:49:21.774 DEBUG [org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory] Starting Dispatcher REST endpoint.
02/12/2019 20:49:21.774  INFO [org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint] Starting rest endpoint.
……
02/12/2019 20:49:22.245  INFO [org.apache.flink.runtime.rpc.akka.AkkaRpcService] Starting RPC endpoint for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at akka://flink/user/resourcemanager .
02/12/2019 20:49:22.246 ERROR [org.apache.flink.runtime.akka.StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy] Actor failed with exception. Stopping it now.
akka.actor.ActorInitializationException: akka://flink/user/resourcemanager: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:175)
at akka.actor.ActorCell.create(ActorCell.scala:607)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:461)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
at akka.dispatch.Mailbox.run(Mailbox.scala:223)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.actor.IllegalActorStateException: Actor behavior has not been set with receive(...)
at akka.actor.IllegalActorStateException$.apply(Actor.scala:140)
at akka.actor.AbstractActor.receive(AbstractActor.scala:74)
at akka.actor.ActorCell.newActor(ActorCell.scala:568)
at akka.actor.ActorCell.create(ActorCell.scala:588)
... 9 more
02/12/2019 20:49:22.257  INFO [org.apache.flink.runtime.rpc.akka.AkkaRpcService] Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/dispatcher .
02/12/2019 20:49:22.257 ERROR [org.apache.flink.runtime.akka.StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy] Actor failed with exception. Stopping it now.
akka.actor.ActorInitializationException: akka://flink/user/dispatcher: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:175)
at akka.actor.ActorCell.create(ActorCell.scala:607)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:461)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
at akka.dispatch.Mailbox.run(Mailbox.scala:223)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.actor.IllegalActorStateException: Actor behavior has not been set with receive(...)
at akka.actor.IllegalActorStateException$.apply(Actor.scala:140)
at akka.actor.AbstractActor.receive(AbstractActor.scala:74)
at akka.actor.ActorCell.newActor(ActorCell.scala:568)
at akka.actor.ActorCell.create(ActorCell.scala:588)
... 9 more
02/12/2019 20:49:22.267 DEBUG [org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory] Starting ResourceManager.
02/12/2019 20:49:22.267 DEBUG [org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory] Starting Dispatcher.
02/12/2019 20:49:22.268  INFO [org.apache.flink.runtime.minicluster.MiniCluster] Flink Mini Cluster started successfully


NoOpInvokable.java (1K) Download Attachment