关于flink中kafka状态及数据回溯问题

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

关于flink中kafka状态及数据回溯问题

1900
目前采用flink on yarn, flink版本是1.7.2,hadoop是2.8.5,kafka是1.0.0


问题一:
消费kafka数据,要保证flink管理kafka的offset,之前看到两种写法,如下


第一种
Properties props = new Properties();
props.put("auto.offset.reset", "latest");
Schema(), props));DataStream<Event> data = env.addSource(new FlinkKafkaConsumer<>("topic", new EventSchema(), props));


第二种
Properties props = new Properties();
Schema(), props).setStartFromLatest());DataStream<Event> data = env.addSource(new FlinkKafkaConsumer<>("topic", new EventSchema(), props).setStartFromLatest());


实际使用下,第一种在异常后,重启能根据offset还原重跑的;第二种是永远都是取最新的offset,请问实际使用中大家是不是都这样用的,采用第一种






问题二:


有些异常情况,想根据kafka中的数据,进行数据的回溯,假如我想把近一天的数据全部回溯,经过API查找,有方法setStartFromTimestamp,调用如下
DataStream<Event> data = env.addSource(new FlinkKafkaConsumer<>("topic", new EventSchema(), props).setStartFromTimestamp(1566797693000));



但实际情况下,并没有达到从那个时间戳开始回溯数据的效果,实际使用情况下,读取了kafka中所有的数据,请问这是用法错误了吗?还是有其他方式处理?