执行如下代码提示异常,改为旧方法StreamTableEnvironment.registerFunction执行正常,
Flink version: 1.11.1 package com.test; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.table.functions.TableFunction; import org.apache.flink.types.Row; public class TestUTDFOk { public static class UDTF extends TableFunction<Row> { public void eval(String input) { Row row = new Row(3); row.setField(0, input); row.setField(1, input.length()); row.setField(2, input + 2); collect(row); } } public static class UDF extends ScalarFunction { public String eval(Row row, Integer index) { try { return String.valueOf(row.getField(index)); } catch (Exception e) { throw e; } } } public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create( env, EnvironmentSettings.newInstance().useBlinkPlanner().build()); // tEnv.registerFunction("udtf", new UDTF()); // tEnv.registerFunction("udf", new UDF()); tEnv.createTemporarySystemFunction("udtf", new UDTF()); tEnv.createTemporarySystemFunction("udf", new UDF()); tEnv.createTemporaryView("source", tEnv.fromValues("a", "b", "c").as("f0")); String sinkDDL = "create table sinkTable (" + "f0 String" + ", x String" + ", y String" + ", z String" + ") with (" + " 'connector.type' = 'filesystem'," + " 'format.type' = 'csv'," + " 'connector.path' = 'F:\\workspace\\douyu-git\\bd-flink\\core\\logs\\a.csv'" + ")"; String udtfCall = "insert into sinkTable SELECT S.f0" + ", udf(f1, 0) as x" + ", udf(f1, 1) as y" + ", udf(f1, 2) as z" + " FROM source as S, LATERAL TABLE(udtf(f0)) as T(f1)"; tEnv.executeSql(sinkDDL); tEnv.executeSql(udtfCall); } } 异常如下: Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. An error occurred in the type inference logic of function 'udf'. at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:525) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:202) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) at com.test.TestUTDFOk.main(TestUTDFOk.java:64) Caused by: org.apache.flink.table.api.ValidationException: An error occurred in the type inference logic of function 'udf'. at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:165) at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:148) at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100) at java.util.Optional.flatMap(Optional.java:241) at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:99) at org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:73) at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1303) at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1288) at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1318) at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1288) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1052) at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141) ... 7 more Caused by: org.apache.flink.table.api.ValidationException: Could not extract a valid type inference for function class 'com.test.TestUTDFOk$UDF'. Please check for implementation mistakes and/or provide a corresponding hint. at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:313) at org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:160) at org.apache.flink.table.types.extraction.TypeInferenceExtractor.forScalarFunction(TypeInferenceExtractor.java:85) at org.apache.flink.table.functions.ScalarFunction.getTypeInference(ScalarFunction.java:144) at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:162) ... 19 more Caused by: org.apache.flink.table.api.ValidationException: Error in extracting a signature to output mapping. at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:313) at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:115) at org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:170) at org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:158) ... 22 more Caused by: org.apache.flink.table.api.ValidationException: Unable to extract a type inference from method: public java.lang.String com.test.TestUTDFOk$UDF.eval(org.apache.flink.types.Row,java.lang.Integer) at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:313) at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:172) at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:110) ... 24 more Caused by: org.apache.flink.table.api.ValidationException: Could not extract a data type from 'class org.apache.flink.types.Row' in parameter 0 of method 'eval' in class 'com.test.TestUTDFOk$UDF'. Please pass the required data type manually or allow RAW types. at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:313) at org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:246) at org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRaw(DataTypeExtractor.java:224) at org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeWithClassContext(DataTypeExtractor.java:198) at org.apache.flink.table.types.extraction.DataTypeExtractor.extractFromMethodParameter(DataTypeExtractor.java:147) at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractDataTypeArgument(FunctionMappingExtractor.java:396) at org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$null$10(FunctionMappingExtractor.java:375) at java.util.Optional.orElseGet(Optional.java:267) at org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$extractArgumentTemplates$11(FunctionMappingExtractor.java:375) at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250) at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110) at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractArgumentTemplates(FunctionMappingExtractor.java:376) at org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$createParameterSignatureExtraction$9(FunctionMappingExtractor.java:354) at org.apache.flink.table.types.extraction.FunctionMappingExtractor.putExtractedResultMappings(FunctionMappingExtractor.java:314) at org.apache.flink.table.types.extraction.FunctionMappingExtractor.collectMethodMappings(FunctionMappingExtractor.java:252) at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:164) ... 25 more Caused by: org.apache.flink.table.api.ValidationException: Cannot extract a data type from a pure 'org.apache.flink.types.Row' class. Please use annotations to define field names and field types. at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:313) at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:305) at org.apache.flink.table.types.extraction.DataTypeExtractor.checkForCommonErrors(DataTypeExtractor.java:343) at org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:277) at org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:238) ... 45 more Process finished with exit code 1 -- Best, zz zhang |
Hi,
1.11中引入的新的udf注册接口,使用的是新的udf类型推断机制,所以会有上面的问题。 你可以参考新的udf类型推导文档[1] 来写一下type hint试试 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#type-inference zz zhang <[hidden email]> 于2020年8月7日周五 上午11:00写道: > 执行如下代码提示异常,改为旧方法StreamTableEnvironment.registerFunction执行正常, > Flink version: 1.11.1 > > package com.test; > > import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.EnvironmentSettings; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import org.apache.flink.table.functions.ScalarFunction; > import org.apache.flink.table.functions.TableFunction; > import org.apache.flink.types.Row; > > > public class TestUTDFOk { > public static class UDTF extends TableFunction<Row> { > > public void eval(String input) { > Row row = new Row(3); > row.setField(0, input); > row.setField(1, input.length()); > row.setField(2, input + 2); > collect(row); > } > } > > public static class UDF extends ScalarFunction { > public String eval(Row row, Integer index) { > try { > return String.valueOf(row.getField(index)); > } catch (Exception e) { > throw e; > } > } > } > > public static void main(String[] args) throws Exception { > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > StreamTableEnvironment tEnv = StreamTableEnvironment.create( > env, > EnvironmentSettings.newInstance().useBlinkPlanner().build()); > // tEnv.registerFunction("udtf", new UDTF()); > // tEnv.registerFunction("udf", new UDF()); > tEnv.createTemporarySystemFunction("udtf", new UDTF()); > tEnv.createTemporarySystemFunction("udf", new UDF()); > > tEnv.createTemporaryView("source", tEnv.fromValues("a", "b", > "c").as("f0")); > String sinkDDL = "create table sinkTable (" > + "f0 String" > + ", x String" > + ", y String" > + ", z String" > + ") with (" > + " 'connector.type' = 'filesystem'," > + " 'format.type' = 'csv'," > + " 'connector.path' = > 'F:\\workspace\\douyu-git\\bd-flink\\core\\logs\\a.csv'" > + ")"; > String udtfCall = "insert into sinkTable SELECT S.f0" > + ", udf(f1, 0) as x" > + ", udf(f1, 1) as y" > + ", udf(f1, 2) as z" > + " FROM source as S, LATERAL TABLE(udtf(f0)) as T(f1)"; > > tEnv.executeSql(sinkDDL); > tEnv.executeSql(udtfCall); > } > } > > 异常如下: > Exception in thread "main" > org.apache.flink.table.api.ValidationException: SQL validation failed. > An error occurred in the type inference logic of function 'udf'. > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:525) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:202) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) > at com.test.TestUTDFOk.main(TestUTDFOk.java:64) > Caused by: org.apache.flink.table.api.ValidationException: An error > occurred in the type inference logic of function 'udf'. > at > org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:165) > at > org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:148) > at > org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100) > at java.util.Optional.flatMap(Optional.java:241) > at > org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:99) > at > org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:73) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1303) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1288) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1318) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1288) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1052) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766) > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141) > ... 7 more > Caused by: org.apache.flink.table.api.ValidationException: Could not > extract a valid type inference for function class > 'com.test.TestUTDFOk$UDF'. Please check for implementation mistakes > and/or provide a corresponding hint. > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:313) > at > org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:160) > at > org.apache.flink.table.types.extraction.TypeInferenceExtractor.forScalarFunction(TypeInferenceExtractor.java:85) > at > org.apache.flink.table.functions.ScalarFunction.getTypeInference(ScalarFunction.java:144) > at > org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:162) > ... 19 more > Caused by: org.apache.flink.table.api.ValidationException: Error in > extracting a signature to output mapping. > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:313) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:115) > at > org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:170) > at > org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:158) > ... 22 more > Caused by: org.apache.flink.table.api.ValidationException: Unable to > extract a type inference from method: > public java.lang.String > com.test.TestUTDFOk$UDF.eval(org.apache.flink.types.Row,java.lang.Integer) > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:313) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:172) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:110) > ... 24 more > Caused by: org.apache.flink.table.api.ValidationException: Could not > extract a data type from 'class org.apache.flink.types.Row' in > parameter 0 of method 'eval' in class 'com.test.TestUTDFOk$UDF'. > Please pass the required data type manually or allow RAW types. > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:313) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:246) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRaw(DataTypeExtractor.java:224) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeWithClassContext(DataTypeExtractor.java:198) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractFromMethodParameter(DataTypeExtractor.java:147) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractDataTypeArgument(FunctionMappingExtractor.java:396) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$null$10(FunctionMappingExtractor.java:375) > at java.util.Optional.orElseGet(Optional.java:267) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$extractArgumentTemplates$11(FunctionMappingExtractor.java:375) > at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250) > at > java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110) > at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) > at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractArgumentTemplates(FunctionMappingExtractor.java:376) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$createParameterSignatureExtraction$9(FunctionMappingExtractor.java:354) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.putExtractedResultMappings(FunctionMappingExtractor.java:314) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.collectMethodMappings(FunctionMappingExtractor.java:252) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:164) > ... 25 more > Caused by: org.apache.flink.table.api.ValidationException: Cannot > extract a data type from a pure 'org.apache.flink.types.Row' class. > Please use annotations to define field names and field types. > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:313) > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:305) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.checkForCommonErrors(DataTypeExtractor.java:343) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:277) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:238) > ... 45 more > > Process finished with exit code 1 > > -- > Best, > zz zhang > -- Best, Benchao Li |
Free forum by Nabble | Edit this page |