flink sql 通过group by 滑窗计算的结果sink到kafka后有重复数据,每条数据都有两条完全一样的数据。这个是什么原因导致的?
聚合计算的逻辑 Table tableoneHour = tableEnv.sqlQuery( "select appname" + ",productCode" + ",link" + ",count(case when nodeName = 'FailTerminateEndEvent' then 1 else null end) as errNum" + ",count(case when nodeName = 'EndEvent' and passStatus = 'Accept' then 1 else null end ) as passNum " + ",count(case when nodeName = 'EndEvent' and passStatus = 'Reject' then 1 else null end) as refNum " + ",count(case when nodeName = 'EndEvent' and passStatus <> 'Reject' and passStatus <> 'Accept' then 1 else null end) as processNum " + ",sum(case when nodeName = 'EndEvent' then loansum else 0 end ) as loansum" + ",count(1) as allNum " + ",'OneHour' as windowType " + ",HOP_END(rowtime, INTERVAL '1' HOUR, INTERVAL '1' HOUR) as inputtime " + "from table1_2 WHERE link in ('1','2','5') GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' HOUR) " + ",appname,productCode,link"); 将table转成dataStream //计算的多张表union到一起 Table tablesql = tableHalfHour.unionAll(tableoneHour).unionAll(tableoneDay); DataStream<Tuple2<Boolean, Row>> dataStream2 = tableEnv.toRetractStream(tablesql,Row.class); DataStream<Tuple2<Boolean, Row>> dataStream7Day = tableEnv.toRetractStream(table7Day,Row.class); //将Table转成dataStream DataStream<String> reslut1 = dataStream2.map(new MapFunction<Tuple2<Boolean, Row>, String>() { @Override public String map(Tuple2<Boolean, Row> tuple2) throws Exception { Map<String, Object> json = new HashMap<>(); json.put("appname",tuple2.f1.getField(0)); json.put("productCode", tuple2.f1.getField(1)); json.put("link",tuple2.f1.getField(2)); json.put("errNum",tuple2.f1.getField(3)); json.put("passNum",tuple2.f1.getField(4)); json.put("refNum",tuple2.f1.getField(5)); json.put("processNum",tuple2.f1.getField(6)); json.put("loansum",tuple2.f1.getField(7)); json.put("allNum",tuple2.f1.getField(8)); json.put("windowType",tuple2.f1.getField(9)); json.put("inputtime",tuple2.f1.getField(10)); return JSON.toJSONString(json); } }); 将结果sink到kafka中 reslut1.addSink(new FlinkKafkaProducer08<>("********",new SimpleStringSchema(),props1)); reslut2.addSink(new FlinkKafkaProducer08<>("********",new SimpleStringSchema(),props1)); sink到kafka的数据存在两条完全一样的数据 | | dpzhoufengdev | | [hidden email] | 签名由网易邮箱大师定制 |
Hi,
可以将 HOP_START 和 HOP_END 都输出来看下重复数据分别属于哪个 window 的。 其中 HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' HOUR) 用 TUMBLE(rowtime, INTERVAL '1' HOUR) 就可以了? Best, Hailong 在 2020-11-23 15:41:52,"dpzhoufengdev" <[hidden email]> 写道: >flink sql 通过group by 滑窗计算的结果sink到kafka后有重复数据,每条数据都有两条完全一样的数据。这个是什么原因导致的? >聚合计算的逻辑 > >Table tableoneHour = tableEnv.sqlQuery( > > "select appname" + > > ",productCode" + > > ",link" + > > ",count(case when nodeName = 'FailTerminateEndEvent' then 1 else null end) as errNum" + > > ",count(case when nodeName = 'EndEvent' and passStatus = 'Accept' then 1 else null end ) as passNum " + > > ",count(case when nodeName = 'EndEvent' and passStatus = 'Reject' then 1 else null end) as refNum " + > > ",count(case when nodeName = 'EndEvent' and passStatus <> 'Reject' and passStatus <> 'Accept' then 1 else null end) as processNum " + > > ",sum(case when nodeName = 'EndEvent' then loansum else 0 end ) as loansum" + > > ",count(1) as allNum " + > > ",'OneHour' as windowType " + > > ",HOP_END(rowtime, INTERVAL '1' HOUR, INTERVAL '1' HOUR) as inputtime " + > > "from table1_2 WHERE link in ('1','2','5') GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' HOUR) " + > > ",appname,productCode,link"); > > >将table转成dataStream > > >//计算的多张表union到一起 > Table tablesql = tableHalfHour.unionAll(tableoneHour).unionAll(tableoneDay); > DataStream<Tuple2<Boolean, Row>> dataStream2 = tableEnv.toRetractStream(tablesql,Row.class); > DataStream<Tuple2<Boolean, Row>> dataStream7Day = tableEnv.toRetractStream(table7Day,Row.class); > > > //将Table转成dataStream > DataStream<String> reslut1 = dataStream2.map(new MapFunction<Tuple2<Boolean, Row>, String>() { > @Override > public String map(Tuple2<Boolean, Row> tuple2) throws Exception { > Map<String, Object> json = new HashMap<>(); > json.put("appname",tuple2.f1.getField(0)); > json.put("productCode", tuple2.f1.getField(1)); > json.put("link",tuple2.f1.getField(2)); > json.put("errNum",tuple2.f1.getField(3)); > json.put("passNum",tuple2.f1.getField(4)); > json.put("refNum",tuple2.f1.getField(5)); > json.put("processNum",tuple2.f1.getField(6)); > json.put("loansum",tuple2.f1.getField(7)); > json.put("allNum",tuple2.f1.getField(8)); > json.put("windowType",tuple2.f1.getField(9)); > json.put("inputtime",tuple2.f1.getField(10)); > return JSON.toJSONString(json); > } > }); > > > > > > >将结果sink到kafka中 >reslut1.addSink(new FlinkKafkaProducer08<>("********",new SimpleStringSchema(),props1)); > reslut2.addSink(new FlinkKafkaProducer08<>("********",new SimpleStringSchema(),props1)); > > >sink到kafka的数据存在两条完全一样的数据 > > > > >| | >dpzhoufengdev >| >| >[hidden email] >| >签名由网易邮箱大师定制 |
好像跟窗口类型也没关系我有一个Table的计算是使用的 是TUMBLE窗口。现在一个主要的问题就是所有生成的数据都有两条完全一样的,不知道怎么回事?
| | dpzhoufengdev | | [hidden email] | 签名由网易邮箱大师定制 在2020年11月23日 20:00,hailongwang<[hidden email]> 写道: Hi, 可以将 HOP_START 和 HOP_END 都输出来看下重复数据分别属于哪个 window 的。 其中 HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' HOUR) 用 TUMBLE(rowtime, INTERVAL '1' HOUR) 就可以了? Best, Hailong 在 2020-11-23 15:41:52,"dpzhoufengdev" <[hidden email]> 写道: flink sql 通过group by 滑窗计算的结果sink到kafka后有重复数据,每条数据都有两条完全一样的数据。这个是什么原因导致的? 聚合计算的逻辑 Table tableoneHour = tableEnv.sqlQuery( "select appname" + ",productCode" + ",link" + ",count(case when nodeName = 'FailTerminateEndEvent' then 1 else null end) as errNum" + ",count(case when nodeName = 'EndEvent' and passStatus = 'Accept' then 1 else null end ) as passNum " + ",count(case when nodeName = 'EndEvent' and passStatus = 'Reject' then 1 else null end) as refNum " + ",count(case when nodeName = 'EndEvent' and passStatus <> 'Reject' and passStatus <> 'Accept' then 1 else null end) as processNum " + ",sum(case when nodeName = 'EndEvent' then loansum else 0 end ) as loansum" + ",count(1) as allNum " + ",'OneHour' as windowType " + ",HOP_END(rowtime, INTERVAL '1' HOUR, INTERVAL '1' HOUR) as inputtime " + "from table1_2 WHERE link in ('1','2','5') GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' HOUR) " + ",appname,productCode,link"); 将table转成dataStream //计算的多张表union到一起 Table tablesql = tableHalfHour.unionAll(tableoneHour).unionAll(tableoneDay); DataStream<Tuple2<Boolean, Row>> dataStream2 = tableEnv.toRetractStream(tablesql,Row.class); DataStream<Tuple2<Boolean, Row>> dataStream7Day = tableEnv.toRetractStream(table7Day,Row.class); //将Table转成dataStream DataStream<String> reslut1 = dataStream2.map(new MapFunction<Tuple2<Boolean, Row>, String>() { @Override public String map(Tuple2<Boolean, Row> tuple2) throws Exception { Map<String, Object> json = new HashMap<>(); json.put("appname",tuple2.f1.getField(0)); json.put("productCode", tuple2.f1.getField(1)); json.put("link",tuple2.f1.getField(2)); json.put("errNum",tuple2.f1.getField(3)); json.put("passNum",tuple2.f1.getField(4)); json.put("refNum",tuple2.f1.getField(5)); json.put("processNum",tuple2.f1.getField(6)); json.put("loansum",tuple2.f1.getField(7)); json.put("allNum",tuple2.f1.getField(8)); json.put("windowType",tuple2.f1.getField(9)); json.put("inputtime",tuple2.f1.getField(10)); return JSON.toJSONString(json); } }); 将结果sink到kafka中 reslut1.addSink(new FlinkKafkaProducer08<>("********",new SimpleStringSchema(),props1)); reslut2.addSink(new FlinkKafkaProducer08<>("********",new SimpleStringSchema(),props1)); sink到kafka的数据存在两条完全一样的数据 | | dpzhoufengdev | | [hidden email] | 签名由网易邮箱大师定制 |
Free forum by Nabble | Edit this page |