flink1.11.0 sql自定义UDAF包含复合类型时报Incompatible types

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

flink1.11.0 sql自定义UDAF包含复合类型时报Incompatible types

junbaozhang
Hi,all
     本人使用flink版本为1.11.0,自定义udaf如下:


public class GetContinuousListenDuration extends AggregateFunction<Row, ContinuousListenDuration> {

    private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm");

    @Override
    @DataTypeHint("ROW<startTime TIMESTAMP(3), duration BIGINT>")
    public Row getValue(ContinuousListenDuration acc) {
        return Row.of(acc.getStartTime(), acc.getDuration());
    }

    @Override
    public ContinuousListenDuration createAccumulator() {
        return new ContinuousListenDuration();
    }

    public void accumulate(ContinuousListenDuration acc, @DataTypeHint("TIMESTAMP(3)") LocalDateTime dt, Boolean isListening) {
        // 此处省略逻辑
    }
}

聚合时以Timestamp(3)、Boolean作为参数,getValue返回类型是ROW<startTime TIMESTAMP(3), duration BIGINT>,函数名定义为get_continuous_listen_duration,调用该函数的sql如下:

insert into
  report.result
select
  id,
  city_code,
  get_continuous_listen_duration(
    dt,
    (order_no is null)
    or (trim(order_no) = '')
  ).startTime as start_time,
  get_continuous_listen_duration(
    dt,
    (order_no is null)
    or (trim(order_no) = '')
  ).duration as duration
from
  (
    select
      o.id,
      o.dt,
      o.order_no,
      r.city_code
    from
      (
        select
          req [1] as id,
          dt,
          proctime,
          req [2] as order_no
        from
          tmp_v
        where
          extra [1] is null
          or extra [1] <> 'false'
      ) o
      JOIN dim.right FOR SYSTEM_TIME AS OF o.proctime AS r ON r.id = o.id
  ) a
group by
  id,
  city_code
having
  get_continuous_listen_duration(
    dt,
    (order_no is null)
    or (trim(order_no) = '')
  ).duration >= 2

运行时发生如下异常:

Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Incompatible types
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:1.8.0_171]
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[?:1.8.0_171]
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[?:1.8.0_171]
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[?:1.8.0_171]
        at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) ~[flink-table_2.11-1.11.0.jar:1.11.0]
        at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550) ~[flink-table_2.11-1.11.0.jar:1.11.0]
        at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839) ~[flink-table_2.11-1.11.0.jar:1.11.0]
        at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) ~[flink-table_2.11-1.11.0.jar:1.11.0]
        at org.apache.calcite.sql.fun.SqlDotOperator.deriveType(SqlDotOperator.java:101) ~[flink-table_2.11-1.11.0.jar:1.11.0]
        at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858) ~[flink-table_2.11-1.11.0.jar:1.11.0]
        at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845) ~[flink-table_2.11-1.11.0.jar:1.11.0]
        at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) ~[flink-table_2.11-1.11.0.jar:1.11.0]
        at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800) ~[flink-table_2.11-1.11.0.jar:1.11.0]
        at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1785) ~[flink-table_2.11-1.11.0.jar:1.11.0]
        at org.apache.calcite.sql.type.InferTypes.lambda$static$0(InferTypes.java:46) ~[flink-table_2.11-1.11.0.jar:1.11.0]
        at org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1980) ~[flink-table_2.11-1.11.0.jar:1.11.0]
        at org.apache.calcite.sql.validate.SqlValidatorImpl.validateHavingClause(SqlValidatorImpl.java:4214) ~[flink-table_2.11-1.11.0.jar:1.11.0]
        at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3515) ~[flink-table_2.11-1.11.0.jar:1.11.0]
        at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) ~[flink-table_2.11-1.11.0.jar:1.11.0]
        at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) ~[flink-table_2.11-1.11.0.jar:1.11.0]
        at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) ~[flink-table_2.11-1.11.0.jar:1.11.0]
        at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) ~[flink-table_2.11-1.11.0.jar:1.11.0]
        at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) ~[flink-table_2.11-1.11.0.jar:1.11.0]
        at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059) ~[flink-table_2.11-1.11.0.jar:1.11.0]
        at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766) ~[flink-table_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:52) ~[flink-table_2.11-1.11.0.jar:1.11.0]
        at com.ververica.flink.table.gateway.operation.MultiSqlOperation.lambda$executeInternal$0(MultiSqlOperation.java:119) ~[flink-sql-gateway-0.1-SNAPSHOT.jar:?]
        at com.ververica.flink.table.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:223) ~[flink-sql-gateway-0.1-SNAPSHOT.jar:?]
        at com.ververica.flink.table.gateway.operation.MultiSqlOperation.executeInternal(MultiSqlOperation.java:109) ~[flink-sql-gateway-0.1-SNAPSHOT.jar:?]

