使用了阿里的包,写入clickhouse
阿里云flink-connector-clickhouse写入ClickHouse <https://help.aliyun.com/document_detail/185696.html?spm=5176.12901015.0.i12901015.2b41525cECNyYW&accounttraceid=1ac9126237284ef9b0a25f666c3030dfxaso> 测试写入clickhouse ,返回如下,无报错,但并未成功写入,不知从何下手排查,请教各位大佬 +---------------------------------------------+ | default_catalog.default_database.sink_table | +---------------------------------------------+ | -1 | +---------------------------------------------+ 代码如下 package com.daniel import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.sources._ import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.table.api._ import org.apache.flink.types.Row import org.apache.flink.table.api.{ TableEnvironment, TableSchema, Types, ValidationException } object StreamingJob { def main(args: Array[String]) { val SourceCsvPath = "/Users/flink-sql-demo/flink-sql-source.csv" val env = StreamExecutionEnvironment.getExecutionEnvironment env.getConfig.disableClosureCleaner val tEnv = StreamTableEnvironment.create(env) val csvTableSource = CsvTableSource .builder() .path(SourceCsvPath) .ignoreFirstLine() .fieldDelimiter(",") .field("name", DataTypes.STRING()) .field("age", DataTypes.BIGINT()) // .field("sex", DataTypes.STRING()) // .field("grade", DataTypes.INT()) .field("rate", DataTypes.FLOAT()) .build() tEnv.registerTableSource("source", csvTableSource) val create_sql = s""" | CREATE TABLE sink_table ( | name VARCHAR |) WITH ( | 'connector' = 'clickhouse', | 'url' = 'clickhouse://*****:8080', | 'username' = '****', | 'password' = '****', | 'database-name' = '***', | 'table-name' = 'live.d_sink_table', | 'sink.batch-size' = '1', | 'sink.partition-strategy' = 'hash', | 'sink.partition-key' = 'name' |) |""".stripMargin tEnv.executeSql(create_sql); val result = tEnv.executeSql( "INSERT INTO sink_table SELECT name FROM source" ) result.print() } } -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
换个第三方工具看看 https://github.com/blynkkk/clickhouse4j
<dependency> <groupId>cc.blynk.clickhouse</groupId> <artifactId>clickhouse4j</artifactId> <version>1.4.4</version> </dependency> DanielGu <[hidden email]> 于2020年12月28日周一 上午12:22写道: > 使用了阿里的包,写入clickhouse > 阿里云flink-connector-clickhouse写入ClickHouse > < > https://help.aliyun.com/document_detail/185696.html?spm=5176.12901015.0.i12901015.2b41525cECNyYW&accounttraceid=1ac9126237284ef9b0a25f666c3030dfxaso> > > > 测试写入clickhouse ,返回如下,无报错,但并未成功写入,不知从何下手排查,请教各位大佬 > +---------------------------------------------+ > | default_catalog.default_database.sink_table | > +---------------------------------------------+ > | -1 | > +---------------------------------------------+ > > > 代码如下 > package com.daniel > import org.apache.flink.streaming.api.scala._ > import org.apache.flink.table.sources._ > import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment > import org.apache.flink.table.api._ > import org.apache.flink.types.Row > import org.apache.flink.table.api.{ > TableEnvironment, > TableSchema, > Types, > ValidationException > } > > object StreamingJob { > def main(args: Array[String]) { > val SourceCsvPath = > "/Users/flink-sql-demo/flink-sql-source.csv" > > val env = StreamExecutionEnvironment.getExecutionEnvironment > > env.getConfig.disableClosureCleaner > > val tEnv = StreamTableEnvironment.create(env) > > val csvTableSource = CsvTableSource > .builder() > .path(SourceCsvPath) > .ignoreFirstLine() > .fieldDelimiter(",") > .field("name", DataTypes.STRING()) > .field("age", DataTypes.BIGINT()) > // .field("sex", DataTypes.STRING()) > // .field("grade", DataTypes.INT()) > .field("rate", DataTypes.FLOAT()) > .build() > > tEnv.registerTableSource("source", csvTableSource) > > val create_sql = > s""" > | CREATE TABLE sink_table ( > | name VARCHAR > |) WITH ( > | 'connector' = 'clickhouse', > | 'url' = 'clickhouse://*****:8080', > | 'username' = '****', > | 'password' = '****', > | 'database-name' = '***', > | 'table-name' = 'live.d_sink_table', > | 'sink.batch-size' = '1', > | 'sink.partition-strategy' = 'hash', > | 'sink.partition-key' = 'name' > |) > |""".stripMargin > > > > tEnv.executeSql(create_sql); > > val result = tEnv.executeSql( > "INSERT INTO sink_table SELECT name FROM source" > ) > result.print() > } > > } > > > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > |
Free forum by Nabble | Edit this page |