疑问:flink sql 不同job消费同一个kafka表(指定了groupId)时输出相同数据?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

疑问:flink sql 不同job消费同一个kafka表(指定了groupId)时输出相同数据?

junbaozhang
Hi,all
        使用flink版本1.10.0,在hive catalog下建了映射kafka的表:
        CREATE TABLE x.log.yanfa_log (
    dt TIMESTAMP(3),
    conn_id STRING,
    sequence STRING,
    trace_id STRING,
    span_info STRING,
    service_id STRING,
    msg_id STRING,
    servicename STRING,
    ret_code STRING,
    duration STRING,
    req_body MAP<String,String>,
    res_body MAP<STRING,STRING>,
    extra_info MAP<STRING,STRING>,
    WATERMARK FOR dt AS dt - INTERVAL '60' SECOND
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = '0.11',
    'connector.topic' = 'x-log-yanfa_log',
    'connector.properties.bootstrap.servers' = '******:9092',
    'connector.properties.zookeeper.connect' = '******:2181',
    'connector.properties.group.id' = 'testGroup',
    'connector.startup-mode' = 'group-offsets',
    'update-mode' = 'append',
    'format.type' = 'json',
    'format.fail-on-missing-field' = 'true'
);
消费表x.log.yanfa_log程序如下:
Catalog myCatalog = new HiveCatalog("x", "default",
                "D:\\conf", "1.1.0");
tEnv.registerCatalog("x", myCatalog);
Table rs = tEnv.sqlQuery("select * from x.log.yanfa_log");
tEnv.toAppendStream(rs, Row.class).print();

        然后针对同一个程序启动了2个job,结果都输出了相同的结果。我的疑问是kafka topic的同一个partition不是只能被group下至多一个consumer消费吗?为什么2个job会输出相同结果呢?
junbaozhang
Reply | Threaded
Open this post in threaded view
|

Re: 疑问:flink sql 不同job消费同一个kafka表(指定了groupId)时输出相同数据?

Benchao Li
Hi,

Flink的Kafka Connector的实现是用的Kafka lower
api,也就是会自己去获取当前的partition信息,自己来分配那些subtask读取那个partition。
所以如果有两个任务,他们互相之间是没有关系的,也不会相互感知到。(只有一点,就是如果你配置了相同的group
id,他们提交offset可能会互相覆盖。)
你说的那个模式是Kafka high-level api。

[hidden email] <[hidden email]> 于2020年5月22日周五 下午4:21写道:

> Hi,all
>         使用flink版本1.10.0,在hive catalog下建了映射kafka的表:
>         CREATE TABLE x.log.yanfa_log (
>     dt TIMESTAMP(3),
>     conn_id STRING,
>     sequence STRING,
>     trace_id STRING,
>     span_info STRING,
>     service_id STRING,
>     msg_id STRING,
>     servicename STRING,
>     ret_code STRING,
>     duration STRING,
>     req_body MAP<String,String>,
>     res_body MAP<STRING,STRING>,
>     extra_info MAP<STRING,STRING>,
>     WATERMARK FOR dt AS dt - INTERVAL '60' SECOND
> ) WITH (
>     'connector.type' = 'kafka',
>     'connector.version' = '0.11',
>     'connector.topic' = 'x-log-yanfa_log',
>     'connector.properties.bootstrap.servers' = '******:9092',
>     'connector.properties.zookeeper.connect' = '******:2181',
>     'connector.properties.group.id' = 'testGroup',
>     'connector.startup-mode' = 'group-offsets',
>     'update-mode' = 'append',
>     'format.type' = 'json',
>     'format.fail-on-missing-field' = 'true'
> );
> 消费表x.log.yanfa_log程序如下:
> Catalog myCatalog = new HiveCatalog("x", "default",
>                 "D:\\conf", "1.1.0");
> tEnv.registerCatalog("x", myCatalog);
> Table rs = tEnv.sqlQuery("select * from x.log.yanfa_log");
> tEnv.toAppendStream(rs, Row.class).print();
>
>         然后针对同一个程序启动了2个job,结果都输出了相同的结果。我的疑问是kafka
> topic的同一个partition不是只能被group下至多一个consumer消费吗?为什么2个job会输出相同结果呢?
>


--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]