我的问题是这样定义udf有什么问题吗?
junbaozhang
Reply | Threaded
Open this post in threaded view
|

回复: flink1.11.0 sql自定义UDAF包含复合类型时报Incompatible types

junbaozhang
补充一下,sql中dt是timestamp(3)类型,同时是watermark
________________________________
发件人: [hidden email] <[hidden email]>
发送时间: 2020年11月4日 17:29
收件人: [hidden email] <[hidden email]>
主题: flink1.11.0 sql自定义UDAF包含复合类型时报Incompatible types

Hi,all
     本人使用flink版本为1.11.0,自定义udaf如下:


public class GetContinuousListenDuration extends AggregateFunction<Row, ContinuousListenDuration> {

    private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm");

    @Override
    @DataTypeHint("ROW<startTime TIMESTAMP(3), duration BIGINT>")
    public Row getValue(ContinuousListenDuration acc) {
        return Row.of(acc.getStartTime(), acc.getDuration());
    }

    @Override
    public ContinuousListenDuration createAccumulator() {
        return new ContinuousListenDuration();
    }

    public void accumulate(ContinuousListenDuration acc, @DataTypeHint("TIMESTAMP(3)") LocalDateTime dt, Boolean isListening) {
        // 此处省略逻辑
    }
}

聚合时以Timestamp(3)、Boolean作为参数,getValue返回类型是ROW<startTime TIMESTAMP(3), duration BIGINT>,函数名定义为get_continuous_listen_duration,调用该函数的sql如下:

insert into
  report.result
select
  id,
  city_code,
  get_continuous_listen_duration(
    dt,
    (order_no is null)
    or (trim(order_no) = '')
  ).startTime as start_time,
  get_continuous_listen_duration(
    dt,
    (order_no is null)
    or (trim(order_no) = '')
  ).duration as duration
from
  (
    select
      o.id,
      o.dt,
      o.order_no,
      r.city_code
    from
      (
        select
          req [1] as id,
          dt,
          proctime,
          req [2] as order_no
        from
          tmp_v
        where
          extra [1] is null
          or extra [1] <> 'false'
      ) o
      JOIN dim.right FOR SYSTEM_TIME AS OF o.proctime AS r ON r.id = o.id
  ) a
group by
  id,
  city_code
having
  get_continuous_listen_duration(
    dt,
    (order_no is null)
    or (trim(order_no) = '')
  ).duration >= 2

