使用StreamTableEnvironment.createTemporarySystemFunction注册UD(T)F异常

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

使用StreamTableEnvironment.createTemporarySystemFunction注册UD(T)F异常

zz zhang
执行如下代码提示异常,改为旧方法StreamTableEnvironment.registerFunction执行正常,
Flink version: 1.11.1

package com.test;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;


public class TestUTDFOk {
    public static class UDTF extends TableFunction<Row> {

        public void eval(String input) {
            Row row = new Row(3);
            row.setField(0, input);
            row.setField(1, input.length());
            row.setField(2, input +  2);
            collect(row);
        }
    }

    public static  class UDF extends ScalarFunction {
        public String eval(Row row, Integer index) {
            try {
                return String.valueOf(row.getField(index));
            } catch (Exception e) {
                throw e;
            }
        }
    }

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(
                env,
EnvironmentSettings.newInstance().useBlinkPlanner().build());
//        tEnv.registerFunction("udtf", new UDTF());
//        tEnv.registerFunction("udf", new UDF());
        tEnv.createTemporarySystemFunction("udtf", new UDTF());
        tEnv.createTemporarySystemFunction("udf", new UDF());

        tEnv.createTemporaryView("source", tEnv.fromValues("a", "b",
"c").as("f0"));
        String sinkDDL = "create table sinkTable ("
                + "f0 String"
                + ", x String"
                + ", y String"
                + ", z String"
                + ") with ("
                + "    'connector.type' = 'filesystem',"
                + "    'format.type' = 'csv',"
                + "    'connector.path' =
'F:\\workspace\\douyu-git\\bd-flink\\core\\logs\\a.csv'"
                + ")";
        String udtfCall = "insert into sinkTable SELECT S.f0"
                + ", udf(f1, 0) as x"
                + ", udf(f1, 1) as y"
                + ", udf(f1, 2) as z"
                + " FROM source as S, LATERAL TABLE(udtf(f0)) as T(f1)";

        tEnv.executeSql(sinkDDL);
        tEnv.executeSql(udtfCall);
    }
}

异常如下:
Exception in thread "main"
org.apache.flink.table.api.ValidationException: SQL validation failed.
An error occurred in the type inference logic of function 'udf'.
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:525)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:202)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
at com.test.TestUTDFOk.main(TestUTDFOk.java:64)
Caused by: org.apache.flink.table.api.ValidationException: An error
occurred in the type inference logic of function 'udf'.
at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:165)
at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:148)
at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100)
at java.util.Optional.flatMap(Optional.java:241)
at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:99)
at org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:73)
at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1303)
at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1288)
at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1318)
at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1288)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1052)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141)
... 7 more
Caused by: org.apache.flink.table.api.ValidationException: Could not
extract a valid type inference for function class
'com.test.TestUTDFOk$UDF'. Please check for implementation mistakes
and/or provide a corresponding hint.
at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:313)
at org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:160)
at org.apache.flink.table.types.extraction.TypeInferenceExtractor.forScalarFunction(TypeInferenceExtractor.java:85)
at org.apache.flink.table.functions.ScalarFunction.getTypeInference(ScalarFunction.java:144)
at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:162)
... 19 more
Caused by: org.apache.flink.table.api.ValidationException: Error in
extracting a signature to output mapping.
at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:313)
at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:115)
at org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:170)
at org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:158)
... 22 more
Caused by: org.apache.flink.table.api.ValidationException: Unable to
extract a type inference from method:
public java.lang.String
com.test.TestUTDFOk$UDF.eval(org.apache.flink.types.Row,java.lang.Integer)
at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:313)
at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:172)
at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:110)
... 24 more
Caused by: org.apache.flink.table.api.ValidationException: Could not
extract a data type from 'class org.apache.flink.types.Row' in
parameter 0 of method 'eval' in class 'com.test.TestUTDFOk$UDF'.
Please pass the required data type manually or allow RAW types.
at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:313)
at org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:246)
at org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRaw(DataTypeExtractor.java:224)
at org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeWithClassContext(DataTypeExtractor.java:198)
at org.apache.flink.table.types.extraction.DataTypeExtractor.extractFromMethodParameter(DataTypeExtractor.java:147)
at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractDataTypeArgument(FunctionMappingExtractor.java:396)
at org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$null$10(FunctionMappingExtractor.java:375)
at java.util.Optional.orElseGet(Optional.java:267)
at org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$extractArgumentTemplates$11(FunctionMappingExtractor.java:375)
at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250)
at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractArgumentTemplates(FunctionMappingExtractor.java:376)
at org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$createParameterSignatureExtraction$9(FunctionMappingExtractor.java:354)
at org.apache.flink.table.types.extraction.FunctionMappingExtractor.putExtractedResultMappings(FunctionMappingExtractor.java:314)
at org.apache.flink.table.types.extraction.FunctionMappingExtractor.collectMethodMappings(FunctionMappingExtractor.java:252)
at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:164)
... 25 more
Caused by: org.apache.flink.table.api.ValidationException: Cannot
extract a data type from a pure 'org.apache.flink.types.Row' class.
Please use annotations to define field names and field types.
at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:313)
at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:305)
at org.apache.flink.table.types.extraction.DataTypeExtractor.checkForCommonErrors(DataTypeExtractor.java:343)
at org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:277)
at org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:238)
... 45 more

Process finished with exit code 1

--
Best,
zz zhang
Reply | Threaded
Open this post in threaded view
|

Re: 使用StreamTableEnvironment.createTemporarySystemFunction注册UD(T)F异常

Benchao Li-2
Hi,

1.11中引入的新的udf注册接口,使用的是新的udf类型推断机制,所以会有上面的问题。
你可以参考新的udf类型推导文档[1] 来写一下type hint试试

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#type-inference

