来自郭华威的邮件

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

来自郭华威的邮件

郭华威
flink1.11.1 使用tableApi  报错:
Exception in thread "main" org.apache.flink.table.api.TableException: Create BatchTableEnvironment failed.
at org.apache.flink.table.api.bridge.java.BatchTableEnvironment.create(BatchTableEnvironment.java:517)
at org.apache.flink.table.api.bridge.java.BatchTableEnvironment.create(BatchTableEnvironment.java:471)
at yueworld.worldCount.BatchWordCount_tablesql.main(BatchWordCount_tablesql.java:24)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at org.apache.flink.table.api.bridge.java.BatchTableEnvironment.create(BatchTableEnvironment.java:509)
... 2 more
但是相关的依赖都有的,下面是pom文件:
<properties>
   <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
   <flink.version>1.11.1</flink.version>
   <mysql.version>5.1.40</mysql.version>
   <scala.binary.version>2.11</scala.binary.version>
   <scala.version>2.11.12</scala.version>
   <java.version>1.8</java.version>
   <maven.compiler.source>${java.version}</maven.compiler.source>
   <maven.compiler.target>${java.version}</maven.compiler.target>
</properties>

<dependencies>

   <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>${flink.version}</version>
   </dependency>

<!-- 利用Java开发 -->
<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
   </dependency>

<!-- 使用Blink Planner -->
<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
   </dependency>

<!-- 支持一些自定义的消息格式,比如kafka里面消息格式是json的,或者需要自定义函数支持 -->
<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-common</artifactId>
      <version>${flink.version}</version>
   </dependency>

<!-- JDBC Connector的支持,本案例会是使用MySQL -->
<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
   </dependency>

<!-- Kafka Connector的支持-->
<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-sql-connector-kafka-0.11_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
   </dependency>

<!-- Kafka里面的消息采用Json格式 -->
<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-json</artifactId>
      <version>${flink.version}</version>
   </dependency>

<!-- MySQL的驱动 -->
<dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>${mysql.version}</version>
   </dependency>

<!--提交作业所必须的依赖,比如:LocalExecutorFactory -->
<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_2.11</artifactId>
      <version>1.11.1</version>
   </dependency>

   <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-jdbc_2.11</artifactId>
      <version>1.11.1</version>
   </dependency>

   <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_2.11</artifactId>
      <version>1.11.0</version>
   </dependency>

<!-- 日志方便调试 -->
<dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>1.7.7</version>
      <scope>runtime</scope>
   </dependency>
   <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <version>1.2.17</version>
      <scope>runtime</scope>
   </dependency>
</dependencies>





 
Reply | Threaded
Open this post in threaded view
|

Re: 来自郭华威的邮件

godfrey he
BatchTableEnvironmentImpl 属于 old planner,
缺少 flink-table-planner_${scala.binary.version}.jar 的依赖

郭华威 <[hidden email]> 于2020年8月10日周一 上午10:21写道:

