代码本地ide 能正常执行, 有正常输出,
打包成fat-jar包后,提交到yarn-session 上执行 报: Caused by: org.apache.kafka.common.config.ConfigException: Invalid value org.apache.flink.kafka010.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer for configuration key.deserializer: Class org.apache.flink.kafka010.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer could not be found. 请教下是什么原因? lib目录下文件为: flink-dist_2.11-1.9.1.jar flink-sql-connector-kafka-0.10_2.11-1.9.0.jar flink-sql-connector-kafka_2.11-1.9.0.jar log4j-1.2.17.jar flink-json-1.9.0-sql-jar.jar flink-sql-connector-kafka-0.11_2.11-1.9.0.jar flink-table_2.11-1.9.1.jar slf4j-log4j12-1.7.15.jar flink-shaded-hadoop-2-uber-2.6.5-7.0.jar flink-sql-connector-kafka-0.9_2.11-1.9.0.jar flink-table-blink_2.11-1.9.1.jar 代码: ``` import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.scala.{StreamTableEnvironment, _} import org.apache.flink.types.Row object StreamingTable2 extends App{ val env = StreamExecutionEnvironment.getExecutionEnvironment val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings) env.setParallelism(2) val sourceDDL1 = """create table kafka_json_source( `timestamp` BIGINT, id int, name varchar ) with ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'hbtest2', 'connector.startup-mode' = 'earliest-offset', 'connector.properties.0.key' = 'bootstrap.servers', 'connector.properties.0.value' = '192.168.1.160:19092', 'connector.properties.1.key' = 'group.id', 'connector.properties.1.value' = 'groupId1', 'connector.properties.2.key' = 'zookeeper.connect', 'connector.properties.2.value' = '192.168.1.160:2181', 'update-mode' = 'append', 'format.type' = 'json', 'format.derive-schema' = 'true' ) """ tEnv.sqlUpdate(sourceDDL1) tEnv.sqlQuery("select * from kafka_json_source").toAppendStream[Row].print() env.execute("table-example2") } ``` |
你好:
看着像是包冲突了,你lib包下有4个flink_kafka的jar包,去掉不用的看看呢 陈浩 ------------------ 原始邮件 ------------------ 发件人: "hb"<[hidden email]>; 发送时间: 2019年10月29日(星期二) 下午2:41 收件人: "user-zh"<[hidden email]>; 主题: flink1.9.1 kafka表读取问题 代码本地ide 能正常执行, 有正常输出, 打包成fat-jar包后,提交到yarn-session 上执行 报: Caused by: org.apache.kafka.common.config.ConfigException: Invalid value org.apache.flink.kafka010.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer for configuration key.deserializer: Class org.apache.flink.kafka010.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer could not be found. 请教下是什么原因? lib目录下文件为: flink-dist_2.11-1.9.1.jar flink-sql-connector-kafka-0.10_2.11-1.9.0.jar flink-sql-connector-kafka_2.11-1.9.0.jar log4j-1.2.17.jar flink-json-1.9.0-sql-jar.jar flink-sql-connector-kafka-0.11_2.11-1.9.0.jar flink-table_2.11-1.9.1.jar slf4j-log4j12-1.7.15.jar flink-shaded-hadoop-2-uber-2.6.5-7.0.jar flink-sql-connector-kafka-0.9_2.11-1.9.0.jar flink-table-blink_2.11-1.9.1.jar 代码: ``` import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.scala.{StreamTableEnvironment, _} import org.apache.flink.types.Row object StreamingTable2 extends App{ val env = StreamExecutionEnvironment.getExecutionEnvironment val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings) env.setParallelism(2) val sourceDDL1 = """create table kafka_json_source( `timestamp` BIGINT, id int, name varchar ) with ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'hbtest2', 'connector.startup-mode' = 'earliest-offset', 'connector.properties.0.key' = 'bootstrap.servers', 'connector.properties.0.value' = '192.168.1.160:19092', 'connector.properties.1.key' = 'group.id', 'connector.properties.1.value' = 'groupId1', 'connector.properties.2.key' = 'zookeeper.connect', 'connector.properties.2.value' = '192.168.1.160:2181', 'update-mode' = 'append', 'format.type' = 'json', 'format.derive-schema' = 'true' ) """ tEnv.sqlUpdate(sourceDDL1) tEnv.sqlQuery("select * from kafka_json_source").toAppendStream[Row].print() env.execute("table-example2") } ``` |
我指定的是0.11版本kafka, 之前lib目录下 只有 0.11 jar的文件, 还是报这个错误的 在 2019-10-29 13:47:34,"如影随形" <[hidden email]> 写道: >你好: > > > 看着像是包冲突了,你lib包下有4个flink_kafka的jar包,去掉不用的看看呢 > > > >陈浩 > > > > > > > > >------------------ 原始邮件 ------------------ >发件人: "hb"<[hidden email]>; >发送时间: 2019年10月29日(星期二) 下午2:41 >收件人: "user-zh"<[hidden email]>; > >主题: flink1.9.1 kafka表读取问题 > > > >代码本地ide 能正常执行, 有正常输出, > > >打包成fat-jar包后,提交到yarn-session 上执行 >报: >Caused by: org.apache.kafka.common.config.ConfigException: Invalid value org.apache.flink.kafka010.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer for configuration key.deserializer: Class org.apache.flink.kafka010.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer could not be found. > > >请教下是什么原因? > > >lib目录下文件为: >flink-dist_2.11-1.9.1.jar >flink-sql-connector-kafka-0.10_2.11-1.9.0.jar >flink-sql-connector-kafka_2.11-1.9.0.jar >log4j-1.2.17.jar >flink-json-1.9.0-sql-jar.jar >flink-sql-connector-kafka-0.11_2.11-1.9.0.jar >flink-table_2.11-1.9.1.jar >slf4j-log4j12-1.7.15.jar >flink-shaded-hadoop-2-uber-2.6.5-7.0.jar >flink-sql-connector-kafka-0.9_2.11-1.9.0.jar >flink-table-blink_2.11-1.9.1.jar > > > > > > >代码: >``` >import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} >import org.apache.flink.table.api.EnvironmentSettings >import org.apache.flink.table.api.scala.{StreamTableEnvironment, _} >import org.apache.flink.types.Row > >object StreamingTable2 extends App{ > val env = StreamExecutionEnvironment.getExecutionEnvironment > val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings) > env.setParallelism(2) > > val sourceDDL1 = > """create table kafka_json_source( > `timestamp` BIGINT, > id int, > name varchar > ) with ( > 'connector.type' = 'kafka', > 'connector.version' = '0.11', > 'connector.topic' = 'hbtest2', > 'connector.startup-mode' = 'earliest-offset', > 'connector.properties.0.key' = 'bootstrap.servers', > 'connector.properties.0.value' = '192.168.1.160:19092', > 'connector.properties.1.key' = 'group.id', > 'connector.properties.1.value' = 'groupId1', > 'connector.properties.2.key' = 'zookeeper.connect', > 'connector.properties.2.value' = '192.168.1.160:2181', > 'update-mode' = 'append', > 'format.type' = 'json', > 'format.derive-schema' = 'true' > ) > """ > > tEnv.sqlUpdate(sourceDDL1) > tEnv.sqlQuery("select * from kafka_json_source").toAppendStream[Row].print() > env.execute("table-example2") >} >``` |
你好:
maven的pom文件能贴出来看一下吗 陈浩 ------------------ 原始邮件 ------------------ 发件人: "hb"<[hidden email]>; 发送时间: 2019年10月29日(星期二) 下午2:53 收件人: "user-zh"<[hidden email]>; 主题: Re:回复:flink1.9.1 kafka表读取问题 我指定的是0.11版本kafka, 之前lib目录下 只有 0.11 jar的文件, 还是报这个错误的 在 2019-10-29 13:47:34,"如影随形" <[hidden email]> 写道: >你好: > > >&nbsp; &nbsp; &nbsp; 看着像是包冲突了,你lib包下有4个flink_kafka的jar包,去掉不用的看看呢 > > > >陈浩 > > >&nbsp; > > > > > >------------------&nbsp;原始邮件&nbsp;------------------ >发件人:&nbsp;"hb"<[hidden email]&gt;; >发送时间:&nbsp;2019年10月29日(星期二) 下午2:41 >收件人:&nbsp;"user-zh"<[hidden email]&gt;; > >主题:&nbsp;flink1.9.1 kafka表读取问题 > > > >代码本地ide 能正常执行, 有正常输出, > > >打包成fat-jar包后,提交到yarn-session 上执行 >报: >Caused by: org.apache.kafka.common.config.ConfigException: Invalid value org.apache.flink.kafka010.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer for configuration key.deserializer: Class org.apache.flink.kafka010.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer could not be found. > > >请教下是什么原因? > > >lib目录下文件为: >flink-dist_2.11-1.9.1.jar&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; >flink-sql-connector-kafka-0.10_2.11-1.9.0.jar&nbsp; >flink-sql-connector-kafka_2.11-1.9.0.jar&nbsp; >log4j-1.2.17.jar >flink-json-1.9.0-sql-jar.jar >flink-sql-connector-kafka-0.11_2.11-1.9.0.jar&nbsp; >flink-table_2.11-1.9.1.jar&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; >slf4j-log4j12-1.7.15.jar >flink-shaded-hadoop-2-uber-2.6.5-7.0.jar&nbsp; >flink-sql-connector-kafka-0.9_2.11-1.9.0.jar&nbsp;&nbsp; >flink-table-blink_2.11-1.9.1.jar > > > > > > >代码: >``` >import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} >import org.apache.flink.table.api.EnvironmentSettings >import org.apache.flink.table.api.scala.{StreamTableEnvironment, _} >import org.apache.flink.types.Row > >object StreamingTable2 extends App{ >&nbsp; val env = StreamExecutionEnvironment.getExecutionEnvironment >&nbsp; val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >&nbsp; val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings) >&nbsp; env.setParallelism(2) > >&nbsp; val sourceDDL1 = >&nbsp;&nbsp;&nbsp; """create table kafka_json_source( >&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `timestamp` BIGINT, >&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; id int, >&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; name varchar >&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ) with ( >&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.type' = 'kafka', >&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.version' = '0.11', >&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.topic' = 'hbtest2', >&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.startup-mode' = 'earliest-offset', >&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.properties.0.key' = 'bootstrap.servers', >&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.properties.0.value' = '192.168.1.160:19092', >&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.properties.1.key' = 'group.id', >&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.properties.1.value' = 'groupId1', >&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.properties.2.key' = 'zookeeper.connect', >&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.properties.2.value' = '192.168.1.160:2181', >&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'update-mode' = 'append', >&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'format.type' = 'json', >&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'format.derive-schema' = 'true' >&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ) >&nbsp;&nbsp;&nbsp; """ > >&nbsp; tEnv.sqlUpdate(sourceDDL1) >&nbsp; tEnv.sqlQuery("select * from kafka_json_source").toAppendStream[Row].print() >&nbsp; env.execute("table-example2") >} >``` |
pom 文件
``` <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.hb</groupId> <artifactId>flink</artifactId> <packaging>pom</packaging> <version>1.9.1-SNAPSHOT</version> <properties> <java.version>1.8</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.9.1</flink.version> <scala.binary.version>2.11</scala.binary.version> <scala.version>2.11.12</scala.version> <elasticsearch.hadoop>6.2.3</elasticsearch.hadoop> <jcommander.version>1.72</jcommander.version> <gson.version>2.6.2</gson.version> <kafka.version>0.11.0.2</kafka.version> <fastjson.version>1.2.46</fastjson.version> <flink-connector-kafka>1.9.1</flink-connector-kafka> <log4j.version>1.2.17</log4j.version> <mysql-connector-java.version>5.1.42</mysql-connector-java.version> <net.dongliu.requests.version>4.18.1</net.dongliu.requests.version> <maven-compiler-plugin.version>3.6.0</maven-compiler-plugin.version> <flink.scope.type>compile</flink.scope.type> <!--<flink.scope.type>provided</flink.scope.type>--> <scope.type>compile</scope.type> </properties> <dependencies> <!--flink--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${flink.version}</version> <scope>${flink.scope.type}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> <scope>${flink.scope.type}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>${flink.version}</version> <scope>${flink.scope.type}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>${flink.version}</version> <scope>${flink.scope.type}</scope> </dependency> <!-- table --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.11</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime-blink_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> <version>${flink-connector-kafka}</version> </dependency> <!-- kafka DDL 需要用的依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch6_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web_2.11</artifactId> <version>${flink.version}</version> </dependency> <!-- flink-end--> <!----> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-hadoop</artifactId> <version>${elasticsearch.hadoop}</version> <scope>${scope.type}</scope> </dependency> <!--scala--> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> <scope>${scope.type}</scope> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-reflect</artifactId> <version>${scala.version}</version> <scope>${scope.type}</scope> </dependency> <!--kafka--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> <!--<exclusions>--> <!--<exclusion>--> <!--<artifactId>lz4</artifactId>--> <!--<groupId>net.jpountz.lz4</groupId>--> <!--</exclusion>--> <!--</exclusions>--> </dependency> <!--commander--> <dependency> <groupId>com.beust</groupId> <artifactId>jcommander</artifactId> <version>${jcommander.version}</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>${gson.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${fastjson.version}</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>${log4j.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql-connector-java.version}</version> </dependency> <dependency> <groupId>net.dongliu</groupId> <artifactId>requests</artifactId> <version>${net.dongliu.requests.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>${maven-compiler-plugin.version}</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> <encoding>${project.build.sourceEncoding}</encoding> </configuration> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.3.2</version> <executions> <execution> <id>scala-compile-first</id> <phase>process-resources</phase> <goals> <goal>compile</goal> </goals> </execution> <execution> <id>scala-test-compile-first</id> <phase>process-test-resources</phase> <goals> <goal>testCompile</goal> </goals> </execution> <execution> <id>attach-scaladocs</id> <phase>verify</phase> <goals> <goal>doc-jar</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.1.1</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> <exclude>defaults.yaml</exclude> </excludes> </filter> </filters> </configuration> </execution> </executions> </plugin> </plugins> </build> </project> ``` 在 2019-10-29 14:05:51,"如影随形" <[hidden email]> 写道: >你好: > maven的pom文件能贴出来看一下吗 > > > >陈浩 > > > > > > > > >------------------ 原始邮件 ------------------ >发件人: "hb"<[hidden email]>; >发送时间: 2019年10月29日(星期二) 下午2:53 >收件人: "user-zh"<[hidden email]>; > >主题: Re:回复:flink1.9.1 kafka表读取问题 > > > > > > >我指定的是0.11版本kafka, 之前lib目录下 只有 0.11 jar的文件, 还是报这个错误的 > > > > > >在 2019-10-29 13:47:34,"如影随形" <[hidden email]> 写道: >>你好: >> >> >>&nbsp; &nbsp; &nbsp; 看着像是包冲突了,你lib包下有4个flink_kafka的jar包,去掉不用的看看呢 >> >> >> >>陈浩 >> >> >>&nbsp; >> >> >> >> >> >>------------------&nbsp;原始邮件&nbsp;------------------ >>发件人:&nbsp;"hb"<[hidden email]&gt;; >>发送时间:&nbsp;2019年10月29日(星期二) 下午2:41 >>收件人:&nbsp;"user-zh"<[hidden email]&gt;; >> >>主题:&nbsp;flink1.9.1 kafka表读取问题 >> >> >> >>代码本地ide 能正常执行, 有正常输出, >> >> >>打包成fat-jar包后,提交到yarn-session 上执行 >>报: >>Caused by: org.apache.kafka.common.config.ConfigException: Invalid value org.apache.flink.kafka010.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer for configuration key.deserializer: Class org.apache.flink.kafka010.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer could not be found. >> >> >>请教下是什么原因? >> >> >>lib目录下文件为: >>flink-dist_2.11-1.9.1.jar&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; >>flink-sql-connector-kafka-0.10_2.11-1.9.0.jar&nbsp; >>flink-sql-connector-kafka_2.11-1.9.0.jar&nbsp; >>log4j-1.2.17.jar >>flink-json-1.9.0-sql-jar.jar >>flink-sql-connector-kafka-0.11_2.11-1.9.0.jar&nbsp; >>flink-table_2.11-1.9.1.jar&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; >>slf4j-log4j12-1.7.15.jar >>flink-shaded-hadoop-2-uber-2.6.5-7.0.jar&nbsp; >>flink-sql-connector-kafka-0.9_2.11-1.9.0.jar&nbsp;&nbsp; >>flink-table-blink_2.11-1.9.1.jar >> >> >> >> >> >> >>代码: >>``` >>import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} >>import org.apache.flink.table.api.EnvironmentSettings >>import org.apache.flink.table.api.scala.{StreamTableEnvironment, _} >>import org.apache.flink.types.Row >> >>object StreamingTable2 extends App{ >>&nbsp; val env = StreamExecutionEnvironment.getExecutionEnvironment >>&nbsp; val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >>&nbsp; val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings) >>&nbsp; env.setParallelism(2) >> >>&nbsp; val sourceDDL1 = >>&nbsp;&nbsp;&nbsp; """create table kafka_json_source( >>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `timestamp` BIGINT, >>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; id int, >>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; name varchar >>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ) with ( >>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.type' = 'kafka', >>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.version' = '0.11', >>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.topic' = 'hbtest2', >>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.startup-mode' = 'earliest-offset', >>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.properties.0.key' = 'bootstrap.servers', >>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.properties.0.value' = '192.168.1.160:19092', >>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.properties.1.key' = 'group.id', >>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.properties.1.value' = 'groupId1', >>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.properties.2.key' = 'zookeeper.connect', >>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.properties.2.value' = '192.168.1.160:2181', >>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'update-mode' = 'append', >>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'format.type' = 'json', >>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'format.derive-schema' = 'true' >>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ) >>&nbsp;&nbsp;&nbsp; """ >> >>&nbsp; tEnv.sqlUpdate(sourceDDL1) >>&nbsp; tEnv.sqlQuery("select * from kafka_json_source").toAppendStream[Row].print() >>&nbsp; env.execute("table-example2") >>} >>``` |
Free forum by Nabble | Edit this page |