环境: flink 1.10,使用flinkSQL
kafka输入数据如: {"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0} {"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0} {"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0} {"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0} {"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0} {"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0} sql如下: INSERT INTO topic_sink SELECT t, id, speed, LAG(speed, 1) OVER w AS speed_1, LAG(speed, 2) OVER w AS speed_2 FROM topic_source WINDOW w AS ( PARTITION BY id ORDER BY t ) 我期望得到的结果数据是 {"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0, "speed_1":null, "speed_2":null} {"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0,"speed_1":1.0, "speed_2":null} {"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0,"speed_1":2.0, "speed_2":1.0} {"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0,"speed_1":3.0, "speed_2":2.0} {"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":4.0, "speed_2":3.0} {"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":5.0, "speed_2":4.0} 实际得到的结果数据是: {"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0, "speed_1":1.0, "speed_2":1.0} {"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0,"speed_1":2.0, "speed_2":2.0} {"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0,"speed_1":3.0, "speed_2":3.0} {"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0,"speed_1":4.0, "speed_2":4.0} {"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":5.0, "speed_2":5.0} {"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":6.0, "speed_2":6.0} 想问一下flink sql里的LAG函数能完成我期望的计算吗?如果可以sql该如何写? -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Hi Robin,
目前LAG/LEAD函数在流式场景下的实现的确是有bug的,那个实现只能在批式场景下work, 是线上其实没有考虑流式的场景。所以你看到的结果应该是它只能返回当前数据。 这个问题我也是最近才发现的,刚刚建了一个issue[1] 来跟踪这个问题。 当前如果你想实现类似功能,可以先自己写一个udaf来做。 [1] https://issues.apache.org/jira/browse/FLINK-19449 Robin Zhang <[hidden email]> 于2020年9月29日周二 下午2:04写道: > 环境: flink 1.10,使用flinkSQL > > kafka输入数据如: > {"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0} > {"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0} > {"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0} > {"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0} > {"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0} > {"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0} > > sql如下: > > INSERT INTO topic_sink > SELECT > t, > id, > speed, > LAG(speed, 1) OVER w AS speed_1, > LAG(speed, 2) OVER w AS speed_2 > FROM topic_source > WINDOW w AS ( > PARTITION BY id > ORDER BY t > ) > 我期望得到的结果数据是 > {"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0, "speed_1":null, > "speed_2":null} > {"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0,"speed_1":1.0, > "speed_2":null} > {"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0,"speed_1":2.0, > "speed_2":1.0} > {"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0,"speed_1":3.0, > "speed_2":2.0} > {"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":4.0, > "speed_2":3.0} > {"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":5.0, > "speed_2":4.0} > > 实际得到的结果数据是: > {"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0, "speed_1":1.0, > "speed_2":1.0} > {"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0,"speed_1":2.0, > "speed_2":2.0} > {"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0,"speed_1":3.0, > "speed_2":3.0} > {"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0,"speed_1":4.0, > "speed_2":4.0} > {"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":5.0, > "speed_2":5.0} > {"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":6.0, > "speed_2":6.0} > > 想问一下flink sql里的LAG函数能完成我期望的计算吗?如果可以sql该如何写? > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best, Benchao Li |
Hi Benchao,
感谢回复,解决了我最近的疑惑。 Best, Robin Benchao Li-2 wrote > Hi Robin, > > 目前LAG/LEAD函数在流式场景下的实现的确是有bug的,那个实现只能在批式场景下work, > 是线上其实没有考虑流式的场景。所以你看到的结果应该是它只能返回当前数据。 > 这个问题我也是最近才发现的,刚刚建了一个issue[1] 来跟踪这个问题。 > 当前如果你想实现类似功能,可以先自己写一个udaf来做。 > > [1] https://issues.apache.org/jira/browse/FLINK-19449 > > Robin Zhang < > vincent2015qdlg@ > > 于2020年9月29日周二 下午2:04写道: > >> 环境: flink 1.10,使用flinkSQL >> >> kafka输入数据如: >> {"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0} >> {"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0} >> {"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0} >> {"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0} >> {"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0} >> {"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0} >> >> sql如下: >> >> INSERT INTO topic_sink >> SELECT >> t, >> id, >> speed, >> LAG(speed, 1) OVER w AS speed_1, >> LAG(speed, 2) OVER w AS speed_2 >> FROM topic_source >> WINDOW w AS ( >> PARTITION BY id >> ORDER BY t >> ) >> 我期望得到的结果数据是 >> {"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0, "speed_1":null, >> "speed_2":null} >> {"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0,"speed_1":1.0, >> "speed_2":null} >> {"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0,"speed_1":2.0, >> "speed_2":1.0} >> {"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0,"speed_1":3.0, >> "speed_2":2.0} >> {"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":4.0, >> "speed_2":3.0} >> {"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":5.0, >> "speed_2":4.0} >> >> 实际得到的结果数据是: >> {"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0, "speed_1":1.0, >> "speed_2":1.0} >> {"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0,"speed_1":2.0, >> "speed_2":2.0} >> {"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0,"speed_1":3.0, >> "speed_2":3.0} >> {"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0,"speed_1":4.0, >> "speed_2":4.0} >> {"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":5.0, >> "speed_2":5.0} >> {"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":6.0, >> "speed_2":6.0} >> >> 想问一下flink sql里的LAG函数能完成我期望的计算吗?如果可以sql该如何写? >> >> >> >> -- >> Sent from: http://apache-flink.147419.n8.nabble.com/ >> > > > -- > > Best, > Benchao Li -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |