需要,针对某个表,按照key1(xxx+yyy+ky1),key2(xxx+yyy+ky2),....等多组key统计。其中xxx+yyy为共同字段。目前有如下3种实现我。
(1)每组key分别统计,分别insert。 (2)每组key分别统计,然后union结果,然后insert。 (3)针对表多次select,然后union,然后再基于key统计,然后insert。 第三种方案中,会将ky1、ky2这几个不同的字段通过 select 'ky1' as key_name, ky1 as key_value union select 'ky2' as key_name, ky2 as key_value 的方式统一为key这个字段,最后通过(xxx+yyy+key_name+key_value)的方式统计。 目前发现个问题,方案3中,window结点一直没有watermark,导致不发生计算。 |
具体SQL如下。
方案2: INSERT INTO flink_sdk_stats ( SELECT DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`, sid AS `supply_id`, 'd77' AS `field_key`, d77 AS `filed_value`, count(1) AS `pv` FROM baidu_log_view GROUP BY sid, d77, TUMBLE(event_time, INTERVAL '5' MINUTE) UNION ALL SELECT DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`, sid AS `supply_id`, 'd79' AS `field_key`, d79 AS `filed_value`, count(1) AS `pv` FROM baidu_log_view GROUP BY sid, d79, TUMBLE(event_time, INTERVAL '5' MINUTE) UNION ALL SELECT DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`, sid AS `supply_id`, 'd80' AS `field_key`, d80 AS `filed_value`, count(1) AS `pv` FROM baidu_log_view GROUP BY sid, d80, TUMBLE(event_time, INTERVAL '5' MINUTE) UNION ALL SELECT DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`, sid AS `supply_id`, 'd81' AS `field_key`, d81 AS `filed_value`, count(1) AS `pv` FROM baidu_log_view GROUP BY sid, d81, TUMBLE(event_time, INTERVAL '5' MINUTE) UNION ALL SELECT DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`, sid AS `supply_id`, 'd83' AS `field_key`, d83 AS `filed_value`, count(1) AS `pv` FROM baidu_log_view GROUP BY sid, d83, TUMBLE(event_time, INTERVAL '5' MINUTE) UNION ALL SELECT DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`, sid AS `supply_id`, 'd84' AS `field_key`, d84 AS `filed_value`, count(1) AS `pv` FROM baidu_log_view GROUP BY sid, d84, TUMBLE(event_time, INTERVAL '5' MINUTE) UNION ALL SELECT DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`, sid AS `supply_id`, 'd86' AS `field_key`, d86 AS `field_value`, count(1) AS `pv` FROM baidu_log_view GROUP BY sid, d86, TUMBLE(event_time, INTERVAL '5' MINUTE) ); 方案3: INSERT INTO flink_sdk_stats SELECT DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`, `supply_id`, `field_key`, `field_value`, count(1) AS `pv` FROM ( SELECT event_time, sid AS `supply_id`, 'd107' AS `field_key`, d107 AS `field_value` FROM baidu_log_view UNION ALL SELECT event_time, sid AS `supply_id`, 'd77' AS `field_key`, d77 AS `field_value` FROM baidu_log_view UNION ALL SELECT event_time, sid AS `supply_id`, 'd77' AS `field_key`, d77 AS `field_value` FROM baidu_log_view UNION ALL SELECT event_time, sid AS `supply_id`, 'd79' AS `field_key`, d79 AS `field_value` FROM baidu_log_view UNION ALL SELECT event_time, sid AS `supply_id`, 'd80' AS `field_key`, d80 AS `field_value` FROM baidu_log_view UNION ALL SELECT event_time, sid AS `supply_id`, 'd81' AS `field_key`, d81 AS `field_value` FROM baidu_log_view UNION ALL SELECT event_time, sid AS `supply_id`, 'd83' AS `field_key`, d83 AS `field_value` FROM baidu_log_view UNION ALL SELECT event_time, sid AS `supply_id`, 'd84' AS `field_key`, d84 AS `field_value` FROM baidu_log_view UNION ALL SELECT event_time, sid AS `supply_id`, 'd86' AS `field_key`, d86 AS `field_value` FROM baidu_log_view ) GROUP BY `supply_id`, `field_key`, `field_value`, TUMBLE(event_time, INTERVAL '5' MINUTE); 赵一旦 <[hidden email]> 于2020年12月15日周二 下午10:48写道: > > 需要,针对某个表,按照key1(xxx+yyy+ky1),key2(xxx+yyy+ky2),....等多组key统计。其中xxx+yyy为共同字段。目前有如下3种实现我。 > (1)每组key分别统计,分别insert。 > (2)每组key分别统计,然后union结果,然后insert。 > (3)针对表多次select,然后union,然后再基于key统计,然后insert。 > 第三种方案中,会将ky1、ky2这几个不同的字段通过 > > select 'ky1' as key_name, ky1 as key_value > union > select 'ky2' as key_name, ky2 as key_value > > 的方式统一为key这个字段,最后通过(xxx+yyy+key_name+key_value)的方式统计。 > > 目前发现个问题,方案3中,window结点一直没有watermark,导致不发生计算。 > > > > |
方案2没问题,方案3的window算子部分没有watermark。
赵一旦 <[hidden email]> 于2020年12月15日周二 下午10:49写道: > 具体SQL如下。 > 方案2: > > > INSERT INTO flink_sdk_stats > ( > SELECT > DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`, > sid AS `supply_id`, > 'd77' AS `field_key`, > d77 AS `filed_value`, > count(1) AS `pv` > FROM > baidu_log_view > GROUP BY > sid, > d77, > TUMBLE(event_time, INTERVAL '5' MINUTE) > > UNION ALL > > SELECT > DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`, > sid AS `supply_id`, > 'd79' AS `field_key`, > d79 AS `filed_value`, > count(1) AS `pv` > FROM > baidu_log_view > GROUP BY > sid, > d79, > TUMBLE(event_time, INTERVAL '5' MINUTE) > > UNION ALL > > SELECT > DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`, > sid AS `supply_id`, > 'd80' AS `field_key`, > d80 AS `filed_value`, > count(1) AS `pv` > FROM > baidu_log_view > GROUP BY > sid, > d80, > TUMBLE(event_time, INTERVAL '5' MINUTE) > > UNION ALL > > SELECT > DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`, > sid AS `supply_id`, > 'd81' AS `field_key`, > d81 AS `filed_value`, > count(1) AS `pv` > FROM > baidu_log_view > GROUP BY > sid, > d81, > TUMBLE(event_time, INTERVAL '5' MINUTE) > > UNION ALL > > SELECT > DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`, > sid AS `supply_id`, > 'd83' AS `field_key`, > d83 AS `filed_value`, > count(1) AS `pv` > FROM > baidu_log_view > GROUP BY > sid, > d83, > TUMBLE(event_time, INTERVAL '5' MINUTE) > > UNION ALL > > SELECT > DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`, > sid AS `supply_id`, > 'd84' AS `field_key`, > d84 AS `filed_value`, > count(1) AS `pv` > FROM > baidu_log_view > GROUP BY > sid, > d84, > TUMBLE(event_time, INTERVAL '5' MINUTE) > > UNION ALL > > SELECT > DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`, > sid AS `supply_id`, > 'd86' AS `field_key`, > d86 AS `field_value`, > count(1) AS `pv` > FROM > baidu_log_view > GROUP BY > sid, > d86, > TUMBLE(event_time, INTERVAL '5' MINUTE) > ); > > > > 方案3: > > > INSERT INTO flink_sdk_stats > SELECT > DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`, > `supply_id`, > `field_key`, > `field_value`, > count(1) AS `pv` > FROM > ( > SELECT event_time, sid AS `supply_id`, 'd107' AS `field_key`, d107 AS `field_value` FROM baidu_log_view > UNION ALL > SELECT event_time, sid AS `supply_id`, 'd77' AS `field_key`, d77 AS `field_value` FROM baidu_log_view > UNION ALL > SELECT event_time, sid AS `supply_id`, 'd77' AS `field_key`, d77 AS `field_value` FROM baidu_log_view > UNION ALL > SELECT event_time, sid AS `supply_id`, 'd79' AS `field_key`, d79 AS `field_value` FROM baidu_log_view > UNION ALL > SELECT event_time, sid AS `supply_id`, 'd80' AS `field_key`, d80 AS `field_value` FROM baidu_log_view > UNION ALL > SELECT event_time, sid AS `supply_id`, 'd81' AS `field_key`, d81 AS `field_value` FROM baidu_log_view > UNION ALL > SELECT event_time, sid AS `supply_id`, 'd83' AS `field_key`, d83 AS `field_value` FROM baidu_log_view > UNION ALL > SELECT event_time, sid AS `supply_id`, 'd84' AS `field_key`, d84 AS `field_value` FROM baidu_log_view > UNION ALL > SELECT event_time, sid AS `supply_id`, 'd86' AS `field_key`, d86 AS `field_value` FROM baidu_log_view > ) > GROUP BY > `supply_id`, `field_key`, `field_value`, TUMBLE(event_time, INTERVAL '5' MINUTE); > > > 赵一旦 <[hidden email]> 于2020年12月15日周二 下午10:48写道: > >> >> 需要,针对某个表,按照key1(xxx+yyy+ky1),key2(xxx+yyy+ky2),....等多组key统计。其中xxx+yyy为共同字段。目前有如下3种实现我。 >> (1)每组key分别统计,分别insert。 >> (2)每组key分别统计,然后union结果,然后insert。 >> (3)针对表多次select,然后union,然后再基于key统计,然后insert。 >> 第三种方案中,会将ky1、ky2这几个不同的字段通过 >> >> select 'ky1' as key_name, ky1 as key_value >> union >> select 'ky2' as key_name, ky2 as key_value >> >> 的方式统一为key这个字段,最后通过(xxx+yyy+key_name+key_value)的方式统计。 >> >> 目前发现个问题,方案3中,window结点一直没有watermark,导致不发生计算。 >> >> >> >> |
有没有人懂啊。今天的新发现如下。
我看了下我的source结点的WEB-UI上展示的那个名字,然后在文本编辑器中划分了下。发现如下。 方案2: Source: TableSourceScan(table=[[default_catalog, default_database, baidu_log, watermark=[-(TO_TIMESTAMP(FROM_UNIXTIME(/(CASE(IS NOT NULL($1), CAST($1):BIGINT NOT NULL, 0:BIGINT), 1000))), 60000:INTERVAL SECOND)]]], fields=[cid, server_time, d]) -> ( Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'77') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'77')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time]), Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'79') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'79')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time]), Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'80') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'80')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time]), Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'81') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'81')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time]), Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'83') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'83')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time]), Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'84') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'84')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time]), Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'86') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'86')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time]) ) 方案3: Source: TableSourceScan(table=[[default_catalog, default_database, dr1, watermark=[-(TO_TIMESTAMP(FROM_UNIXTIME(/(CASE(IS NOT NULL($1), CAST($1):BIGINT NOT NULL, 0:BIGINT), 1000))), 60000:INTERVAL SECOND)]]], fields=[cid, server_time, d]) -> ( Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time, (d ITEM _UTF-16LE'106') AS su pply_id, _UTF-16LE'd107':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'107') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'107')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS field_value]), Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time, (d ITEM _UTF-16LE'106') AS supply_id, _UTF-16LE'd77':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'77') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'77')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS field_value]), Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time, (d ITEM _UTF-16LE'106') AS supply_id, _UTF-16LE'd79':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'79') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'79')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS field_value]), Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time, (d ITEM _UTF-16LE'106') AS supply_id, _UTF-16LE'd80':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'80') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'80')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS field_value]), Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time, (d ITEM _UTF-16LE'106') AS supply_id, _UTF-16LE'd81':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'81') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'81')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS field_value]), Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time, (d ITEM _UTF-16LE'106') AS supply_id, _UTF-16LE'd83':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'83') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'83')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS field_value]), Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time, (d ITEM _UTF-16LE'106') AS supply_id, _UTF-16LE'd84':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'84') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'84')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS field_value]), Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time, (d ITEM _UTF-16LE'106') AS supply_id, _UTF-16LE'd86':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'86') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'86')) CASE _UTF-16LE'NULL':VARCHAR(214 7483647) CHARACTER SET "UTF-16LE") AS field_value]) ) 赵一旦 <[hidden email]> 于2020年12月15日周二 下午10:50写道: > 方案2没问题,方案3的window算子部分没有watermark。 > > 赵一旦 <[hidden email]> 于2020年12月15日周二 下午10:49写道: > >> 具体SQL如下。 >> 方案2: >> >> >> INSERT INTO flink_sdk_stats >> ( >> SELECT >> DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`, >> sid AS `supply_id`, >> 'd77' AS `field_key`, >> d77 AS `filed_value`, >> count(1) AS `pv` >> FROM >> baidu_log_view >> GROUP BY >> sid, >> d77, >> TUMBLE(event_time, INTERVAL '5' MINUTE) >> >> UNION ALL >> >> SELECT >> DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`, >> sid AS `supply_id`, >> 'd79' AS `field_key`, >> d79 AS `filed_value`, >> count(1) AS `pv` >> FROM >> baidu_log_view >> GROUP BY >> sid, >> d79, >> TUMBLE(event_time, INTERVAL '5' MINUTE) >> >> UNION ALL >> >> SELECT >> DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`, >> sid AS `supply_id`, >> 'd80' AS `field_key`, >> d80 AS `filed_value`, >> count(1) AS `pv` >> FROM >> baidu_log_view >> GROUP BY >> sid, >> d80, >> TUMBLE(event_time, INTERVAL '5' MINUTE) >> >> UNION ALL >> >> SELECT >> DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`, >> sid AS `supply_id`, >> 'd81' AS `field_key`, >> d81 AS `filed_value`, >> count(1) AS `pv` >> FROM >> baidu_log_view >> GROUP BY >> sid, >> d81, >> TUMBLE(event_time, INTERVAL '5' MINUTE) >> >> UNION ALL >> >> SELECT >> DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`, >> sid AS `supply_id`, >> 'd83' AS `field_key`, >> d83 AS `filed_value`, >> count(1) AS `pv` >> FROM >> baidu_log_view >> GROUP BY >> sid, >> d83, >> TUMBLE(event_time, INTERVAL '5' MINUTE) >> >> UNION ALL >> >> SELECT >> DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`, >> sid AS `supply_id`, >> 'd84' AS `field_key`, >> d84 AS `filed_value`, >> count(1) AS `pv` >> FROM >> baidu_log_view >> GROUP BY >> sid, >> d84, >> TUMBLE(event_time, INTERVAL '5' MINUTE) >> >> UNION ALL >> >> SELECT >> DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`, >> sid AS `supply_id`, >> 'd86' AS `field_key`, >> d86 AS `field_value`, >> count(1) AS `pv` >> FROM >> baidu_log_view >> GROUP BY >> sid, >> d86, >> TUMBLE(event_time, INTERVAL '5' MINUTE) >> ); >> >> >> >> 方案3: >> >> >> INSERT INTO flink_sdk_stats >> SELECT >> DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`, >> `supply_id`, >> `field_key`, >> `field_value`, >> count(1) AS `pv` >> FROM >> ( >> SELECT event_time, sid AS `supply_id`, 'd107' AS `field_key`, d107 AS `field_value` FROM baidu_log_view >> UNION ALL >> SELECT event_time, sid AS `supply_id`, 'd77' AS `field_key`, d77 AS `field_value` FROM baidu_log_view >> UNION ALL >> SELECT event_time, sid AS `supply_id`, 'd77' AS `field_key`, d77 AS `field_value` FROM baidu_log_view >> UNION ALL >> SELECT event_time, sid AS `supply_id`, 'd79' AS `field_key`, d79 AS `field_value` FROM baidu_log_view >> UNION ALL >> SELECT event_time, sid AS `supply_id`, 'd80' AS `field_key`, d80 AS `field_value` FROM baidu_log_view >> UNION ALL >> SELECT event_time, sid AS `supply_id`, 'd81' AS `field_key`, d81 AS `field_value` FROM baidu_log_view >> UNION ALL >> SELECT event_time, sid AS `supply_id`, 'd83' AS `field_key`, d83 AS `field_value` FROM baidu_log_view >> UNION ALL >> SELECT event_time, sid AS `supply_id`, 'd84' AS `field_key`, d84 AS `field_value` FROM baidu_log_view >> UNION ALL >> SELECT event_time, sid AS `supply_id`, 'd86' AS `field_key`, d86 AS `field_value` FROM baidu_log_view >> ) >> GROUP BY >> `supply_id`, `field_key`, `field_value`, TUMBLE(event_time, INTERVAL '5' MINUTE); >> >> >> 赵一旦 <[hidden email]> 于2020年12月15日周二 下午10:48写道: >> >>> >>> 需要,针对某个表,按照key1(xxx+yyy+ky1),key2(xxx+yyy+ky2),....等多组key统计。其中xxx+yyy为共同字段。目前有如下3种实现我。 >>> (1)每组key分别统计,分别insert。 >>> (2)每组key分别统计,然后union结果,然后insert。 >>> (3)针对表多次select,然后union,然后再基于key统计,然后insert。 >>> 第三种方案中,会将ky1、ky2这几个不同的字段通过 >>> >>> select 'ky1' as key_name, ky1 as key_value >>> union >>> select 'ky2' as key_name, ky2 as key_value >>> >>> 的方式统一为key这个字段,最后通过(xxx+yyy+key_name+key_value)的方式统计。 >>> >>> 目前发现个问题,方案3中,window结点一直没有watermark,导致不发生计算。 >>> >>> >>> >>> |
从这2个方案的source结点来看没有太大区别。但问题在于,我从web-ui的metric标签查看outputwatermark的时候。发现方案2中0号并行实例存在8个带有outputwatermark的指标(1个source开头,7个calc开头)。方案3中则只有2个。
赵一旦 <[hidden email]> 于2020年12月16日周三 上午10:41写道: > 有没有人懂啊。今天的新发现如下。 > 我看了下我的source结点的WEB-UI上展示的那个名字,然后在文本编辑器中划分了下。发现如下。 > 方案2: > > Source: TableSourceScan(table=[[default_catalog, default_database, baidu_log, watermark=[-(TO_TIMESTAMP(FROM_UNIXTIME(/(CASE(IS NOT NULL($1), CAST($1):BIGINT NOT NULL, 0:BIGINT), 1000))), 60000:INTERVAL SECOND)]]], fields=[cid, server_time, d]) > -> ( > > Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'77') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'77')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time]), > > Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'79') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'79')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time]), > > Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'80') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'80')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time]), > > Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'81') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'81')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time]), > > Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'83') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'83')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time]), > > Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'84') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'84')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time]), > > Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'86') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'86')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time]) > ) > > 方案3: > > Source: TableSourceScan(table=[[default_catalog, default_database, dr1, watermark=[-(TO_TIMESTAMP(FROM_UNIXTIME(/(CASE(IS NOT NULL($1), CAST($1):BIGINT NOT NULL, 0:BIGINT), 1000))), 60000:INTERVAL SECOND)]]], fields=[cid, server_time, d]) > -> ( > > Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time, (d ITEM _UTF-16LE'106') AS su > > pply_id, _UTF-16LE'd107':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'107') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'107')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS field_value]), > > > Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time, (d ITEM _UTF-16LE'106') AS supply_id, _UTF-16LE'd77':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'77') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'77')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS field_value]), > > > Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time, (d ITEM _UTF-16LE'106') AS supply_id, _UTF-16LE'd79':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'79') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'79')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS field_value]), > > > Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time, (d ITEM _UTF-16LE'106') AS supply_id, _UTF-16LE'd80':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'80') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'80')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS field_value]), > > > Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time, (d ITEM _UTF-16LE'106') AS supply_id, _UTF-16LE'd81':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'81') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'81')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS field_value]), > > > Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time, (d ITEM _UTF-16LE'106') AS supply_id, _UTF-16LE'd83':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'83') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'83')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS field_value]), > > > Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time, (d ITEM _UTF-16LE'106') AS supply_id, _UTF-16LE'd84':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'84') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'84')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS field_value]), > > > Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time, (d ITEM _UTF-16LE'106') AS supply_id, _UTF-16LE'd86':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'86') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'86')) CASE _UTF-16LE'NULL':VARCHAR(214 > 7483647) CHARACTER SET "UTF-16LE") AS field_value]) > ) > > > 赵一旦 <[hidden email]> 于2020年12月15日周二 下午10:50写道: > >> 方案2没问题,方案3的window算子部分没有watermark。 >> >> 赵一旦 <[hidden email]> 于2020年12月15日周二 下午10:49写道: >> >>> 具体SQL如下。 >>> 方案2: >>> >>> >>> INSERT INTO flink_sdk_stats >>> ( >>> SELECT >>> DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`, >>> sid AS `supply_id`, >>> 'd77' AS `field_key`, >>> d77 AS `filed_value`, >>> count(1) AS `pv` >>> FROM >>> baidu_log_view >>> GROUP BY >>> sid, >>> d77, >>> TUMBLE(event_time, INTERVAL '5' MINUTE) >>> >>> UNION ALL >>> >>> SELECT >>> DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`, >>> sid AS `supply_id`, >>> 'd79' AS `field_key`, >>> d79 AS `filed_value`, >>> count(1) AS `pv` >>> FROM >>> baidu_log_view >>> GROUP BY >>> sid, >>> d79, >>> TUMBLE(event_time, INTERVAL '5' MINUTE) >>> >>> UNION ALL >>> >>> SELECT >>> DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`, >>> sid AS `supply_id`, >>> 'd80' AS `field_key`, >>> d80 AS `filed_value`, >>> count(1) AS `pv` >>> FROM >>> baidu_log_view >>> GROUP BY >>> sid, >>> d80, >>> TUMBLE(event_time, INTERVAL '5' MINUTE) >>> >>> UNION ALL >>> >>> SELECT >>> DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`, >>> sid AS `supply_id`, >>> 'd81' AS `field_key`, >>> d81 AS `filed_value`, >>> count(1) AS `pv` >>> FROM >>> baidu_log_view >>> GROUP BY >>> sid, >>> d81, >>> TUMBLE(event_time, INTERVAL '5' MINUTE) >>> >>> UNION ALL >>> >>> SELECT >>> DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`, >>> sid AS `supply_id`, >>> 'd83' AS `field_key`, >>> d83 AS `filed_value`, >>> count(1) AS `pv` >>> FROM >>> baidu_log_view >>> GROUP BY >>> sid, >>> d83, >>> TUMBLE(event_time, INTERVAL '5' MINUTE) >>> >>> UNION ALL >>> >>> SELECT >>> DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`, >>> sid AS `supply_id`, >>> 'd84' AS `field_key`, >>> d84 AS `filed_value`, >>> count(1) AS `pv` >>> FROM >>> baidu_log_view >>> GROUP BY >>> sid, >>> d84, >>> TUMBLE(event_time, INTERVAL '5' MINUTE) >>> >>> UNION ALL >>> >>> SELECT >>> DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`, >>> sid AS `supply_id`, >>> 'd86' AS `field_key`, >>> d86 AS `field_value`, >>> count(1) AS `pv` >>> FROM >>> baidu_log_view >>> GROUP BY >>> sid, >>> d86, >>> TUMBLE(event_time, INTERVAL '5' MINUTE) >>> ); >>> >>> >>> >>> 方案3: >>> >>> >>> INSERT INTO flink_sdk_stats >>> SELECT >>> DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`, >>> `supply_id`, >>> `field_key`, >>> `field_value`, >>> count(1) AS `pv` >>> FROM >>> ( >>> SELECT event_time, sid AS `supply_id`, 'd107' AS `field_key`, d107 AS `field_value` FROM baidu_log_view >>> UNION ALL >>> SELECT event_time, sid AS `supply_id`, 'd77' AS `field_key`, d77 AS `field_value` FROM baidu_log_view >>> UNION ALL >>> SELECT event_time, sid AS `supply_id`, 'd77' AS `field_key`, d77 AS `field_value` FROM baidu_log_view >>> UNION ALL >>> SELECT event_time, sid AS `supply_id`, 'd79' AS `field_key`, d79 AS `field_value` FROM baidu_log_view >>> UNION ALL >>> SELECT event_time, sid AS `supply_id`, 'd80' AS `field_key`, d80 AS `field_value` FROM baidu_log_view >>> UNION ALL >>> SELECT event_time, sid AS `supply_id`, 'd81' AS `field_key`, d81 AS `field_value` FROM baidu_log_view >>> UNION ALL >>> SELECT event_time, sid AS `supply_id`, 'd83' AS `field_key`, d83 AS `field_value` FROM baidu_log_view >>> UNION ALL >>> SELECT event_time, sid AS `supply_id`, 'd84' AS `field_key`, d84 AS `field_value` FROM baidu_log_view >>> UNION ALL >>> SELECT event_time, sid AS `supply_id`, 'd86' AS `field_key`, d86 AS `field_value` FROM baidu_log_view >>> ) >>> GROUP BY >>> `supply_id`, `field_key`, `field_value`, TUMBLE(event_time, INTERVAL '5' MINUTE); >>> >>> >>> 赵一旦 <[hidden email]> 于2020年12月15日周二 下午10:48写道: >>> >>>> >>>> 需要,针对某个表,按照key1(xxx+yyy+ky1),key2(xxx+yyy+ky2),....等多组key统计。其中xxx+yyy为共同字段。目前有如下3种实现我。 >>>> (1)每组key分别统计,分别insert。 >>>> (2)每组key分别统计,然后union结果,然后insert。 >>>> (3)针对表多次select,然后union,然后再基于key统计,然后insert。 >>>> 第三种方案中,会将ky1、ky2这几个不同的字段通过 >>>> >>>> select 'ky1' as key_name, ky1 as key_value >>>> union >>>> select 'ky2' as key_name, ky2 as key_value >>>> >>>> 的方式统一为key这个字段,最后通过(xxx+yyy+key_name+key_value)的方式统计。 >>>> >>>> 目前发现个问题,方案3中,window结点一直没有watermark,导致不发生计算。 >>>> >>>> >>>> >>>> |
为了方便描述,重新给出了完整SQL,以及部分分析到如下地址。
https://www.yuque.com/sixhours-gid0m/eegye3/xrz2mm 欢迎大家帮忙解答。 赵一旦 <[hidden email]> 于2020年12月16日周三 上午10:52写道: > > 从这2个方案的source结点来看没有太大区别。但问题在于,我从web-ui的metric标签查看outputwatermark的时候。发现方案2中0号并行实例存在8个带有outputwatermark的指标(1个source开头,7个calc开头)。方案3中则只有2个。 > > 赵一旦 <[hidden email]> 于2020年12月16日周三 上午10:41写道: > >> 有没有人懂啊。今天的新发现如下。 >> 我看了下我的source结点的WEB-UI上展示的那个名字,然后在文本编辑器中划分了下。发现如下。 >> 方案2: >> >> Source: TableSourceScan(table=[[default_catalog, default_database, baidu_log, watermark=[-(TO_TIMESTAMP(FROM_UNIXTIME(/(CASE(IS NOT NULL($1), CAST($1):BIGINT NOT NULL, 0:BIGINT), 1000))), 60000:INTERVAL SECOND)]]], fields=[cid, server_time, d]) >> -> ( >> >> Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'77') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'77')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time]), >> >> Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'79') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'79')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time]), >> >> Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'80') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'80')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time]), >> >> Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'81') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'81')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time]), >> >> Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'83') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'83')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time]), >> >> Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'84') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'84')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time]), >> >> Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'86') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'86')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time]) >> ) >> >> 方案3: >> >> Source: TableSourceScan(table=[[default_catalog, default_database, dr1, watermark=[-(TO_TIMESTAMP(FROM_UNIXTIME(/(CASE(IS NOT NULL($1), CAST($1):BIGINT NOT NULL, 0:BIGINT), 1000))), 60000:INTERVAL SECOND)]]], fields=[cid, server_time, d]) >> -> ( >> >> Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time, (d ITEM _UTF-16LE'106') AS su >> >> pply_id, _UTF-16LE'd107':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'107') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'107')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS field_value]), >> >> >> Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time, (d ITEM _UTF-16LE'106') AS supply_id, _UTF-16LE'd77':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'77') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'77')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS field_value]), >> >> >> Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time, (d ITEM _UTF-16LE'106') AS supply_id, _UTF-16LE'd79':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'79') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'79')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS field_value]), >> >> >> Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time, (d ITEM _UTF-16LE'106') AS supply_id, _UTF-16LE'd80':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'80') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'80')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS field_value]), >> >> >> Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time, (d ITEM _UTF-16LE'106') AS supply_id, _UTF-16LE'd81':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'81') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'81')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS field_value]), >> >> >> Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time, (d ITEM _UTF-16LE'106') AS supply_id, _UTF-16LE'd83':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'83') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'83')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS field_value]), >> >> >> Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time, (d ITEM _UTF-16LE'106') AS supply_id, _UTF-16LE'd84':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'84') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'84')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS field_value]), >> >> >> Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time, (d ITEM _UTF-16LE'106') AS supply_id, _UTF-16LE'd86':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'86') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'86')) CASE _UTF-16LE'NULL':VARCHAR(214 >> 7483647) CHARACTER SET "UTF-16LE") AS field_value]) >> ) >> >> >> 赵一旦 <[hidden email]> 于2020年12月15日周二 下午10:50写道: >> >>> 方案2没问题,方案3的window算子部分没有watermark。 >>> >>> 赵一旦 <[hidden email]> 于2020年12月15日周二 下午10:49写道: >>> >>>> 具体SQL如下。 >>>> 方案2: >>>> >>>> >>>> INSERT INTO flink_sdk_stats >>>> ( >>>> SELECT >>>> DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`, >>>> sid AS `supply_id`, >>>> 'd77' AS `field_key`, >>>> d77 AS `filed_value`, >>>> count(1) AS `pv` >>>> FROM >>>> baidu_log_view >>>> GROUP BY >>>> sid, >>>> d77, >>>> TUMBLE(event_time, INTERVAL '5' MINUTE) >>>> >>>> UNION ALL >>>> >>>> SELECT >>>> DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`, >>>> sid AS `supply_id`, >>>> 'd79' AS `field_key`, >>>> d79 AS `filed_value`, >>>> count(1) AS `pv` >>>> FROM >>>> baidu_log_view >>>> GROUP BY >>>> sid, >>>> d79, >>>> TUMBLE(event_time, INTERVAL '5' MINUTE) >>>> >>>> UNION ALL >>>> >>>> SELECT >>>> DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`, >>>> sid AS `supply_id`, >>>> 'd80' AS `field_key`, >>>> d80 AS `filed_value`, >>>> count(1) AS `pv` >>>> FROM >>>> baidu_log_view >>>> GROUP BY >>>> sid, >>>> d80, >>>> TUMBLE(event_time, INTERVAL '5' MINUTE) >>>> >>>> UNION ALL >>>> >>>> SELECT >>>> DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`, >>>> sid AS `supply_id`, >>>> 'd81' AS `field_key`, >>>> d81 AS `filed_value`, >>>> count(1) AS `pv` >>>> FROM >>>> baidu_log_view >>>> GROUP BY >>>> sid, >>>> d81, >>>> TUMBLE(event_time, INTERVAL '5' MINUTE) >>>> >>>> UNION ALL >>>> >>>> SELECT >>>> DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`, >>>> sid AS `supply_id`, >>>> 'd83' AS `field_key`, >>>> d83 AS `filed_value`, >>>> count(1) AS `pv` >>>> FROM >>>> baidu_log_view >>>> GROUP BY >>>> sid, >>>> d83, >>>> TUMBLE(event_time, INTERVAL '5' MINUTE) >>>> >>>> UNION ALL >>>> >>>> SELECT >>>> DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`, >>>> sid AS `supply_id`, >>>> 'd84' AS `field_key`, >>>> d84 AS `filed_value`, >>>> count(1) AS `pv` >>>> FROM >>>> baidu_log_view >>>> GROUP BY >>>> sid, >>>> d84, >>>> TUMBLE(event_time, INTERVAL '5' MINUTE) >>>> >>>> UNION ALL >>>> >>>> SELECT >>>> DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`, >>>> sid AS `supply_id`, >>>> 'd86' AS `field_key`, >>>> d86 AS `field_value`, >>>> count(1) AS `pv` >>>> FROM >>>> baidu_log_view >>>> GROUP BY >>>> sid, >>>> d86, >>>> TUMBLE(event_time, INTERVAL '5' MINUTE) >>>> ); >>>> >>>> >>>> >>>> 方案3: >>>> >>>> >>>> INSERT INTO flink_sdk_stats >>>> SELECT >>>> DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`, >>>> `supply_id`, >>>> `field_key`, >>>> `field_value`, >>>> count(1) AS `pv` >>>> FROM >>>> ( >>>> SELECT event_time, sid AS `supply_id`, 'd107' AS `field_key`, d107 AS `field_value` FROM baidu_log_view >>>> UNION ALL >>>> SELECT event_time, sid AS `supply_id`, 'd77' AS `field_key`, d77 AS `field_value` FROM baidu_log_view >>>> UNION ALL >>>> SELECT event_time, sid AS `supply_id`, 'd77' AS `field_key`, d77 AS `field_value` FROM baidu_log_view >>>> UNION ALL >>>> SELECT event_time, sid AS `supply_id`, 'd79' AS `field_key`, d79 AS `field_value` FROM baidu_log_view >>>> UNION ALL >>>> SELECT event_time, sid AS `supply_id`, 'd80' AS `field_key`, d80 AS `field_value` FROM baidu_log_view >>>> UNION ALL >>>> SELECT event_time, sid AS `supply_id`, 'd81' AS `field_key`, d81 AS `field_value` FROM baidu_log_view >>>> UNION ALL >>>> SELECT event_time, sid AS `supply_id`, 'd83' AS `field_key`, d83 AS `field_value` FROM baidu_log_view >>>> UNION ALL >>>> SELECT event_time, sid AS `supply_id`, 'd84' AS `field_key`, d84 AS `field_value` FROM baidu_log_view >>>> UNION ALL >>>> SELECT event_time, sid AS `supply_id`, 'd86' AS `field_key`, d86 AS `field_value` FROM baidu_log_view >>>> ) >>>> GROUP BY >>>> `supply_id`, `field_key`, `field_value`, TUMBLE(event_time, INTERVAL '5' MINUTE); >>>> >>>> >>>> 赵一旦 <[hidden email]> 于2020年12月15日周二 下午10:48写道: >>>> >>>>> >>>>> 需要,针对某个表,按照key1(xxx+yyy+ky1),key2(xxx+yyy+ky2),....等多组key统计。其中xxx+yyy为共同字段。目前有如下3种实现我。 >>>>> (1)每组key分别统计,分别insert。 >>>>> (2)每组key分别统计,然后union结果,然后insert。 >>>>> (3)针对表多次select,然后union,然后再基于key统计,然后insert。 >>>>> 第三种方案中,会将ky1、ky2这几个不同的字段通过 >>>>> >>>>> select 'ky1' as key_name, ky1 as key_value >>>>> union >>>>> select 'ky2' as key_name, ky2 as key_value >>>>> >>>>> 的方式统一为key这个字段,最后通过(xxx+yyy+key_name+key_value)的方式统计。 >>>>> >>>>> 目前发现个问题,方案3中,window结点一直没有watermark,导致不发生计算。 >>>>> >>>>> >>>>> >>>>> |
Free forum by Nabble | Edit this page |