flink-sql消费基于on kafka的flink table,每次select这个flink table相当于是不同group id了吗?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

flink-sql消费基于on kafka的flink table,每次select这个flink table相当于是不同group id了吗?

凌天荣
现有一张表基于kafka的flink table,我同时起两个任务对同一张表(这张表)进行select,得到的是分别每个任务得到一样的数据,也就是说每次select这个表,每次group id都不同吗?
Reply | Threaded
Open this post in threaded view
|

Re: flink-sql消费基于on kafka的flink table,每次select这个flink table相当于是不同group id了吗?

Leonard Xu
Hi

可以看下贴下你Kafka table的option 和 作业的 checkpoint配置吗?
可以确定的是,用的都是同一个group id,。
如果你没有配置 checkpoint,  Flink Kafka consumer 的 enable.auto.commit 默认设置为 false,就不会提交对应group 的offset, 此时你两个作业只是用 group id 确定一个起始消费offset,得到的数据就是一致的。
你可以看看[1][2]里面对这个机制的解释。

Best
Leonard

[1] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/connectors/kafka.html#%E9%85%8D%E7%BD%AE-kafka-consumer-%E5%BC%80%E5%A7%8B%E6%B6%88%E8%B4%B9%E7%9A%84%E4%BD%8D%E7%BD%AE
[2] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/connectors/kafka.html#kafka-consumer-%E6%8F%90%E4%BA%A4-offset-%E7%9A%84%E8%A1%8C%E4%B8%BA%E9%85%8D%E7%BD%AE

> 在 2020年9月9日,16:24,凌天荣 <[hidden email]> 写道:
>
> 现有一张表基于kafka的flink table,我同时起两个任务对同一张表(这张表)进行select,得到的是分别每个任务得到一样的数据,也就是说每次select这个表,每次group id都不同吗?

Reply | Threaded
Open this post in threaded view
|

回复: flink-sql消费基于on kafka的flink table,每次select这个flink table相当于是不同group id了吗?

凌天荣
CREATE TABLE ODS_PaymentOrdert (
    orderId INT, 
    memberId INT,
orderAmount DECIMAL(10, 2),
paymentStatus SMALLINT,
orderDate VARCHAR,
payDate VARCHAR,
paymentIP VARCHAR,
orderSrc VARCHAR,
channelType SMALLINT,
productId SMALLINT,
amount SMALLINT,
unit VARCHAR,
paymentChannel SMALLINT,
serviceOrderType SMALLINT,
refundAmount DECIMAL(10, 2),
proctime as PROCTIME(),
primary key(orderId) NOT ENFORCED
) WITH (
  'connector' = 'kafka',
  'properties.group.id' = 'flink-sql',
  'properties.bootstrap.servers' = 'xx.xx.xx.xxx:9092',
  'topic' = 'ODS_PaymentOrdert',  
  'scan.startup.mode' = 'latest-offset',
  'format' = 'canal-json');   
 
这是kafka table option,


这是设置checkpoint了

------------------ 原始邮件 ------------------
发件人: "user-zh" <[hidden email]>;
发送时间: 2020年9月9日(星期三) 晚上9:46
收件人: "user-zh"<[hidden email]>;
主题: Re: flink-sql消费基于on kafka的flink table,每次select这个flink table相当于是不同group id了吗?

Hi

可以看下贴下你Kafka table的option 和 作业的 checkpoint配置吗?
可以确定的是,用的都是同一个group id,。
如果你没有配置 checkpoint,  Flink Kafka consumer 的 enable.auto.commit 默认设置为 false,就不会提交对应group 的offset, 此时你两个作业只是用 group id 确定一个起始消费offset,得到的数据就是一致的。
你可以看看[1][2]里面对这个机制的解释。

Best
Leonard

[1] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/connectors/kafka.html#%E9%85%8D%E7%BD%AE-kafka-consumer-%E5%BC%80%E5%A7%8B%E6%B6%88%E8%B4%B9%E7%9A%84%E4%BD%8D%E7%BD%AE
[2] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/connectors/kafka.html#kafka-consumer-%E6%8F%90%E4%BA%A4-offset-%E7%9A%84%E8%A1%8C%E4%B8%BA%E9%85%8D%E7%BD%AE

> 在 2020年9月9日,16:24,凌天荣 <[hidden email]> 写道:
>
> 现有一张表基于kafka的flink table,我同时起两个任务对同一张表(这张表)进行select,得到的是分别每个任务得到一样的数据,也就是说每次select这个表,每次group id都不同吗?


Reply | Threaded
Open this post in threaded view
|

Re: flink-sql消费基于on kafka的flink table,每次select这个flink table相当于是不同group id了吗?

nobleyd
这个问题不在于是否同一个group id,kafka消费分很多模式的。flink默认应该是assign topic
partition的方式,这种方式不受group的影响的。

凌天荣 <[hidden email]> 于2020年9月10日周四 上午10:08写道:

> CREATE TABLE ODS_PaymentOrdert (
>     orderId INT,
>     memberId INT,
> orderAmount DECIMAL(10, 2),
> paymentStatus SMALLINT,
> orderDate VARCHAR,
> payDate VARCHAR,
> paymentIP VARCHAR,
> orderSrc VARCHAR,
> channelType SMALLINT,
> productId SMALLINT,
> amount SMALLINT,
> unit VARCHAR,
> paymentChannel SMALLINT,
> serviceOrderType SMALLINT,
> refundAmount DECIMAL(10, 2),
> proctime as PROCTIME(),
> primary key(orderId) NOT ENFORCED
> ) WITH (
>   'connector' = 'kafka',
>   'properties.group.id' = 'flink-sql',
>   'properties.bootstrap.servers' = 'xx.xx.xx.xxx:9092',
>   'topic' = 'ODS_PaymentOrdert',
>   'scan.startup.mode' = 'latest-offset',
>   'format' = 'canal-json');
>
> 这是kafka table option,
>
>
> 这是设置checkpoint了
>
> ------------------ 原始邮件 ------------------
> *发件人:* "user-zh" <[hidden email]>;
> *发送时间:* 2020年9月9日(星期三) 晚上9:46
> *收件人:* "user-zh"<[hidden email]>;
> *主题:* Re: flink-sql消费基于on kafka的flink table,每次select这个flink
> table相当于是不同group id了吗?
>
> Hi
>
> 可以看下贴下你Kafka table的option 和 作业的 checkpoint配置吗?
> 可以确定的是,用的都是同一个group id,。
> 如果你没有配置 checkpoint,  Flink Kafka consumer 的 enable.auto.commit 默认设置为
> false,就不会提交对应group 的offset, 此时你两个作业只是用 group id 确定一个起始消费offset,得到的数据就是一致的。
> 你可以看看[1][2]里面对这个机制的解释。
>
> Best
> Leonard
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/connectors/kafka.html#%E9%85%8D%E7%BD%AE-kafka-consumer-%E5%BC%80%E5%A7%8B%E6%B6%88%E8%B4%B9%E7%9A%84%E4%BD%8D%E7%BD%AE
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/connectors/kafka.html#kafka-consumer-%E6%8F%90%E4%BA%A4-offset-%E7%9A%84%E8%A1%8C%E4%B8%BA%E9%85%8D%E7%BD%AE
>
> > 在 2020年9月9日,16:24,凌天荣 <[hidden email]> 写道:
> >
> > 现有一张表基于kafka的flink
> table,我同时起两个任务对同一张表(这张表)进行select,得到的是分别每个任务得到一样的数据,也就是说每次select这个表,每次group
> id都不同吗?
>
>
>