flink1.10在通过TableFunction实现行转列时,Row一直是空

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

flink1.10在通过TableFunction实现行转列时,Row一直是空

Jim Chen
大家好:
    我的业务场景,是想实现一个行转列的效果。然后通过自定义tableFunction来实现。
在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型,
那么在eval方法接收到的就是Row[],
问题出在,Row[]中的数据获取不到,里面的元素都是NULL

通过下面的步骤和代码可还原车祸场景:
kafka topic: test_action
kafka message:
    {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": "id002", "actionName": "bbb"} ] }

代码1:Problem.java
package com.flink;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 *
 * 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型,
 * 那么在eval方法接收到的就是Row[],
 * 问题出在,Row[]中的数据获取不到,里面的元素都是NULL
 *
 * kafka topic: test_action
 *
 * kafka message:
 *   {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": "id002", "actionName": "bbb"} ] }
 */
public class Problem {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env, envSettings);
        bsEnv.registerFunction("explode2", new ExplodeFunction());

        String ddlSource = "CREATE TABLE actionTable (\n" +
                "    action ARRAY<\n" +
                "               ROW<" +
                "                   actionID STRING,\n" +
                "                   actionName STRING\n" +
                "                       >\n" +
                "                           >\n" +
                ") WITH (\n" +
                "    'connector.type' = 'kafka',\n" +
                "    'connector.version' = '0.11',\n" +
                "    'connector.topic' = 'test_action',\n" +
                "    'connector.startup-mode' = 'earliest-offset',\n" +
                "    'connector.properties.zookeeper.connect' = 'localhost:2181',\n" +
                "    'connector.properties.bootstrap.servers' = 'localhost:9092',\n" +
                "    'update-mode' = 'append',\n" +
                "    'format.type' = 'json'\n" +
                ")";
        bsEnv.sqlUpdate(ddlSource);

//        Table table = bsEnv.sqlQuery("select `action` from actionTable");
        Table table = bsEnv.sqlQuery("select * from actionTable, LATERAL TABLE(explode2(`action`)) as T(`word`)");
        table.printSchema();
        bsEnv.toAppendStream(table, Row.class)
                .print("==tb==");


        bsEnv.execute("ARRAY tableFunction Problem");
    }
}

代码2:ExplodeFunction.java
package com.flink;

import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

import java.util.ArrayList;
import java.util.Arrays;

public class ExplodeFunction extends TableFunction<Row> {

    public void eval(Row[] values) {
        System.out.println(values.length);
        if (values.length > 0) {
            for (Row row : values) {
                if (row != null) {// 这里debug出来的row总是空
                    ArrayList<Object> list = new ArrayList<>();
                    for (int i = 0; i < row.getArity(); i++) {
                        Object field = row.getField(i);
                        list.add(field);
                    }
                    collector.collect(Row.of(Arrays.toString(list.toArray())));
                }
            }
        }
    }
}

最后贴个debug的图

Reply | Threaded
Open this post in threaded view
|

Re: flink1.10在通过TableFunction实现行转列时,Row一直是空

Jark
Administrator
Hi,

当前还不支持 Row[] 作为参数。目前有一个 issue 在解决这个问题,可以关注下。
https://issues.apache.org/jira/browse/FLINK-17855


Best,
Jark

On Mon, 6 Jul 2020 at 10:19, Jim Chen <[hidden email]> wrote:

> 大家好:
>     我的业务场景,是想实现一个行转列的效果。然后通过自定义tableFunction来实现。
> 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型,
> 那么在eval方法接收到的就是Row[],
> 问题出在,Row[]中的数据获取不到,里面的元素都是NULL
>
> 通过下面的步骤和代码可还原车祸场景:
> kafka topic: test_action
> kafka message:
>     {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
> "id002", "actionName": "bbb"} ] }
>
> 代码1:Problem.java
> package com.flink;
>
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.types.Row;
>
> /**
>  *
>  * 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型,
>  * 那么在eval方法接收到的就是Row[],
>  * 问题出在,Row[]中的数据获取不到,里面的元素都是NULL
>  *
>  * kafka topic: test_action
>  *
>  * kafka message:
>  *   {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
> "id002", "actionName": "bbb"} ] }
>  */
> public class Problem {
>
>     public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
>                 .useBlinkPlanner()
>                 .inStreamingMode()
>                 .build();
>         StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env,
> envSettings);
>         bsEnv.registerFunction("explode2", new ExplodeFunction());
>
>         String ddlSource = "CREATE TABLE actionTable (\n" +
>                 "    action ARRAY<\n" +
>                 "               ROW<" +
>                 "                   actionID STRING,\n" +
>                 "                   actionName STRING\n" +
>                 "                       >\n" +
>                 "                           >\n" +
>                 ") WITH (\n" +
>                 "    'connector.type' = 'kafka',\n" +
>                 "    'connector.version' = '0.11',\n" +
>                 "    'connector.topic' = 'test_action',\n" +
>                 "    'connector.startup-mode' = 'earliest-offset',\n" +
>                 "    'connector.properties.zookeeper.connect' =
> 'localhost:2181',\n" +
>                 "    'connector.properties.bootstrap.servers' =
> 'localhost:9092',\n" +
>                 "    'update-mode' = 'append',\n" +
>                 "    'format.type' = 'json'\n" +
>                 ")";
>         bsEnv.sqlUpdate(ddlSource);
>
> //        Table table = bsEnv.sqlQuery("select `action` from actionTable");
>         Table table = bsEnv.sqlQuery("select * from actionTable, LATERAL
> TABLE(explode2(`action`)) as T(`word`)");
>         table.printSchema();
>         bsEnv.toAppendStream(table, Row.class)
>                 .print("==tb==");
>
>
>         bsEnv.execute("ARRAY tableFunction Problem");
>     }
> }
>
> 代码2:ExplodeFunction.java
> package com.flink;
>
> import org.apache.flink.table.functions.TableFunction;
> import org.apache.flink.types.Row;
>
> import java.util.ArrayList;
> import java.util.Arrays;
>
> public class ExplodeFunction extends TableFunction<Row> {
>
>     public void eval(Row[] values) {
>         System.out.println(values.length);
>         if (values.length > 0) {
>             for (Row row : values) {
>                 if (row != null) {// 这里debug出来的row总是空
>                     ArrayList<Object> list = new ArrayList<>();
>                     for (int i = 0; i < row.getArity(); i++) {
>                         Object field = row.getField(i);
>                         list.add(field);
>                     }
>
> collector.collect(Row.of(Arrays.toString(list.toArray())));
>                 }
>             }
>         }
>     }
> }
>
> 最后贴个debug的图
> [image: image.png]
>
Reply | Threaded
Open this post in threaded view
|

Re: flink1.10在通过TableFunction实现行转列时,Row一直是空

Jim Chen
Hi,
我现在转换思路,就是在定义表的时候,把ARRYA看成STRING,
那么,现在的问题,就是查询出来,都是空。

基于上面的代码环境,新写了一个类
Problem2.java

package com.flink;

import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 *
 * 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型,
 * 那么在eval方法接收到的就是Row[],
 * 问题出在,Row[]中的数据获取不到,里面的元素都是NULL
 *
 * 现在思路:就是在定义表的时候,把ARRYA看成STRING,
 * 现在的问题,就是查询出来,都是空
 *
 * kafka topic: test_action
 *
 * kafka message:
 *   {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
"id002", "actionName": "bbb"} ] }
 */
public class Problem2 {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env,
envSettings);
        bsEnv.registerFunction("explode3", new ExplodeFunction());

        String ddlSource = "CREATE TABLE actionTable3 (\n" +
                "    action STRING\n" +
                ") WITH (\n" +
                "    'connector.type' = 'kafka',\n" +
                "    'connector.version' = '0.11',\n" +
                "    'connector.topic' = 'test_action',\n" +
                "    'connector.startup-mode' = 'earliest-offset',\n" +
                "    'connector.properties.zookeeper.connect' =
'localhost:2181',\n" +
                "    'connector.properties.bootstrap.servers' =
'localhost:9092',\n" +
                "    'update-mode' = 'append',\n" +
                "    'format.type' = 'json',\n" +
                "    'format.derive-schema' = 'false',\n" +
                "    'format.json-schema' = '{\"type\": \"object\",
\"properties\": {\"action\": {\"type\": \"string\"} } }'" +
                ")";
        System.out.println(ddlSource);
        bsEnv.sqlUpdate(ddlSource);

        Table table = bsEnv.sqlQuery("select * from actionTable3");
//        Table table = bsEnv.sqlQuery("select * from actionTable2, LATERAL
TABLE(explode3(`action`)) as T(`word`)");
        table.printSchema();
        bsEnv.toAppendStream(table, Row.class)
                .print();// 输出都是空

        bsEnv.execute("ARRAY tableFunction Problem");
    }
}