zz zhang <[hidden email]> 于2020年8月7日周五 上午11:00写道:

> 执行如下代码提示异常,改为旧方法StreamTableEnvironment.registerFunction执行正常,
> Flink version: 1.11.1
>
> package com.test;
>
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.table.functions.ScalarFunction;
> import org.apache.flink.table.functions.TableFunction;
> import org.apache.flink.types.Row;
>
>
> public class TestUTDFOk {
>     public static class UDTF extends TableFunction<Row> {
>
>         public void eval(String input) {
>             Row row = new Row(3);
>             row.setField(0, input);
>             row.setField(1, input.length());
>             row.setField(2, input +  2);
>             collect(row);
>         }
>     }
>
>     public static  class UDF extends ScalarFunction {
>         public String eval(Row row, Integer index) {
>             try {
>                 return String.valueOf(row.getField(index));
>             } catch (Exception e) {
>                 throw e;
>             }
>         }
>     }
>
>     public static void main(String[] args) throws Exception {
>
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         StreamTableEnvironment tEnv = StreamTableEnvironment.create(
>                 env,
> EnvironmentSettings.newInstance().useBlinkPlanner().build());
> //        tEnv.registerFunction("udtf", new UDTF());
> //        tEnv.registerFunction("udf", new UDF());
>         tEnv.createTemporarySystemFunction("udtf", new UDTF());
>         tEnv.createTemporarySystemFunction("udf", new UDF());
>
>         tEnv.createTemporaryView("source", tEnv.fromValues("a", "b",
> "c").as("f0"));
>         String sinkDDL = "create table sinkTable ("
>                 + "f0 String"
>                 + ", x String"
>                 + ", y String"
>                 + ", z String"
>                 + ") with ("
>                 + "    'connector.type' = 'filesystem',"
>                 + "    'format.type' = 'csv',"
>                 + "    'connector.path' =
> 'F:\\workspace\\douyu-git\\bd-flink\\core\\logs\\a.csv'"
>                 + ")";
>         String udtfCall = "insert into sinkTable SELECT S.f0"
>                 + ", udf(f1, 0) as x"
>                 + ", udf(f1, 1) as y"
>                 + ", udf(f1, 2) as z"
>                 + " FROM source as S, LATERAL TABLE(udtf(f0)) as T(f1)";
>
>         tEnv.executeSql(sinkDDL);
>         tEnv.executeSql(udtfCall);
>     }
> }
>
> 异常如下:
> Exception in thread "main"
> org.apache.flink.table.api.ValidationException: SQL validation failed.
> An error occurred in the type inference logic of function 'udf'.
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:525)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:202)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
> at com.test.TestUTDFOk.main(TestUTDFOk.java:64)
> Caused by: org.apache.flink.table.api.ValidationException: An error
> occurred in the type inference logic of function 'udf'.
> at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:165)
> at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:148)
> at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100)
> at java.util.Optional.flatMap(Optional.java:241)
> at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:99)
> at
> org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:73)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1303)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1288)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1318)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1288)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1052)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141)
> ... 7 more
> Caused by: org.apache.flink.table.api.ValidationException: Could not
> extract a valid type inference for function class
> 'com.test.TestUTDFOk$UDF'. Please check for implementation mistakes
> and/or provide a corresponding hint.
> at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:313)
> at
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:160)
> at
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.forScalarFunction(TypeInferenceExtractor.java:85)
> at
> org.apache.flink.table.functions.ScalarFunction.getTypeInference(ScalarFunction.java:144)
> at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:162)
> ... 19 more
> Caused by: org.apache.flink.table.api.ValidationException: Error in
> extracting a signature to output mapping.
> at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:313)
> at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:115)
> at
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:170)
> at
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:158)
> ... 22 more
> Caused by: org.apache.flink.table.api.ValidationException: Unable to
> extract a type inference from method:
> public java.lang.String
> com.test.TestUTDFOk$UDF.eval(org.apache.flink.types.Row,java.lang.Integer)
> at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:313)
> at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:172)
> at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:110)
> ... 24 more
> Caused by: org.apache.flink.table.api.ValidationException: Could not
> extract a data type from 'class org.apache.flink.types.Row' in
> parameter 0 of method 'eval' in class 'com.test.TestUTDFOk$UDF'.
> Please pass the required data type manually or allow RAW types.
> at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:313)
> at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:246)
> at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRaw(DataTypeExtractor.java:224)
> at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeWithClassContext(DataTypeExtractor.java:198)
> at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractFromMethodParameter(DataTypeExtractor.java:147)
> at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractDataTypeArgument(FunctionMappingExtractor.java:396)
> at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$null$10(FunctionMappingExtractor.java:375)
> at java.util.Optional.orElseGet(Optional.java:267)
> at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$extractArgumentTemplates$11(FunctionMappingExtractor.java:375)
> at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250)
> at
> java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
> at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
> at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractArgumentTemplates(FunctionMappingExtractor.java:376)
> at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$createParameterSignatureExtraction$9(FunctionMappingExtractor.java:354)
> at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.putExtractedResultMappings(FunctionMappingExtractor.java:314)
> at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.collectMethodMappings(FunctionMappingExtractor.java:252)
> at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:164)
> ... 25 more
> Caused by: org.apache.flink.table.api.ValidationException: Cannot
> extract a data type from a pure 'org.apache.flink.types.Row' class.
> Please use annotations to define field names and field types.
> at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:313)
> at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:305)
> at
> org.apache.flink.table.types.extraction.DataTypeExtractor.checkForCommonErrors(DataTypeExtractor.java:343)
> at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:277)
> at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:238)
> ... 45 more
>
> Process finished with exit code 1
>
> --
> Best,
> zz zhang
>


--

Best,
Benchao Li