Flink 1.11.3 PackagedProgram启动报错

classic Classic list List threaded Threaded
2 messages Options
111
Reply | Threaded
Open this post in threaded view
|

Flink 1.11.3 PackagedProgram启动报错

111
Dear All:
运行环境:flink-1.11.3
运行背景:想要通过PackagedProgram启动Flink Jar(流程模拟flink run)
代码:
|
String[] programArgs = new String[]{};
String jarFilePath = "D:\\workspace\\FlinkJarDemo\\target\\FlinkJarDemo-1.0-SNAPSHOT.jar";
List<URL> classpaths = Collections.singletonList(new URL("file://D:\\workspace\\FlinkJarDemo\\target\\FlinkJarDemo-1.0-SNAPSHOT.jar"));

Configuration configuration = new Configuration();
configuration.set(DeploymentOptions.ATTACHED, true);
configuration.set(DeploymentOptions.SHUTDOWN_IF_ATTACHED, true);
configuration.setString("execution.target", "local");
configuration.setString(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first");
configuration.setString(CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL, "org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner;org.apache.flink.api.java.functions.KeySelector;");

// Get assembler class
String entryPointClass = "a.b.c.Test";
File jarFile = jarFilePath != null ? getJarFile(jarFilePath) : null;

PackagedProgram program = PackagedProgram.newBuilder()
                .setJarFile(jarFile)
                .setUserClassPaths(classpaths)
                .setEntryPointClassName(entryPointClass)
                .setConfiguration(configuration)
//.setSavepointRestoreSettings(runOptions.getSavepointRestoreSettings())
.setArguments(programArgs)
                .build();

ClientUtils.executeProgram(
new DefaultExecutorServiceLoader(),
configuration,
program,
                false,
                false
);
|

运行后提示:

|
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not instantiate outputs in order.
at org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:429)
at org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriters(StreamTask.java:1150)
at org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriterDelegate(StreamTask.java:1134)
at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:284)
at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:271)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.<init>(SourceStreamTask.java:73)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.<init>(SourceStreamTask.java:69)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1372)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:699)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.keySelector of type org.apache.flink.api.java.functions.KeySelector in instance of org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2302)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1432)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2409)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2403)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:501)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:459)
at java.util.ArrayList.readObject(ArrayList.java:799)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2294)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:501)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:459)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
at org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:426)
... 14 more
|




查阅资料说是类加载的问题,已经修改为parent-first模式。
目前猜测是TaskManager端的执行有问题,无法加载到用户jar。
不知道是不是我PackagedProgram使用有问题,望解答。


Best,
xingoo

111
Reply | Threaded
Open this post in threaded view
|

Re:Flink 1.11.3 PackagedProgram启动报错

111
问题解决了,是因为自定义Jar中存在UDF。<br/>而生成的Pipeline未挂载对应的依赖jar,导致任务分发后UDF无法实例化。<br/><br/>解决办法如下:<br/>String[] programArgs = new String[]{};<br/>String jarFilePath = "D:\\workspace\\FlinkJarDemo\\target\\FlinkJarDemo-1.0-SNAPSHOT.jar";<br/>List&lt;URL&gt; classpaths = Collections.emptyList();<br/><br/>Configuration configuration = new Configuration();<br/>configuration.set(DeploymentOptions.ATTACHED, true);<br/>configuration.set(DeploymentOptions.SHUTDOWN_IF_ATTACHED, true);<br/>configuration.setString("execution.target", "local");<br/><br/>// Get assembler class<br/>String entryPointClass = "a.b.c.Test";<br/>File jarFile = jarFilePath != null ? getJarFile(jarFilePath) : null;<br/><br/>PackagedProgram program = PackagedProgram.newBuilder()<br/>        .setJarFile(jarFile)<br/>        .setUserClassPaths(classpaths)<br/>        .setEntryPointClassName(entryPointClass)<br/>        .setConfiguration(configuration)<br/>        //.setSavepointRestoreSettings(runOptions.getSavepointRestoreSettings())<br/>        .setArguments(programArgs)<br/>        .build();<br/><br/>// 为pipeline添加用户jar<br/>ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.JARS, program.getJobJarAndDependencies(), URL::toString);<br/><br/>ClientUtils.executeProgram(<br/>        new DefaultExecutorServiceLoader(),<br/>        configuration,<br/>        program,<br/>        false,<br/>        false<br/>);
在 2021-01-19 14:01:58,"xingoo" <[hidden email]> 写道:

>Dear All:
>运行环境:flink-1.11.3
>运行背景:想要通过PackagedProgram启动Flink Jar(流程模拟flink run)
>代码:
>|
>String[] programArgs = new String[]{};
>String jarFilePath = "D:\\workspace\\FlinkJarDemo\\target\\FlinkJarDemo-1.0-SNAPSHOT.jar";
>List<URL> classpaths = Collections.singletonList(new URL("file://D:\\workspace\\FlinkJarDemo\\target\\FlinkJarDemo-1.0-SNAPSHOT.jar"));
>
>Configuration configuration = new Configuration();
>configuration.set(DeploymentOptions.ATTACHED, true);
>configuration.set(DeploymentOptions.SHUTDOWN_IF_ATTACHED, true);
>configuration.setString("execution.target", "local");
>configuration.setString(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first");
>configuration.setString(CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL, "org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner;org.apache.flink.api.java.functions.KeySelector;");
>
>// Get assembler class
>String entryPointClass = "a.b.c.Test";
>File jarFile = jarFilePath != null ? getJarFile(jarFilePath) : null;
>
>PackagedProgram program = PackagedProgram.newBuilder()
>                .setJarFile(jarFile)
>                .setUserClassPaths(classpaths)
>                .setEntryPointClassName(entryPointClass)
>                .setConfiguration(configuration)
>//.setSavepointRestoreSettings(runOptions.getSavepointRestoreSettings())
>.setArguments(programArgs)
>                .build();
>
>ClientUtils.executeProgram(
>new DefaultExecutorServiceLoader(),
>configuration,
>program,
>                false,
>                false
>);
>|
>
>运行后提示:
>
>|
>Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not instantiate outputs in order.
>at org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:429)
>at org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriters(StreamTask.java:1150)
>at org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriterDelegate(StreamTask.java:1134)
>at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:284)
>at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:271)
>at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.<init>(SourceStreamTask.java:73)
>at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.<init>(SourceStreamTask.java:69)
>at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>at org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1372)
>at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:699)
>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>at java.lang.Thread.run(Thread.java:748)
>Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.keySelector of type org.apache.flink.api.java.functions.KeySelector in instance of org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner
>at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2302)
>at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1432)
>at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2409)
>at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327)
>at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185)
>at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665)
>at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2403)
>at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327)
>at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185)
>at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665)
>at java.io.ObjectInputStream.readObject(ObjectInputStream.java:501)
>at java.io.ObjectInputStream.readObject(ObjectInputStream.java:459)
>at java.util.ArrayList.readObject(ArrayList.java:799)
>at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>at java.lang.reflect.Method.invoke(Method.java:498)
>at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
>at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2294)
>at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185)
>at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665)
>at java.io.ObjectInputStream.readObject(ObjectInputStream.java:501)
>at java.io.ObjectInputStream.readObject(ObjectInputStream.java:459)
>at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
>at org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:426)
>... 14 more
>|
>
>
>
>
>查阅资料说是类加载的问题,已经修改为parent-first模式。
>目前猜测是TaskManager端的执行有问题,无法加载到用户jar。
>不知道是不是我PackagedProgram使用有问题,望解答。
>
>
>Best,
>xingoo
>