> flink1.11.1 使用tableApi  报错:
> Exception in thread "main" org.apache.flink.table.api.TableException:
> Create BatchTableEnvironment failed.
> at
> org.apache.flink.table.api.bridge.java.BatchTableEnvironment.create(BatchTableEnvironment.java:517)
> at
> org.apache.flink.table.api.bridge.java.BatchTableEnvironment.create(BatchTableEnvironment.java:471)
> at
> yueworld.worldCount.BatchWordCount_tablesql.main(BatchWordCount_tablesql.java:24)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:264)
> at
> org.apache.flink.table.api.bridge.java.BatchTableEnvironment.create(BatchTableEnvironment.java:509)
> ... 2 more
> 但是相关的依赖都有的,下面是pom文件:
> <properties>
>    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>    <flink.version>1.11.1</flink.version>
>    <mysql.version>5.1.40</mysql.version>
>    <scala.binary.version>2.11</scala.binary.version>
>    <scala.version>2.11.12</scala.version>
>    <java.version>1.8</java.version>
>    <maven.compiler.source>${java.version}</maven.compiler.source>
>    <maven.compiler.target>${java.version}</maven.compiler.target>
> </properties>
>
> <dependencies>
>
>    <dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-java</artifactId>
>       <version>${flink.version}</version>
>    </dependency>
>
> <!-- 利用Java开发 -->
> <dependency>
>       <groupId>org.apache.flink</groupId>
>
> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
>       <version>${flink.version}</version>
>    </dependency>
>
> <!-- 使用Blink Planner -->
> <dependency>
>       <groupId>org.apache.flink</groupId>
>
> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
>       <version>${flink.version}</version>
>    </dependency>
>
> <!-- 支持一些自定义的消息格式,比如kafka里面消息格式是json的,或者需要自定义函数支持 -->
> <dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-table-common</artifactId>
>       <version>${flink.version}</version>
>    </dependency>
>
> <!-- JDBC Connector的支持,本案例会是使用MySQL -->
> <dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
>       <version>${flink.version}</version>
>    </dependency>
>
> <!-- Kafka Connector的支持-->
> <dependency>
>       <groupId>org.apache.flink</groupId>
>
> <artifactId>flink-sql-connector-kafka-0.11_${scala.binary.version}</artifactId>
>       <version>${flink.version}</version>
>    </dependency>
>
> <!-- Kafka里面的消息采用Json格式 -->
> <dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-json</artifactId>
>       <version>${flink.version}</version>
>    </dependency>
>
> <!-- MySQL的驱动 -->
> <dependency>
>       <groupId>mysql</groupId>
>       <artifactId>mysql-connector-java</artifactId>
>       <version>${mysql.version}</version>
>    </dependency>
>
> <!--提交作业所必须的依赖,比如:LocalExecutorFactory -->
> <dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-clients_2.11</artifactId>
>       <version>1.11.1</version>
>    </dependency>
>
>    <dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-connector-jdbc_2.11</artifactId>
>       <version>1.11.1</version>
>    </dependency>
>
>    <dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-streaming-scala_2.11</artifactId>
>       <version>1.11.0</version>
>    </dependency>
>
> <!-- 日志方便调试 -->
> <dependency>
>       <groupId>org.slf4j</groupId>
>       <artifactId>slf4j-log4j12</artifactId>
>       <version>1.7.7</version>
>       <scope>runtime</scope>
>    </dependency>
>    <dependency>
>       <groupId>log4j</groupId>
>       <artifactId>log4j</artifactId>
>       <version>1.2.17</version>
>       <scope>runtime</scope>
>    </dependency>
> </dependencies>
>
>
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: 来自郭华威的邮件

Shengkai Fang
In reply to this post by 郭华威
hi.
能提供具体的代码?



郭华威 <[hidden email]> 于2020年8月10日周一 上午10:21写道:

> flink1.11.1 使用tableApi  报错:
> Exception in thread "main" org.apache.flink.table.api.TableException:
> Create BatchTableEnvironment failed.
> at
> org.apache.flink.table.api.bridge.java.BatchTableEnvironment.create(BatchTableEnvironment.java:517)
> at
> org.apache.flink.table.api.bridge.java.BatchTableEnvironment.create(BatchTableEnvironment.java:471)
> at
> yueworld.worldCount.BatchWordCount_tablesql.main(BatchWordCount_tablesql.java:24)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:264)
> at
> org.apache.flink.table.api.bridge.java.BatchTableEnvironment.create(BatchTableEnvironment.java:509)
> ... 2 more
> 但是相关的依赖都有的,下面是pom文件:
> <properties>
>    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>    <flink.version>1.11.1</flink.version>
>    <mysql.version>5.1.40</mysql.version>
>    <scala.binary.version>2.11</scala.binary.version>
>    <scala.version>2.11.12</scala.version>
>    <java.version>1.8</java.version>
>    <maven.compiler.source>${java.version}</maven.compiler.source>
>    <maven.compiler.target>${java.version}</maven.compiler.target>
> </properties>
>
> <dependencies>
>
>    <dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-java</artifactId>
>       <version>${flink.version}</version>
>    </dependency>
>
> <!-- 利用Java开发 -->
> <dependency>
>       <groupId>org.apache.flink</groupId>
>
> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
>       <version>${flink.version}</version>
>    </dependency>
>
> <!-- 使用Blink Planner -->
> <dependency>
>       <groupId>org.apache.flink</groupId>
>
> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
>       <version>${flink.version}</version>
>    </dependency>
>
> <!-- 支持一些自定义的消息格式,比如kafka里面消息格式是json的,或者需要自定义函数支持 -->
> <dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-table-common</artifactId>
>       <version>${flink.version}</version>
>    </dependency>
>
> <!-- JDBC Connector的支持,本案例会是使用MySQL -->
> <dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
>       <version>${flink.version}</version>
>    </dependency>
>
> <!-- Kafka Connector的支持-->
> <dependency>
>       <groupId>org.apache.flink</groupId>
>
> <artifactId>flink-sql-connector-kafka-0.11_${scala.binary.version}</artifactId>
>       <version>${flink.version}</version>
>    </dependency>
>
> <!-- Kafka里面的消息采用Json格式 -->
> <dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-json</artifactId>
>       <version>${flink.version}</version>
>    </dependency>
>
> <!-- MySQL的驱动 -->
> <dependency>
>       <groupId>mysql</groupId>
>       <artifactId>mysql-connector-java</artifactId>
>       <version>${mysql.version}</version>
>    </dependency>
>
> <!--提交作业所必须的依赖,比如:LocalExecutorFactory -->
> <dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-clients_2.11</artifactId>
>       <version>1.11.1</version>
>    </dependency>
>
>    <dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-connector-jdbc_2.11</artifactId>
>       <version>1.11.1</version>
>    </dependency>
>
>    <dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-streaming-scala_2.11</artifactId>
>       <version>1.11.0</version>
>    </dependency>
>
> <!-- 日志方便调试 -->
> <dependency>
>       <groupId>org.slf4j</groupId>
>       <artifactId>slf4j-log4j12</artifactId>
>       <version>1.7.7</version>
>       <scope>runtime</scope>
>    </dependency>
>    <dependency>
>       <groupId>log4j</groupId>
>       <artifactId>log4j</artifactId>
>       <version>1.2.17</version>
>       <scope>runtime</scope>
>    </dependency>
> </dependencies>
>
>
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re:Re: 来自郭华威的邮件

郭华威

/**
 *  flink sql 计算wordCount
 */
public class BatchWordCount_tablesql {
public static void main(String[] args) throws Exception {

// 创建执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 创建一个tableEnvironment
BatchTableEnvironment btEnv = BatchTableEnvironment.create(env);

// 获取数据
String words = "hello flink hello spark hello hbase";
String[] split = words.split(" ");
ArrayList<WordCount> list = new ArrayList<>();
        for (String word:split) {
            list.add(new WordCount(word,1));
}
        DataSet<WordCount> input = env.fromCollection(list);

// DataSet 转sql,指定字段名称
Table table = btEnv.fromDataSet(input,"word,frequency");
table.printSchema();

// 将table注册为表
btEnv.createTemporaryView("WordCountTable",table);

// 执行sql 查询
Table table1 = btEnv.sqlQuery("select word as word,sum(frequency) as frequency from WordCountTable group by word");

// 将sql查询出来的结果转换为DataSet
DataSet<WordCount> wordCountDataSet = btEnv.toDataSet(table1, WordCount.class);

wordCountDataSet.printToErr();

}

public static class WordCount{
public String word;
        public int frequency;

        public WordCount() {
        }

public WordCount(String word, int frequency) {
this.word = word;
            this.frequency = frequency;
}

public String getWord() {
return word;
}

public void setWord(String word) {
this.word = word;
}

public int getFrequency() {
return frequency;
}

public void setFrequency(int frequency) {
this.frequency = frequency;
}

@Override
public String toString() {
return "WordCount{" +
"word='" + word + '\'' +
", frequency=" + frequency +
'}';
}
    }
}

















在 2020-08-10 10:48:27,"Shengkai Fang" <[hidden email]> 写道:

