flink-1.11 DDL 写入hdfs问题 Cannot instantiate user function

classic Classic list List threaded Threaded
4 messages Options
kcz
Reply | Threaded
Open this post in threaded view
|

flink-1.11 DDL 写入hdfs问题 Cannot instantiate user function

kcz
standalone 
lib  jar包如下
flink-connector-hive_2.11-1.11.0.jar        flink-json-1.11.0.jar                        flink-sql-connector-kafka_2.12-1.11.0.jar  log4j-api-2.12.1.jar
flink-csv-1.11.0.jar                        flink-parquet_2.11-1.11.0.jar                flink-table_2.11-1.11.0.jar                log4j-core-2.12.1.jar
flink-dist_2.11-1.11.0.jar                  flink-shaded-hadoop-2-uber-2.7.2.11-9.0.jar  flink-table-blink_2.11-1.11.0.jar          log4j-slf4j-impl-2.12.1.jar
flink-hadoop-compatibility_2.11-1.11.0.jar  flink-shaded-zookeeper-3.4.14.jar            log4j-1.2-api-2.12.1.jar





代码如下:idea下不报错
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
// 同一时间只允许进行一个检查点
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);;
env.setStateBackend(new FsStateBackend(path));

tableEnv.executeSql("CREATE TABLE source_table (\n" +
        "\thost STRING,\n" +
        "\turl STRING,\n" +
        "\tpublic_date STRING\n" +
        ") WITH (\n" +
        "\t'connector.type' = 'kafka',\n" +
        "\t'connector.version' = 'universal',\n" +
        "\t'connector.startup-mode' = 'latest-offset',\n" +
        "\t'connector.topic' = 'test_flink_1.11',\n" +
        "\t'connector.properties.group.id' = 'domain_testGroup',\n" +
        "\t'connector.properties.zookeeper.connect' = '127.0.0.1:2181',\n" +
        "\t'connector.properties.bootstrap.servers' = '127.0.0.1:9092',\n" +
        "\t'update-mode' = 'append',\n" +
        "\t'format.type' = 'json',\n" +
        "\t'format.derive-schema' = 'true'\n" +
        ")");

tableEnv.executeSql("CREATE TABLE fs_table (\n" +
        "  host STRING,\n" +
        "  url STRING,\n" +
        "  public_date STRING\n" +
        ") PARTITIONED BY (public_date) WITH (\n" +
        "  'connector'='filesystem',\n" +
        "  'path'='path',\n" +
        "  'format'='json',\n" +
        "  'sink.partition-commit.delay'='0s',\n" +
        "  'sink.partition-commit.policy.kind'='success-file'\n" +
        ")");

tableEnv.executeSql("INSERT INTO  fs_table SELECT host, url, DATE_FORMAT(public_date, 'yyyy-MM-dd') FROM source_table");
TableResult result = tableEnv.executeSql("SELECT * FROM fs_table ");
result.print();
报错如下
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:291)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init&gt;(OperatorChain.java:126)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:453)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
&nbsp;&nbsp;&nbsp;&nbsp;at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: cannot assign instance of org.apache.commons.collections.map.LinkedMap to field


第二个bug sink到hdfs时候,采用parquet时候,lib下面有parquet包,pom里面是provided,但是会提示这个error,也试过pom里面不是provided,还是不OK
kcz
Reply | Threaded
Open this post in threaded view
|

回复:flink-1.11 DDL 写入hdfs问题 Cannot instantiate user function

kcz
第一个bug提示只需要
classloader.resolve-order: parent-first
第二个bug采用了parquet还没解决


------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "kcz"                                                                                    <[hidden email]&gt;;
发送时间:&nbsp;2020年7月17日(星期五) 中午1:32
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;flink-1.11 DDL 写入hdfs问题 Cannot instantiate user function



standalone
lib&nbsp; jar包如下
flink-connector-hive_2.11-1.11.0.jar&nbsp; &nbsp; &nbsp; &nbsp; flink-json-1.11.0.jar&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; flink-sql-connector-kafka_2.12-1.11.0.jar&nbsp; log4j-api-2.12.1.jar
flink-csv-1.11.0.jar&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; flink-parquet_2.11-1.11.0.jar&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; flink-table_2.11-1.11.0.jar&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log4j-core-2.12.1.jar
flink-dist_2.11-1.11.0.jar&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; flink-shaded-hadoop-2-uber-2.7.2.11-9.0.jar&nbsp; flink-table-blink_2.11-1.11.0.jar&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log4j-slf4j-impl-2.12.1.jar
flink-hadoop-compatibility_2.11-1.11.0.jar&nbsp; flink-shaded-zookeeper-3.4.14.jar&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log4j-1.2-api-2.12.1.jar





