FlinkSQL多个select group by统计union后输出,或多个select union后在group by统计的区别,以及问题。

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

FlinkSQL多个select group by统计union后输出,或多个select union后在group by统计的区别,以及问题。

nobleyd
需要,针对某个表,按照key1(xxx+yyy+ky1),key2(xxx+yyy+ky2),....等多组key统计。其中xxx+yyy为共同字段。目前有如下3种实现我。
(1)每组key分别统计,分别insert。
(2)每组key分别统计,然后union结果,然后insert。
(3)针对表多次select,然后union,然后再基于key统计,然后insert。
第三种方案中,会将ky1、ky2这几个不同的字段通过

select 'ky1' as key_name, ky1 as key_value
union
select 'ky2' as key_name, ky2 as key_value

的方式统一为key这个字段,最后通过(xxx+yyy+key_name+key_value)的方式统计。

目前发现个问题,方案3中,window结点一直没有watermark,导致不发生计算。
Reply | Threaded
Open this post in threaded view
|

Re: FlinkSQL多个select group by统计union后输出,或多个select union后在group by统计的区别,以及问题。

nobleyd
具体SQL如下。
方案2:


INSERT INTO flink_sdk_stats
(
    SELECT
        DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE),
'yyyyMMddHHmm') AS `time`,
        sid
            AS `supply_id`,
        'd77'
            AS `field_key`,
        d77
            AS `filed_value`,
        count(1)
            AS `pv`
    FROM
        baidu_log_view
    GROUP BY
        sid,
        d77,
        TUMBLE(event_time, INTERVAL '5' MINUTE)

    UNION ALL

    SELECT
        DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE),
'yyyyMMddHHmm') AS `time`,
        sid
            AS `supply_id`,
        'd79'
            AS `field_key`,
        d79
            AS `filed_value`,
        count(1)
            AS `pv`
    FROM
        baidu_log_view
    GROUP BY
        sid,
        d79,
        TUMBLE(event_time, INTERVAL '5' MINUTE)

    UNION ALL

    SELECT
        DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE),
'yyyyMMddHHmm') AS `time`,
        sid
            AS `supply_id`,
        'd80'
            AS `field_key`,
        d80
            AS `filed_value`,
        count(1)
            AS `pv`
    FROM
        baidu_log_view
    GROUP BY
        sid,
        d80,
        TUMBLE(event_time, INTERVAL '5' MINUTE)

    UNION ALL

    SELECT
        DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE),
'yyyyMMddHHmm') AS `time`,
        sid
            AS `supply_id`,
        'd81'
            AS `field_key`,
        d81
            AS `filed_value`,
        count(1)
            AS `pv`
    FROM
        baidu_log_view
    GROUP BY
        sid,
        d81,
        TUMBLE(event_time, INTERVAL '5' MINUTE)

    UNION ALL

    SELECT
        DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE),
'yyyyMMddHHmm') AS `time`,
        sid
            AS `supply_id`,
        'd83'
            AS `field_key`,
        d83
            AS `filed_value`,
        count(1)
            AS `pv`
    FROM
        baidu_log_view
    GROUP BY
        sid,
        d83,
        TUMBLE(event_time, INTERVAL '5' MINUTE)

    UNION ALL

    SELECT
        DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE),
'yyyyMMddHHmm') AS `time`,
        sid
            AS `supply_id`,
        'd84'
            AS `field_key`,
        d84
            AS `filed_value`,
        count(1)
            AS `pv`
    FROM
        baidu_log_view
    GROUP BY
        sid,
        d84,
        TUMBLE(event_time, INTERVAL '5' MINUTE)

    UNION ALL

    SELECT
        DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE),
'yyyyMMddHHmm') AS `time`,
        sid
            AS `supply_id`,
        'd86'
            AS `field_key`,
        d86
            AS `field_value`,
        count(1)
            AS `pv`
    FROM
        baidu_log_view
    GROUP BY
        sid,
        d86,
        TUMBLE(event_time, INTERVAL '5' MINUTE)
);



方案3:


INSERT INTO flink_sdk_stats
SELECT
    DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE),
'yyyyMMddHHmm') AS `time`,
    `supply_id`,
    `field_key`,
    `field_value`,
    count(1) AS `pv`
FROM
    (
     SELECT event_time, sid AS `supply_id`, 'd107' AS `field_key`,
d107 AS `field_value` FROM baidu_log_view
     UNION ALL
     SELECT event_time, sid AS `supply_id`, 'd77'  AS `field_key`, d77
 AS `field_value` FROM baidu_log_view
     UNION ALL
     SELECT event_time, sid AS `supply_id`, 'd77'  AS `field_key`, d77
 AS `field_value` FROM baidu_log_view
     UNION ALL
     SELECT event_time, sid AS `supply_id`, 'd79'  AS `field_key`, d79
 AS `field_value` FROM baidu_log_view
     UNION ALL
     SELECT event_time, sid AS `supply_id`, 'd80'  AS `field_key`, d80
 AS `field_value` FROM baidu_log_view
     UNION ALL
     SELECT event_time, sid AS `supply_id`, 'd81'  AS `field_key`, d81
 AS `field_value` FROM baidu_log_view
     UNION ALL
     SELECT event_time, sid AS `supply_id`, 'd83'  AS `field_key`, d83
 AS `field_value` FROM baidu_log_view
     UNION ALL
     SELECT event_time, sid AS `supply_id`, 'd84'  AS `field_key`, d84
 AS `field_value` FROM baidu_log_view
     UNION ALL
     SELECT event_time, sid AS `supply_id`, 'd86'  AS `field_key`, d86
 AS `field_value` FROM baidu_log_view
)
GROUP BY
    `supply_id`, `field_key`, `field_value`, TUMBLE(event_time,
INTERVAL '5' MINUTE);


