flink-CDC client 一对多问题

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

flink-CDC client 一对多问题

Li,Qian(DXM,PB)
请问:

我在使用Flink CDC SQL CLI的时候,想将关联的两张表的一对多关系 映射成ARRAY[ROW(userBankNo,createTime)]的形式,要怎么映射呢?
  表aa
  id, userId
  表 bb
  userId,userBankNo,createTime

select  * from aa as a left join bb as b where a.userId=b.userId

谢谢!

Reply | Threaded
Open this post in threaded view
|

Re: flink-CDC client 一对多问题

Jark
Administrator
你可以通过 groupby collect 来将一对多的关系聚合起来,代码类似如下:

select userId, collect(userBankTime)
from (
  select  userId, concat(userBankNo, '_', createTime) as userBankTime
  from aa as a left join bb as b where a.userId=b.userId
) group by userId;


Best,
Jark

On Mon, 21 Sep 2020 at 12:20, Li,Qian(DXM,PB) <[hidden email]> wrote:

> 请问:
>
> 我在使用Flink CDC SQL CLI的时候,想将关联的两张表的一对多关系
> 映射成ARRAY[ROW(userBankNo,createTime)]的形式,要怎么映射呢?
>   表aa
>   id, userId
>   表 bb
>   userId,userBankNo,createTime
>
> select  * from aa as a left join bb as b where a.userId=b.userId
>
> 谢谢!
>
>
Reply | Threaded
Open this post in threaded view
|

Re: flink-CDC client 一对多问题

silence-2
In reply to this post by Li,Qian(DXM,PB)
可以写一个group_array的udaf
select  *  from aa as a left join (
        select userId,group_array(row(userId, userBankNo, userBankNo)) from bb
group by userId
) as b where a.userId=b.userId




--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: [DKIM Failure] Re: flink-CDC client 一对多问题

Li,Qian(DXM,PB)
In reply to this post by Jark
Fink sql cdc client 好多sql函数都不支持呀,比如CONCAT(a,b,c) 或者 GROUP_CONCAT(a,b)

是我还需要引入什么包么?如果支持的话,上面说的这种格式就可以用GROUP_CONCAT 很好的解决了,

另外Fink sql cdc client只支持部分sql操作么?具体支持那些函数,有相关的官方文档么?


在 2020/9/21 下午1:42,“Jark Wu”<[hidden email]> 写入:

    你可以通过 groupby collect 来将一对多的关系聚合起来,代码类似如下:
   
    select userId, collect(userBankTime)
    from (
      select  userId, concat(userBankNo, '_', createTime) as userBankTime
      from aa as a left join bb as b where a.userId=b.userId
    ) group by userId;
   
   
    Best,
    Jark
   
    On Mon, 21 Sep 2020 at 12:20, Li,Qian(DXM,PB) <[hidden email]> wrote:
   
    > 请问:
    >
    > 我在使用Flink CDC SQL CLI的时候,想将关联的两张表的一对多关系
    > 映射成ARRAY[ROW(userBankNo,createTime)]的形式,要怎么映射呢?
    >   表aa
    >   id, userId
    >   表 bb
    >   userId,userBankNo,createTime
    >
    > select  * from aa as a left join bb as b where a.userId=b.userId
    >
    > 谢谢!
    >
    >
   

Reply | Threaded
Open this post in threaded view
|

Re: [DKIM Failure] Re: flink-CDC client 一对多问题

Jark
Administrator
1. concat 是内置函数,可以直接用。
2. 内置函数中没有 group_concat,不过有个类似功能的 listagg
3. 内置函数,不需要额外引入包,可以直接使用。内置函数列表请查看官方文档 [1]
4. 没有所谓的 flink sql cdc client, 只有 flink sql client,cdc 是其支持的一个功能。
5. 不清楚你说的只支持部分 sql 操作是指哪些 sql 操作不支持? 具体支持的 sql 操作,请查看官网[2].

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html
[2]:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html

On Mon, 21 Sep 2020 at 14:59, Li,Qian(DXM,PB) <[hidden email]> wrote:

