|
Hi David,
Thanks a lot for the explanation!
Eleanore Eleanore,
Yes, if you change the implementation in the way that is suggested by the slide, the tests will fail. But it's more interesting to observe the behavior in the console.
The notes that go with that slide explain the situation in more detail. (Use alt-p or option-p to see the notes). But to recap here, there are two related effects:
(1) Instead of producing a single result at the end of the window, this alternative implementation produces a result for every event. In other words, it produces a stream that eventually arrives at the same maximum value produced by the timeWindowAll.
(2) With timeWindowAll, once the results for a given hour have been produced, Flink frees the state associated with the window for that hour. It knows, based on the watermarking, that no more events are expected, so the state is no longer needed and can be cleared. But with maxBy, the state for each key (each hour) is kept forever. This is why this is not a good approach: the keyspace is unbounded, and we can't intervene to clean up stale state.
Regards, David
Hi experts,
The logic is to 0. keyBy driverId 1. create 1 hour window based on eventTime 2. sum up all the tips for this driver within this 1 hour window 3. create an 1 hour globalWindow for all drivers 4. find the max tips
sample code shown as below. SingleOutputStreamOperator<Tuple3<Long, Long, Float>> aggregatedTipsPerDriver = fares.keyBy(rides -> rides.driverId) .window(TumblingEventTimeWindows.of(Time.hours(1))) .process(new SumTipsFunction());
// Tuple3: reporting the timestamp for the end of the hour, the driverId, and the total of that driver's tips for that hour SingleOutputStreamOperator<Tuple3<Long, Long, Float>> hourlyMax = aggregatedTipsPerDriver.windowAll(TumblingEventTimeWindows.of(Time.hours(1))) .maxBy(2);
The question is shown as 4th slide: why we cannot keyed by the hour? If I change the implementation to keyBy hour and run the HourlyTipsTest, the test of testMaxAcrossDrivers will fail: // (946688400000,1,6.0) -> for timestamp window: 946688400000, driverId: 1, earns most tip: 6.0 Expected :[(946688400000,1,6.0), (946692000000,2,20.0)] Actual :[(946688400000,1,6.0), (946692000000,2,20.0), (946692000000,2,20.0)]
Thanks a lot! Eleanore
|