你好:
我在使用flink 1.11.2版本的时候使用flinksql处理两条流。因为两条流都是数据库变更信息,我需要取最新的数据关联;所以分别对两条流做row_number=1 (SELECT [column_list] FROM ( SELECT [column_list], ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]] ORDER BY time_attr [asc|desc]) AS rownum FROM table_name) WHERE rownum = 1) 去重后再左关联; 前期当左流变更都没有问题,结果符合预期;当右流有数据时,第一条数据也符合预期,但是右流在发送一条数据出现变更时,出现了一条不符合预期的数据; left> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,,2020-12-14 15:59:50,4f28c1211e274bba819cc63680a3b386,null,null,null,null) right> (true,3774bca649224249bdbcb8e7c80b52f9,1,0,8,1607932790000) left> (false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,,2020-12-14 15:59:50,4f28c1211e274bba819cc63680a3b386,null,null,null,null) left> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,,2020-12-14 15:59:50,4f28c1211e274bba819cc63680a3b386,1,0,8,1607932790000) left> (false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,,2020-12-14 15:59:50,4f28c1211e274bba819cc63680a3b386,1,0,8,1607932790000) left> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,1,0,8,1607932790000) right> (false,3774bca649224249bdbcb8e7c80b52f9,1,0,8,1607932790000) right> (true,3774bca649224249bdbcb8e7c80b52f9,1,0,1,1607933006000) left> (false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,1,0,8,1607932790000) left> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,null,null,null,null) left> (false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,null,null,null,null) left> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,1,0,1,1607933006000) 第1行左流来了数据显示true,此时右流没有数据结果是null; 第2行右流来了数据,显示为true(单独打印了右流的结果); 第3行显示左流撤回; 第4行 左右流数据关联上,正常显示; 第5行 左流数据变更,数据撤回; 第6行 显示变更后的数据; 第7行 右流数据变化,数据撤回; 第8行 显示右流最新的结果; 第9行 因为右流数据变化 所以左流(关联数据)撤回; 第10行 和第11 行 不符合预期; 正常应该是 右流发生变化 第9行 因为右流数据变化 所以左流(关联数据)撤回;然后右流的最新数据和左流产生结果;显示第12行数据才对; 所以想请教一下大家; 1607998361520> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,null,null,null,null) 1607998361520> (false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,null,null,null,null) 我的sql语句如下 String sql = "SELECT a.sheetId sheetId,sheetCode,sheetStatus,sheetType,sheetScene,sheetObject," + " sheetPresentation,sheetAcceptor,sheetHandler,updateTime,dealTaskId,provided,satisfied,score,operateTime " + " from (SELECT sheetId,sheetCode,sheetStatus,sheetType,sheetScene,sheetObject," + " sheetPresentation,sheetAcceptor,sheetHandler,updateTime,dealTaskId" + " FROM (SELECT *," + " ROW_NUMBER() OVER (PARTITION BY sheetId ORDER BY operateTime desc) AS rownum " + " FROM sheetMain)" + " WHERE rownum = 1 ) a" + " left JOIN " + " (select sheetId,provided,satisfied,score,operateTime from (SELECT *," + " ROW_NUMBER() OVER (PARTITION BY sheetId ORDER BY operateTime desc) AS rownum " + " FROM sheetAnswers)" + " WHERE rownum = 1 ) c" + " ON a.sheetId = c.sheetId " ; [hidden email] |
---------- 转发的邮件 ----------
发件人:hdxg1101300123 <[hidden email]> 日期:2020年12月15日 10:36 主题:两条流去重后再关联出现不符合预期数据 收件人:user-zh <[hidden email]> 抄送: > 你好: > 我在使用flink 1.11.2版本的时候使用flinksql处理两条流。因为两条流都是数据库变更信息,我需要取最新的数据关联;所以分别对两条流做row_number=1 > (SELECT [column_list] FROM ( > SELECT [column_list], > ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]] > ORDER BY time_attr [asc|desc]) AS rownum > FROM table_name) > WHERE rownum = 1) > 去重后再左关联; 前期当左流变更都没有问题,结果符合预期;当右流有数据时,第一条数据也符合预期,但是右流在发送一条数据出现变更时,出现了一条不符合预期的数据; > > left> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,,2020-12-14 15:59:50,4f28c1211e274bba819cc63680a3b386,null,null,null,null) > right> (true,3774bca649224249bdbcb8e7c80b52f9,1,0,8,1607932790000) > left> (false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,,2020-12-14 15:59:50,4f28c1211e274bba819cc63680a3b386,null,null,null,null) > left> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,,2020-12-14 15:59:50,4f28c1211e274bba819cc63680a3b386,1,0,8,1607932790000) > left> (false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,,2020-12-14 15:59:50,4f28c1211e274bba819cc63680a3b386,1,0,8,1607932790000) > left> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,1,0,8,1607932790000) > right> (false,3774bca649224249bdbcb8e7c80b52f9,1,0,8,1607932790000) > right> (true,3774bca649224249bdbcb8e7c80b52f9,1,0,1,1607933006000) > left> (false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,1,0,8,1607932790000) > left> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,null,null,null,null) > left> (false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,null,null,null,null) > left> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,1,0,1,1607933006000) > > 第1行左流来了数据显示true,此时右流没有数据结果是null; > 第2行右流来了数据,显示为true(单独打印了右流的结果); > 第3行显示左流撤回; > 第4行 左右流数据关联上,正常显示; > 第5行 左流数据变更,数据撤回; > 第6行 显示变更后的数据; > 第7行 右流数据变化,数据撤回; > 第8行 显示右流最新的结果; > 第9行 因为右流数据变化 所以左流(关联数据)撤回; > 第10行 和第11 行 不符合预期; > 正常应该是 右流发生变化 第9行 因为右流数据变化 所以左流(关联数据)撤回;然后右流的最新数据和左流产生结果;显示第12行数据才对; > 所以想请教一下大家; > > 1607998361520> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,null,null,null,null) > 1607998361520> (false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,null,null,null,null) > > 我的sql语句如下 > String sql = "SELECT a.sheetId sheetId,sheetCode,sheetStatus,sheetType,sheetScene,sheetObject," + > " sheetPresentation,sheetAcceptor,sheetHandler,updateTime,dealTaskId,provided,satisfied,score,operateTime " + > " from (SELECT sheetId,sheetCode,sheetStatus,sheetType,sheetScene,sheetObject," + > " sheetPresentation,sheetAcceptor,sheetHandler,updateTime,dealTaskId" + > " FROM (SELECT *," + > " ROW_NUMBER() OVER (PARTITION BY sheetId ORDER BY operateTime desc) AS rownum " + > " FROM sheetMain)" + > " WHERE rownum = 1 ) a" + > " left JOIN " + > " (select sheetId,provided,satisfied,score,operateTime from (SELECT *," + > " ROW_NUMBER() OVER (PARTITION BY sheetId ORDER BY operateTime desc) AS rownum " + > " FROM sheetAnswers)" + > " WHERE rownum = 1 ) c" + > " ON a.sheetId = c.sheetId " ; > > > > [hidden email] |
我是这样想的,因为最后的两条流关联是 两条结果流的关联,两条结果流 都属于回撤流,任何一边变化都是2条消息;对于左侧第一条就是回撤,第二条就是变化后的;但是右边发生变化 则会有两条数据,false消息 和左边关联 认为变化整个流表示变化回撤再显示关联后的数据;true数据来了再次关联 认为整个流变化;撤回再关联发出;
我的想法是可不可以 之和右边流为true的数据关联; [hidden email] 发件人: hdxg1101300123 发送时间: 2020-12-15 23:44 收件人: user-zh 主题: 转发: 两条流去重后再关联出现不符合预期数据 ---------- 转发的邮件 ---------- 发件人:hdxg1101300123 <[hidden email]> 日期:2020年12月15日 10:36 主题:两条流去重后再关联出现不符合预期数据 收件人:user-zh <[hidden email]> 抄送: > 你好: > 我在使用flink 1.11.2版本的时候使用flinksql处理两条流。因为两条流都是数据库变更信息,我需要取最新的数据关联;所以分别对两条流做row_number=1 > (SELECT [column_list] FROM ( > SELECT [column_list], > ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]] > ORDER BY time_attr [asc|desc]) AS rownum > FROM table_name) > WHERE rownum = 1) > 去重后再左关联; 前期当左流变更都没有问题,结果符合预期;当右流有数据时,第一条数据也符合预期,但是右流在发送一条数据出现变更时,出现了一条不符合预期的数据; > > left> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,,2020-12-14 15:59:50,4f28c1211e274bba819cc63680a3b386,null,null,null,null) > right> (true,3774bca649224249bdbcb8e7c80b52f9,1,0,8,1607932790000) > left> (false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,,2020-12-14 15:59:50,4f28c1211e274bba819cc63680a3b386,null,null,null,null) > left> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,,2020-12-14 15:59:50,4f28c1211e274bba819cc63680a3b386,1,0,8,1607932790000) > left> (false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,,2020-12-14 15:59:50,4f28c1211e274bba819cc63680a3b386,1,0,8,1607932790000) > left> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,1,0,8,1607932790000) > right> (false,3774bca649224249bdbcb8e7c80b52f9,1,0,8,1607932790000) > right> (true,3774bca649224249bdbcb8e7c80b52f9,1,0,1,1607933006000) > left> (false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,1,0,8,1607932790000) > left> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,null,null,null,null) > left> (false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,null,null,null,null) > left> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,1,0,1,1607933006000) > > 第1行左流来了数据显示true,此时右流没有数据结果是null; > 第2行右流来了数据,显示为true(单独打印了右流的结果); > 第3行显示左流撤回; > 第4行 左右流数据关联上,正常显示; > 第5行 左流数据变更,数据撤回; > 第6行 显示变更后的数据; > 第7行 右流数据变化,数据撤回; > 第8行 显示右流最新的结果; > 第9行 因为右流数据变化 所以左流(关联数据)撤回; > 第10行 和第11 行 不符合预期; > 正常应该是 右流发生变化 第9行 因为右流数据变化 所以左流(关联数据)撤回;然后右流的最新数据和左流产生结果;显示第12行数据才对; > 所以想请教一下大家; > > 1607998361520> (true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,null,null,null,null) > 1607998361520> (false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,null,null,null,null) > > 我的sql语句如下 > String sql = "SELECT a.sheetId sheetId,sheetCode,sheetStatus,sheetType,sheetScene,sheetObject," + > " sheetPresentation,sheetAcceptor,sheetHandler,updateTime,dealTaskId,provided,satisfied,score,operateTime " + > " from (SELECT sheetId,sheetCode,sheetStatus,sheetType,sheetScene,sheetObject," + > " sheetPresentation,sheetAcceptor,sheetHandler,updateTime,dealTaskId" + > " FROM (SELECT *," + > " ROW_NUMBER() OVER (PARTITION BY sheetId ORDER BY operateTime desc) AS rownum " + > " FROM sheetMain)" + > " WHERE rownum = 1 ) a" + > " left JOIN " + > " (select sheetId,provided,satisfied,score,operateTime from (SELECT *," + > " ROW_NUMBER() OVER (PARTITION BY sheetId ORDER BY operateTime desc) AS rownum " + > " FROM sheetAnswers)" + > " WHERE rownum = 1 ) c" + > " ON a.sheetId = c.sheetId " ; > > > > [hidden email] |
Free forum by Nabble | Edit this page |