> Fink sql cdc client 好多sql函数都不支持呀,比如CONCAT(a,b,c) 或者 GROUP_CONCAT(a,b)
>
> 是我还需要引入什么包么?如果支持的话,上面说的这种格式就可以用GROUP_CONCAT 很好的解决了,
>
> 另外Fink sql cdc client只支持部分sql操作么?具体支持那些函数,有相关的官方文档么?
>
>
> 在 2020/9/21 下午1:42,“Jark Wu”<[hidden email]> 写入:
>
>     你可以通过 groupby collect 来将一对多的关系聚合起来,代码类似如下:
>
>     select userId, collect(userBankTime)
>     from (
>       select  userId, concat(userBankNo, '_', createTime) as userBankTime
>       from aa as a left join bb as b where a.userId=b.userId
>     ) group by userId;
>
>
>     Best,
>     Jark
>
>     On Mon, 21 Sep 2020 at 12:20, Li,Qian(DXM,PB) <[hidden email]>
> wrote:
>
>     > 请问:
>     >
>     > 我在使用Flink CDC SQL CLI的时候,想将关联的两张表的一对多关系
>     > 映射成ARRAY[ROW(userBankNo,createTime)]的形式,要怎么映射呢?
>     >   表aa
>     >   id, userId
>     >   表 bb
>     >   userId,userBankNo,createTime
>     >
>     > select  * from aa as a left join bb as b where a.userId=b.userId
>     >
>     > 谢谢!
>     >
>     >
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DKIM Failure] Re: [DKIM Failure] Re: flink-CDC client 一对多问题

Li,Qian(DXM,PB)
你好:我按照上述方法进行测试,往es插入数据的时候一直报这个问题,是哪里有问题么?

create view uu AS select collect(userBankTime) as userBank ,name from ( select  name, row(name,description) as userBankTime from tt_haha) group by name; // 创建视图,描述userBank类型是MULTITYPE

Flink SQL> desc uu;
root
 |-- userBank: MULTISET<ROW<`EXPR$0` STRING, `EXPR$1` STRING> NOT NULL> NOT NULL
 |-- name: STRING

CREATE TABLE user_log_sink_10 (
>     name STRING,
>     maps MULTISET<ROW<name STRING,description STRING>>,
>     PRIMARY KEY (name) NOT ENFORCED
> ) WITH (
>     'connector' = 'elasticsearch-6',
>     'hosts' = 'xxxx',
>     'index' = 'enriched_orders',
>     'document-type' = 'user'
> );

insert into user_log_sink_10 select name, collect(userBankTime) as maps from (select name, row(name,description) as userBankTime from tt_haha) group by name;

