Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
97 posts
|
flink订阅kafka消息,同时sink到hbase和hive中, 当向kafka发送42条记录,然后停止producer发消息,去hive中查可以精准地查到42条,但是在hbase中却只查到30条 query: streamTableEnv.executeSql( """ | |CREATE TABLE hbase_table ( | rowkey VARCHAR, | cf ROW(sex VARCHAR, age INT, created_time VARCHAR) |) WITH ( | 'connector.type' = 'hbase', | 'connector.version' = '2.1.0', | 'connector.table-name' = 'ods:user_hbase6', | 'connector.zookeeper.quorum' = 'cdh1:2181,cdh2:2181,cdh3:2181', | 'connector.zookeeper.znode.parent' = '/hbase', | 'connector.write.buffer-flush.max-size' = '1mb', | 'connector.write.buffer-flush.max-rows' = '1', | 'connector.write.buffer-flush.interval' = '0s' |) |""".stripMargin) val statementSet = streamTableEnv.createStatementSet() val insertHbase = """ |insert into hbase_table |SELECT | CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid, | ROW(sex, age, created_time ) as cf |FROM (select uid,sex,age, cast(created_time as VARCHAR) as created_time from kafka_table) | |""".stripMargin statementSet.addInsertSql(insertHbase) val insertHive = """ | |INSERT INTO odsCatalog.ods.hive_table |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'), DATE_FORMAT(created_time, 'HH') |FROM kafka_table | |""".stripMargin statementSet.addInsertSql(insertHive) statementSet.execute() 是因为参数'connector.write.buffer-flush.max-size' = '1mb'吗?我尝试设置‘0’,‘10b','1kb',都失败了,报错如下: Property 'connector.write.buffer-flush.max-size' must be a memory size (in bytes) value but was: 1kb Property 'connector.write.buffer-flush.max-size' must be a memory size (in bytes) value but was: 10b Property 'connector.write.buffer-flush.max-size' must be a memory size (in bytes) value but was: 1 并且,按照官网文档 https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/hbase.html 设置参数也不识别,报错: Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'hbase-2.1.0' that implements 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath. 看了一下源码, org.apache.flink.table.descriptors.HBaseValidator public static final String CONNECTOR_TYPE_VALUE_HBASE = "hbase"; public static final String CONNECTOR_VERSION_VALUE_143 = "2.1.0"; public static final String CONNECTOR_TABLE_NAME = "connector.table-name"; public static final String CONNECTOR_ZK_QUORUM = "connector.zookeeper.quorum"; public static final String CONNECTOR_ZK_NODE_PARENT = "connector.zookeeper.znode.parent"; public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_SIZE = "connector.write.buffer-flush.max-size"; public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS = "connector.write.buffer-flush.max-rows"; public static final String CONNECTOR_WRITE_BUFFER_FLUSH_INTERVAL = "connector.write.buffer-flush.interval"; 参数还是老参数 |
Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
339 posts
|
Hi, Zhou
> 'connector.write.buffer-flush.max-size' = '1mb', > 'connector.write.buffer-flush.interval' = ‘0s' (1) connector.write.buffer-flush.max-size这个配置项支持的单位只有mb,其他不支持,所以会报对应的错。这个参数用于 BufferredMutator 做buffer优化的参数,表示buffer存多大的size就触发写,flush.interval参数是按照多长的时间轮询写入,两个参数根据需要配合使用。当connector.write.buffer-flush.interval 设置为 0s 时,表示不会轮询,所以只会等connector.write.buffer-flush.max-size到最大size再写入。你把connector.write.buffer-flush.interval 设置成 1s 应该就能看到数据了。 (2) Hbase connector 1.11.0 之前的版本只支持1.4.3,所以你填2.1.0会报错,在1.11.0开始支持为1.4.x, 所以1.11.0新的connector里支持的参数为’connector’ = ‘hbase-1.4’, 因为hbase 1.4.x版本API是兼容的,另外社区也在讨论支持HBase 2.x[1] Best, Leonard Xu [1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Upgrade-HBase-connector-to-2-2-x-tc42657.html#a42674 <http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Upgrade-HBase-connector-to-2-2-x-tc42657.html#a42674> > 在 2020年7月13日,21:09,Zhou Zach <[hidden email]> 写道: > > > > flink订阅kafka消息,同时sink到hbase和hive中, > 当向kafka发送42条记录,然后停止producer发消息,去hive中查可以精准地查到42条,但是在hbase中却只查到30条 > > > query: > streamTableEnv.executeSql( > """ > | > |CREATE TABLE hbase_table ( > | rowkey VARCHAR, > | cf ROW(sex VARCHAR, age INT, created_time VARCHAR) > |) WITH ( > | 'connector.type' = 'hbase', > | 'connector.version' = '2.1.0', > | 'connector.table-name' = 'ods:user_hbase6', > | 'connector.zookeeper.quorum' = 'cdh1:2181,cdh2:2181,cdh3:2181', > | 'connector.zookeeper.znode.parent' = '/hbase', > | 'connector.write.buffer-flush.max-size' = '1mb', > | 'connector.write.buffer-flush.max-rows' = '1', > | 'connector.write.buffer-flush.interval' = '0s' > |) > |""".stripMargin) > > val statementSet = streamTableEnv.createStatementSet() > val insertHbase = > """ > |insert into hbase_table > |SELECT > | CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid, > | ROW(sex, age, created_time ) as cf > |FROM (select uid,sex,age, cast(created_time as VARCHAR) as created_time from kafka_table) > | > |""".stripMargin > > statementSet.addInsertSql(insertHbase) > > val insertHive = > """ > | > |INSERT INTO odsCatalog.ods.hive_table > |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'), DATE_FORMAT(created_time, 'HH') > |FROM kafka_table > | > |""".stripMargin > statementSet.addInsertSql(insertHive) > > > statementSet.execute() > > > 是因为参数'connector.write.buffer-flush.max-size' = '1mb'吗?我尝试设置‘0’,‘10b','1kb',都失败了,报错如下: > Property 'connector.write.buffer-flush.max-size' must be a memory size (in bytes) value but was: 1kb > Property 'connector.write.buffer-flush.max-size' must be a memory size (in bytes) value but was: 10b > Property 'connector.write.buffer-flush.max-size' must be a memory size (in bytes) value but was: 1 > > > > > > > 并且,按照官网文档 > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/hbase.html > > > 设置参数也不识别,报错: > Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'hbase-2.1.0' that implements 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath. > > > 看了一下源码, > org.apache.flink.table.descriptors.HBaseValidator > public static final String CONNECTOR_TYPE_VALUE_HBASE = "hbase"; > public static final String CONNECTOR_VERSION_VALUE_143 = "2.1.0"; > public static final String CONNECTOR_TABLE_NAME = "connector.table-name"; > public static final String CONNECTOR_ZK_QUORUM = "connector.zookeeper.quorum"; > public static final String CONNECTOR_ZK_NODE_PARENT = "connector.zookeeper.znode.parent"; > public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_SIZE = "connector.write.buffer-flush.max-size"; > public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS = "connector.write.buffer-flush.max-rows"; > public static final String CONNECTOR_WRITE_BUFFER_FLUSH_INTERVAL = "connector.write.buffer-flush.interval"; > 参数还是老参数 ... [show rest of quote] |
Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
97 posts
|
Hi, Leonard 我设置了 'connector.write.buffer-flush.interval' = ‘1s',然后重启运行程序, 再消息发送刚开始,比如说发送了4条,hive和hbase接收的消息都是4条,再消息发送48条的时候,我停止了producer, 再去查结果hbase是19条,hive是48条,如果说每1s钟flink查一下sink hbase buffer是不是到1mb,到了就sink,没到就不sink,但是这解释不了,为啥刚开始,hbase和hive接收到到数据是同步的,奇怪 在 2020-07-13 21:50:54,"Leonard Xu" <[hidden email]> 写道: >Hi, Zhou > > >> 'connector.write.buffer-flush.max-size' = '1mb', >> 'connector.write.buffer-flush.interval' = ‘0s' > >(1) connector.write.buffer-flush.max-size这个配置项支持的单位只有mb,其他不支持,所以会报对应的错。这个参数用于 BufferredMutator 做buffer优化的参数,表示buffer存多大的size就触发写,flush.interval参数是按照多长的时间轮询写入,两个参数根据需要配合使用。当connector.write.buffer-flush.interval 设置为 0s 时,表示不会轮询,所以只会等connector.write.buffer-flush.max-size到最大size再写入。你把connector.write.buffer-flush.interval 设置成 1s 应该就能看到数据了。 > >(2) Hbase connector 1.11.0 之前的版本只支持1.4.3,所以你填2.1.0会报错,在1.11.0开始支持为1.4.x, 所以1.11.0新的connector里支持的参数为’connector’ = ‘hbase-1.4’, 因为hbase 1.4.x版本API是兼容的,另外社区也在讨论支持HBase 2.x[1] > > >Best, >Leonard Xu >[1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Upgrade-HBase-connector-to-2-2-x-tc42657.html#a42674 <http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Upgrade-HBase-connector-to-2-2-x-tc42657.html#a42674> > > >> 在 2020年7月13日,21:09,Zhou Zach <[hidden email]> 写道: >> >> >> >> flink订阅kafka消息,同时sink到hbase和hive中, >> 当向kafka发送42条记录,然后停止producer发消息,去hive中查可以精准地查到42条,但是在hbase中却只查到30条 >> >> >> query: >> streamTableEnv.executeSql( >> """ >> | >> |CREATE TABLE hbase_table ( >> | rowkey VARCHAR, >> | cf ROW(sex VARCHAR, age INT, created_time VARCHAR) >> |) WITH ( >> | 'connector.type' = 'hbase', >> | 'connector.version' = '2.1.0', >> | 'connector.table-name' = 'ods:user_hbase6', >> | 'connector.zookeeper.quorum' = 'cdh1:2181,cdh2:2181,cdh3:2181', >> | 'connector.zookeeper.znode.parent' = '/hbase', >> | 'connector.write.buffer-flush.max-size' = '1mb', >> | 'connector.write.buffer-flush.max-rows' = '1', >> | 'connector.write.buffer-flush.interval' = '0s' >> |) >> |""".stripMargin) >> >> val statementSet = streamTableEnv.createStatementSet() >> val insertHbase = >> """ >> |insert into hbase_table >> |SELECT >> | CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid, >> | ROW(sex, age, created_time ) as cf >> |FROM (select uid,sex,age, cast(created_time as VARCHAR) as created_time from kafka_table) >> | >> |""".stripMargin >> >> statementSet.addInsertSql(insertHbase) >> >> val insertHive = >> """ >> | >> |INSERT INTO odsCatalog.ods.hive_table >> |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'), DATE_FORMAT(created_time, 'HH') >> |FROM kafka_table >> | >> |""".stripMargin >> statementSet.addInsertSql(insertHive) >> >> >> statementSet.execute() >> >> >> 是因为参数'connector.write.buffer-flush.max-size' = '1mb'吗?我尝试设置‘0’,‘10b','1kb',都失败了,报错如下: >> Property 'connector.write.buffer-flush.max-size' must be a memory size (in bytes) value but was: 1kb >> Property 'connector.write.buffer-flush.max-size' must be a memory size (in bytes) value but was: 10b >> Property 'connector.write.buffer-flush.max-size' must be a memory size (in bytes) value but was: 1 >> >> >> >> >> >> >> 并且,按照官网文档 >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/hbase.html >> >> >> 设置参数也不识别,报错: >> Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'hbase-2.1.0' that implements 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath. >> >> >> 看了一下源码, >> org.apache.flink.table.descriptors.HBaseValidator >> public static final String CONNECTOR_TYPE_VALUE_HBASE = "hbase"; >> public static final String CONNECTOR_VERSION_VALUE_143 = "2.1.0"; >> public static final String CONNECTOR_TABLE_NAME = "connector.table-name"; >> public static final String CONNECTOR_ZK_QUORUM = "connector.zookeeper.quorum"; >> public static final String CONNECTOR_ZK_NODE_PARENT = "connector.zookeeper.znode.parent"; >> public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_SIZE = "connector.write.buffer-flush.max-size"; >> public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS = "connector.write.buffer-flush.max-rows"; >> public static final String CONNECTOR_WRITE_BUFFER_FLUSH_INTERVAL = "connector.write.buffer-flush.interval"; >> 参数还是老参数 > ... [show rest of quote]
|
Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
339 posts
|
Hi,
> 在 2020年7月14日,09:52,Zhou Zach <[hidden email]> 写道: > >>> | CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid, 看下这个抽取出来的rowkey是否有重复的呢? 祝好, Leonard Xu |
Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
97 posts
|
Hi Leonard, 原来是有重复key,hbase做了upsert,请问Hive Streaming Writing是不是目前只支持append模式,不支持upsert模式 在 2020-07-14 09:56:00,"Leonard Xu" <[hidden email]> 写道: >Hi, > >> 在 2020年7月14日,09:52,Zhou Zach <[hidden email]> 写道: >> >>>> | CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid, > >看下这个抽取出来的rowkey是否有重复的呢? > >祝好, >Leonard Xu |
Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
50 posts
|
你好,
本质还是StreamingFileSink,所以目前只能append ------------------------------------------------------------------ 发件人:Zhou Zach <[hidden email]> 发送时间:2020年7月14日(星期二) 10:56 收件人:user-zh <[hidden email]> 主 题:Re:Re: flink 同时sink hbase和hive,hbase少记录 Hi Leonard, 原来是有重复key,hbase做了upsert,请问Hive Streaming Writing是不是目前只支持append模式,不支持upsert模式 在 2020-07-14 09:56:00,"Leonard Xu" <[hidden email]> 写道: >Hi, > >> 在 2020年7月14日,09:52,Zhou Zach <[hidden email]> 写道: >> >>>> | CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid, > >看下这个抽取出来的rowkey是否有重复的呢? > >祝好, >Leonard Xu |
Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
97 posts
|
Hi,
感谢社区热心答疑! 在 2020-07-14 11:00:18,"夏帅" <[hidden email]> 写道: >你好, >本质还是StreamingFileSink,所以目前只能append > > >------------------------------------------------------------------ >发件人:Zhou Zach <[hidden email]> >发送时间:2020年7月14日(星期二) 10:56 >收件人:user-zh <[hidden email]> >主 题:Re:Re: flink 同时sink hbase和hive,hbase少记录 > > > > >Hi Leonard, >原来是有重复key,hbase做了upsert,请问Hive Streaming Writing是不是目前只支持append模式,不支持upsert模式 > > > > > > > > > > > > > > >在 2020-07-14 09:56:00,"Leonard Xu" <[hidden email]> 写道: >>Hi, >> >>> 在 2020年7月14日,09:52,Zhou Zach <[hidden email]> 写道: >>> >>>>> | CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid, >> >>看下这个抽取出来的rowkey是否有重复的呢? >> >>祝好, >>Leonard Xu ... [show rest of quote]
|
Free forum by Nabble | Edit this page |