运行时发生如下异常:

Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Incompatible types
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:1.8.0_171]
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[?:1.8.0_171]
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[?:1.8.0_171]
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[?:1.8.0_171]
        at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) ~[flink-table_2.11-1.11.0.jar:1.11.0]
        at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550) ~[flink-table_2.11-1.11.0.jar:1.11.0]
        at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839) ~[flink-table_2.11-1.11.0.jar:1.11.0]
        at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) ~[flink-table_2.11-1.11.0.jar:1.11.0]
        at org.apache.calcite.sql.fun.SqlDotOperator.deriveType(SqlDotOperator.java:101) ~[flink-table_2.11-1.11.0.jar:1.11.0]
        at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858) ~[flink-table_2.11-1.11.0.jar:1.11.0]
        at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845) ~[flink-table_2.11-1.11.0.jar:1.11.0]
        at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) ~[flink-table_2.11-1.11.0.jar:1.11.0]
        at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800) ~[flink-table_2.11-1.11.0.jar:1.11.0]
        at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1785) ~[flink-table_2.11-1.11.0.jar:1.11.0]
        at org.apache.calcite.sql.type.InferTypes.lambda$static$0(InferTypes.java:46) ~[flink-table_2.11-1.11.0.jar:1.11.0]
        at org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1980) ~[flink-table_2.11-1.11.0.jar:1.11.0]
        at org.apache.calcite.sql.validate.SqlValidatorImpl.validateHavingClause(SqlValidatorImpl.java:4214) ~[flink-table_2.11-1.11.0.jar:1.11.0]
        at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3515) ~[flink-table_2.11-1.11.0.jar:1.11.0]
        at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) ~[flink-table_2.11-1.11.0.jar:1.11.0]
        at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) ~[flink-table_2.11-1.11.0.jar:1.11.0]
        at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) ~[flink-table_2.11-1.11.0.jar:1.11.0]
        at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) ~[flink-table_2.11-1.11.0.jar:1.11.0]
        at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) ~[flink-table_2.11-1.11.0.jar:1.11.0]
        at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059) ~[flink-table_2.11-1.11.0.jar:1.11.0]
        at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766) ~[flink-table_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:52) ~[flink-table_2.11-1.11.0.jar:1.11.0]
        at com.ververica.flink.table.gateway.operation.MultiSqlOperation.lambda$executeInternal$0(MultiSqlOperation.java:119) ~[flink-sql-gateway-0.1-SNAPSHOT.jar:?]
        at com.ververica.flink.table.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:223) ~[flink-sql-gateway-0.1-SNAPSHOT.jar:?]
        at com.ververica.flink.table.gateway.operation.MultiSqlOperation.executeInternal(MultiSqlOperation.java:109) ~[flink-sql-gateway-0.1-SNAPSHOT.jar:?]

我的问题是这样定义udf有什么问题吗?
junbaozhang
Reply | Threaded
Open this post in threaded view
|

退订订阅

wangleigis@163.com



退订




--

祝:工作顺利,完事如意!


Reply | Threaded
Open this post in threaded view
|

Re:退订订阅

hailongwang
Hi wangleigis,


退订需发邮件到 [hidden email] 更多详细情况可以参考[1] [1] https://flink.apache.org/community.html#mailing-lists


Best,
Hailong Wang

在 2020-11-04 17:59:45,"wangleigis" <[hidden email]> 写道:

>
>
>
>退订
>
>
>
>
>--
>
>祝:工作顺利,完事如意!
>
>
Reply | Threaded
Open this post in threaded view
|

Re:flink1.11.0 sql自定义UDAF包含复合类型时报Incompatible types

hailongwang
In reply to this post by junbaozhang
Hi wind,


 从这行报错堆栈来看:` at org.apache.calcite.sql.fun.SqlDotOperator.deriveType(SqlDotOperator.java:101) ` ,
 应该是在对 row.startTime 或者 row. duration validate 阶段,推断类型时识别出不兼容类型,可以检测下用法有没有错误。


Best,
Hailong Wang




在 2020-11-04 16:29:37,"[hidden email]" <[hidden email]> 写道:

