flink1.10 定义表时,把json数组声明成STRING类型的,查询出来是

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

flink1.10 定义表时,把json数组声明成STRING类型的,查询出来是

Jim Chen
Hi,
可以通过以下步骤还原车祸现场:
kafka topic: test_action
kafka message:
  {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
"id002", "actionName": "bbb"} ] }

代码Problem2.java:
package com.flink;

import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 *
 * 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型,
 * 那么在eval方法接收到的就是Row[],
 * 问题出在,Row[]中的数据获取不到,里面的元素都是NULL
 *
 * 现在思路:就是在定义表的时候,把ARRYA看成STRING,
 * 现在的问题,就是查询出来,都是空
 *
 * kafka topic: test_action
 *
 * kafka message:
 *   {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
"id002", "actionName": "bbb"} ] }
 */
public class Problem2 {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env,
envSettings);
        bsEnv.registerFunction("explode3", new ExplodeFunction());

        String ddlSource = "CREATE TABLE actionTable3 (\n" +
                "    action STRING\n" +
                ") WITH (\n" +
                "    'connector.type' = 'kafka',\n" +
                "    'connector.version' = '0.11',\n" +
                "    'connector.topic' = 'test_action',\n" +
                "    'connector.startup-mode' = 'earliest-offset',\n" +
                "    'connector.properties.zookeeper.connect' =
'localhost:2181',\n" +
                "    'connector.properties.bootstrap.servers' =
'localhost:9092',\n" +
                "    'update-mode' = 'append',\n" +
                "    'format.type' = 'json',\n" +
                "    'format.derive-schema' = 'false',\n" +
                "    'format.json-schema' = '{\"type\": \"object\",
\"properties\": {\"action\": {\"type\": \"string\"} } }'" +
                ")";
        System.out.println(ddlSource);
        bsEnv.sqlUpdate(ddlSource);

        Table table = bsEnv.sqlQuery("select * from actionTable3");
//        Table table = bsEnv.sqlQuery("select * from actionTable2, LATERAL
TABLE(explode3(`action`)) as T(`word`)");
        table.printSchema();
        bsEnv.toAppendStream(table, Row.class)
                .print();// 输出都是空

        bsEnv.execute("ARRAY tableFunction Problem");
    }
}
Reply | Threaded
Open this post in threaded view
|

Re: flink1.10 定义表时,把json数组声明成STRING类型的,查询出来是

Benchao Li-2
Hi Jim,

这是一个已知问题[1],你可以看下这个issue,是否可以解决你的问题?

[1] https://issues.apache.org/jira/browse/FLINK-18002

Jim Chen <[hidden email]> 于2020年7月6日周一 上午11:28写道:

> Hi,
> 可以通过以下步骤还原车祸现场:
> kafka topic: test_action
> kafka message:
>   {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
> "id002", "actionName": "bbb"} ] }
>
> 代码Problem2.java:
> package com.flink;
>
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.types.Row;
>
> /**
>  *
>  * 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型,
>  * 那么在eval方法接收到的就是Row[],
>  * 问题出在,Row[]中的数据获取不到,里面的元素都是NULL
>  *
>  * 现在思路:就是在定义表的时候,把ARRYA看成STRING,
>  * 现在的问题,就是查询出来,都是空
>  *
>  * kafka topic: test_action
>  *
>  * kafka message:
>  *   {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
> "id002", "actionName": "bbb"} ] }
>  */
> public class Problem2 {
>
>     public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
>                 .useBlinkPlanner()
>                 .inStreamingMode()
>                 .build();
>         StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env,
> envSettings);
>         bsEnv.registerFunction("explode3", new ExplodeFunction());
>
>         String ddlSource = "CREATE TABLE actionTable3 (\n" +
>                 "    action STRING\n" +
>                 ") WITH (\n" +
>                 "    'connector.type' = 'kafka',\n" +
>                 "    'connector.version' = '0.11',\n" +
>                 "    'connector.topic' = 'test_action',\n" +
>                 "    'connector.startup-mode' = 'earliest-offset',\n" +
>                 "    'connector.properties.zookeeper.connect' =
> 'localhost:2181',\n" +
>                 "    'connector.properties.bootstrap.servers' =
> 'localhost:9092',\n" +
>                 "    'update-mode' = 'append',\n" +
>                 "    'format.type' = 'json',\n" +
>                 "    'format.derive-schema' = 'false',\n" +
>                 "    'format.json-schema' = '{\"type\": \"object\",
> \"properties\": {\"action\": {\"type\": \"string\"} } }'" +
>                 ")";
>         System.out.println(ddlSource);
>         bsEnv.sqlUpdate(ddlSource);
>
>         Table table = bsEnv.sqlQuery("select * from actionTable3");
> //        Table table = bsEnv.sqlQuery("select * from actionTable2, LATERAL
> TABLE(explode3(`action`)) as T(`word`)");
>         table.printSchema();
>         bsEnv.toAppendStream(table, Row.class)
>                 .print();// 输出都是空
>
>         bsEnv.execute("ARRAY tableFunction Problem");
>     }
> }
>


--

Best,
Benchao Li