flink sql 通过group by 滑窗计算的结果sink到kafka后有重复数据

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

flink sql 通过group by 滑窗计算的结果sink到kafka后有重复数据

dpzhoufengdev
flink sql 通过group by 滑窗计算的结果sink到kafka后有重复数据,每条数据都有两条完全一样的数据。这个是什么原因导致的?
聚合计算的逻辑

Table tableoneHour = tableEnv.sqlQuery(

                "select  appname" +

                        ",productCode" +

                        ",link" +

                         ",count(case when nodeName = 'FailTerminateEndEvent' then 1 else null end) as errNum" +

                        ",count(case when nodeName = 'EndEvent' and passStatus = 'Accept' then 1 else null end ) as passNum " +

                        ",count(case when nodeName = 'EndEvent' and passStatus = 'Reject' then 1 else null end) as refNum " +

                        ",count(case when nodeName = 'EndEvent' and passStatus <> 'Reject' and passStatus <> 'Accept' then 1 else null end) as processNum " +

                        ",sum(case when nodeName = 'EndEvent' then loansum else 0 end ) as loansum" +

                        ",count(1) as allNum " +

                        ",'OneHour' as windowType " +

                        ",HOP_END(rowtime, INTERVAL '1'  HOUR, INTERVAL '1' HOUR) as inputtime " +

                        "from  table1_2 WHERE link in ('1','2','5')  GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' HOUR) " +

                        ",appname,productCode,link");


将table转成dataStream


//计算的多张表union到一起
        Table tablesql = tableHalfHour.unionAll(tableoneHour).unionAll(tableoneDay);
        DataStream<Tuple2<Boolean, Row>> dataStream2 = tableEnv.toRetractStream(tablesql,Row.class);
        DataStream<Tuple2<Boolean, Row>> dataStream7Day = tableEnv.toRetractStream(table7Day,Row.class);


        //将Table转成dataStream
        DataStream<String> reslut1 = dataStream2.map(new MapFunction<Tuple2<Boolean, Row>, String>() {
            @Override
            public String map(Tuple2<Boolean, Row> tuple2) throws Exception {
                Map<String, Object> json = new HashMap<>();
                json.put("appname",tuple2.f1.getField(0));
                json.put("productCode", tuple2.f1.getField(1));
                json.put("link",tuple2.f1.getField(2));
                json.put("errNum",tuple2.f1.getField(3));
                json.put("passNum",tuple2.f1.getField(4));
                json.put("refNum",tuple2.f1.getField(5));
                json.put("processNum",tuple2.f1.getField(6));
                json.put("loansum",tuple2.f1.getField(7));
                json.put("allNum",tuple2.f1.getField(8));
                json.put("windowType",tuple2.f1.getField(9));
                json.put("inputtime",tuple2.f1.getField(10));
                return JSON.toJSONString(json);
            }
        });






将结果sink到kafka中
reslut1.addSink(new FlinkKafkaProducer08<>("********",new SimpleStringSchema(),props1));
         reslut2.addSink(new FlinkKafkaProducer08<>("********",new SimpleStringSchema(),props1));


sink到kafka的数据存在两条完全一样的数据




| |
dpzhoufengdev
|
|
[hidden email]
|
签名由网易邮箱大师定制
Reply | Threaded
Open this post in threaded view
|

Re:flink sql 通过group by 滑窗计算的结果sink到kafka后有重复数据

hailongwang
Hi,
  可以将 HOP_START 和 HOP_END 都输出来看下重复数据分别属于哪个 window 的。
  其中 HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' HOUR) 用 TUMBLE(rowtime, INTERVAL '1' HOUR) 就可以了?




Best,
Hailong
在 2020-11-23 15:41:52,"dpzhoufengdev" <[hidden email]> 写道:

>flink sql 通过group by 滑窗计算的结果sink到kafka后有重复数据,每条数据都有两条完全一样的数据。这个是什么原因导致的?
>聚合计算的逻辑
>
>Table tableoneHour = tableEnv.sqlQuery(
>
>                "select  appname" +
>
>                        ",productCode" +
>
>                        ",link" +
>
>                         ",count(case when nodeName = 'FailTerminateEndEvent' then 1 else null end) as errNum" +
>
>                        ",count(case when nodeName = 'EndEvent' and passStatus = 'Accept' then 1 else null end ) as passNum " +
>
>                        ",count(case when nodeName = 'EndEvent' and passStatus = 'Reject' then 1 else null end) as refNum " +
>
>                        ",count(case when nodeName = 'EndEvent' and passStatus <> 'Reject' and passStatus <> 'Accept' then 1 else null end) as processNum " +
>
>                        ",sum(case when nodeName = 'EndEvent' then loansum else 0 end ) as loansum" +
>
>                        ",count(1) as allNum " +
>
>                        ",'OneHour' as windowType " +
>
>                        ",HOP_END(rowtime, INTERVAL '1'  HOUR, INTERVAL '1' HOUR) as inputtime " +
>
>                        "from  table1_2 WHERE link in ('1','2','5')  GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' HOUR) " +
>
>                        ",appname,productCode,link");
>
>
>将table转成dataStream
>
>
>//计算的多张表union到一起
>        Table tablesql = tableHalfHour.unionAll(tableoneHour).unionAll(tableoneDay);
>        DataStream<Tuple2<Boolean, Row>> dataStream2 = tableEnv.toRetractStream(tablesql,Row.class);
>        DataStream<Tuple2<Boolean, Row>> dataStream7Day = tableEnv.toRetractStream(table7Day,Row.class);
>
>
>        //将Table转成dataStream
>        DataStream<String> reslut1 = dataStream2.map(new MapFunction<Tuple2<Boolean, Row>, String>() {
>            @Override
>            public String map(Tuple2<Boolean, Row> tuple2) throws Exception {
>                Map<String, Object> json = new HashMap<>();
>                json.put("appname",tuple2.f1.getField(0));
>                json.put("productCode", tuple2.f1.getField(1));
>                json.put("link",tuple2.f1.getField(2));
>                json.put("errNum",tuple2.f1.getField(3));
>                json.put("passNum",tuple2.f1.getField(4));
>                json.put("refNum",tuple2.f1.getField(5));
>                json.put("processNum",tuple2.f1.getField(6));
>                json.put("loansum",tuple2.f1.getField(7));
>                json.put("allNum",tuple2.f1.getField(8));
>                json.put("windowType",tuple2.f1.getField(9));
>                json.put("inputtime",tuple2.f1.getField(10));
>                return JSON.toJSONString(json);
>            }
>        });
>
>
>
>
>
>
>将结果sink到kafka中
>reslut1.addSink(new FlinkKafkaProducer08<>("********",new SimpleStringSchema(),props1));
>         reslut2.addSink(new FlinkKafkaProducer08<>("********",new SimpleStringSchema(),props1));
>
>
>sink到kafka的数据存在两条完全一样的数据
>
>
>
>
>| |
>dpzhoufengdev
>|
>|
>[hidden email]
>|
>签名由网易邮箱大师定制
Reply | Threaded
Open this post in threaded view
|

回复:flink sql 通过group by 滑窗计算的结果sink到kafka后有重复数据

dpzhoufengdev
好像跟窗口类型也没关系我有一个Table的计算是使用的 是TUMBLE窗口。现在一个主要的问题就是所有生成的数据都有两条完全一样的,不知道怎么回事?


| |
dpzhoufengdev
|
|
[hidden email]
|
签名由网易邮箱大师定制
在2020年11月23日 20:00,hailongwang<[hidden email]> 写道:
Hi,
可以将 HOP_START 和 HOP_END 都输出来看下重复数据分别属于哪个 window 的。
其中 HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' HOUR) 用 TUMBLE(rowtime, INTERVAL '1' HOUR) 就可以了?




Best,
Hailong
在 2020-11-23 15:41:52,"dpzhoufengdev" <[hidden email]> 写道:
flink sql 通过group by 滑窗计算的结果sink到kafka后有重复数据,每条数据都有两条完全一样的数据。这个是什么原因导致的?
聚合计算的逻辑

Table tableoneHour = tableEnv.sqlQuery(

"select  appname" +

",productCode" +

",link" +

",count(case when nodeName = 'FailTerminateEndEvent' then 1 else null end) as errNum" +

",count(case when nodeName = 'EndEvent' and passStatus = 'Accept' then 1 else null end ) as passNum " +

",count(case when nodeName = 'EndEvent' and passStatus = 'Reject' then 1 else null end) as refNum " +

",count(case when nodeName = 'EndEvent' and passStatus <> 'Reject' and passStatus <> 'Accept' then 1 else null end) as processNum " +

",sum(case when nodeName = 'EndEvent' then loansum else 0 end ) as loansum" +

",count(1) as allNum " +

",'OneHour' as windowType " +

",HOP_END(rowtime, INTERVAL '1'  HOUR, INTERVAL '1' HOUR) as inputtime " +

"from  table1_2 WHERE link in ('1','2','5')  GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' HOUR) " +

",appname,productCode,link");


将table转成dataStream


//计算的多张表union到一起
Table tablesql = tableHalfHour.unionAll(tableoneHour).unionAll(tableoneDay);
DataStream<Tuple2<Boolean, Row>> dataStream2 = tableEnv.toRetractStream(tablesql,Row.class);
DataStream<Tuple2<Boolean, Row>> dataStream7Day = tableEnv.toRetractStream(table7Day,Row.class);


//将Table转成dataStream
DataStream<String> reslut1 = dataStream2.map(new MapFunction<Tuple2<Boolean, Row>, String>() {
@Override
public String map(Tuple2<Boolean, Row> tuple2) throws Exception {
Map<String, Object> json = new HashMap<>();
json.put("appname",tuple2.f1.getField(0));
json.put("productCode", tuple2.f1.getField(1));
json.put("link",tuple2.f1.getField(2));
json.put("errNum",tuple2.f1.getField(3));
json.put("passNum",tuple2.f1.getField(4));
json.put("refNum",tuple2.f1.getField(5));
json.put("processNum",tuple2.f1.getField(6));
json.put("loansum",tuple2.f1.getField(7));
json.put("allNum",tuple2.f1.getField(8));
json.put("windowType",tuple2.f1.getField(9));
json.put("inputtime",tuple2.f1.getField(10));
return JSON.toJSONString(json);
}
});






将结果sink到kafka中
reslut1.addSink(new FlinkKafkaProducer08<>("********",new SimpleStringSchema(),props1));
reslut2.addSink(new FlinkKafkaProducer08<>("********",new SimpleStringSchema(),props1));


sink到kafka的数据存在两条完全一样的数据




| |
dpzhoufengdev
|
|
[hidden email]
|
签名由网易邮箱大师定制