>hi.
>能提供具体的代码?
>
>
>
>郭华威 <[hidden email]> 于2020年8月10日周一 上午10:21写道:
>
>> flink1.11.1 使用tableApi  报错:
>> Exception in thread "main" org.apache.flink.table.api.TableException:
>> Create BatchTableEnvironment failed.
>> at
>> org.apache.flink.table.api.bridge.java.BatchTableEnvironment.create(BatchTableEnvironment.java:517)
>> at
>> org.apache.flink.table.api.bridge.java.BatchTableEnvironment.create(BatchTableEnvironment.java:471)
>> at
>> yueworld.worldCount.BatchWordCount_tablesql.main(BatchWordCount_tablesql.java:24)
>> Caused by: java.lang.ClassNotFoundException:
>> org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:264)
>> at
>> org.apache.flink.table.api.bridge.java.BatchTableEnvironment.create(BatchTableEnvironment.java:509)
>> ... 2 more
>> 但是相关的依赖都有的,下面是pom文件:
>> <properties>
>>    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>>    <flink.version>1.11.1</flink.version>
>>    <mysql.version>5.1.40</mysql.version>
>>    <scala.binary.version>2.11</scala.binary.version>
>>    <scala.version>2.11.12</scala.version>
>>    <java.version>1.8</java.version>
>>    <maven.compiler.source>${java.version}</maven.compiler.source>
>>    <maven.compiler.target>${java.version}</maven.compiler.target>
>> </properties>
>>
>> <dependencies>
>>
>>    <dependency>
>>       <groupId>org.apache.flink</groupId>
>>       <artifactId>flink-java</artifactId>
>>       <version>${flink.version}</version>
>>    </dependency>
>>
>> <!-- 利用Java开发 -->
>> <dependency>
>>       <groupId>org.apache.flink</groupId>
>>
>> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
>>       <version>${flink.version}</version>
>>    </dependency>
>>
>> <!-- 使用Blink Planner -->
>> <dependency>
>>       <groupId>org.apache.flink</groupId>
>>
>> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
>>       <version>${flink.version}</version>
>>    </dependency>
>>
>> <!-- 支持一些自定义的消息格式,比如kafka里面消息格式是json的,或者需要自定义函数支持 -->
>> <dependency>
>>       <groupId>org.apache.flink</groupId>
>>       <artifactId>flink-table-common</artifactId>
>>       <version>${flink.version}</version>
>>    </dependency>
>>
>> <!-- JDBC Connector的支持,本案例会是使用MySQL -->
>> <dependency>
>>       <groupId>org.apache.flink</groupId>
>>       <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
>>       <version>${flink.version}</version>
>>    </dependency>
>>
>> <!-- Kafka Connector的支持-->
>> <dependency>
>>       <groupId>org.apache.flink</groupId>
>>
>> <artifactId>flink-sql-connector-kafka-0.11_${scala.binary.version}</artifactId>
>>       <version>${flink.version}</version>
>>    </dependency>
>>
>> <!-- Kafka里面的消息采用Json格式 -->
>> <dependency>
>>       <groupId>org.apache.flink</groupId>
>>       <artifactId>flink-json</artifactId>
>>       <version>${flink.version}</version>
>>    </dependency>
>>
>> <!-- MySQL的驱动 -->
>> <dependency>
>>       <groupId>mysql</groupId>
>>       <artifactId>mysql-connector-java</artifactId>
>>       <version>${mysql.version}</version>
>>    </dependency>
>>
>> <!--提交作业所必须的依赖,比如:LocalExecutorFactory -->
>> <dependency>
>>       <groupId>org.apache.flink</groupId>
>>       <artifactId>flink-clients_2.11</artifactId>
>>       <version>1.11.1</version>
>>    </dependency>
>>
>>    <dependency>
>>       <groupId>org.apache.flink</groupId>
>>       <artifactId>flink-connector-jdbc_2.11</artifactId>
>>       <version>1.11.1</version>
>>    </dependency>
>>
>>    <dependency>
>>       <groupId>org.apache.flink</groupId>
>>       <artifactId>flink-streaming-scala_2.11</artifactId>
>>       <version>1.11.0</version>
>>    </dependency>
>>
>> <!-- 日志方便调试 -->
>> <dependency>
>>       <groupId>org.slf4j</groupId>
>>       <artifactId>slf4j-log4j12</artifactId>
>>       <version>1.7.7</version>
>>       <scope>runtime</scope>
>>    </dependency>
>>    <dependency>
>>       <groupId>log4j</groupId>
>>       <artifactId>log4j</artifactId>
>>       <version>1.2.17</version>
>>       <scope>runtime</scope>
>>    </dependency>
>> </dependencies>
>>
>>
>>
>>
>>
>>