flink array 查询解析问题

classic Classic list List threaded Threaded
2 messages Options
kcz
Reply | Threaded
Open this post in threaded view
|

flink array 查询解析问题

kcz
1.定义ddl解析array字段时候,假如select 那个字段可以解析出。2.当我去定义自己函数时候,会出现null,flink直接跳过解析array那个函数了吗?
CREATE TABLE sourceTable (
        event_time_line array<ROW (
                `rule_name` VARCHAR,
                `count` VARCHAR
        )&gt;
) WITH (
        'connector.type' = 'kafka',
        'connector.version' = 'universal',
        'connector.startup-mode' = 'earliest-offset',
        'connector.topic' = 'topic_test_1',
        'connector.properties.zookeeper.connect' = 'localhost:2181',
        'connector.properties.bootstrap.servers' = 'localhost:9092',
        'update-mode' = 'append',
        'format.type' = 'json',
        'format.derive-schema' = 'true'
);
--可以查出数据
select event_time_line from sourceTable ;
--当我定义自己函数时候,参数value没有传过去,但是数组的size长度过去了
select type_change(event_time_line) from sourceTable ;


public class TypeChange extends ScalarFunction {
    /**
     * 为null,但是数组有长度
     * @param rows
     * @return
     */
    public String eval(Row [] rows){
        return JSONObject.toJSONString(rows);
    }

}
Reply | Threaded
Open this post in threaded view
|

Re: flink array 查询解析问题

Benchao Li
Hi,

你的UDF应该要显示指定一下参数的类型,覆盖ScalarFunction的getParameterTypes方法。
因为UDF对于复杂类型的推导能力有限,这种复杂类型可以显示指定参数类型。

出发 <[hidden email]> 于2020年4月14日周二 下午3:37写道:

> 1.定义ddl解析array字段时候,假如select
> 那个字段可以解析出。2.当我去定义自己函数时候,会出现null,flink直接跳过解析array那个函数了吗?
> CREATE TABLE sourceTable (
>         event_time_line array<ROW (
>                 `rule_name` VARCHAR,
>                 `count` VARCHAR
>         )&gt;
> ) WITH (
>         'connector.type' = 'kafka',
>         'connector.version' = 'universal',
>         'connector.startup-mode' = 'earliest-offset',
>         'connector.topic' = 'topic_test_1',
>         'connector.properties.zookeeper.connect' = 'localhost:2181',
>         'connector.properties.bootstrap.servers' = 'localhost:9092',
>         'update-mode' = 'append',
>         'format.type' = 'json',
>         'format.derive-schema' = 'true'
> );
> --可以查出数据
> select event_time_line from sourceTable ;
> --当我定义自己函数时候,参数value没有传过去,但是数组的size长度过去了
> select type_change(event_time_line) from sourceTable ;
>
>
> public class TypeChange extends ScalarFunction {
>     /**
>      * 为null,但是数组有长度
>      * @param rows
>      * @return
>      */
>     public String eval(Row [] rows){
>         return JSONObject.toJSONString(rows);
>     }
>
> }



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]