赵一旦 <[hidden email]> 于2020年12月15日周二 下午10:48写道:

>
> 需要,针对某个表,按照key1(xxx+yyy+ky1),key2(xxx+yyy+ky2),....等多组key统计。其中xxx+yyy为共同字段。目前有如下3种实现我。
> (1)每组key分别统计,分别insert。
> (2)每组key分别统计,然后union结果,然后insert。
> (3)针对表多次select,然后union,然后再基于key统计,然后insert。
> 第三种方案中,会将ky1、ky2这几个不同的字段通过
>
> select 'ky1' as key_name, ky1 as key_value
> union
> select 'ky2' as key_name, ky2 as key_value
>
> 的方式统一为key这个字段,最后通过(xxx+yyy+key_name+key_value)的方式统计。
>
> 目前发现个问题,方案3中,window结点一直没有watermark,导致不发生计算。
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: FlinkSQL多个select group by统计union后输出,或多个select union后在group by统计的区别,以及问题。

nobleyd
方案2没问题,方案3的window算子部分没有watermark。

赵一旦 <[hidden email]> 于2020年12月15日周二 下午10:49写道:

> 具体SQL如下。
> 方案2:
>
>
> INSERT INTO flink_sdk_stats
> (
>     SELECT
>         DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`,
>         sid                                                                        AS `supply_id`,
>         'd77'                                                                      AS `field_key`,
>         d77                                                                        AS `filed_value`,
>         count(1)                                                                   AS `pv`
>     FROM
>         baidu_log_view
>     GROUP BY
>         sid,
>         d77,
>         TUMBLE(event_time, INTERVAL '5' MINUTE)
>
>     UNION ALL
>
>     SELECT
>         DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`,
>         sid                                                                        AS `supply_id`,
>         'd79'                                                                      AS `field_key`,
>         d79                                                                        AS `filed_value`,
>         count(1)                                                                   AS `pv`
>     FROM
>         baidu_log_view
>     GROUP BY
>         sid,
>         d79,
>         TUMBLE(event_time, INTERVAL '5' MINUTE)
>
>     UNION ALL
>
>     SELECT
>         DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`,
>         sid                                                                        AS `supply_id`,
>         'd80'                                                                      AS `field_key`,
>         d80                                                                        AS `filed_value`,
>         count(1)                                                                   AS `pv`
>     FROM
>         baidu_log_view
>     GROUP BY
>         sid,
>         d80,
>         TUMBLE(event_time, INTERVAL '5' MINUTE)
>
>     UNION ALL
>
>     SELECT
>         DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`,
>         sid                                                                        AS `supply_id`,
>         'd81'                                                                      AS `field_key`,
>         d81                                                                        AS `filed_value`,
>         count(1)                                                                   AS `pv`
>     FROM
>         baidu_log_view
>     GROUP BY
>         sid,
>         d81,
>         TUMBLE(event_time, INTERVAL '5' MINUTE)
>
>     UNION ALL
>
>     SELECT
>         DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`,
>         sid                                                                        AS `supply_id`,
>         'd83'                                                                      AS `field_key`,
>         d83                                                                        AS `filed_value`,
>         count(1)                                                                   AS `pv`
>     FROM
>         baidu_log_view
>     GROUP BY
>         sid,
>         d83,
>         TUMBLE(event_time, INTERVAL '5' MINUTE)
>
>     UNION ALL
>
>     SELECT
>         DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`,
>         sid                                                                        AS `supply_id`,
>         'd84'                                                                      AS `field_key`,
>         d84                                                                        AS `filed_value`,
>         count(1)                                                                   AS `pv`
>     FROM
>         baidu_log_view
>     GROUP BY
>         sid,
>         d84,
>         TUMBLE(event_time, INTERVAL '5' MINUTE)
>
>     UNION ALL
>
>     SELECT
>         DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`,
>         sid                                                                        AS `supply_id`,
>         'd86'                                                                      AS `field_key`,
>         d86                                                                        AS `field_value`,
>         count(1)                                                                   AS `pv`
>     FROM
>         baidu_log_view
>     GROUP BY
>         sid,
>         d86,
>         TUMBLE(event_time, INTERVAL '5' MINUTE)
> );
>
>
>
> 方案3:
>
>
> INSERT INTO flink_sdk_stats
> SELECT
>     DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`,
>     `supply_id`,
>     `field_key`,
>     `field_value`,
>     count(1) AS `pv`
> FROM
>     (
>      SELECT event_time, sid AS `supply_id`, 'd107' AS `field_key`, d107 AS `field_value` FROM baidu_log_view
>      UNION ALL
>      SELECT event_time, sid AS `supply_id`, 'd77'  AS `field_key`, d77  AS `field_value` FROM baidu_log_view
>      UNION ALL
>      SELECT event_time, sid AS `supply_id`, 'd77'  AS `field_key`, d77  AS `field_value` FROM baidu_log_view
>      UNION ALL
>      SELECT event_time, sid AS `supply_id`, 'd79'  AS `field_key`, d79  AS `field_value` FROM baidu_log_view
>      UNION ALL
>      SELECT event_time, sid AS `supply_id`, 'd80'  AS `field_key`, d80  AS `field_value` FROM baidu_log_view
>      UNION ALL
>      SELECT event_time, sid AS `supply_id`, 'd81'  AS `field_key`, d81  AS `field_value` FROM baidu_log_view
>      UNION ALL
>      SELECT event_time, sid AS `supply_id`, 'd83'  AS `field_key`, d83  AS `field_value` FROM baidu_log_view
>      UNION ALL
>      SELECT event_time, sid AS `supply_id`, 'd84'  AS `field_key`, d84  AS `field_value` FROM baidu_log_view
>      UNION ALL
>      SELECT event_time, sid AS `supply_id`, 'd86'  AS `field_key`, d86  AS `field_value` FROM baidu_log_view
> )
> GROUP BY
>     `supply_id`, `field_key`, `field_value`, TUMBLE(event_time, INTERVAL '5' MINUTE);
>
>
> 赵一旦 <[hidden email]> 于2020年12月15日周二 下午10:48写道:
>
>>
>> 需要,针对某个表,按照key1(xxx+yyy+ky1),key2(xxx+yyy+ky2),....等多组key统计。其中xxx+yyy为共同字段。目前有如下3种实现我。
>> (1)每组key分别统计,分别insert。
>> (2)每组key分别统计,然后union结果,然后insert。
>> (3)针对表多次select,然后union,然后再基于key统计,然后insert。
>> 第三种方案中,会将ky1、ky2这几个不同的字段通过
>>
>> select 'ky1' as key_name, ky1 as key_value
>> union
>> select 'ky2' as key_name, ky2 as key_value
>>
>> 的方式统一为key这个字段,最后通过(xxx+yyy+key_name+key_value)的方式统计。
>>
>> 目前发现个问题,方案3中,window结点一直没有watermark,导致不发生计算。
>>
>>
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: FlinkSQL多个select group by统计union后输出,或多个select union后在group by统计的区别,以及问题。

nobleyd
有没有人懂啊。今天的新发现如下。
我看了下我的source结点的WEB-UI上展示的那个名字,然后在文本编辑器中划分了下。发现如下。
方案2:
Source: TableSourceScan(table=[[default_catalog, default_database,
baidu_log, watermark=[-(TO_TIMESTAMP(FROM_UNIXTIME(/(CASE(IS NOT
NULL($1), CAST($1):BIGINT NOT NULL, 0:BIGINT), 1000))), 60000:INTERVAL
SECOND)]]], fields=[cid, server_time, d])
 -> (
  Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'77')
IS NOT NULL CASE CAST((d ITEM _UTF-16LE'77')) CASE
_UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1,
Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE
CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time]),
  Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'79')
