How to control the banlence by using FlinkKafkaConsumer.setStartFromTimestamp(timestamp)

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

How to control the banlence by using FlinkKafkaConsumer.setStartFromTimestamp(timestamp)

Jary Zhen
Hello everyone,

   First,a brief pipeline introduction:
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
      consume multi kafka topic
      -> union them
      -> assignTimestampsAndWatermarks
      -> keyby
      -> window()  and so on …
It's a very normal way use flink to process data like this in production
environment.
But,  If I want to test the pipeline above I need to use the api of
FlinkKafkaConsumer.setStartFromTimestamp(timestamp) to comsume 'past’ data.
So my question is how to control the ’step‘ banlence as one topic produces
3 records per second while another topic produces 30000 per second.

I don’t know if I describe clearly . so any suspicion please let me know

Tks
Reply | Threaded
Open this post in threaded view
|

Re: How to control the banlence by using FlinkKafkaConsumer.setStartFromTimestamp(timestamp)

Jary Zhen
Hi, dinesh , thanks for your reply.

  For example, there are two topics, topic A produces 1 record per second
and topic B produces 3600 records per second. If I set kafka consume config
like this:
     max.poll.records: “3600"
     max.poll.interval.ms: "1000”) ,
which means I can get the whole records by every second from these two
topics in real time.
But , if  I want to consume the data from last day or earlier days by using
FlinkKafkaConsumer.setStartFromTimestamp(timestamp). I will get 3600
records within one second from *topic A* which is produce *in an hour* in
production environment, at the same time, I will get 3600 records within
one second from* topic B* which is produce *in an second. *So By using
*EventTime* semanteme , the watermark assigned from topic A  wil aways let
the data from topic B as ‘late data’ in window operator.  What I wanted is
that 1 records from A and 3600 records from B by using FlinkKafkaConsumer.
setStartFromTimestamp(timestamp) so that I can simulate consume data as in
real production environment.


Best







On Sat, 23 May 2020 at 23:42, C DINESH <[hidden email]> wrote:

> Hi Jary,
>
> What you mean by step banlence . Could you please provide a concrete
> example
>
> On Fri, May 22, 2020 at 3:46 PM Jary Zhen <[hidden email]> wrote:
>
>> Hello everyone,
>>
>>    First,a brief pipeline introduction:
>>       env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>       consume multi kafka topic
>>       -> union them
>>       -> assignTimestampsAndWatermarks
>>       -> keyby
>>       -> window()  and so on …
>> It's a very normal way use flink to process data like this in production
>> environment.
>> But,  If I want to test the pipeline above I need to use the api of
>> FlinkKafkaConsumer.setStartFromTimestamp(timestamp) to comsume 'past’ data.
>> So my question is how to control the ’step‘ banlence as one topic
>> produces 3 records per second while another topic produces 30000 per second.
>>
>> I don’t know if I describe clearly . so any suspicion please let me know
>>
>> Tks
>>
>>