standalone
lib jar包如下 flink-connector-hive_2.11-1.11.0.jar flink-json-1.11.0.jar flink-sql-connector-kafka_2.12-1.11.0.jar log4j-api-2.12.1.jar flink-csv-1.11.0.jar flink-parquet_2.11-1.11.0.jar flink-table_2.11-1.11.0.jar log4j-core-2.12.1.jar flink-dist_2.11-1.11.0.jar flink-shaded-hadoop-2-uber-2.7.2.11-9.0.jar flink-table-blink_2.11-1.11.0.jar log4j-slf4j-impl-2.12.1.jar flink-hadoop-compatibility_2.11-1.11.0.jar flink-shaded-zookeeper-3.4.14.jar log4j-1.2-api-2.12.1.jar 代码如下:idea下不报错 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); env.setParallelism(1); env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE); // 同一时间只允许进行一个检查点 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);; env.setStateBackend(new FsStateBackend(path)); tableEnv.executeSql("CREATE TABLE source_table (\n" + "\thost STRING,\n" + "\turl STRING,\n" + "\tpublic_date STRING\n" + ") WITH (\n" + "\t'connector.type' = 'kafka',\n" + "\t'connector.version' = 'universal',\n" + "\t'connector.startup-mode' = 'latest-offset',\n" + "\t'connector.topic' = 'test_flink_1.11',\n" + "\t'connector.properties.group.id' = 'domain_testGroup',\n" + "\t'connector.properties.zookeeper.connect' = '127.0.0.1:2181',\n" + "\t'connector.properties.bootstrap.servers' = '127.0.0.1:9092',\n" + "\t'update-mode' = 'append',\n" + "\t'format.type' = 'json',\n" + "\t'format.derive-schema' = 'true'\n" + ")"); tableEnv.executeSql("CREATE TABLE fs_table (\n" + " host STRING,\n" + " url STRING,\n" + " public_date STRING\n" + ") PARTITIONED BY (public_date) WITH (\n" + " 'connector'='filesystem',\n" + " 'path'='path',\n" + " 'format'='json',\n" + " 'sink.partition-commit.delay'='0s',\n" + " 'sink.partition-commit.policy.kind'='success-file'\n" + ")"); tableEnv.executeSql("INSERT INTO fs_table SELECT host, url, DATE_FORMAT(public_date, 'yyyy-MM-dd') FROM source_table"); TableResult result = tableEnv.executeSql("SELECT * FROM fs_table "); result.print(); 报错如下 org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:291) at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:126) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:453) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassCastException: cannot assign instance of org.apache.commons.collections.map.LinkedMap to field 第二个bug sink到hdfs时候,采用parquet时候,lib下面有parquet包,pom里面是provided,但是会提示这个error,也试过pom里面不是provided,还是不OK |
第一个bug提示只需要
classloader.resolve-order: parent-first 第二个bug采用了parquet还没解决 ------------------ 原始邮件 ------------------ 发件人: "kcz" <[hidden email]>; 发送时间: 2020年7月17日(星期五) 中午1:32 收件人: "user-zh"<[hidden email]>; 主题: flink-1.11 DDL 写入hdfs问题 Cannot instantiate user function standalone lib jar包如下 flink-connector-hive_2.11-1.11.0.jar flink-json-1.11.0.jar flink-sql-connector-kafka_2.12-1.11.0.jar log4j-api-2.12.1.jar flink-csv-1.11.0.jar flink-parquet_2.11-1.11.0.jar flink-table_2.11-1.11.0.jar log4j-core-2.12.1.jar flink-dist_2.11-1.11.0.jar flink-shaded-hadoop-2-uber-2.7.2.11-9.0.jar flink-table-blink_2.11-1.11.0.jar log4j-slf4j-impl-2.12.1.jar flink-hadoop-compatibility_2.11-1.11.0.jar flink-shaded-zookeeper-3.4.14.jar log4j-1.2-api-2.12.1.jar 代码如下:idea下不报错 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); env.setParallelism(1); env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE); // 同一时间只允许进行一个检查点 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);; env.setStateBackend(new FsStateBackend(path)); tableEnv.executeSql("CREATE TABLE source_table (\n" + "\thost STRING,\n" + "\turl STRING,\n" + "\tpublic_date STRING\n" + ") WITH (\n" + "\t'connector.type' = 'kafka',\n" + "\t'connector.version' = 'universal',\n" + "\t'connector.startup-mode' = 'latest-offset',\n" + "\t'connector.topic' = 'test_flink_1.11',\n" + "\t'connector.properties.group.id' = 'domain_testGroup',\n" + "\t'connector.properties.zookeeper.connect' = '127.0.0.1:2181',\n" + "\t'connector.properties.bootstrap.servers' = '127.0.0.1:9092',\n" + "\t'update-mode' = 'append',\n" + "\t'format.type' = 'json',\n" + "\t'format.derive-schema' = 'true'\n" + ")"); tableEnv.executeSql("CREATE TABLE fs_table (\n" + " host STRING,\n" + " url STRING,\n" + " public_date STRING\n" + ") PARTITIONED BY (public_date) WITH (\n" + " 'connector'='filesystem',\n" + " 'path'='path',\n" + " 'format'='json',\n" + " 'sink.partition-commit.delay'='0s',\n" + " 'sink.partition-commit.policy.kind'='success-file'\n" + ")"); tableEnv.executeSql("INSERT INTO fs_table SELECT host, url, DATE_FORMAT(public_date, 'yyyy-MM-dd') FROM source_table"); TableResult result = tableEnv.executeSql("SELECT * FROM fs_table "); result.print(); 报错如下 org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:291) at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:126) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:453) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassCastException: cannot assign instance of org.apache.commons.collections.map.LinkedMap to field 第二个bug sink到hdfs时候,采用parquet时候,lib下面有parquet包,pom里面是provided,但是会提示这个error,也试过pom里面不是provided,还是不OK |
第二个问题的异常栈是啥?
kcz <[hidden email]> 于2020年7月17日周五 下午2:17写道: > 第一个bug提示只需要 > classloader.resolve-order: parent-first > 第二个bug采用了parquet还没解决 > > > ------------------ 原始邮件 ------------------ > 发件人: > "kcz" > < > [hidden email]>; > 发送时间: 2020年7月17日(星期五) 中午1:32 > 收件人: "user-zh"<[hidden email]>; > > 主题: flink-1.11 DDL 写入hdfs问题 Cannot instantiate user function > > > > standalone > lib jar包如下 > flink-connector-hive_2.11-1.11.0.jar > flink-json-1.11.0.jar > > flink-sql-connector-kafka_2.12-1.11.0.jar log4j-api-2.12.1.jar > flink-csv-1.11.0.jar > flink-parquet_2.11-1.11.0.jar > > flink-table_2.11-1.11.0.jar > log4j-core-2.12.1.jar > flink-dist_2.11-1.11.0.jar > flink-shaded-hadoop-2-uber-2.7.2.11-9.0.jar > flink-table-blink_2.11-1.11.0.jar > log4j-slf4j-impl-2.12.1.jar > flink-hadoop-compatibility_2.11-1.11.0.jar > flink-shaded-zookeeper-3.4.14.jar > log4j-1.2-api-2.12.1.jar > > > > > > 代码如下:idea下不报错 > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); > env.setParallelism(1); > env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE); > // 同一时间只允许进行一个检查点 > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);; > env.setStateBackend(new FsStateBackend(path)); > > tableEnv.executeSql("CREATE TABLE source_table (\n" + > "\thost STRING,\n" + > "\turl STRING,\n" + > "\tpublic_date STRING\n" + > ") WITH (\n" + > "\t'connector.type' = 'kafka',\n" + > "\t'connector.version' = 'universal',\n" + > "\t'connector.startup-mode' = 'latest-offset',\n" + > "\t'connector.topic' = 'test_flink_1.11',\n" + > "\t'connector.properties.group.id' = 'domain_testGroup',\n" + > "\t'connector.properties.zookeeper.connect' = '127.0.0.1:2181',\n" > + > "\t'connector.properties.bootstrap.servers' = '127.0.0.1:9092',\n" > + > "\t'update-mode' = 'append',\n" + > "\t'format.type' = 'json',\n" + > "\t'format.derive-schema' = 'true'\n" + > ")"); > > tableEnv.executeSql("CREATE TABLE fs_table (\n" + > " host STRING,\n" + > " url STRING,\n" + > " public_date STRING\n" + > ") PARTITIONED BY (public_date) WITH (\n" + > " 'connector'='filesystem',\n" + > " 'path'='path',\n" + > " 'format'='json',\n" + > " 'sink.partition-commit.delay'='0s',\n" + > " 'sink.partition-commit.policy.kind'='success-file'\n" + > ")"); > > tableEnv.executeSql("INSERT INTO fs_table SELECT host, url, > DATE_FORMAT(public_date, 'yyyy-MM-dd') FROM source_table"); > TableResult result = tableEnv.executeSql("SELECT * FROM fs_table "); > result.print(); > 报错如下 > org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot > instantiate user function. > at > org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:291) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:126) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:453) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522) > at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.ClassCastException: cannot assign instance of > org.apache.commons.collections.map.LinkedMap to field > > > 第二个bug > sink到hdfs时候,采用parquet时候,lib下面有parquet包,pom里面是provided,但是会提示这个error,也试过pom里面不是provided,还是不OK |
这是使用了parquet的error:
java.lang.NoClassDefFoundError: org/apache/parquet/hadoop/ParquetWriter$Builder at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:760) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) at java.net.URLClassLoader.access$100(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:368) at java.net.URLClassLoader$1.run(URLClassLoader.java:362) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:361) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.flink.formats.parquet.ParquetFileSystemFormatFactory.createBulkWriterFactory(ParquetFileSystemFormatFactory.java:110) at org.apache.flink.table.filesystem.FileSystemTableSink.createWriter(FileSystemTableSink.java:274) at org.apache.flink.table.filesystem.FileSystemTableSink.consumeDataStream(FileSystemTableSink.java:154) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:114) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) at com.HdfsDDL.main(HdfsDDL.java:71) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) at org.apache.flink.client.cli.CliFrontend$$Lambda$67/388104475.call(Unknown Source) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext$$Lambda$68/1470966439.run(Unknown Source) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1659) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) Caused by: java.lang.ClassNotFoundException: org.apache.parquet.hadoop.ParquetWriter$Builder at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2020年7月17日(星期五) 下午3:29 收件人: "user-zh"<[hidden email]>; 主题: Re: flink-1.11 DDL 写入hdfs问题 Cannot instantiate user function 第二个问题的异常栈是啥? kcz <[hidden email]> 于2020年7月17日周五 下午2:17写道: > 第一个bug提示只需要 > classloader.resolve-order: parent-first > 第二个bug采用了parquet还没解决 > > > ------------------&nbsp;原始邮件&nbsp;------------------ > 发件人: > "kcz" > < > [hidden email]&gt;; > 发送时间:&nbsp;2020年7月17日(星期五) 中午1:32 > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > > 主题:&nbsp;flink-1.11 DDL 写入hdfs问题 Cannot instantiate user function > > > > standalone > lib&nbsp; jar包如下 > flink-connector-hive_2.11-1.11.0.jar&nbsp; &nbsp; &nbsp; &nbsp; > flink-json-1.11.0.jar&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > flink-sql-connector-kafka_2.12-1.11.0.jar&nbsp; log4j-api-2.12.1.jar > flink-csv-1.11.0.jar&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; flink-parquet_2.11-1.11.0.jar&nbsp; > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > flink-table_2.11-1.11.0.jar&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > &nbsp; log4j-core-2.12.1.jar > flink-dist_2.11-1.11.0.jar&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > &nbsp; &nbsp; flink-shaded-hadoop-2-uber-2.7.2.11-9.0.jar&nbsp; > flink-table-blink_2.11-1.11.0.jar&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > log4j-slf4j-impl-2.12.1.jar > flink-hadoop-compatibility_2.11-1.11.0.jar&nbsp; > flink-shaded-zookeeper-3.4.14.jar&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > log4j-1.2-api-2.12.1.jar > > > > > > 代码如下:idea下不报错 > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); > env.setParallelism(1); > env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE); > // 同一时间只允许进行一个检查点 > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);; > env.setStateBackend(new FsStateBackend(path)); > > tableEnv.executeSql("CREATE TABLE source_table (\n" + > "\thost STRING,\n" + > "\turl STRING,\n" + > "\tpublic_date STRING\n" + > ") WITH (\n" + > "\t'connector.type' = 'kafka',\n" + > "\t'connector.version' = 'universal',\n" + > "\t'connector.startup-mode' = 'latest-offset',\n" + > "\t'connector.topic' = 'test_flink_1.11',\n" + > "\t'connector.properties.group.id' = 'domain_testGroup',\n" + > "\t'connector.properties.zookeeper.connect' = '127.0.0.1:2181',\n" > + > "\t'connector.properties.bootstrap.servers' = '127.0.0.1:9092',\n" > + > "\t'update-mode' = 'append',\n" + > "\t'format.type' = 'json',\n" + > "\t'format.derive-schema' = 'true'\n" + > ")"); > > tableEnv.executeSql("CREATE TABLE fs_table (\n" + > " host STRING,\n" + > " url STRING,\n" + > " public_date STRING\n" + > ") PARTITIONED BY (public_date) WITH (\n" + > " 'connector'='filesystem',\n" + > " 'path'='path',\n" + > " 'format'='json',\n" + > " 'sink.partition-commit.delay'='0s',\n" + > " 'sink.partition-commit.policy.kind'='success-file'\n" + > ")"); > > tableEnv.executeSql("INSERT INTO fs_table SELECT host, url, > DATE_FORMAT(public_date, 'yyyy-MM-dd') FROM source_table"); > TableResult result = tableEnv.executeSql("SELECT * FROM fs_table "); > result.print(); > 报错如下 > org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot > instantiate user function. > &nbsp;&nbsp;&nbsp; at > org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:291) > &nbsp;&nbsp;&nbsp; at > org.apache.flink.streaming.runtime.tasks.OperatorChain.<init&gt;(OperatorChain.java:126) > &nbsp;&nbsp;&nbsp; at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:453) > &nbsp;&nbsp;&nbsp; at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522) > &nbsp;&nbsp;&nbsp; at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > &nbsp;&nbsp;&nbsp; at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > &nbsp;&nbsp;&nbsp; at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.ClassCastException: cannot assign instance of > org.apache.commons.collections.map.LinkedMap to field > > > 第二个bug > sink到hdfs时候,采用parquet时候,lib下面有parquet包,pom里面是provided,但是会提示这个error,也试过pom里面不是provided,还是不OK |
Free forum by Nabble | Edit this page |