代码如下:idea下不报错
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
// 同一时间只允许进行一个检查点
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);;
env.setStateBackend(new FsStateBackend(path));

tableEnv.executeSql("CREATE TABLE source_table (\n" +
        "\thost STRING,\n" +
        "\turl STRING,\n" +
        "\tpublic_date STRING\n" +
        ") WITH (\n" +
        "\t'connector.type' = 'kafka',\n" +
        "\t'connector.version' = 'universal',\n" +
        "\t'connector.startup-mode' = 'latest-offset',\n" +
        "\t'connector.topic' = 'test_flink_1.11',\n" +
        "\t'connector.properties.group.id' = 'domain_testGroup',\n" +
        "\t'connector.properties.zookeeper.connect' = '127.0.0.1:2181',\n" +
        "\t'connector.properties.bootstrap.servers' = '127.0.0.1:9092',\n" +
        "\t'update-mode' = 'append',\n" +
        "\t'format.type' = 'json',\n" +
        "\t'format.derive-schema' = 'true'\n" +
        ")");

tableEnv.executeSql("CREATE TABLE fs_table (\n" +
        "  host STRING,\n" +
        "  url STRING,\n" +
        "  public_date STRING\n" +
        ") PARTITIONED BY (public_date) WITH (\n" +
        "  'connector'='filesystem',\n" +
        "  'path'='path',\n" +
        "  'format'='json',\n" +
        "  'sink.partition-commit.delay'='0s',\n" +
        "  'sink.partition-commit.policy.kind'='success-file'\n" +
        ")");

tableEnv.executeSql("INSERT INTO  fs_table SELECT host, url, DATE_FORMAT(public_date, 'yyyy-MM-dd') FROM source_table");
TableResult result = tableEnv.executeSql("SELECT * FROM fs_table ");
result.print();
报错如下
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
&nbsp;&nbsp;&nbsp; at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:291)
&nbsp;&nbsp;&nbsp; at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init&gt;(OperatorChain.java:126)
&nbsp;&nbsp;&nbsp; at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:453)
&nbsp;&nbsp;&nbsp; at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
&nbsp;&nbsp;&nbsp; at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
&nbsp;&nbsp;&nbsp; at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
&nbsp;&nbsp;&nbsp; at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: cannot assign instance of org.apache.commons.collections.map.LinkedMap to field


第二个bug sink到hdfs时候,采用parquet时候,lib下面有parquet包,pom里面是provided,但是会提示这个error,也试过pom里面不是provided,还是不OK
Reply | Threaded
Open this post in threaded view
|

Re: flink-1.11 DDL 写入hdfs问题 Cannot instantiate user function

godfrey he
第二个问题的异常栈是啥?

kcz <[hidden email]> 于2020年7月17日周五 下午2:17写道:

> 第一个bug提示只需要
> classloader.resolve-order: parent-first
> 第二个bug采用了parquet还没解决
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:
>                                                   "kcz"
>                                                                 <
> [hidden email]&gt;;
> 发送时间:&nbsp;2020年7月17日(星期五) 中午1:32
> 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
>
> 主题:&nbsp;flink-1.11 DDL 写入hdfs问题 Cannot instantiate user function
>
>
>
> standalone
> lib&nbsp; jar包如下
> flink-connector-hive_2.11-1.11.0.jar&nbsp; &nbsp; &nbsp; &nbsp;
> flink-json-1.11.0.jar&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> flink-sql-connector-kafka_2.12-1.11.0.jar&nbsp; log4j-api-2.12.1.jar
> flink-csv-1.11.0.jar&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; flink-parquet_2.11-1.11.0.jar&nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> flink-table_2.11-1.11.0.jar&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; log4j-core-2.12.1.jar
> flink-dist_2.11-1.11.0.jar&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; flink-shaded-hadoop-2-uber-2.7.2.11-9.0.jar&nbsp;
> flink-table-blink_2.11-1.11.0.jar&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> log4j-slf4j-impl-2.12.1.jar
> flink-hadoop-compatibility_2.11-1.11.0.jar&nbsp;
> flink-shaded-zookeeper-3.4.14.jar&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> log4j-1.2-api-2.12.1.jar
>
>
>
>
>
> 代码如下:idea下不报错
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
> env.setParallelism(1);
> env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
> // 同一时间只允许进行一个检查点
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);;
> env.setStateBackend(new FsStateBackend(path));
>
> tableEnv.executeSql("CREATE TABLE source_table (\n" +
>         "\thost STRING,\n" +
>         "\turl STRING,\n" +
>         "\tpublic_date STRING\n" +
>         ") WITH (\n" +
>         "\t'connector.type' = 'kafka',\n" +
>         "\t'connector.version' = 'universal',\n" +
>         "\t'connector.startup-mode' = 'latest-offset',\n" +
>         "\t'connector.topic' = 'test_flink_1.11',\n" +
>         "\t'connector.properties.group.id' = 'domain_testGroup',\n" +
>         "\t'connector.properties.zookeeper.connect' = '127.0.0.1:2181',\n"
> +
>         "\t'connector.properties.bootstrap.servers' = '127.0.0.1:9092',\n"
> +
>         "\t'update-mode' = 'append',\n" +
>         "\t'format.type' = 'json',\n" +
>         "\t'format.derive-schema' = 'true'\n" +
>         ")");
>
> tableEnv.executeSql("CREATE TABLE fs_table (\n" +
>         "  host STRING,\n" +
>         "  url STRING,\n" +
>         "  public_date STRING\n" +
>         ") PARTITIONED BY (public_date) WITH (\n" +
>         "  'connector'='filesystem',\n" +
>         "  'path'='path',\n" +
>         "  'format'='json',\n" +
>         "  'sink.partition-commit.delay'='0s',\n" +
>         "  'sink.partition-commit.policy.kind'='success-file'\n" +
>         ")");
>
> tableEnv.executeSql("INSERT INTO  fs_table SELECT host, url,
> DATE_FORMAT(public_date, 'yyyy-MM-dd') FROM source_table");
> TableResult result = tableEnv.executeSql("SELECT * FROM fs_table ");
> result.print();
> 报错如下
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
> instantiate user function.
> &nbsp;&nbsp;&nbsp; at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:291)
> &nbsp;&nbsp;&nbsp; at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init&gt;(OperatorChain.java:126)
> &nbsp;&nbsp;&nbsp; at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:453)
> &nbsp;&nbsp;&nbsp; at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
> &nbsp;&nbsp;&nbsp; at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> &nbsp;&nbsp;&nbsp; at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> &nbsp;&nbsp;&nbsp; at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassCastException: cannot assign instance of
> org.apache.commons.collections.map.LinkedMap to field
>
>
> 第二个bug
> sink到hdfs时候,采用parquet时候,lib下面有parquet包,pom里面是provided,但是会提示这个error,也试过pom里面不是provided,还是不OK
kcz
Reply | Threaded
Open this post in threaded view
|

回复: flink-1.11 DDL 写入hdfs问题 Cannot instantiate user function

kcz
这是使用了parquet的error:
java.lang.NoClassDefFoundError: org/apache/parquet/hadoop/ParquetWriter$Builder
        at java.lang.ClassLoader.defineClass1(Native Method)
        at java.lang.ClassLoader.defineClass(ClassLoader.java:760)
        at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
        at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
        at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at org.apache.flink.formats.parquet.ParquetFileSystemFormatFactory.createBulkWriterFactory(ParquetFileSystemFormatFactory.java:110)
        at org.apache.flink.table.filesystem.FileSystemTableSink.createWriter(FileSystemTableSink.java:274)
        at org.apache.flink.table.filesystem.FileSystemTableSink.consumeDataStream(FileSystemTableSink.java:154)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:114)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
        at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)
        at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66)
        at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
        at com.HdfsDDL.main(HdfsDDL.java:71)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
        at org.apache.flink.client.cli.CliFrontend$$Lambda$67/388104475.call(Unknown Source)
        at org.apache.flink.runtime.security.contexts.HadoopSecurityContext$$Lambda$68/1470966439.run(Unknown Source)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1659)
        at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: java.lang.ClassNotFoundException: org.apache.parquet.hadoop.ParquetWriter$Builder
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)







------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <[hidden email]&gt;;
发送时间:&nbsp;2020年7月17日(星期五) 下午3:29
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: flink-1.11 DDL 写入hdfs问题 Cannot instantiate user function



第二个问题的异常栈是啥?

kcz <[hidden email]&gt; 于2020年7月17日周五 下午2:17写道:

