Hi, dinesh , thanks for your reply.
For example, there are two topics, topic A produces 1 record per second
and topic B produces 3600 records per second. If I set kafka consume config
like this:
max.poll.records: “3600"
max.poll.interval.ms: "1000”) ,
which means I can get the whole records by every second from these two
topics in real time.
But , if I want to consume the data from last day or earlier days by using
FlinkKafkaConsumer.setStartFromTimestamp(timestamp). I will get 3600
records within one second from *topic A* which is produce *in an hour* in
production environment, at the same time, I will get 3600 records within
one second from* topic B* which is produce *in an second. *So By using
*EventTime* semanteme , the watermark assigned from topic A wil aways let
the data from topic B as ‘late data’ in window operator. What I wanted is
that 1 records from A and 3600 records from B by using FlinkKafkaConsumer.
setStartFromTimestamp(timestamp) so that I can simulate consume data as in
real production environment.
Best
On Sat, 23 May 2020 at 23:42, C DINESH <
[hidden email]> wrote:
> Hi Jary,
>
> What you mean by step banlence . Could you please provide a concrete
> example
>
> On Fri, May 22, 2020 at 3:46 PM Jary Zhen <
[hidden email]> wrote:
>
>> Hello everyone,
>>
>> First,a brief pipeline introduction:
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> consume multi kafka topic
>> -> union them
>> -> assignTimestampsAndWatermarks
>> -> keyby
>> -> window() and so on …
>> It's a very normal way use flink to process data like this in production
>> environment.
>> But, If I want to test the pipeline above I need to use the api of
>> FlinkKafkaConsumer.setStartFromTimestamp(timestamp) to comsume 'past’ data.
>> So my question is how to control the ’step‘ banlence as one topic
>> produces 3 records per second while another topic produces 30000 per second.
>>
>> I don’t know if I describe clearly . so any suspicion please let me know
>>
>> Tks
>>
>>