Jark Wu <[hidden email]> 于2020年7月6日周一 上午10:36写道:

> Hi,
>
> 当前还不支持 Row[] 作为参数。目前有一个 issue 在解决这个问题,可以关注下。
> https://issues.apache.org/jira/browse/FLINK-17855
>
>
> Best,
> Jark
>
> On Mon, 6 Jul 2020 at 10:19, Jim Chen <[hidden email]> wrote:
>
> > 大家好:
> >     我的业务场景,是想实现一个行转列的效果。然后通过自定义tableFunction来实现。
> > 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型,
> > 那么在eval方法接收到的就是Row[],
> > 问题出在,Row[]中的数据获取不到,里面的元素都是NULL
> >
> > 通过下面的步骤和代码可还原车祸场景:
> > kafka topic: test_action
> > kafka message:
> >     {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
> > "id002", "actionName": "bbb"} ] }
> >
> > 代码1:Problem.java
> > package com.flink;
> >
> > import
> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> > import org.apache.flink.table.api.EnvironmentSettings;
> > import org.apache.flink.table.api.Table;
> > import org.apache.flink.table.api.java.StreamTableEnvironment;
> > import org.apache.flink.types.Row;
> >
> > /**
> >  *
> >  * 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型,
> >  * 那么在eval方法接收到的就是Row[],
> >  * 问题出在,Row[]中的数据获取不到,里面的元素都是NULL
> >  *
> >  * kafka topic: test_action
> >  *
> >  * kafka message:
> >  *   {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
> > "id002", "actionName": "bbb"} ] }
> >  */
> > public class Problem {
> >
> >     public static void main(String[] args) throws Exception {
> >         StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> >         EnvironmentSettings envSettings =
> EnvironmentSettings.newInstance()
> >                 .useBlinkPlanner()
> >                 .inStreamingMode()
> >                 .build();
> >         StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env,
> > envSettings);
> >         bsEnv.registerFunction("explode2", new ExplodeFunction());
> >
> >         String ddlSource = "CREATE TABLE actionTable (\n" +
> >                 "    action ARRAY<\n" +
> >                 "               ROW<" +
> >                 "                   actionID STRING,\n" +
> >                 "                   actionName STRING\n" +
> >                 "                       >\n" +
> >                 "                           >\n" +
> >                 ") WITH (\n" +
> >                 "    'connector.type' = 'kafka',\n" +
> >                 "    'connector.version' = '0.11',\n" +
> >                 "    'connector.topic' = 'test_action',\n" +
> >                 "    'connector.startup-mode' = 'earliest-offset',\n" +
> >                 "    'connector.properties.zookeeper.connect' =
> > 'localhost:2181',\n" +
> >                 "    'connector.properties.bootstrap.servers' =
> > 'localhost:9092',\n" +
> >                 "    'update-mode' = 'append',\n" +
> >                 "    'format.type' = 'json'\n" +
> >                 ")";
> >         bsEnv.sqlUpdate(ddlSource);
> >
> > //        Table table = bsEnv.sqlQuery("select `action` from
> actionTable");
> >         Table table = bsEnv.sqlQuery("select * from actionTable, LATERAL
> > TABLE(explode2(`action`)) as T(`word`)");
> >         table.printSchema();
> >         bsEnv.toAppendStream(table, Row.class)
> >                 .print("==tb==");
> >
> >
> >         bsEnv.execute("ARRAY tableFunction Problem");
> >     }
> > }
> >
> > 代码2:ExplodeFunction.java
> > package com.flink;
> >
> > import org.apache.flink.table.functions.TableFunction;
> > import org.apache.flink.types.Row;
> >
> > import java.util.ArrayList;
> > import java.util.Arrays;
> >
> > public class ExplodeFunction extends TableFunction<Row> {
> >
> >     public void eval(Row[] values) {
> >         System.out.println(values.length);
> >         if (values.length > 0) {
> >             for (Row row : values) {
> >                 if (row != null) {// 这里debug出来的row总是空
> >                     ArrayList<Object> list = new ArrayList<>();
> >                     for (int i = 0; i < row.getArity(); i++) {
> >                         Object field = row.getField(i);
> >                         list.add(field);
> >                     }
> >
> > collector.collect(Row.of(Arrays.toString(list.toArray())));
> >                 }
> >             }
> >         }
> >     }
> > }
> >
> > 最后贴个debug的图
> > [image: image.png]
> >
>