&gt; 第一个bug提示只需要
&gt; classloader.resolve-order: parent-first
&gt; 第二个bug采用了parquet还没解决
&gt;
&gt;
&gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
&gt; 发件人:
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "kcz"
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <
&gt; [hidden email]&amp;gt;;
&gt; 发送时间:&amp;nbsp;2020年7月17日(星期五) 中午1:32
&gt; 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
&gt;
&gt; 主题:&amp;nbsp;flink-1.11 DDL 写入hdfs问题 Cannot instantiate user function
&gt;
&gt;
&gt;
&gt; standalone
&gt; lib&amp;nbsp; jar包如下
&gt; flink-connector-hive_2.11-1.11.0.jar&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
&gt; flink-json-1.11.0.jar&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
&gt; flink-sql-connector-kafka_2.12-1.11.0.jar&amp;nbsp; log4j-api-2.12.1.jar
&gt; flink-csv-1.11.0.jar&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; flink-parquet_2.11-1.11.0.jar&amp;nbsp;
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
&gt; flink-table_2.11-1.11.0.jar&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
&gt; &amp;nbsp; log4j-core-2.12.1.jar
&gt; flink-dist_2.11-1.11.0.jar&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
&gt; &amp;nbsp; &amp;nbsp; flink-shaded-hadoop-2-uber-2.7.2.11-9.0.jar&amp;nbsp;
&gt; flink-table-blink_2.11-1.11.0.jar&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
&gt; log4j-slf4j-impl-2.12.1.jar
&gt; flink-hadoop-compatibility_2.11-1.11.0.jar&amp;nbsp;
&gt; flink-shaded-zookeeper-3.4.14.jar&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
&gt; log4j-1.2-api-2.12.1.jar
&gt;
&gt;
&gt;
&gt;
&gt;
&gt; 代码如下:idea下不报错
&gt; StreamExecutionEnvironment env =
&gt; StreamExecutionEnvironment.getExecutionEnvironment();
&gt; StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
&gt; env.setParallelism(1);
&gt; env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
&gt; // 同一时间只允许进行一个检查点
&gt; env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);;
&gt; env.setStateBackend(new FsStateBackend(path));
&gt;
&gt; tableEnv.executeSql("CREATE TABLE source_table (\n" +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "\thost STRING,\n" +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "\turl STRING,\n" +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "\tpublic_date STRING\n" +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ") WITH (\n" +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "\t'connector.type' = 'kafka',\n" +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "\t'connector.version' = 'universal',\n" +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "\t'connector.startup-mode' = 'latest-offset',\n" +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "\t'connector.topic' = 'test_flink_1.11',\n" +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "\t'connector.properties.group.id' = 'domain_testGroup',\n" +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "\t'connector.properties.zookeeper.connect' = '127.0.0.1:2181',\n"
&gt; +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "\t'connector.properties.bootstrap.servers' = '127.0.0.1:9092',\n"
&gt; +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "\t'update-mode' = 'append',\n" +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "\t'format.type' = 'json',\n" +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "\t'format.derive-schema' = 'true'\n" +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ")");
&gt;
&gt; tableEnv.executeSql("CREATE TABLE fs_table (\n" +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "&nbsp; host STRING,\n" +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "&nbsp; url STRING,\n" +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "&nbsp; public_date STRING\n" +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ") PARTITIONED BY (public_date) WITH (\n" +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "&nbsp; 'connector'='filesystem',\n" +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "&nbsp; 'path'='path',\n" +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "&nbsp; 'format'='json',\n" +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "&nbsp; 'sink.partition-commit.delay'='0s',\n" +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "&nbsp; 'sink.partition-commit.policy.kind'='success-file'\n" +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ")");
&gt;
&gt; tableEnv.executeSql("INSERT INTO&nbsp; fs_table SELECT host, url,
&gt; DATE_FORMAT(public_date, 'yyyy-MM-dd') FROM source_table");
&gt; TableResult result = tableEnv.executeSql("SELECT * FROM fs_table ");
&gt; result.print();
&gt; 报错如下
&gt; org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
&gt; instantiate user function.
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:291)
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; org.apache.flink.streaming.runtime.tasks.OperatorChain.<init&amp;gt;(OperatorChain.java:126)
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:453)
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; at java.lang.Thread.run(Thread.java:745)
&gt; Caused by: java.lang.ClassCastException: cannot assign instance of
&gt; org.apache.commons.collections.map.LinkedMap to field
&gt;
&gt;
&gt; 第二个bug
&gt; sink到hdfs时候,采用parquet时候,lib下面有parquet包,pom里面是provided,但是会提示这个error,也试过pom里面不是provided,还是不OK