flink1.11.1 使用tableApi 报错:
Exception in thread "main" org.apache.flink.table.api.TableException: Create BatchTableEnvironment failed. at org.apache.flink.table.api.bridge.java.BatchTableEnvironment.create(BatchTableEnvironment.java:517) at org.apache.flink.table.api.bridge.java.BatchTableEnvironment.create(BatchTableEnvironment.java:471) at yueworld.worldCount.BatchWordCount_tablesql.main(BatchWordCount_tablesql.java:24) Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:264) at org.apache.flink.table.api.bridge.java.BatchTableEnvironment.create(BatchTableEnvironment.java:509) ... 2 more 但是相关的依赖都有的,下面是pom文件: <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.11.1</flink.version> <mysql.version>5.1.40</mysql.version> <scala.binary.version>2.11</scala.binary.version> <scala.version>2.11.12</scala.version> <java.version>1.8</java.version> <maven.compiler.source>${java.version}</maven.compiler.source> <maven.compiler.target>${java.version}</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <!-- 利用Java开发 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- 使用Blink Planner --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- 支持一些自定义的消息格式,比如kafka里面消息格式是json的,或者需要自定义函数支持 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency> <!-- JDBC Connector的支持,本案例会是使用MySQL --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- Kafka Connector的支持--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-sql-connector-kafka-0.11_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- Kafka里面的消息采用Json格式 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> <!-- MySQL的驱动 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql.version}</version> </dependency> <!--提交作业所必须的依赖,比如:LocalExecutorFactory --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>1.11.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_2.11</artifactId> <version>1.11.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.11.0</version> </dependency> <!-- 日志方便调试 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> <scope>runtime</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> <scope>runtime</scope> </dependency> </dependencies> |
BatchTableEnvironmentImpl 属于 old planner,
缺少 flink-table-planner_${scala.binary.version}.jar 的依赖 郭华威 <[hidden email]> 于2020年8月10日周一 上午10:21写道: > flink1.11.1 使用tableApi 报错: > Exception in thread "main" org.apache.flink.table.api.TableException: > Create BatchTableEnvironment failed. > at > org.apache.flink.table.api.bridge.java.BatchTableEnvironment.create(BatchTableEnvironment.java:517) > at > org.apache.flink.table.api.bridge.java.BatchTableEnvironment.create(BatchTableEnvironment.java:471) > at > yueworld.worldCount.BatchWordCount_tablesql.main(BatchWordCount_tablesql.java:24) > Caused by: java.lang.ClassNotFoundException: > org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl > at java.net.URLClassLoader.findClass(URLClassLoader.java:382) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:264) > at > org.apache.flink.table.api.bridge.java.BatchTableEnvironment.create(BatchTableEnvironment.java:509) > ... 2 more > 但是相关的依赖都有的,下面是pom文件: > <properties> > <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> > <flink.version>1.11.1</flink.version> > <mysql.version>5.1.40</mysql.version> > <scala.binary.version>2.11</scala.binary.version> > <scala.version>2.11.12</scala.version> > <java.version>1.8</java.version> > <maven.compiler.source>${java.version}</maven.compiler.source> > <maven.compiler.target>${java.version}</maven.compiler.target> > </properties> > > <dependencies> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-java</artifactId> > <version>${flink.version}</version> > </dependency> > > <!-- 利用Java开发 --> > <dependency> > <groupId>org.apache.flink</groupId> > > <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > > <!-- 使用Blink Planner --> > <dependency> > <groupId>org.apache.flink</groupId> > > <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > > <!-- 支持一些自定义的消息格式,比如kafka里面消息格式是json的,或者需要自定义函数支持 --> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-common</artifactId> > <version>${flink.version}</version> > </dependency> > > <!-- JDBC Connector的支持,本案例会是使用MySQL --> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > > <!-- Kafka Connector的支持--> > <dependency> > <groupId>org.apache.flink</groupId> > > <artifactId>flink-sql-connector-kafka-0.11_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > > <!-- Kafka里面的消息采用Json格式 --> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-json</artifactId> > <version>${flink.version}</version> > </dependency> > > <!-- MySQL的驱动 --> > <dependency> > <groupId>mysql</groupId> > <artifactId>mysql-connector-java</artifactId> > <version>${mysql.version}</version> > </dependency> > > <!--提交作业所必须的依赖,比如:LocalExecutorFactory --> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-clients_2.11</artifactId> > <version>1.11.1</version> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-jdbc_2.11</artifactId> > <version>1.11.1</version> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-streaming-scala_2.11</artifactId> > <version>1.11.0</version> > </dependency> > > <!-- 日志方便调试 --> > <dependency> > <groupId>org.slf4j</groupId> > <artifactId>slf4j-log4j12</artifactId> > <version>1.7.7</version> > <scope>runtime</scope> > </dependency> > <dependency> > <groupId>log4j</groupId> > <artifactId>log4j</artifactId> > <version>1.2.17</version> > <scope>runtime</scope> > </dependency> > </dependencies> > > > > > > |
In reply to this post by 郭华威
hi.
能提供具体的代码? 郭华威 <[hidden email]> 于2020年8月10日周一 上午10:21写道: > flink1.11.1 使用tableApi 报错: > Exception in thread "main" org.apache.flink.table.api.TableException: > Create BatchTableEnvironment failed. > at > org.apache.flink.table.api.bridge.java.BatchTableEnvironment.create(BatchTableEnvironment.java:517) > at > org.apache.flink.table.api.bridge.java.BatchTableEnvironment.create(BatchTableEnvironment.java:471) > at > yueworld.worldCount.BatchWordCount_tablesql.main(BatchWordCount_tablesql.java:24) > Caused by: java.lang.ClassNotFoundException: > org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl > at java.net.URLClassLoader.findClass(URLClassLoader.java:382) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:264) > at > org.apache.flink.table.api.bridge.java.BatchTableEnvironment.create(BatchTableEnvironment.java:509) > ... 2 more > 但是相关的依赖都有的,下面是pom文件: > <properties> > <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> > <flink.version>1.11.1</flink.version> > <mysql.version>5.1.40</mysql.version> > <scala.binary.version>2.11</scala.binary.version> > <scala.version>2.11.12</scala.version> > <java.version>1.8</java.version> > <maven.compiler.source>${java.version}</maven.compiler.source> > <maven.compiler.target>${java.version}</maven.compiler.target> > </properties> > > <dependencies> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-java</artifactId> > <version>${flink.version}</version> > </dependency> > > <!-- 利用Java开发 --> > <dependency> > <groupId>org.apache.flink</groupId> > > <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > > <!-- 使用Blink Planner --> > <dependency> > <groupId>org.apache.flink</groupId> > > <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > > <!-- 支持一些自定义的消息格式,比如kafka里面消息格式是json的,或者需要自定义函数支持 --> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-common</artifactId> > <version>${flink.version}</version> > </dependency> > > <!-- JDBC Connector的支持,本案例会是使用MySQL --> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > > <!-- Kafka Connector的支持--> > <dependency> > <groupId>org.apache.flink</groupId> > > <artifactId>flink-sql-connector-kafka-0.11_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > > <!-- Kafka里面的消息采用Json格式 --> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-json</artifactId> > <version>${flink.version}</version> > </dependency> > > <!-- MySQL的驱动 --> > <dependency> > <groupId>mysql</groupId> > <artifactId>mysql-connector-java</artifactId> > <version>${mysql.version}</version> > </dependency> > > <!--提交作业所必须的依赖,比如:LocalExecutorFactory --> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-clients_2.11</artifactId> > <version>1.11.1</version> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-jdbc_2.11</artifactId> > <version>1.11.1</version> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-streaming-scala_2.11</artifactId> > <version>1.11.0</version> > </dependency> > > <!-- 日志方便调试 --> > <dependency> > <groupId>org.slf4j</groupId> > <artifactId>slf4j-log4j12</artifactId> > <version>1.7.7</version> > <scope>runtime</scope> > </dependency> > <dependency> > <groupId>log4j</groupId> > <artifactId>log4j</artifactId> > <version>1.2.17</version> > <scope>runtime</scope> > </dependency> > </dependencies> > > > > > > |
/** * flink sql 计算wordCount */ public class BatchWordCount_tablesql { public static void main(String[] args) throws Exception { // 创建执行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 创建一个tableEnvironment BatchTableEnvironment btEnv = BatchTableEnvironment.create(env); // 获取数据 String words = "hello flink hello spark hello hbase"; String[] split = words.split(" "); ArrayList<WordCount> list = new ArrayList<>(); for (String word:split) { list.add(new WordCount(word,1)); } DataSet<WordCount> input = env.fromCollection(list); // DataSet 转sql,指定字段名称 Table table = btEnv.fromDataSet(input,"word,frequency"); table.printSchema(); // 将table注册为表 btEnv.createTemporaryView("WordCountTable",table); // 执行sql 查询 Table table1 = btEnv.sqlQuery("select word as word,sum(frequency) as frequency from WordCountTable group by word"); // 将sql查询出来的结果转换为DataSet DataSet<WordCount> wordCountDataSet = btEnv.toDataSet(table1, WordCount.class); wordCountDataSet.printToErr(); } public static class WordCount{ public String word; public int frequency; public WordCount() { } public WordCount(String word, int frequency) { this.word = word; this.frequency = frequency; } public String getWord() { return word; } public void setWord(String word) { this.word = word; } public int getFrequency() { return frequency; } public void setFrequency(int frequency) { this.frequency = frequency; } @Override public String toString() { return "WordCount{" + "word='" + word + '\'' + ", frequency=" + frequency + '}'; } } } 在 2020-08-10 10:48:27,"Shengkai Fang" <[hidden email]> 写道: >hi. >能提供具体的代码? > > > >郭华威 <[hidden email]> 于2020年8月10日周一 上午10:21写道: > >> flink1.11.1 使用tableApi 报错: >> Exception in thread "main" org.apache.flink.table.api.TableException: >> Create BatchTableEnvironment failed. >> at >> org.apache.flink.table.api.bridge.java.BatchTableEnvironment.create(BatchTableEnvironment.java:517) >> at >> org.apache.flink.table.api.bridge.java.BatchTableEnvironment.create(BatchTableEnvironment.java:471) >> at >> yueworld.worldCount.BatchWordCount_tablesql.main(BatchWordCount_tablesql.java:24) >> Caused by: java.lang.ClassNotFoundException: >> org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl >> at java.net.URLClassLoader.findClass(URLClassLoader.java:382) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >> at java.lang.Class.forName0(Native Method) >> at java.lang.Class.forName(Class.java:264) >> at >> org.apache.flink.table.api.bridge.java.BatchTableEnvironment.create(BatchTableEnvironment.java:509) >> ... 2 more >> 但是相关的依赖都有的,下面是pom文件: >> <properties> >> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> >> <flink.version>1.11.1</flink.version> >> <mysql.version>5.1.40</mysql.version> >> <scala.binary.version>2.11</scala.binary.version> >> <scala.version>2.11.12</scala.version> >> <java.version>1.8</java.version> >> <maven.compiler.source>${java.version}</maven.compiler.source> >> <maven.compiler.target>${java.version}</maven.compiler.target> >> </properties> >> >> <dependencies> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-java</artifactId> >> <version>${flink.version}</version> >> </dependency> >> >> <!-- 利用Java开发 --> >> <dependency> >> <groupId>org.apache.flink</groupId> >> >> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> >> <version>${flink.version}</version> >> </dependency> >> >> <!-- 使用Blink Planner --> >> <dependency> >> <groupId>org.apache.flink</groupId> >> >> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> >> <version>${flink.version}</version> >> </dependency> >> >> <!-- 支持一些自定义的消息格式,比如kafka里面消息格式是json的,或者需要自定义函数支持 --> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-table-common</artifactId> >> <version>${flink.version}</version> >> </dependency> >> >> <!-- JDBC Connector的支持,本案例会是使用MySQL --> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId> >> <version>${flink.version}</version> >> </dependency> >> >> <!-- Kafka Connector的支持--> >> <dependency> >> <groupId>org.apache.flink</groupId> >> >> <artifactId>flink-sql-connector-kafka-0.11_${scala.binary.version}</artifactId> >> <version>${flink.version}</version> >> </dependency> >> >> <!-- Kafka里面的消息采用Json格式 --> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-json</artifactId> >> <version>${flink.version}</version> >> </dependency> >> >> <!-- MySQL的驱动 --> >> <dependency> >> <groupId>mysql</groupId> >> <artifactId>mysql-connector-java</artifactId> >> <version>${mysql.version}</version> >> </dependency> >> >> <!--提交作业所必须的依赖,比如:LocalExecutorFactory --> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-clients_2.11</artifactId> >> <version>1.11.1</version> >> </dependency> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-connector-jdbc_2.11</artifactId> >> <version>1.11.1</version> >> </dependency> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-streaming-scala_2.11</artifactId> >> <version>1.11.0</version> >> </dependency> >> >> <!-- 日志方便调试 --> >> <dependency> >> <groupId>org.slf4j</groupId> >> <artifactId>slf4j-log4j12</artifactId> >> <version>1.7.7</version> >> <scope>runtime</scope> >> </dependency> >> <dependency> >> <groupId>log4j</groupId> >> <artifactId>log4j</artifactId> >> <version>1.2.17</version> >> <scope>runtime</scope> >> </dependency> >> </dependencies> >> >> >> >> >> >> |
Free forum by Nabble | Edit this page |