Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
45 posts
|
hi, 我对FlinkKafkaConsumer的实现有点迷惑, 这有两个相同代码的程序:
//--------------------------- val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment Env.setRestartStrategy(RestartStrategies.noRestart()) val consumerProps = new Properties() consumerProps.put("bootstrap.servers", brokers) consumerProps.put("group.id", "test1234") val consumer = new FlinkKafkaConsumer[String](topic,new KafkaStringSchema,consumerProps).setStartFromLatest() Env.addSource(consumer).print() Env.execute()//-----------------------------------我同时启动这两个程序,他们连接相同的集群的topic,group.id也一样,然后我向topic发送一些数据,发现这两个程序都能消费到发送的所有分区的消息,kafka 的consumer group组内应该是有消费隔离的,为什么这里两个程序都能同时消费到全部数据呢?而用KafkaConsumer写两个相同的程序去消费这个topic就可以看到两边程序是没有重复消费同一分区的。我用的是flink1.11flink-connector-kafka_2.11 谢谢 |
Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
14 posts
|
Hi op,
在 Flink 消费 Kafka 的过程中, 由 FlinkKafkaConsumer 会从 Kafka 中拿到当前 topic 的所有 partition 信息并分配给个并发消费,这里的 group id 只是用于将当前 partition 的消费 offset commit 到 Kafka,并用这个消费组标识。而使用 KafkaConsumer 消费数据则应用到了 Kafka 的消费组管理, 这是 Kafka 服务端的一个角色。 另外,当启动两个作业用同一个 topic 和 group id 消费 kafka, 如果两个作业会分别以同一个 group id commit offset 到kafka, 如果以 group offset 消费模式启动作业, 则会以最后一次 commit 的 offset 开始消费。 > 在 2020年9月3日,下午3:03,op <[hidden email]> 写道: > > hi, 我对FlinkKafkaConsumer的实现有点迷惑, 这有两个相同代码的程序: > //--------------------------- > val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment > Env.setRestartStrategy(RestartStrategies.noRestart()) > val consumerProps = new Properties() > consumerProps.put("bootstrap.servers", brokers) > consumerProps.put("group.id", "test1234") > > val consumer = new FlinkKafkaConsumer[String](topic,new KafkaStringSchema,consumerProps).setStartFromLatest() > Env.addSource(consumer).print() > Env.execute()//-----------------------------------我同时启动这两个程序,他们连接相同的集群的topic,group.id也一样,然后我向topic发送一些数据,发现这两个程序都能消费到发送的所有分区的消息,kafka 的consumer group组内应该是有消费隔离的,为什么这里两个程序都能同时消费到全部数据呢?而用KafkaConsumer写两个相同的程序去消费这个topic就可以看到两边程序是没有重复消费同一分区的。我用的是flink1.11flink-connector-kafka_2.11 谢谢 ... [show rest of quote] |
Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
45 posts
|
谢谢,但我还是不太明白,FlinkKafkaConsumer底层不是用的KafkaConsumer吗,为什么flink不用kafka的消费组管理呢?
------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2020年9月3日(星期四) 晚上6:09 收件人: "user-zh"<[hidden email]>; 主题: Re: FlinkKafkaConsumer问题 Hi op, 在 Flink 消费 Kafka 的过程中, 由 FlinkKafkaConsumer 会从 Kafka 中拿到当前 topic 的所有 partition 信息并分配给个并发消费,这里的 group id 只是用于将当前 partition 的消费 offset commit 到 Kafka,并用这个消费组标识。而使用 KafkaConsumer 消费数据则应用到了 Kafka 的消费组管理, 这是 Kafka 服务端的一个角色。 另外,当启动两个作业用同一个 topic 和 group id 消费 kafka, 如果两个作业会分别以同一个 group id commit offset 到kafka, 如果以 group offset 消费模式启动作业, 则会以最后一次 commit 的 offset 开始消费。 > 在 2020年9月3日,下午3:03,op <[hidden email]> 写道: > > &nbsp; &nbsp; hi,&nbsp; &nbsp; 我对FlinkKafkaConsumer的实现有点迷惑,&nbsp; &nbsp; 这有两个相同代码的程序: > //--------------------------- > val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment > Env.setRestartStrategy(RestartStrategies.noRestart()) > val consumerProps = new Properties() > consumerProps.put("bootstrap.servers", brokers) > consumerProps.put("group.id", "test1234") > > val consumer = new FlinkKafkaConsumer[String](topic,new KafkaStringSchema,consumerProps).setStartFromLatest() > Env.addSource(consumer).print() > Env.execute()//-----------------------------------我同时启动这两个程序,他们连接相同的集群的topic,group.id也一样,然后我向topic发送一些数据,发现这两个程序都能消费到发送的所有分区的消息,kafka 的consumer group组内应该是有消费隔离的,为什么这里两个程序都能同时消费到全部数据呢?而用KafkaConsumer写两个相同的程序去消费这个topic就可以看到两边程序是没有重复消费同一分区的。我用的是flink1.11flink-connector-kafka_2.11 谢谢 |
Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
32 posts
|
是flink对kafka的消费,是自己管理offset,用low-level api去寻址,而不是用group.id来管理offset这种high-level
api。是这个意思吧。 op <[hidden email]> 于2020年9月4日周五 上午10:25写道: > > 谢谢,但我还是不太明白,FlinkKafkaConsumer底层不是用的KafkaConsumer吗,为什么flink不用kafka的消费组管理呢? > > > ------------------ 原始邮件 ------------------ > 发件人: > "user-zh" > < > [hidden email]>; > 发送时间: 2020年9月3日(星期四) 晚上6:09 > 收件人: "user-zh"<[hidden email]>; > > 主题: Re: FlinkKafkaConsumer问题 > > > > Hi op, > > 在 Flink 消费 Kafka 的过程中, 由 FlinkKafkaConsumer 会从 Kafka 中拿到当前 topic 的所有 > partition 信息并分配给个并发消费,这里的 group id 只是用于将当前 partition 的消费 offset commit 到 > Kafka,并用这个消费组标识。而使用 KafkaConsumer 消费数据则应用到了 Kafka 的消费组管理, 这是 Kafka 服务端的一个角色。 > > 另外,当启动两个作业用同一个 topic 和 group id 消费 kafka, 如果两个作业会分别以同一个 group id commit > offset 到kafka, 如果以 group offset 消费模式启动作业, 则会以最后一次 commit 的 offset 开始消费。 > > > 在 2020年9月3日,下午3:03,op <[hidden email]> 写道: > > > > &nbsp; &nbsp; hi,&nbsp; &nbsp; > 我对FlinkKafkaConsumer的实现有点迷惑,&nbsp; &nbsp; 这有两个相同代码的程序: > > //--------------------------- > > val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment > > Env.setRestartStrategy(RestartStrategies.noRestart()) > > val consumerProps = new Properties() > > consumerProps.put("bootstrap.servers", brokers) > > consumerProps.put("group.id", "test1234") > > > > val consumer = new FlinkKafkaConsumer[String](topic,new > KafkaStringSchema,consumerProps).setStartFromLatest() > > Env.addSource(consumer).print() > > > Env.execute()//-----------------------------------我同时启动这两个程序,他们连接相同的集群的topic, > group.id也一样,然后我向topic发送一些数据,发现这两个程序都能消费到发送的所有分区的消息,kafka 的consumer > group组内应该是有消费隔离的,为什么这里两个程序都能同时消费到全部数据呢?而用KafkaConsumer写两个相同的程序去消费这个topic就可以看到两边程序是没有重复消费同一分区的。我用的是flink1.11flink-connector-kafka_2.11 > 谢谢 ... [show rest of quote]
|
Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
14 posts
|
In reply to this post by op
Hi,
为了保证 Flink 程序的 exactly-once,必须由各个 Kafka source 算子维护当前算子所消费的 partition 消费 offset 信息,并在每次checkpoint 时将这些信息写入到 state 中, 在从 checkpoint 恢复中从上次 commit 的位点开始消费,保证 exactly-once. 如果用 Kafka 消费组管理,那么 FlinkKafkaConsumer 内各个并发实例所分配的 partition 将由 Kafka 的消费组管理,且 offset 也由 Kafka 消费组管理者记录,Flink 无法维护这些信息。 > 在 2020年9月4日,上午10:25,op <[hidden email]> 写道: > > 谢谢,但我还是不太明白,FlinkKafkaConsumer底层不是用的KafkaConsumer吗,为什么flink不用kafka的消费组管理呢? > > > ------------------ 原始邮件 ------------------ > 发件人: "user-zh" <[hidden email]>; > 发送时间: 2020年9月3日(星期四) 晚上6:09 > 收件人: "user-zh"<[hidden email]>; > > 主题: Re: FlinkKafkaConsumer问题 > > > > Hi op, > > 在 Flink 消费 Kafka 的过程中, 由 FlinkKafkaConsumer 会从 Kafka 中拿到当前 topic 的所有 partition 信息并分配给个并发消费,这里的 group id 只是用于将当前 partition 的消费 offset commit 到 Kafka,并用这个消费组标识。而使用 KafkaConsumer 消费数据则应用到了 Kafka 的消费组管理, 这是 Kafka 服务端的一个角色。 > > 另外,当启动两个作业用同一个 topic 和 group id 消费 kafka, 如果两个作业会分别以同一个 group id commit offset 到kafka, 如果以 group offset 消费模式启动作业, 则会以最后一次 commit 的 offset 开始消费。 > > > 在 2020年9月3日,下午3:03,op <[hidden email]> 写道: > > > > &nbsp; &nbsp; hi,&nbsp; &nbsp; 我对FlinkKafkaConsumer的实现有点迷惑,&nbsp; &nbsp; 这有两个相同代码的程序: > > //--------------------------- > > val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment > > Env.setRestartStrategy(RestartStrategies.noRestart()) > > val consumerProps = new Properties() > > consumerProps.put("bootstrap.servers", brokers) > > consumerProps.put("group.id", "test1234") > > > > val consumer = new FlinkKafkaConsumer[String](topic,new KafkaStringSchema,consumerProps).setStartFromLatest() > > Env.addSource(consumer).print() > > Env.execute()//-----------------------------------我同时启动这两个程序,他们连接相同的集群的topic,group.id也一样,然后我向topic发送一些数据,发现这两个程序都能消费到发送的所有分区的消息,kafka 的consumer group组内应该是有消费隔离的,为什么这里两个程序都能同时消费到全部数据呢?而用KafkaConsumer写两个相同的程序去消费这个topic就可以看到两边程序是没有重复消费同一分区的。我用的是flink1.11flink-connector-kafka_2.11 谢谢 ... [show rest of quote] |
Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
29 posts
|
为了保证exactly-once,flink自己通过barrier来实现checkpoint,包括barrier的传递等等,所以flink在kafkaconsumer的基础之上,封装了一层语义保障。
在 2020/9/4 10:34, Shuiqiang Chen 写道: > Hi, > 为了保证 Flink 程序的 exactly-once,必须由各个 Kafka source 算子维护当前算子所消费的 partition 消费 offset 信息,并在每次checkpoint 时将这些信息写入到 state 中, 在从 checkpoint 恢复中从上次 commit 的位点开始消费,保证 exactly-once. 如果用 Kafka 消费组管理,那么 FlinkKafkaConsumer 内各个并发实例所分配的 partition 将由 Kafka 的消费组管理,且 offset 也由 Kafka 消费组管理者记录,Flink 无法维护这些信息。 > >> 在 2020年9月4日,上午10:25,op <[hidden email]> 写道: >> >> 谢谢,但我还是不太明白,FlinkKafkaConsumer底层不是用的KafkaConsumer吗,为什么flink不用kafka的消费组管理呢? >> >> >> ------------------ 原始邮件 ------------------ >> 发件人: "user-zh" <[hidden email]>; >> 发送时间: 2020年9月3日(星期四) 晚上6:09 >> 收件人: "user-zh"<[hidden email]>; >> >> 主题: Re: FlinkKafkaConsumer问题 >> >> >> >> Hi op, >> >> 在 Flink 消费 Kafka 的过程中, 由 FlinkKafkaConsumer 会从 Kafka 中拿到当前 topic 的所有 partition 信息并分配给个并发消费,这里的 group id 只是用于将当前 partition 的消费 offset commit 到 Kafka,并用这个消费组标识。而使用 KafkaConsumer 消费数据则应用到了 Kafka 的消费组管理, 这是 Kafka 服务端的一个角色。 >> >> 另外,当启动两个作业用同一个 topic 和 group id 消费 kafka, 如果两个作业会分别以同一个 group id commit offset 到kafka, 如果以 group offset 消费模式启动作业, 则会以最后一次 commit 的 offset 开始消费。 >> >> > 在 2020年9月3日,下午3:03,op <[hidden email]> 写道: >> > >> > &nbsp; &nbsp; hi,&nbsp; &nbsp; 我对FlinkKafkaConsumer的实现有点迷惑,&nbsp; &nbsp; 这有两个相同代码的程序: >> > //--------------------------- >> > val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment >> > Env.setRestartStrategy(RestartStrategies.noRestart()) >> > val consumerProps = new Properties() >> > consumerProps.put("bootstrap.servers", brokers) >> > consumerProps.put("group.id", "test1234") >> > >> > val consumer = new FlinkKafkaConsumer[String](topic,new KafkaStringSchema,consumerProps).setStartFromLatest() >> > Env.addSource(consumer).print() >> > Env.execute()//-----------------------------------我同时启动这两个程序,他们连接相同的集群的topic,group.id也一样,然后我向topic发送一些数据,发现这两个程序都能消费到发送的所有分区的消息,kafka 的consumer group组内应该是有消费隔离的,为什么这里两个程序都能同时消费到全部数据呢?而用KafkaConsumer写两个相同的程序去消费这个topic就可以看到两边程序是没有重复消费同一分区的。我用的是flink1.11flink-connector-kafka_2.11 谢谢 ... [show rest of quote] |
Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
45 posts
|
大概懂了 感谢
------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2020年9月4日(星期五) 中午11:54 收件人: "user-zh"<[hidden email]>;"Shuiqiang Chen"<[hidden email]>; 主题: Re: FlinkKafkaConsumer问题 为了保证exactly-once,flink自己通过barrier来实现checkpoint,包括barrier的传递等等,所以flink在kafkaconsumer的基础之上,封装了一层语义保障。 在 2020/9/4 10:34, Shuiqiang Chen 写道: > Hi, > 为了保证 Flink 程序的 exactly-once,必须由各个 Kafka source 算子维护当前算子所消费的 partition 消费 offset 信息,并在每次checkpoint 时将这些信息写入到 state 中, 在从 checkpoint 恢复中从上次 commit 的位点开始消费,保证 exactly-once. 如果用 Kafka 消费组管理,那么 FlinkKafkaConsumer 内各个并发实例所分配的 partition 将由 Kafka 的消费组管理,且 offset 也由 Kafka 消费组管理者记录,Flink 无法维护这些信息。 > >> 在 2020年9月4日,上午10:25,op <[hidden email]> 写道: >> >> 谢谢,但我还是不太明白,FlinkKafkaConsumer底层不是用的KafkaConsumer吗,为什么flink不用kafka的消费组管理呢?&nbsp; >> >> >> ------------------&nbsp;原始邮件&nbsp;------------------ >> 发件人: "user-zh" <[hidden email]&gt;; >> 发送时间:&nbsp;2020年9月3日(星期四) 晚上6:09 >> 收件人:&nbsp;"user-zh"<[hidden email]&gt;; >> >> 主题:&nbsp;Re: FlinkKafkaConsumer问题 >> >> >> >> Hi op, >> >> 在 Flink 消费 Kafka 的过程中, 由 FlinkKafkaConsumer 会从 Kafka 中拿到当前 topic 的所有 partition 信息并分配给个并发消费,这里的 group id 只是用于将当前 partition 的消费 offset commit 到 Kafka,并用这个消费组标识。而使用 KafkaConsumer 消费数据则应用到了 Kafka 的消费组管理, 这是 Kafka 服务端的一个角色。 >> >> 另外,当启动两个作业用同一个 topic 和 group id 消费 kafka, 如果两个作业会分别以同一个 group id commit offset 到kafka, 如果以 group offset 消费模式启动作业, 则会以最后一次 commit 的 offset 开始消费。 >> >> &gt; 在 2020年9月3日,下午3:03,op <[hidden email]&gt; 写道: >> &gt; >> &gt; &amp;nbsp; &amp;nbsp; hi,&amp;nbsp; &amp;nbsp; 我对FlinkKafkaConsumer的实现有点迷惑,&amp;nbsp; &amp;nbsp; 这有两个相同代码的程序: >> &gt; //--------------------------- >> &gt; val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment >> &gt; Env.setRestartStrategy(RestartStrategies.noRestart()) >> &gt; val consumerProps = new Properties() >> &gt; consumerProps.put("bootstrap.servers", brokers) >> &gt; consumerProps.put("group.id", "test1234") >> &gt; >> &gt; val consumer = new FlinkKafkaConsumer[String](topic,new KafkaStringSchema,consumerProps).setStartFromLatest() >> &gt; Env.addSource(consumer).print() >> &gt; Env.execute()//-----------------------------------我同时启动这两个程序,他们连接相同的集群的topic,group.id也一样,然后我向topic发送一些数据,发现这两个程序都能消费到发送的所有分区的消息,kafka 的consumer group组内应该是有消费隔离的,为什么这里两个程序都能同时消费到全部数据呢?而用KafkaConsumer写两个相同的程序去消费这个topic就可以看到两边程序是没有重复消费同一分区的。我用的是flink1.11flink-connector-kafka_2.11 谢谢 |
Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
52 posts
|
hi,这种现象是在开checkpoint才出现的吗,还是没有开启也会?
Best | | a511955993 | | 邮箱:[hidden email] | 签名由 网易邮箱大师 定制 在2020年09月04日 14:11,op 写道: 大概懂了 感谢 ------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2020年9月4日(星期五) 中午11:54 收件人: "user-zh"<[hidden email]>;"Shuiqiang Chen"<[hidden email]>; 主题: Re: FlinkKafkaConsumer问题 为了保证exactly-once,flink自己通过barrier来实现checkpoint,包括barrier的传递等等,所以flink在kafkaconsumer的基础之上,封装了一层语义保障。 在 2020/9/4 10:34, Shuiqiang Chen 写道: > Hi, > 为了保证 Flink 程序的 exactly-once,必须由各个 Kafka source 算子维护当前算子所消费的 partition 消费 offset 信息,并在每次checkpoint 时将这些信息写入到 state 中, 在从 checkpoint 恢复中从上次 commit 的位点开始消费,保证 exactly-once. 如果用 Kafka 消费组管理,那么 FlinkKafkaConsumer 内各个并发实例所分配的 partition 将由 Kafka 的消费组管理,且 offset 也由 Kafka 消费组管理者记录,Flink 无法维护这些信息。 > >> 在 2020年9月4日,上午10:25,op <[hidden email]> 写道: >> >> 谢谢,但我还是不太明白,FlinkKafkaConsumer底层不是用的KafkaConsumer吗,为什么flink不用kafka的消费组管理呢?&nbsp; >> >> >> ------------------&nbsp;原始邮件&nbsp;------------------ >> 发件人: "user-zh" <[hidden email]&gt;; >> 发送时间:&nbsp;2020年9月3日(星期四) 晚上6:09 >> 收件人:&nbsp;"user-zh"<[hidden email]&gt;; >> >> 主题:&nbsp;Re: FlinkKafkaConsumer问题 >> >> >> >> Hi op, >> >> 在 Flink 消费 Kafka 的过程中, 由 FlinkKafkaConsumer 会从 Kafka 中拿到当前 topic 的所有 partition 信息并分配给个并发消费,这里的 group id 只是用于将当前 partition 的消费 offset commit 到 Kafka,并用这个消费组标识。而使用 KafkaConsumer 消费数据则应用到了 Kafka 的消费组管理, 这是 Kafka 服务端的一个角色。 >> >> 另外,当启动两个作业用同一个 topic 和 group id 消费 kafka, 如果两个作业会分别以同一个 group id commit offset 到kafka, 如果以 group offset 消费模式启动作业, 则会以最后一次 commit 的 offset 开始消费。 >> >> &gt; 在 2020年9月3日,下午3:03,op <[hidden email]&gt; 写道: >> &gt; >> &gt; &amp;nbsp; &amp;nbsp; hi,&amp;nbsp; &amp;nbsp; 我对FlinkKafkaConsumer的实现有点迷惑,&amp;nbsp; &amp;nbsp; 这有两个相同代码的程序: >> &gt; //--------------------------- >> &gt; val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment >> &gt; Env.setRestartStrategy(RestartStrategies.noRestart()) >> &gt; val consumerProps = new Properties() >> &gt; consumerProps.put("bootstrap.servers", brokers) >> &gt; consumerProps.put("group.id", "test1234") >> &gt; >> &gt; val consumer = new FlinkKafkaConsumer[String](topic,new KafkaStringSchema,consumerProps).setStartFromLatest() >> &gt; Env.addSource(consumer).print() >> &gt; Env.execute()//-----------------------------------我同时启动这两个程序,他们连接相同的集群的topic,group.id也一样,然后我向topic发送一些数据,发现这两个程序都能消费到发送的所有分区的消息,kafka 的consumer group组内应该是有消费隔离的,为什么这里两个程序都能同时消费到全部数据呢?而用KafkaConsumer写两个相同的程序去消费这个topic就可以看到两边程序是没有重复消费同一分区的。我用的是flink1.11flink-connector-kafka_2.11 谢谢 |
Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
388 posts
|
这个问题和flink关系不大。Kafka本身就是这么个特点,指定group,如果是订阅方式,会是你想象的那样,分享消息。但,如果是通过assign方式指定了消费哪个分区,则不受到group中消费者共享消息的限制。
SmileSmile <[hidden email]> 于2020年9月5日周六 下午4:51写道: > hi,这种现象是在开checkpoint才出现的吗,还是没有开启也会? > > Best > > > > > | | > a511955993 > | > | > 邮箱:[hidden email] > | > > 签名由 网易邮箱大师 定制 > > 在2020年09月04日 14:11,op 写道: > 大概懂了 感谢 > > > > > ------------------ 原始邮件 ------------------ > 发件人: > "user-zh" > < > [hidden email]>; > 发送时间: 2020年9月4日(星期五) 中午11:54 > 收件人: "user-zh"<[hidden email]>;"Shuiqiang Chen"< > [hidden email]>; > > 主题: Re: FlinkKafkaConsumer问题 > > > > > 为了保证exactly-once,flink自己通过barrier来实现checkpoint,包括barrier的传递等等,所以flink在kafkaconsumer的基础之上,封装了一层语义保障。 > > 在 2020/9/4 10:34, Shuiqiang Chen 写道: > > > Hi, > > 为了保证 Flink 程序的 exactly-once,必须由各个 Kafka source 算子维护当前算子所消费的 > partition 消费 offset 信息,并在每次checkpoint 时将这些信息写入到 state 中, 在从 checkpoint > 恢复中从上次 commit 的位点开始消费,保证 exactly-once. 如果用 Kafka 消费组管理,那么 > FlinkKafkaConsumer 内各个并发实例所分配的 partition 将由 Kafka 的消费组管理,且 offset 也由 Kafka > 消费组管理者记录,Flink 无法维护这些信息。 > > > >> 在 2020年9月4日,上午10:25,op <[hidden email]> 写道: > >> > >> > 谢谢,但我还是不太明白,FlinkKafkaConsumer底层不是用的KafkaConsumer吗,为什么flink不用kafka的消费组管理呢?&nbsp; > >> > >> > >> ------------------&nbsp;原始邮件&nbsp;------------------ > >> > 发件人: > "user-zh" > <[hidden email]&gt;; > >> 发送时间:&nbsp;2020年9月3日(星期四) 晚上6:09 > >> 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > >> > >> 主题:&nbsp;Re: FlinkKafkaConsumer问题 > >> > >> > >> > >> Hi op, > >> > >> 在 Flink 消费 Kafka 的过程中, 由 FlinkKafkaConsumer 会从 Kafka 中拿到当前 topic > 的所有 partition 信息并分配给个并发消费,这里的 group id 只是用于将当前 partition 的消费 offset commit > 到 Kafka,并用这个消费组标识。而使用 KafkaConsumer 消费数据则应用到了 Kafka 的消费组管理, 这是 Kafka > 服务端的一个角色。 > >> > >> 另外,当启动两个作业用同一个 topic 和 group id 消费 kafka, 如果两个作业会分别以同一个 group id > commit offset 到kafka, 如果以 group offset 消费模式启动作业, 则会以最后一次 commit 的 offset > 开始消费。 > >> > >> &gt; 在 2020年9月3日,下午3:03,op <[hidden email]&gt; 写道: > >> &gt; > >> &gt; &amp;nbsp; &amp;nbsp; hi,&amp;nbsp; > &amp;nbsp; 我对FlinkKafkaConsumer的实现有点迷惑,&amp;nbsp; &amp;nbsp; > 这有两个相同代码的程序: > >> &gt; //--------------------------- > >> &gt; val bsEnv = > StreamExecutionEnvironment.getExecutionEnvironment > >> &gt; Env.setRestartStrategy(RestartStrategies.noRestart()) > >> &gt; val consumerProps = new Properties() > >> &gt; consumerProps.put("bootstrap.servers", brokers) > >> &gt; consumerProps.put("group.id", "test1234") > >> &gt; > >> &gt; val consumer = new FlinkKafkaConsumer[String](topic,new > KafkaStringSchema,consumerProps).setStartFromLatest() > >> &gt; Env.addSource(consumer).print() > >> &gt; > Env.execute()//-----------------------------------我同时启动这两个程序,他们连接相同的集群的topic, > group.id也一样,然后我向topic发送一些数据,发现这两个程序都能消费到发送的所有分区的消息,kafka 的consumer > group组内应该是有消费隔离的,为什么这里两个程序都能同时消费到全部数据呢?而用KafkaConsumer写两个相同的程序去消费这个topic就可以看到两边程序是没有重复消费同一分区的。我用的是flink1.11flink-connector-kafka_2.11 > 谢谢 ... [show rest of quote]
|
Free forum by Nabble | Edit this page |