FlinkKafka Consumer can't dynamic discover the partition update

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

FlinkKafka Consumer can't dynamic discover the partition update

张云云
When start the job, occurs WARN log like below:

WARN  org.apache.kafka.clients.consumer.ConsumerConfig  - The
configuration 'flink.partition-discovery.interval-millis' was supplied
but isn't a known config.




And I try to change the kafka partion with command, partition number from 3
to 4

./kafka-topics.sh --alter --zookeeper 10.0.10.21:15311 --topic
STRUCTED_LOG --partitions 4

it dosen't work.



How can I do with this problem. Thanks a lot
Reply | Threaded
Open this post in threaded view
|

Re: FlinkKafka Consumer can't dynamic discover the partition update

LiangbinZhang
Hi,张云云
    1. flink.partition-discovery.interval-millis
是kafka的一个配置参数,不知道你是不是通过kafkaProp设置的
    2. 通过shell查看topic分区是否顺利增加,并且有数据写入。

Best,
Robin


张云云 wrote

> When start the job, occurs WARN log like below:
>
> WARN  org.apache.kafka.clients.consumer.ConsumerConfig  - The
> configuration 'flink.partition-discovery.interval-millis' was supplied
> but isn't a known config.
>
>
>
>
> And I try to change the kafka partion with command, partition number from
> 3
> to 4
>
> ./kafka-topics.sh --alter --zookeeper 10.0.10.21:15311 --topic
> STRUCTED_LOG --partitions 4
>
> it dosen't work.
>
>
>
> How can I do with this problem. Thanks a lot





--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: FlinkKafka Consumer can't dynamic discover the partition update

张云云
感谢回复!

我是这样配置的

Properties properties = new Properties();
properties.setProperty("bootstrap.servers",
    ConfigureManager.getString(CmConfigConstants.SOURCE_KAFKA_SERVERS,
null, kafkaName));
properties.setProperty("group.id",
    ConfigureManager.getString(CmConfigConstants.SOURCE_KAFKA_GROUPID,
null, kafkaName));
properties.setProperty("flink.partition-discovery.interval-millis", "5000");

String[] topics = ConfigureManager
    .getArrayString(CmConfigConstants.SOURCE_KAFKA_TOPICS, ",", null,
kafkaName);
Preconditions.checkNotNull(topics, "topics can't be null");
FlinkKafkaConsumer011<String> kafkaConsumer = new FlinkKafkaConsumer011<>(
    Arrays.asList(topics),
    new SimpleStringSchema(), properties);


增加分区之后没有效果,程序日志中没有任何检测信息

数据是有写入的,不过我还没确认那个分区有没有写入,后面我看下


On Thu, Feb 18, 2021 at 2:38 PM Robin Zhang <[hidden email]>
wrote:

> Hi,张云云
>     1. flink.partition-discovery.interval-millis
> 是kafka的一个配置参数,不知道你是不是通过kafkaProp设置的
>     2. 通过shell查看topic分区是否顺利增加,并且有数据写入。
>
> Best,
> Robin
>
>
> 张云云 wrote
> > When start the job, occurs WARN log like below:
> >
> > WARN  org.apache.kafka.clients.consumer.ConsumerConfig  - The
> > configuration 'flink.partition-discovery.interval-millis' was supplied
> > but isn't a known config.
> >
> >
> >
> >
> > And I try to change the kafka partion with command, partition number from
> > 3
> > to 4
> >
> > ./kafka-topics.sh --alter --zookeeper 10.0.10.21:15311 --topic
> > STRUCTED_LOG --partitions 4
> >
> > it dosen't work.
> >
> >
> >
> > How can I do with this problem. Thanks a lot
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>