Hi,all
使用flink版本1.10.0,在hive catalog下建了映射kafka的表: CREATE TABLE x.log.yanfa_log ( dt TIMESTAMP(3), conn_id STRING, sequence STRING, trace_id STRING, span_info STRING, service_id STRING, msg_id STRING, servicename STRING, ret_code STRING, duration STRING, req_body MAP<String,String>, res_body MAP<STRING,STRING>, extra_info MAP<STRING,STRING>, WATERMARK FOR dt AS dt - INTERVAL '60' SECOND ) WITH ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'x-log-yanfa_log', 'connector.properties.bootstrap.servers' = '******:9092', 'connector.properties.zookeeper.connect' = '******:2181', 'connector.properties.group.id' = 'testGroup', 'connector.startup-mode' = 'group-offsets', 'update-mode' = 'append', 'format.type' = 'json', 'format.fail-on-missing-field' = 'true' ); 消费表x.log.yanfa_log程序如下: Catalog myCatalog = new HiveCatalog("x", "default", "D:\\conf", "1.1.0"); tEnv.registerCatalog("x", myCatalog); Table rs = tEnv.sqlQuery("select * from x.log.yanfa_log"); tEnv.toAppendStream(rs, Row.class).print(); 然后针对同一个程序启动了2个job,结果都输出了相同的结果。我的疑问是kafka topic的同一个partition不是只能被group下至多一个consumer消费吗?为什么2个job会输出相同结果呢?
junbaozhang
|
Hi,
Flink的Kafka Connector的实现是用的Kafka lower api,也就是会自己去获取当前的partition信息,自己来分配那些subtask读取那个partition。 所以如果有两个任务,他们互相之间是没有关系的,也不会相互感知到。(只有一点,就是如果你配置了相同的group id,他们提交offset可能会互相覆盖。) 你说的那个模式是Kafka high-level api。 [hidden email] <[hidden email]> 于2020年5月22日周五 下午4:21写道: > Hi,all > 使用flink版本1.10.0,在hive catalog下建了映射kafka的表: > CREATE TABLE x.log.yanfa_log ( > dt TIMESTAMP(3), > conn_id STRING, > sequence STRING, > trace_id STRING, > span_info STRING, > service_id STRING, > msg_id STRING, > servicename STRING, > ret_code STRING, > duration STRING, > req_body MAP<String,String>, > res_body MAP<STRING,STRING>, > extra_info MAP<STRING,STRING>, > WATERMARK FOR dt AS dt - INTERVAL '60' SECOND > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = '0.11', > 'connector.topic' = 'x-log-yanfa_log', > 'connector.properties.bootstrap.servers' = '******:9092', > 'connector.properties.zookeeper.connect' = '******:2181', > 'connector.properties.group.id' = 'testGroup', > 'connector.startup-mode' = 'group-offsets', > 'update-mode' = 'append', > 'format.type' = 'json', > 'format.fail-on-missing-field' = 'true' > ); > 消费表x.log.yanfa_log程序如下: > Catalog myCatalog = new HiveCatalog("x", "default", > "D:\\conf", "1.1.0"); > tEnv.registerCatalog("x", myCatalog); > Table rs = tEnv.sqlQuery("select * from x.log.yanfa_log"); > tEnv.toAppendStream(rs, Row.class).print(); > > 然后针对同一个程序启动了2个job,结果都输出了相同的结果。我的疑问是kafka > topic的同一个partition不是只能被group下至多一个consumer消费吗?为什么2个job会输出相同结果呢? > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
Free forum by Nabble | Edit this page |