flink-connector-clickhouse写入ClickHouse 问题

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

flink-connector-clickhouse写入ClickHouse 问题

DanielGu
使用了阿里的包,写入clickhouse
阿里云flink-connector-clickhouse写入ClickHouse
<https://help.aliyun.com/document_detail/185696.html?spm=5176.12901015.0.i12901015.2b41525cECNyYW&accounttraceid=1ac9126237284ef9b0a25f666c3030dfxaso>  

测试写入clickhouse ,返回如下,无报错,但并未成功写入,不知从何下手排查,请教各位大佬
+---------------------------------------------+
| default_catalog.default_database.sink_table |
+---------------------------------------------+
|                                          -1 |
+---------------------------------------------+


代码如下
package com.daniel
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.sources._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api._
import org.apache.flink.types.Row
import org.apache.flink.table.api.{
  TableEnvironment,
  TableSchema,
  Types,
  ValidationException
}

object StreamingJob {
  def main(args: Array[String]) {
    val SourceCsvPath =
      "/Users/flink-sql-demo/flink-sql-source.csv"

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.getConfig.disableClosureCleaner

    val tEnv = StreamTableEnvironment.create(env)

    val csvTableSource = CsvTableSource
      .builder()
      .path(SourceCsvPath)
      .ignoreFirstLine()
      .fieldDelimiter(",")
      .field("name", DataTypes.STRING())
      .field("age", DataTypes.BIGINT())
//      .field("sex", DataTypes.STRING())
//      .field("grade", DataTypes.INT())
      .field("rate", DataTypes.FLOAT())
      .build()

    tEnv.registerTableSource("source", csvTableSource)

    val create_sql =
      s"""
         | CREATE TABLE sink_table (
         |    name VARCHAR
         |) WITH (
         |    'connector' = 'clickhouse',
         |    'url' = 'clickhouse://*****:8080',
         |    'username' = '****',
         |    'password' = '****',
         |    'database-name' = '***',
         |    'table-name' = 'live.d_sink_table',
         |    'sink.batch-size' = '1',
         |    'sink.partition-strategy' = 'hash',
         |    'sink.partition-key' = 'name'
         |)
         |""".stripMargin



    tEnv.executeSql(create_sql);

    val result = tEnv.executeSql(
      "INSERT INTO sink_table SELECT name FROM source"
    )
    result.print()
  }

}






--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink-connector-clickhouse写入ClickHouse 问题

张锴
换个第三方工具看看 https://github.com/blynkkk/clickhouse4j

<dependency>
    <groupId>cc.blynk.clickhouse</groupId>
    <artifactId>clickhouse4j</artifactId>
    <version>1.4.4</version>
</dependency>


DanielGu <[hidden email]> 于2020年12月28日周一 上午12:22写道:

> 使用了阿里的包,写入clickhouse
> 阿里云flink-connector-clickhouse写入ClickHouse
> <
> https://help.aliyun.com/document_detail/185696.html?spm=5176.12901015.0.i12901015.2b41525cECNyYW&accounttraceid=1ac9126237284ef9b0a25f666c3030dfxaso>
>
>
> 测试写入clickhouse ,返回如下,无报错,但并未成功写入,不知从何下手排查,请教各位大佬
> +---------------------------------------------+
> | default_catalog.default_database.sink_table |
> +---------------------------------------------+
> |                                          -1 |
> +---------------------------------------------+
>
>
> 代码如下
> package com.daniel
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.table.sources._
> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
> import org.apache.flink.table.api._
> import org.apache.flink.types.Row
> import org.apache.flink.table.api.{
>   TableEnvironment,
>   TableSchema,
>   Types,
>   ValidationException
> }
>
> object StreamingJob {
>   def main(args: Array[String]) {
>     val SourceCsvPath =
>       "/Users/flink-sql-demo/flink-sql-source.csv"
>
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>
>     env.getConfig.disableClosureCleaner
>
>     val tEnv = StreamTableEnvironment.create(env)
>
>     val csvTableSource = CsvTableSource
>       .builder()
>       .path(SourceCsvPath)
>       .ignoreFirstLine()
>       .fieldDelimiter(",")
>       .field("name", DataTypes.STRING())
>       .field("age", DataTypes.BIGINT())
> //      .field("sex", DataTypes.STRING())
> //      .field("grade", DataTypes.INT())
>       .field("rate", DataTypes.FLOAT())
>       .build()
>
>     tEnv.registerTableSource("source", csvTableSource)
>
>     val create_sql =
>       s"""
>          | CREATE TABLE sink_table (
>          |    name VARCHAR
>          |) WITH (
>          |    'connector' = 'clickhouse',
>          |    'url' = 'clickhouse://*****:8080',
>          |    'username' = '****',
>          |    'password' = '****',
>          |    'database-name' = '***',
>          |    'table-name' = 'live.d_sink_table',
>          |    'sink.batch-size' = '1',
>          |    'sink.partition-strategy' = 'hash',
>          |    'sink.partition-key' = 'name'
>          |)
>          |""".stripMargin
>
>
>
>     tEnv.executeSql(create_sql);
>
>     val result = tEnv.executeSql(
>       "INSERT INTO sink_table SELECT name FROM source"
>     )
>     result.print()
>   }
>
> }
>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>