Hi,
flink-connector-jdbc_2.11-1.11.1.jar 有添加在flink/lib下,只能保证在作业执行的时候,可以找到对应的class,在客户端提交的时候,会编译作业,从报错看,是客户端编译作业的时候找不到对应的class。 可以试试这里的方法:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/dependency_management.html#java-dependency > 在 2021年2月7日,上午10:50,陈康 <[hidden email]> 写道: > > 请教大佬们: > 在执行pyflink过程中,报错 Could not read the user code wrapper: > org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat > .. > Caused by: java.lang.ClassNotFoundException: > org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat > > flink-connector-jdbc_2.11-1.11.1.jar 有添加在flink/lib下 > > 有了解的大佬嘛、谢谢~ > > ============= > > [hadoop@hadoop01 lib]$ pwd > /opt/module/flink-1.11.1/lib > [hadoop@hadoop01 lib]$ ll > total 228100 > *-rw-r--r-- 1 hadoop hadoop 196528 2021/02/07 10:26:21 > flink-connector-jdbc_2.11-1.11.1.jar* > -rw-r--r-- 1 hadoop hadoop 90782 2020/07/15 17:24:31 > flink-csv-1.11.1.jar > -rw-r--r-- 1 hadoop hadoop 108350618 2020/07/15 17:30:13 > flink-dist_2.11-1.11.1.jar > -rw-r--r-- 1 hadoop hadoop 94865 2020/07/15 17:24:11 > flink-json-1.11.1.jar > -rw-r--r-- 1 hadoop hadoop 41507566 2021/02/05 17:05:01 > flink-shaded-hadoop-2-uber-2.7.5-9.0.jar > -rw-r--r-- 1 hadoop hadoop 7712156 2020/06/18 10:42:32 > flink-shaded-zookeeper-3.4.14.jar > -rw-r--r-- 1 hadoop hadoop 33327194 2020/07/15 17:28:46 > flink-table_2.11-1.11.1.jar > -rw-r--r-- 1 hadoop hadoop 37331759 2020/07/15 17:28:54 > flink-table-blink_2.11-1.11.1.jar > -rw-r--r-- 1 hadoop hadoop 1889063 2021/01/13 17:03:03 > kafka-clients-2.1.0.jar > -rw-r--r-- 1 hadoop hadoop 67114 2020/04/20 20:47:31 > log4j-1.2-api-2.12.1.jar > -rw-r--r-- 1 hadoop hadoop 276771 2020/04/20 20:47:31 > log4j-api-2.12.1.jar > -rw-r--r-- 1 hadoop hadoop 1674433 2020/04/20 20:47:32 > log4j-core-2.12.1.jar > -rw-r--r-- 1 hadoop hadoop 23518 2020/04/20 20:47:31 > log4j-slf4j-impl-2.12.1.jar > -rw-r--r-- 1 hadoop hadoop 1007502 2020/12/25 10:51:20 > mysql-connector-java-5.1.47.jar > > > > -------------- > > *[hadoop@hadoop01 pyflink]$ /opt/module/flink-1.11.1/bin/flink run -py > NtPyFlink.py* > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/opt/module/flink-1.11.1/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/opt/module/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/opt/module/hbase/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type > [org.apache.logging.slf4j.Log4jLoggerFactory] > > Traceback (most recent call last): > File "NtPyFlink.py", line 300, in <module> > t_env.execute('重连参数预测') > File > "/opt/module/flink-1.11.1/opt/python/pyflink.zip/pyflink/table/table_environment.py", > line 1057, in execute > File > "/opt/module/flink-1.11.1/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", > line 1286, in __call__ > File > "/opt/module/flink-1.11.1/opt/python/pyflink.zip/pyflink/util/exceptions.py", > line 147, in deco > File > "/opt/module/flink-1.11.1/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", > line 328, in get_return_value > py4j.protocol.Py4JJavaError: An error occurred while calling o6.execute. > : org.apache.flink.util.FlinkException: Failed to execute job '重连参数预测'. > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1823) > at > org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128) > at > org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76) > at > org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:52) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1214) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > at > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > at > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to > submit JobGraph. > at > org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:366) > at > java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) > at > java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:292) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) > at > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929) > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > ... 1 more > Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal > server error., <Exception on server side: > org.apache.flink.runtime.client.JobSubmissionException: Failed to submit > job. > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:344) > at > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) > at > java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not > instantiate JobManager. > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398) > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) > ... 6 more > Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot > initialize task 'Source: TableSourceScan(table=[[default_catalog, > default_database, source, project=[colA, colB, colC, colD, colE, colF, colG, > colH]]], fields=[colA, colB, colC, colD, colE, colF, colG, colH]) -> > StreamExecPythonCalc -> Sink: > Sink(table=[default_catalog.default_database.print_table], fields=[colA, > colB, colC, colD, colE, colF, colG, colH, p])': Loading the input/output > formats failed: > at > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216) > at > org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:269) > at > org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:242) > at > org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:229) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:119) > at > org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103) > at > org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284) > at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:272) > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40) > at > org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:140) > at > org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84) > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:388) > ... 7 more > Caused by: java.lang.Exception: Loading the input/output formats failed: > at > org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:155) > at > org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:59) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:212) > ... 19 more > Caused by: java.lang.RuntimeException: Deserializing the input/output > formats failed: Could not read the user code wrapper: > org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat > at > org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:68) > at > org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:152) > ... 21 more > Caused by: > org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could > not read the user code wrapper: > org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat > at > org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:290) > at > org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:66) > ... 22 more > Caused by: java.lang.ClassNotFoundException: > org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at > org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61) > at > org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65) > at > org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:348) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78) > at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1826) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) > at java.util.HashMap.readObject(HashMap.java:1404) > at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) > at > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) > at > org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288) > ... 23 more > > End of exception on server side>] > at > org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390) > at > org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374) > at > java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) > at > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) > ... 4 more > > org.apache.flink.client.program.ProgramAbortException > at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) > at > org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ |
感谢回复、jar添加到lib下没重启服务、 自己S 13了;
不过又在PyFlink 应用UDF(在SQL应用udf函数)过程中遇到如下问题;把udf函数去掉,pyflink 又可以执行...... 请问有遇到过嘛?谢谢~ ----------- Caused by: java.lang.RuntimeException: Failed to create stage bundle factory! INFO:root:Initializing python harness: /opt/python36/lib/python3.6/site-packages/pyflink/fn_execution/beam/beam_boot.py --id=1-1 --logging_endpoint=localhost:32926 --artifact_endpoint=localhost:27046 --provision_endpoint=localhost:39820 --control_endpoint=localhost:31631 Failed to set up logging handler, continuing without. Traceback (most recent call last): File "/opt/python36/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker_main.py", line 94, in main fn_log_handler = FnApiLogRecordHandler(logging_service_descriptor) File "/opt/python36/lib/python3.6/site-packages/apache_beam/runners/worker/log_handler.py", line 75, in __init__ grpc.channel_ready_future(ch).result(timeout=60) File "/opt/python36/lib/python3.6/site-packages/grpc/_utilities.py", line 140, in result self._block(timeout) File "/opt/python36/lib/python3.6/site-packages/grpc/_utilities.py", line 86, in _block raise grpc.FutureTimeoutError() grpc.FutureTimeoutError WARNING:apache_beam.runners.worker.sdk_worker_main:No session file found: /tmp/staged/pickled_main_session. Functions defined in __main__ (interactive session) may fail. WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['--options_id=0.0', '--app_name=AbstractPythonFunctionRunner'] ERROR:apache_beam.runners.worker.sdk_worker_main:Python sdk harness failed: Traceback (most recent call last): File "/opt/python36/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker_main.py", line 158, in main sdk_pipeline_options.view_as(ProfilingOptions))).run() File "/opt/python36/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 162, in __init__ grpc.channel_ready_future(self._control_channel).result(timeout=60) File "/opt/python36/lib/python3.6/site-packages/grpc/_utilities.py", line 140, in result self._block(timeout) File "/opt/python36/lib/python3.6/site-packages/grpc/_utilities.py", line 86, in _block raise grpc.FutureTimeoutError() grpc.FutureTimeoutError Traceback (most recent call last): File "/opt/python36/lib/python3.6/runpy.py", line 193, in _run_module_as_main "__main__", mod_spec) File "/opt/python36/lib/python3.6/runpy.py", line 85, in _run_code exec(code, run_globals) File "/opt/python36/lib/python3.6/site-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py", line 29, in <module> apache_beam.runners.worker.sdk_worker_main.main(sys.argv) File "/opt/python36/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker_main.py", line 158, in main sdk_pipeline_options.view_as(ProfilingOptions))).run() File "/opt/python36/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 162, in __init__ grpc.channel_ready_future(self._control_channel).result(timeout=60) File "/opt/python36/lib/python3.6/site-packages/grpc/_utilities.py", line 140, in result self._block(timeout) File "/opt/python36/lib/python3.6/site-packages/grpc/_utilities.py", line 86, in _block raise grpc.FutureTimeoutError() grpc.FutureTimeoutError ----------- t_env.sql_query(""" SELECT colA , colB , colC , colD , colE , colF , colG , colH , train_and_predict(colA, colB, colC, colD, colE, colF, colG, colH) P (去掉可以执行、加上报上面的错 0.0) FROM source """).insert_into("print_table") ========= -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |