Dear All:
运行环境:flink-1.11.3 运行背景:想要通过PackagedProgram启动Flink Jar(流程模拟flink run) 代码: | String[] programArgs = new String[]{}; String jarFilePath = "D:\\workspace\\FlinkJarDemo\\target\\FlinkJarDemo-1.0-SNAPSHOT.jar"; List<URL> classpaths = Collections.singletonList(new URL("file://D:\\workspace\\FlinkJarDemo\\target\\FlinkJarDemo-1.0-SNAPSHOT.jar")); Configuration configuration = new Configuration(); configuration.set(DeploymentOptions.ATTACHED, true); configuration.set(DeploymentOptions.SHUTDOWN_IF_ATTACHED, true); configuration.setString("execution.target", "local"); configuration.setString(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first"); configuration.setString(CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL, "org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner;org.apache.flink.api.java.functions.KeySelector;"); // Get assembler class String entryPointClass = "a.b.c.Test"; File jarFile = jarFilePath != null ? getJarFile(jarFilePath) : null; PackagedProgram program = PackagedProgram.newBuilder() .setJarFile(jarFile) .setUserClassPaths(classpaths) .setEntryPointClassName(entryPointClass) .setConfiguration(configuration) //.setSavepointRestoreSettings(runOptions.getSavepointRestoreSettings()) .setArguments(programArgs) .build(); ClientUtils.executeProgram( new DefaultExecutorServiceLoader(), configuration, program, false, false ); | 运行后提示: | Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not instantiate outputs in order. at org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:429) at org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriters(StreamTask.java:1150) at org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriterDelegate(StreamTask.java:1134) at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:284) at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:271) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.<init>(SourceStreamTask.java:73) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.<init>(SourceStreamTask.java:69) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1372) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:699) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.keySelector of type org.apache.flink.api.java.functions.KeySelector in instance of org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2302) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1432) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2409) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2403) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:501) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:459) at java.util.ArrayList.readObject(ArrayList.java:799) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2294) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:501) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:459) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) at org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:426) ... 14 more | 查阅资料说是类加载的问题,已经修改为parent-first模式。 目前猜测是TaskManager端的执行有问题,无法加载到用户jar。 不知道是不是我PackagedProgram使用有问题,望解答。 Best, xingoo |
问题解决了,是因为自定义Jar中存在UDF。<br/>而生成的Pipeline未挂载对应的依赖jar,导致任务分发后UDF无法实例化。<br/><br/>解决办法如下:<br/>String[] programArgs = new String[]{};<br/>String jarFilePath = "D:\\workspace\\FlinkJarDemo\\target\\FlinkJarDemo-1.0-SNAPSHOT.jar";<br/>List<URL> classpaths = Collections.emptyList();<br/><br/>Configuration configuration = new Configuration();<br/>configuration.set(DeploymentOptions.ATTACHED, true);<br/>configuration.set(DeploymentOptions.SHUTDOWN_IF_ATTACHED, true);<br/>configuration.setString("execution.target", "local");<br/><br/>// Get assembler class<br/>String entryPointClass = "a.b.c.Test";<br/>File jarFile = jarFilePath != null ? getJarFile(jarFilePath) : null;<br/><br/>PackagedProgram program = PackagedProgram.newBuilder()<br/> .setJarFile(jarFile)<br/> .setUserClassPaths(classpaths)<br/> .setEntryPointClassName(entryPointClass)<br/> .setConfiguration(configuration)<br/> //.setSavepointRestoreSettings(runOptions.getSavepointRestoreSettings())<br/> .setArguments(programArgs)<br/> .build();<br/><br/>// 为pipeline添加用户jar<br/>ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.JARS, program.getJobJarAndDependencies(), URL::toString);<br/><br/>ClientUtils.executeProgram(<br/> new DefaultExecutorServiceLoader(),<br/> configuration,<br/> program,<br/> false,<br/> false<br/>);
在 2021-01-19 14:01:58,"xingoo" <[hidden email]> 写道: >Dear All: >运行环境:flink-1.11.3 >运行背景:想要通过PackagedProgram启动Flink Jar(流程模拟flink run) >代码: >| >String[] programArgs = new String[]{}; >String jarFilePath = "D:\\workspace\\FlinkJarDemo\\target\\FlinkJarDemo-1.0-SNAPSHOT.jar"; >List<URL> classpaths = Collections.singletonList(new URL("file://D:\\workspace\\FlinkJarDemo\\target\\FlinkJarDemo-1.0-SNAPSHOT.jar")); > >Configuration configuration = new Configuration(); >configuration.set(DeploymentOptions.ATTACHED, true); >configuration.set(DeploymentOptions.SHUTDOWN_IF_ATTACHED, true); >configuration.setString("execution.target", "local"); >configuration.setString(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first"); >configuration.setString(CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL, "org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner;org.apache.flink.api.java.functions.KeySelector;"); > >// Get assembler class >String entryPointClass = "a.b.c.Test"; >File jarFile = jarFilePath != null ? getJarFile(jarFilePath) : null; > >PackagedProgram program = PackagedProgram.newBuilder() > .setJarFile(jarFile) > .setUserClassPaths(classpaths) > .setEntryPointClassName(entryPointClass) > .setConfiguration(configuration) >//.setSavepointRestoreSettings(runOptions.getSavepointRestoreSettings()) >.setArguments(programArgs) > .build(); > >ClientUtils.executeProgram( >new DefaultExecutorServiceLoader(), >configuration, >program, > false, > false >); >| > >运行后提示: > >| >Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not instantiate outputs in order. >at org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:429) >at org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriters(StreamTask.java:1150) >at org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriterDelegate(StreamTask.java:1134) >at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:284) >at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:271) >at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.<init>(SourceStreamTask.java:73) >at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.<init>(SourceStreamTask.java:69) >at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) >at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) >at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >at java.lang.reflect.Constructor.newInstance(Constructor.java:423) >at org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1372) >at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:699) >at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) >at java.lang.Thread.run(Thread.java:748) >Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.keySelector of type org.apache.flink.api.java.functions.KeySelector in instance of org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner >at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2302) >at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1432) >at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2409) >at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) >at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) >at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) >at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2403) >at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) >at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) >at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) >at java.io.ObjectInputStream.readObject(ObjectInputStream.java:501) >at java.io.ObjectInputStream.readObject(ObjectInputStream.java:459) >at java.util.ArrayList.readObject(ArrayList.java:799) >at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >at java.lang.reflect.Method.invoke(Method.java:498) >at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185) >at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2294) >at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) >at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) >at java.io.ObjectInputStream.readObject(ObjectInputStream.java:501) >at java.io.ObjectInputStream.readObject(ObjectInputStream.java:459) >at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) >at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) >at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) >at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) >at org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:426) >... 14 more >| > > > > >查阅资料说是类加载的问题,已经修改为parent-first模式。 >目前猜测是TaskManager端的执行有问题,无法加载到用户jar。 >不知道是不是我PackagedProgram使用有问题,望解答。 > > >Best, >xingoo > |
Free forum by Nabble | Edit this page |