IS NOT NULL CASE CAST((d ITEM _UTF-16LE'79')) CASE
_UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1,
Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE
CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time]),
  Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'80')
IS NOT NULL CASE CAST((d ITEM _UTF-16LE'80')) CASE
_UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1,
Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE
CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time]),
  Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'81')
IS NOT NULL CASE CAST((d ITEM _UTF-16LE'81')) CASE
_UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1,
Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE
CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time]),
  Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'83')
IS NOT NULL CASE CAST((d ITEM _UTF-16LE'83')) CASE
_UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1,
Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE
CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time]),
  Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'84')
IS NOT NULL CASE CAST((d ITEM _UTF-16LE'84')) CASE
_UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1,
Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE
CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time]),
  Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'86')
IS NOT NULL CASE CAST((d ITEM _UTF-16LE'86')) CASE
_UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1,
Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE
CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time])
 )

方案3:
Source: TableSourceScan(table=[[default_catalog, default_database,
dr1, watermark=[-(TO_TIMESTAMP(FROM_UNIXTIME(/(CASE(IS NOT NULL($1),
CAST($1):BIGINT NOT NULL, 0:BIGINT), 1000))), 60000:INTERVAL
SECOND)]]], fields=[cid, server_time, d])
 -> (
  Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS
NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS
event_time, (d ITEM _UTF-16LE'106') AS su

pply_id, _UTF-16LE'd107':VARCHAR(4) CHARACTER SET "UTF-16LE" AS
field_key, ((d ITEM _UTF-16LE'107') IS NOT NULL CASE CAST((d ITEM
_UTF-16LE'107')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER
SET "UTF-16LE") AS field_value]),

  Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS
NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS
event_time, (d ITEM _UTF-16LE'106') AS supply_id,
_UTF-16LE'd77':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d
ITEM _UTF-16LE'77') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'77')) CASE
_UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS
field_value]),

  Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS
NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS
event_time, (d ITEM _UTF-16LE'106') AS supply_id,
_UTF-16LE'd79':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d
ITEM _UTF-16LE'79') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'79')) CASE
_UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS
field_value]),

  Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS
NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS
event_time, (d ITEM _UTF-16LE'106') AS supply_id,
_UTF-16LE'd80':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d
ITEM _UTF-16LE'80') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'80')) CASE
_UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS
field_value]),

  Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS
NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS
event_time, (d ITEM _UTF-16LE'106') AS supply_id,
_UTF-16LE'd81':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d
ITEM _UTF-16LE'81') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'81')) CASE
_UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS
field_value]),

  Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS
NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS
event_time, (d ITEM _UTF-16LE'106') AS supply_id,
_UTF-16LE'd83':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d
ITEM _UTF-16LE'83') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'83')) CASE
_UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS
field_value]),

  Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS
NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS
event_time, (d ITEM _UTF-16LE'106') AS supply_id,
_UTF-16LE'd84':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d
ITEM _UTF-16LE'84') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'84')) CASE
_UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS
field_value]),

  Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS
NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS
event_time, (d ITEM _UTF-16LE'106') AS supply_id,
_UTF-16LE'd86':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d
ITEM _UTF-16LE'86') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'86')) CASE
_UTF-16LE'NULL':VARCHAR(214
7483647) CHARACTER SET "UTF-16LE") AS field_value])
 )