>Hi,all
>     本人使用flink版本为1.11.0,自定义udaf如下:
>
>
>public class GetContinuousListenDuration extends AggregateFunction<Row, ContinuousListenDuration> {
>
>    private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm");
>
>    @Override
>    @DataTypeHint("ROW<startTime TIMESTAMP(3), duration BIGINT>")
>    public Row getValue(ContinuousListenDuration acc) {
>        return Row.of(acc.getStartTime(), acc.getDuration());
>    }
>
>    @Override
>    public ContinuousListenDuration createAccumulator() {
>        return new ContinuousListenDuration();
>    }
>
>    public void accumulate(ContinuousListenDuration acc, @DataTypeHint("TIMESTAMP(3)") LocalDateTime dt, Boolean isListening) {
>        // 此处省略逻辑
>    }
>}
>
>聚合时以Timestamp(3)、Boolean作为参数,getValue返回类型是ROW<startTime TIMESTAMP(3), duration BIGINT>,函数名定义为get_continuous_listen_duration,调用该函数的sql如下:
>
>insert into
>  report.result
>select
>  id,
>  city_code,
>  get_continuous_listen_duration(
>    dt,
>    (order_no is null)
>    or (trim(order_no) = '')
>  ).startTime as start_time,
>  get_continuous_listen_duration(
>    dt,
>    (order_no is null)
>    or (trim(order_no) = '')
>  ).duration as duration
>from
>  (
>    select
>      o.id,
>      o.dt,
>      o.order_no,
>      r.city_code
>    from
>      (
>        select
>          req [1] as id,
>          dt,
>          proctime,
>          req [2] as order_no
>        from
>          tmp_v
>        where
>          extra [1] is null
>          or extra [1] <> 'false'
>      ) o
>      JOIN dim.right FOR SYSTEM_TIME AS OF o.proctime AS r ON r.id = o.id
>  ) a
>group by
>  id,
>  city_code
>having
>  get_continuous_listen_duration(
>    dt,
>    (order_no is null)
>    or (trim(order_no) = '')
>  ).duration >= 2
>
>运行时发生如下异常:
>
>Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Incompatible types
>        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:1.8.0_171]
>        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[?:1.8.0_171]
>        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[?:1.8.0_171]
>        at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[?:1.8.0_171]
>        at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) ~[flink-table_2.11-1.11.0.jar:1.11.0]
>        at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550) ~[flink-table_2.11-1.11.0.jar:1.11.0]
>        at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839) ~[flink-table_2.11-1.11.0.jar:1.11.0]
>        at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) ~[flink-table_2.11-1.11.0.jar:1.11.0]
>        at org.apache.calcite.sql.fun.SqlDotOperator.deriveType(SqlDotOperator.java:101) ~[flink-table_2.11-1.11.0.jar:1.11.0]
>        at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858) ~[flink-table_2.11-1.11.0.jar:1.11.0]
>        at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845) ~[flink-table_2.11-1.11.0.jar:1.11.0]
>        at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) ~[flink-table_2.11-1.11.0.jar:1.11.0]
>        at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800) ~[flink-table_2.11-1.11.0.jar:1.11.0]
>        at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1785) ~[flink-table_2.11-1.11.0.jar:1.11.0]
>        at org.apache.calcite.sql.type.InferTypes.lambda$static$0(InferTypes.java:46) ~[flink-table_2.11-1.11.0.jar:1.11.0]
>        at org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1980) ~[flink-table_2.11-1.11.0.jar:1.11.0]
>        at org.apache.calcite.sql.validate.SqlValidatorImpl.validateHavingClause(SqlValidatorImpl.java:4214) ~[flink-table_2.11-1.11.0.jar:1.11.0]
>        at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3515) ~[flink-table_2.11-1.11.0.jar:1.11.0]
>        at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) ~[flink-table_2.11-1.11.0.jar:1.11.0]
>        at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) ~[flink-table_2.11-1.11.0.jar:1.11.0]
>        at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) ~[flink-table_2.11-1.11.0.jar:1.11.0]
>        at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) ~[flink-table_2.11-1.11.0.jar:1.11.0]
>        at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) ~[flink-table_2.11-1.11.0.jar:1.11.0]
>        at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059) ~[flink-table_2.11-1.11.0.jar:1.11.0]
>        at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766) ~[flink-table_2.11-1.11.0.jar:1.11.0]
>        at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>        at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>        at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>        at org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:52) ~[flink-table_2.11-1.11.0.jar:1.11.0]
>        at com.ververica.flink.table.gateway.operation.MultiSqlOperation.lambda$executeInternal$0(MultiSqlOperation.java:119) ~[flink-sql-gateway-0.1-SNAPSHOT.jar:?]
>        at com.ververica.flink.table.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:223) ~[flink-sql-gateway-0.1-SNAPSHOT.jar:?]
>        at com.ververica.flink.table.gateway.operation.MultiSqlOperation.executeInternal(MultiSqlOperation.java:109) ~[flink-sql-gateway-0.1-SNAPSHOT.jar:?]
>
>我的问题是这样定义udf有什么问题吗?