[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassCastException: org.apache.flink.table.types.logical.MultisetType cannot be cast to org.apache.flink.table.types.logical.MapType

还有select name, collect(userBankTime) as maps from (select name, row(name,description) as userBankTime from tt_haha) group by name 出来的结果是
test123        {test123,123123=1}
123123        {123123,ff=1, 123123,kk=2}
这种的,在flink sql中是什么格式呢?

|--------------------|

在 2020/9/21 下午3:05,“Jark Wu”<[hidden email]> 写入:

    1. concat 是内置函数,可以直接用。
    2. 内置函数中没有 group_concat,不过有个类似功能的 listagg
    3. 内置函数,不需要额外引入包,可以直接使用。内置函数列表请查看官方文档 [1]
    4. 没有所谓的 flink sql cdc client, 只有 flink sql client,cdc 是其支持的一个功能。
    5. 不清楚你说的只支持部分 sql 操作是指哪些 sql 操作不支持? 具体支持的 sql 操作,请查看官网[2].
   
    Best,
    Jark
   
    [1]:
    https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html
    [2]:
    https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html
   
    On Mon, 21 Sep 2020 at 14:59, Li,Qian(DXM,PB) <[hidden email]> wrote:
   
    > Fink sql cdc client 好多sql函数都不支持呀,比如CONCAT(a,b,c) 或者 GROUP_CONCAT(a,b)
    >
    > 是我还需要引入什么包么?如果支持的话,上面说的这种格式就可以用GROUP_CONCAT 很好的解决了,
    >
    > 另外Fink sql cdc client只支持部分sql操作么?具体支持那些函数,有相关的官方文档么?
    >
    >
    > 在 2020/9/21 下午1:42,“Jark Wu”<[hidden email]> 写入:
    >
    >     你可以通过 groupby collect 来将一对多的关系聚合起来,代码类似如下:
    >
    >     select userId, collect(userBankTime)
    >     from (
    >       select  userId, concat(userBankNo, '_', createTime) as userBankTime
    >       from aa as a left join bb as b where a.userId=b.userId
    >     ) group by userId;
    >
    >
    >     Best,
    >     Jark
    >
    >     On Mon, 21 Sep 2020 at 12:20, Li,Qian(DXM,PB) <[hidden email]>
    > wrote:
    >
    >     > 请问:
    >     >
    >     > 我在使用Flink CDC SQL CLI的时候,想将关联的两张表的一对多关系
    >     > 映射成ARRAY[ROW(userBankNo,createTime)]的形式,要怎么映射呢?
    >     >   表aa
    >     >   id, userId
    >     >   表 bb
    >     >   userId,userBankNo,createTime
    >     >
    >     > select  * from aa as a left join bb as b where a.userId=b.userId
    >     >
    >     > 谢谢!
    >     >
    >     >
    >
    >
    >
   

Reply | Threaded
Open this post in threaded view
|

Re: [DKIM Failure] Re: [DKIM Failure] Re: flink-CDC client 一对多问题

Jark
Administrator
1. 你用的是什么版本呢?我在最新的版本上试了下,没有报错呢。试下最新版本?
2. {123123,ff=1, 123123,kk=2} 就是 MULTISET 打印出来的结构哈, 就是 {key1=cnt, key2=cnt}
其中 123123,ff 是 row#toString。

Best,
Jark

On Tue, 22 Sep 2020 at 19:28, Li,Qian(DXM,PB) <[hidden email]> wrote:

> 你好:我按照上述方法进行测试,往es插入数据的时候一直报这个问题,是哪里有问题么?
>
> create view uu AS select collect(userBankTime) as userBank ,name from (
> select  name, row(name,description) as userBankTime from tt_haha) group by
> name; // 创建视图,描述userBank类型是MULTITYPE
>
> Flink SQL> desc uu;
> root
>  |-- userBank: MULTISET<ROW<`EXPR$0` STRING, `EXPR$1` STRING> NOT NULL>
> NOT NULL
>  |-- name: STRING
>
> CREATE TABLE user_log_sink_10 (
> >     name STRING,
> >     maps MULTISET<ROW<name STRING,description STRING>>,
> >     PRIMARY KEY (name) NOT ENFORCED
> > ) WITH (
> >     'connector' = 'elasticsearch-6',
> >     'hosts' = 'xxxx',
> >     'index' = 'enriched_orders',
> >     'document-type' = 'user'
> > );
>
> insert into user_log_sink_10 select name, collect(userBankTime) as maps
> from (select name, row(name,description) as userBankTime from tt_haha)
> group by name;
>
> [INFO] Submitting SQL update statement to the cluster...
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.ClassCastException:
> org.apache.flink.table.types.logical.MultisetType cannot be cast to
> org.apache.flink.table.types.logical.MapType
>
> 还有select name, collect(userBankTime) as maps from (select name,
> row(name,description) as userBankTime from tt_haha) group by name 出来的结果是
> test123        {test123,123123=1}
> 123123        {123123,ff=1, 123123,kk=2}
> 这种的,在flink sql中是什么格式呢?
>
> |--------------------|
>
> 在 2020/9/21 下午3:05,“Jark Wu”<[hidden email]> 写入:
>
>     1. concat 是内置函数,可以直接用。
>     2. 内置函数中没有 group_concat,不过有个类似功能的 listagg
>     3. 内置函数,不需要额外引入包,可以直接使用。内置函数列表请查看官方文档 [1]
>     4. 没有所谓的 flink sql cdc client, 只有 flink sql client,cdc 是其支持的一个功能。
>     5. 不清楚你说的只支持部分 sql 操作是指哪些 sql 操作不支持? 具体支持的 sql 操作,请查看官网[2].
>
>     Best,
>     Jark
>
>     [1]:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html
>     [2]:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html
>
>     On Mon, 21 Sep 2020 at 14:59, Li,Qian(DXM,PB) <[hidden email]>
> wrote:
>
>     > Fink sql cdc client 好多sql函数都不支持呀,比如CONCAT(a,b,c) 或者 GROUP_CONCAT(a,b)
>     >
>     > 是我还需要引入什么包么?如果支持的话,上面说的这种格式就可以用GROUP_CONCAT 很好的解决了,
>     >
>     > 另外Fink sql cdc client只支持部分sql操作么?具体支持那些函数,有相关的官方文档么?
>     >
>     >
>     > 在 2020/9/21 下午1:42,“Jark Wu”<[hidden email]> 写入:
>     >
>     >     你可以通过 groupby collect 来将一对多的关系聚合起来,代码类似如下:
>     >
>     >     select userId, collect(userBankTime)
>     >     from (
>     >       select  userId, concat(userBankNo, '_', createTime) as
> userBankTime
>     >       from aa as a left join bb as b where a.userId=b.userId
>     >     ) group by userId;
>     >
>     >
>     >     Best,
>     >     Jark
>     >
>     >     On Mon, 21 Sep 2020 at 12:20, Li,Qian(DXM,PB) <
> [hidden email]>
>     > wrote:
>     >
>     >     > 请问:
>     >     >
>     >     > 我在使用Flink CDC SQL CLI的时候,想将关联的两张表的一对多关系
>     >     > 映射成ARRAY[ROW(userBankNo,createTime)]的形式,要怎么映射呢?
>     >     >   表aa
>     >     >   id, userId
>     >     >   表 bb
>     >     >   userId,userBankNo,createTime
>     >     >
>     >     > select  * from aa as a left join bb as b where
> a.userId=b.userId
>     >     >
>     >     > 谢谢!
>     >     >
>     >     >
>     >
>     >
>     >
>
>
>