赵一旦 <[hidden email]> 于2020年12月15日周二 下午10:50写道:

> 方案2没问题,方案3的window算子部分没有watermark。
>
> 赵一旦 <[hidden email]> 于2020年12月15日周二 下午10:49写道:
>
>> 具体SQL如下。
>> 方案2:
>>
>>
>> INSERT INTO flink_sdk_stats
>> (
>>     SELECT
>>         DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`,
>>         sid                                                                        AS `supply_id`,
>>         'd77'                                                                      AS `field_key`,
>>         d77                                                                        AS `filed_value`,
>>         count(1)                                                                   AS `pv`
>>     FROM
>>         baidu_log_view
>>     GROUP BY
>>         sid,
>>         d77,
>>         TUMBLE(event_time, INTERVAL '5' MINUTE)
>>
>>     UNION ALL
>>
>>     SELECT
>>         DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`,
>>         sid                                                                        AS `supply_id`,
>>         'd79'                                                                      AS `field_key`,
>>         d79                                                                        AS `filed_value`,
>>         count(1)                                                                   AS `pv`
>>     FROM
>>         baidu_log_view
>>     GROUP BY
>>         sid,
>>         d79,
>>         TUMBLE(event_time, INTERVAL '5' MINUTE)
>>
>>     UNION ALL
>>
>>     SELECT
>>         DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`,
>>         sid                                                                        AS `supply_id`,
>>         'd80'                                                                      AS `field_key`,
>>         d80                                                                        AS `filed_value`,
>>         count(1)                                                                   AS `pv`
>>     FROM
>>         baidu_log_view
>>     GROUP BY
>>         sid,
>>         d80,
>>         TUMBLE(event_time, INTERVAL '5' MINUTE)
>>
>>     UNION ALL
>>
>>     SELECT
>>         DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`,
>>         sid                                                                        AS `supply_id`,
>>         'd81'                                                                      AS `field_key`,
>>         d81                                                                        AS `filed_value`,
>>         count(1)                                                                   AS `pv`
>>     FROM
>>         baidu_log_view
>>     GROUP BY
>>         sid,
>>         d81,
>>         TUMBLE(event_time, INTERVAL '5' MINUTE)
>>
>>     UNION ALL
>>
>>     SELECT
>>         DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`,
>>         sid                                                                        AS `supply_id`,
>>         'd83'                                                                      AS `field_key`,
>>         d83                                                                        AS `filed_value`,
>>         count(1)                                                                   AS `pv`
>>     FROM
>>         baidu_log_view
>>     GROUP BY
>>         sid,
>>         d83,
>>         TUMBLE(event_time, INTERVAL '5' MINUTE)
>>
>>     UNION ALL
>>
>>     SELECT
>>         DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`,
>>         sid                                                                        AS `supply_id`,
>>         'd84'                                                                      AS `field_key`,
>>         d84                                                                        AS `filed_value`,
>>         count(1)                                                                   AS `pv`
>>     FROM
>>         baidu_log_view
>>     GROUP BY
>>         sid,
>>         d84,
>>         TUMBLE(event_time, INTERVAL '5' MINUTE)
>>
>>     UNION ALL
>>
>>     SELECT
>>         DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`,
>>         sid                                                                        AS `supply_id`,
>>         'd86'                                                                      AS `field_key`,
>>         d86                                                                        AS `field_value`,
>>         count(1)                                                                   AS `pv`
>>     FROM
>>         baidu_log_view
>>     GROUP BY
>>         sid,
>>         d86,
>>         TUMBLE(event_time, INTERVAL '5' MINUTE)
>> );
>>
>>
>>
>> 方案3:
>>
>>
>> INSERT INTO flink_sdk_stats
>> SELECT
>>     DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`,
>>     `supply_id`,
>>     `field_key`,
>>     `field_value`,
>>     count(1) AS `pv`
>> FROM
>>     (
>>      SELECT event_time, sid AS `supply_id`, 'd107' AS `field_key`, d107 AS `field_value` FROM baidu_log_view
>>      UNION ALL
>>      SELECT event_time, sid AS `supply_id`, 'd77'  AS `field_key`, d77  AS `field_value` FROM baidu_log_view
>>      UNION ALL
>>      SELECT event_time, sid AS `supply_id`, 'd77'  AS `field_key`, d77  AS `field_value` FROM baidu_log_view
>>      UNION ALL
>>      SELECT event_time, sid AS `supply_id`, 'd79'  AS `field_key`, d79  AS `field_value` FROM baidu_log_view
>>      UNION ALL
>>      SELECT event_time, sid AS `supply_id`, 'd80'  AS `field_key`, d80  AS `field_value` FROM baidu_log_view
>>      UNION ALL
>>      SELECT event_time, sid AS `supply_id`, 'd81'  AS `field_key`, d81  AS `field_value` FROM baidu_log_view
>>      UNION ALL
>>      SELECT event_time, sid AS `supply_id`, 'd83'  AS `field_key`, d83  AS `field_value` FROM baidu_log_view
>>      UNION ALL
>>      SELECT event_time, sid AS `supply_id`, 'd84'  AS `field_key`, d84  AS `field_value` FROM baidu_log_view
>>      UNION ALL
>>      SELECT event_time, sid AS `supply_id`, 'd86'  AS `field_key`, d86  AS `field_value` FROM baidu_log_view
>> )
>> GROUP BY
>>     `supply_id`, `field_key`, `field_value`, TUMBLE(event_time, INTERVAL '5' MINUTE);
>>
>>
>> 赵一旦 <[hidden email]> 于2020年12月15日周二 下午10:48写道:
>>
>>>
>>> 需要,针对某个表,按照key1(xxx+yyy+ky1),key2(xxx+yyy+ky2),....等多组key统计。其中xxx+yyy为共同字段。目前有如下3种实现我。
>>> (1)每组key分别统计,分别insert。
>>> (2)每组key分别统计,然后union结果,然后insert。
>>> (3)针对表多次select,然后union,然后再基于key统计,然后insert。
>>> 第三种方案中,会将ky1、ky2这几个不同的字段通过
>>>
>>> select 'ky1' as key_name, ky1 as key_value
>>> union
>>> select 'ky2' as key_name, ky2 as key_value
>>>
>>> 的方式统一为key这个字段,最后通过(xxx+yyy+key_name+key_value)的方式统计。
>>>
>>> 目前发现个问题,方案3中,window结点一直没有watermark,导致不发生计算。
>>>
>>>
>>>
>>>
Reply | Threaded
Open this post in threaded view
|

Re: FlinkSQL多个select group by统计union后输出,或多个select union后在group by统计的区别,以及问题。

nobleyd
从这2个方案的source结点来看没有太大区别。但问题在于,我从web-ui的metric标签查看outputwatermark的时候。发现方案2中0号并行实例存在8个带有outputwatermark的指标(1个source开头,7个calc开头)。方案3中则只有2个。

赵一旦 <[hidden email]> 于2020年12月16日周三 上午10:41写道:

> 有没有人懂啊。今天的新发现如下。
> 我看了下我的source结点的WEB-UI上展示的那个名字,然后在文本编辑器中划分了下。发现如下。
> 方案2:
>
> Source: TableSourceScan(table=[[default_catalog, default_database, baidu_log, watermark=[-(TO_TIMESTAMP(FROM_UNIXTIME(/(CASE(IS NOT NULL($1), CAST($1):BIGINT NOT NULL, 0:BIGINT), 1000))), 60000:INTERVAL SECOND)]]], fields=[cid, server_time, d])
>  -> (
>
>   Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'77') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'77')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time]),
>
>   Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'79') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'79')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time]),
>
>   Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'80') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'80')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time]),
>
>   Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'81') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'81')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time]),
>
>   Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'83') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'83')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time]),
>
>   Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'84') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'84')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time]),
>
>   Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'86') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'86')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time])
>  )
>
> 方案3:
>
> Source: TableSourceScan(table=[[default_catalog, default_database, dr1, watermark=[-(TO_TIMESTAMP(FROM_UNIXTIME(/(CASE(IS NOT NULL($1), CAST($1):BIGINT NOT NULL, 0:BIGINT), 1000))), 60000:INTERVAL SECOND)]]], fields=[cid, server_time, d])
>  -> (
>
>   Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time, (d ITEM _UTF-16LE'106') AS su
>
> pply_id, _UTF-16LE'd107':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'107') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'107')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS field_value]),
>
>
>   Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time, (d ITEM _UTF-16LE'106') AS supply_id, _UTF-16LE'd77':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'77') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'77')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS field_value]),
>
>
>   Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time, (d ITEM _UTF-16LE'106') AS supply_id, _UTF-16LE'd79':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'79') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'79')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS field_value]),
>
>
>   Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time, (d ITEM _UTF-16LE'106') AS supply_id, _UTF-16LE'd80':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'80') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'80')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS field_value]),
>
>
>   Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time, (d ITEM _UTF-16LE'106') AS supply_id, _UTF-16LE'd81':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'81') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'81')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS field_value]),
>
>
>   Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time, (d ITEM _UTF-16LE'106') AS supply_id, _UTF-16LE'd83':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'83') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'83')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS field_value]),
>
>
>   Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time, (d ITEM _UTF-16LE'106') AS supply_id, _UTF-16LE'd84':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'84') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'84')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS field_value]),
>
>
>   Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time, (d ITEM _UTF-16LE'106') AS supply_id, _UTF-16LE'd86':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'86') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'86')) CASE _UTF-16LE'NULL':VARCHAR(214
> 7483647) CHARACTER SET "UTF-16LE") AS field_value])
>  )
>
>
> 赵一旦 <[hidden email]> 于2020年12月15日周二 下午10:50写道:
>
>> 方案2没问题,方案3的window算子部分没有watermark。
>>
>> 赵一旦 <[hidden email]> 于2020年12月15日周二 下午10:49写道:
>>
>>> 具体SQL如下。
>>> 方案2:
>>>
>>>
>>> INSERT INTO flink_sdk_stats
>>> (
>>>     SELECT
>>>         DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`,
>>>         sid                                                                        AS `supply_id`,
>>>         'd77'                                                                      AS `field_key`,
>>>         d77                                                                        AS `filed_value`,
>>>         count(1)                                                                   AS `pv`
>>>     FROM
>>>         baidu_log_view
>>>     GROUP BY
>>>         sid,
>>>         d77,
>>>         TUMBLE(event_time, INTERVAL '5' MINUTE)
>>>
>>>     UNION ALL
>>>
>>>     SELECT
>>>         DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`,
>>>         sid                                                                        AS `supply_id`,
>>>         'd79'                                                                      AS `field_key`,
>>>         d79                                                                        AS `filed_value`,
>>>         count(1)                                                                   AS `pv`
>>>     FROM
>>>         baidu_log_view
>>>     GROUP BY
>>>         sid,
>>>         d79,
>>>         TUMBLE(event_time, INTERVAL '5' MINUTE)
>>>
>>>     UNION ALL
>>>
>>>     SELECT
>>>         DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`,
>>>         sid                                                                        AS `supply_id`,
>>>         'd80'                                                                      AS `field_key`,
>>>         d80                                                                        AS `filed_value`,
>>>         count(1)                                                                   AS `pv`
>>>     FROM
>>>         baidu_log_view
>>>     GROUP BY
>>>         sid,
>>>         d80,
>>>         TUMBLE(event_time, INTERVAL '5' MINUTE)
>>>
>>>     UNION ALL
>>>
>>>     SELECT
>>>         DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`,
>>>         sid                                                                        AS `supply_id`,
>>>         'd81'                                                                      AS `field_key`,
>>>         d81                                                                        AS `filed_value`,
>>>         count(1)                                                                   AS `pv`
>>>     FROM
>>>         baidu_log_view
>>>     GROUP BY
>>>         sid,
>>>         d81,
>>>         TUMBLE(event_time, INTERVAL '5' MINUTE)
>>>
>>>     UNION ALL
>>>
>>>     SELECT
>>>         DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`,
>>>         sid                                                                        AS `supply_id`,
>>>         'd83'                                                                      AS `field_key`,
>>>         d83                                                                        AS `filed_value`,
>>>         count(1)                                                                   AS `pv`
>>>     FROM
>>>         baidu_log_view
>>>     GROUP BY
>>>         sid,
>>>         d83,
>>>         TUMBLE(event_time, INTERVAL '5' MINUTE)
>>>
>>>     UNION ALL
>>>
>>>     SELECT
>>>         DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`,
>>>         sid                                                                        AS `supply_id`,
>>>         'd84'                                                                      AS `field_key`,
>>>         d84                                                                        AS `filed_value`,
>>>         count(1)                                                                   AS `pv`
>>>     FROM
>>>         baidu_log_view
>>>     GROUP BY
>>>         sid,
>>>         d84,
>>>         TUMBLE(event_time, INTERVAL '5' MINUTE)
>>>
>>>     UNION ALL
>>>
>>>     SELECT
>>>         DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`,
>>>         sid                                                                        AS `supply_id`,
>>>         'd86'                                                                      AS `field_key`,
>>>         d86                                                                        AS `field_value`,
>>>         count(1)                                                                   AS `pv`
>>>     FROM
>>>         baidu_log_view
>>>     GROUP BY
>>>         sid,
>>>         d86,
>>>         TUMBLE(event_time, INTERVAL '5' MINUTE)
>>> );
>>>
>>>
>>>
>>> 方案3:
>>>
>>>
>>> INSERT INTO flink_sdk_stats
>>> SELECT
>>>     DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`,
>>>     `supply_id`,
>>>     `field_key`,
>>>     `field_value`,
>>>     count(1) AS `pv`
>>> FROM
>>>     (
>>>      SELECT event_time, sid AS `supply_id`, 'd107' AS `field_key`, d107 AS `field_value` FROM baidu_log_view
>>>      UNION ALL
>>>      SELECT event_time, sid AS `supply_id`, 'd77'  AS `field_key`, d77  AS `field_value` FROM baidu_log_view
>>>      UNION ALL
>>>      SELECT event_time, sid AS `supply_id`, 'd77'  AS `field_key`, d77  AS `field_value` FROM baidu_log_view
>>>      UNION ALL
>>>      SELECT event_time, sid AS `supply_id`, 'd79'  AS `field_key`, d79  AS `field_value` FROM baidu_log_view
>>>      UNION ALL
>>>      SELECT event_time, sid AS `supply_id`, 'd80'  AS `field_key`, d80  AS `field_value` FROM baidu_log_view
>>>      UNION ALL
>>>      SELECT event_time, sid AS `supply_id`, 'd81'  AS `field_key`, d81  AS `field_value` FROM baidu_log_view
>>>      UNION ALL
>>>      SELECT event_time, sid AS `supply_id`, 'd83'  AS `field_key`, d83  AS `field_value` FROM baidu_log_view
>>>      UNION ALL
>>>      SELECT event_time, sid AS `supply_id`, 'd84'  AS `field_key`, d84  AS `field_value` FROM baidu_log_view
>>>      UNION ALL
>>>      SELECT event_time, sid AS `supply_id`, 'd86'  AS `field_key`, d86  AS `field_value` FROM baidu_log_view
>>> )
>>> GROUP BY
>>>     `supply_id`, `field_key`, `field_value`, TUMBLE(event_time, INTERVAL '5' MINUTE);
>>>
>>>
>>> 赵一旦 <[hidden email]> 于2020年12月15日周二 下午10:48写道:
>>>
>>>>
>>>> 需要,针对某个表,按照key1(xxx+yyy+ky1),key2(xxx+yyy+ky2),....等多组key统计。其中xxx+yyy为共同字段。目前有如下3种实现我。
>>>> (1)每组key分别统计,分别insert。
>>>> (2)每组key分别统计,然后union结果,然后insert。
>>>> (3)针对表多次select,然后union,然后再基于key统计,然后insert。
>>>> 第三种方案中,会将ky1、ky2这几个不同的字段通过
>>>>
>>>> select 'ky1' as key_name, ky1 as key_value
>>>> union
>>>> select 'ky2' as key_name, ky2 as key_value
>>>>
>>>> 的方式统一为key这个字段,最后通过(xxx+yyy+key_name+key_value)的方式统计。
>>>>
>>>> 目前发现个问题,方案3中,window结点一直没有watermark,导致不发生计算。
>>>>
>>>>
>>>>
>>>>
Reply | Threaded
Open this post in threaded view
|

Re: FlinkSQL多个select group by统计union后输出,或多个select union后在group by统计的区别,以及问题。

nobleyd
为了方便描述,重新给出了完整SQL,以及部分分析到如下地址。

https://www.yuque.com/sixhours-gid0m/eegye3/xrz2mm

欢迎大家帮忙解答。

赵一旦 <[hidden email]> 于2020年12月16日周三 上午10:52写道:

>
> 从这2个方案的source结点来看没有太大区别。但问题在于,我从web-ui的metric标签查看outputwatermark的时候。发现方案2中0号并行实例存在8个带有outputwatermark的指标(1个source开头,7个calc开头)。方案3中则只有2个。
>
> 赵一旦 <[hidden email]> 于2020年12月16日周三 上午10:41写道:
>
>> 有没有人懂啊。今天的新发现如下。
>> 我看了下我的source结点的WEB-UI上展示的那个名字,然后在文本编辑器中划分了下。发现如下。
>> 方案2:
>>
>> Source: TableSourceScan(table=[[default_catalog, default_database, baidu_log, watermark=[-(TO_TIMESTAMP(FROM_UNIXTIME(/(CASE(IS NOT NULL($1), CAST($1):BIGINT NOT NULL, 0:BIGINT), 1000))), 60000:INTERVAL SECOND)]]], fields=[cid, server_time, d])
>>  -> (
>>
>>   Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'77') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'77')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time]),
>>
>>   Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'79') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'79')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time]),
>>
>>   Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'80') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'80')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time]),
>>
>>   Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'81') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'81')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time]),
>>
>>   Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'83') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'83')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time]),
>>
>>   Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'84') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'84')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time]),
>>
>>   Calc(select=[(d ITEM _UTF-16LE'106') AS $f0, ((d ITEM _UTF-16LE'86') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'86')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $f1, Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time])
>>  )
>>
>> 方案3:
>>
>> Source: TableSourceScan(table=[[default_catalog, default_database, dr1, watermark=[-(TO_TIMESTAMP(FROM_UNIXTIME(/(CASE(IS NOT NULL($1), CAST($1):BIGINT NOT NULL, 0:BIGINT), 1000))), 60000:INTERVAL SECOND)]]], fields=[cid, server_time, d])
>>  -> (
>>
>>   Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time, (d ITEM _UTF-16LE'106') AS su
>>
>> pply_id, _UTF-16LE'd107':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'107') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'107')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS field_value]),
>>
>>
>>   Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time, (d ITEM _UTF-16LE'106') AS supply_id, _UTF-16LE'd77':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'77') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'77')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS field_value]),
>>
>>
>>   Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time, (d ITEM _UTF-16LE'106') AS supply_id, _UTF-16LE'd79':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'79') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'79')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS field_value]),
>>
>>
>>   Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time, (d ITEM _UTF-16LE'106') AS supply_id, _UTF-16LE'd80':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'80') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'80')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS field_value]),
>>
>>
>>   Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time, (d ITEM _UTF-16LE'106') AS supply_id, _UTF-16LE'd81':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'81') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'81')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS field_value]),
>>
>>
>>   Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time, (d ITEM _UTF-16LE'106') AS supply_id, _UTF-16LE'd83':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'83') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'83')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS field_value]),
>>
>>
>>   Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time, (d ITEM _UTF-16LE'106') AS supply_id, _UTF-16LE'd84':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'84') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'84')) CASE _UTF-16LE'NULL':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS field_value]),
>>
>>
>>   Calc(select=[Reinterpret(TO_TIMESTAMP(FROM_UNIXTIME(((server_time IS NOT NULL CASE CAST(server_time) CASE 0:BIGINT) / 1000)))) AS event_time, (d ITEM _UTF-16LE'106') AS supply_id, _UTF-16LE'd86':VARCHAR(4) CHARACTER SET "UTF-16LE" AS field_key, ((d ITEM _UTF-16LE'86') IS NOT NULL CASE CAST((d ITEM _UTF-16LE'86')) CASE _UTF-16LE'NULL':VARCHAR(214
>> 7483647) CHARACTER SET "UTF-16LE") AS field_value])
>>  )
>>
>>
>> 赵一旦 <[hidden email]> 于2020年12月15日周二 下午10:50写道:
>>
>>> 方案2没问题,方案3的window算子部分没有watermark。
>>>
>>> 赵一旦 <[hidden email]> 于2020年12月15日周二 下午10:49写道:
>>>
>>>> 具体SQL如下。
>>>> 方案2:
>>>>
>>>>
>>>> INSERT INTO flink_sdk_stats
>>>> (
>>>>     SELECT
>>>>         DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`,
>>>>         sid                                                                        AS `supply_id`,
>>>>         'd77'                                                                      AS `field_key`,
>>>>         d77                                                                        AS `filed_value`,
>>>>         count(1)                                                                   AS `pv`
>>>>     FROM
>>>>         baidu_log_view
>>>>     GROUP BY
>>>>         sid,
>>>>         d77,
>>>>         TUMBLE(event_time, INTERVAL '5' MINUTE)
>>>>
>>>>     UNION ALL
>>>>
>>>>     SELECT
>>>>         DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`,
>>>>         sid                                                                        AS `supply_id`,
>>>>         'd79'                                                                      AS `field_key`,
>>>>         d79                                                                        AS `filed_value`,
>>>>         count(1)                                                                   AS `pv`
>>>>     FROM
>>>>         baidu_log_view
>>>>     GROUP BY
>>>>         sid,
>>>>         d79,
>>>>         TUMBLE(event_time, INTERVAL '5' MINUTE)
>>>>
>>>>     UNION ALL
>>>>
>>>>     SELECT
>>>>         DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`,
>>>>         sid                                                                        AS `supply_id`,
>>>>         'd80'                                                                      AS `field_key`,
>>>>         d80                                                                        AS `filed_value`,
>>>>         count(1)                                                                   AS `pv`
>>>>     FROM
>>>>         baidu_log_view
>>>>     GROUP BY
>>>>         sid,
>>>>         d80,
>>>>         TUMBLE(event_time, INTERVAL '5' MINUTE)
>>>>
>>>>     UNION ALL
>>>>
>>>>     SELECT
>>>>         DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`,
>>>>         sid                                                                        AS `supply_id`,
>>>>         'd81'                                                                      AS `field_key`,
>>>>         d81                                                                        AS `filed_value`,
>>>>         count(1)                                                                   AS `pv`
>>>>     FROM
>>>>         baidu_log_view
>>>>     GROUP BY
>>>>         sid,
>>>>         d81,
>>>>         TUMBLE(event_time, INTERVAL '5' MINUTE)
>>>>
>>>>     UNION ALL
>>>>
>>>>     SELECT
>>>>         DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`,
>>>>         sid                                                                        AS `supply_id`,
>>>>         'd83'                                                                      AS `field_key`,
>>>>         d83                                                                        AS `filed_value`,
>>>>         count(1)                                                                   AS `pv`
>>>>     FROM
>>>>         baidu_log_view
>>>>     GROUP BY
>>>>         sid,
>>>>         d83,
>>>>         TUMBLE(event_time, INTERVAL '5' MINUTE)
>>>>
>>>>     UNION ALL
>>>>
>>>>     SELECT
>>>>         DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`,
>>>>         sid                                                                        AS `supply_id`,
>>>>         'd84'                                                                      AS `field_key`,
>>>>         d84                                                                        AS `filed_value`,
>>>>         count(1)                                                                   AS `pv`
>>>>     FROM
>>>>         baidu_log_view
>>>>     GROUP BY
>>>>         sid,
>>>>         d84,
>>>>         TUMBLE(event_time, INTERVAL '5' MINUTE)
>>>>
>>>>     UNION ALL
>>>>
>>>>     SELECT
>>>>         DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`,
>>>>         sid                                                                        AS `supply_id`,
>>>>         'd86'                                                                      AS `field_key`,
>>>>         d86                                                                        AS `field_value`,
>>>>         count(1)                                                                   AS `pv`
>>>>     FROM
>>>>         baidu_log_view
>>>>     GROUP BY
>>>>         sid,
>>>>         d86,
>>>>         TUMBLE(event_time, INTERVAL '5' MINUTE)
>>>> );
>>>>
>>>>
>>>>
>>>> 方案3:
>>>>
>>>>
>>>> INSERT INTO flink_sdk_stats
>>>> SELECT
>>>>     DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') AS `time`,
>>>>     `supply_id`,
>>>>     `field_key`,
>>>>     `field_value`,
>>>>     count(1) AS `pv`
>>>> FROM
>>>>     (
>>>>      SELECT event_time, sid AS `supply_id`, 'd107' AS `field_key`, d107 AS `field_value` FROM baidu_log_view
>>>>      UNION ALL
>>>>      SELECT event_time, sid AS `supply_id`, 'd77'  AS `field_key`, d77  AS `field_value` FROM baidu_log_view
>>>>      UNION ALL
>>>>      SELECT event_time, sid AS `supply_id`, 'd77'  AS `field_key`, d77  AS `field_value` FROM baidu_log_view
>>>>      UNION ALL
>>>>      SELECT event_time, sid AS `supply_id`, 'd79'  AS `field_key`, d79  AS `field_value` FROM baidu_log_view
>>>>      UNION ALL
>>>>      SELECT event_time, sid AS `supply_id`, 'd80'  AS `field_key`, d80  AS `field_value` FROM baidu_log_view
>>>>      UNION ALL
>>>>      SELECT event_time, sid AS `supply_id`, 'd81'  AS `field_key`, d81  AS `field_value` FROM baidu_log_view
>>>>      UNION ALL
>>>>      SELECT event_time, sid AS `supply_id`, 'd83'  AS `field_key`, d83  AS `field_value` FROM baidu_log_view
>>>>      UNION ALL
>>>>      SELECT event_time, sid AS `supply_id`, 'd84'  AS `field_key`, d84  AS `field_value` FROM baidu_log_view
>>>>      UNION ALL
>>>>      SELECT event_time, sid AS `supply_id`, 'd86'  AS `field_key`, d86  AS `field_value` FROM baidu_log_view
>>>> )
>>>> GROUP BY
>>>>     `supply_id`, `field_key`, `field_value`, TUMBLE(event_time, INTERVAL '5' MINUTE);
>>>>
>>>>
>>>> 赵一旦 <[hidden email]> 于2020年12月15日周二 下午10:48写道:
>>>>
>>>>>
>>>>> 需要,针对某个表,按照key1(xxx+yyy+ky1),key2(xxx+yyy+ky2),....等多组key统计。其中xxx+yyy为共同字段。目前有如下3种实现我。
>>>>> (1)每组key分别统计,分别insert。
>>>>> (2)每组key分别统计,然后union结果,然后insert。
>>>>> (3)针对表多次select,然后union,然后再基于key统计,然后insert。
>>>>> 第三种方案中,会将ky1、ky2这几个不同的字段通过
>>>>>
>>>>> select 'ky1' as key_name, ky1 as key_value
>>>>> union
>>>>> select 'ky2' as key_name, ky2 as key_value
>>>>>
>>>>> 的方式统一为key这个字段,最后通过(xxx+yyy+key_name+key_value)的方式统计。
>>>>>
>>>>> 目前发现个问题,方案3中,window结点一直没有watermark,导致不发生计算。
>>>>>
>>>>>
>>>>>
>>>>>