flink版本是1.11.1
创建了一个udf函数如下 public class FromUnixTime extends ScalarFunction { private static final Logger logger = LoggerFactory.getLogger(FromUnixTime.class); public String eval(long unixTime, String timeZone, String format) { try { DateFormat dateFormat = new SimpleDateFormat(format); dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone)); return dateFormat.format(new Date(unixTime)); } catch (Exception e) { logger.error("parse unixTime error, unixTime: " + unixTime + ",timeZone: " + timeZone, ",format:" + format, e); return null; } } public String eval(long unixTime, String timeZone) { return eval(unixTime, timeZone, "yyyy-MM-dd HH:mm:ss"); } } 测试执行如下语句是能正常成功的。 <http://apache-flink.147419.n8.nabble.com/file/t572/lALPDiCptXS2iL3NAUbNBPo_1274_326.jpg> 但是创建view报如下错误,是啥原因? create view xx as select *, UDF_FromUnixTime(exeAt, 'utc', 'yyyy-MM-ddHH:mm' ) as biz_min from timeout_topic_source; 其中exeAt是个long类型的时间 错误如下: org.apache.flink.table.api.ValidationException: SQL validation failed. Invalid function call: default_catalog.default_database.UDF_FromUnixTime(BIGINT, CHAR(3) NOT NULL, CHAR(15) NOT NULL) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:148) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:639) at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCreateView(FlinkSqlInterrpeter.java:343) at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:267) at org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:145) at org.apache.zeppelin.flink.FlinkSqlInterrpeter.interpret(FlinkSqlInterrpeter.java:114) at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110) at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:776) at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:668) at org.apache.zeppelin.scheduler.Job.run(Job.java:172) at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:130) at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:39) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.table.api.ValidationException: Invalid function call: default_catalog.default_database.UDF_FromUnixTime(BIGINT, CHAR(3) NOT NULL, CHAR(15) NOT NULL) at org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidCallException(TypeInferenceUtil.java:207) at org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:93) at org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:684) at org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:448) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:314) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1785) at org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:133) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1785) at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:481) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4255) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3523) at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059) at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:143) ... 17 more Caused by: org.apache.flink.table.api.ValidationException: Invalid input arguments. Expected signatures are: default_catalog.default_database.UDF_FromUnixTime(unixTime BIGINT NOT NULL, timeZone STRING) default_catalog.default_database.UDF_FromUnixTime(unixTime BIGINT NOT NULL, timeZone STRING, format STRING) at org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidInputException(TypeInferenceUtil.java:190) at org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:131) at org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89) ... 43 more Caused by: org.apache.flink.table.api.ValidationException: Invalid input arguments. at org.apache.flink.table.types.inference.TypeInferenceUtil.inferInputTypes(TypeInferenceUtil.java:467) at org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:123) at org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:102) at org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:126) ... 44 more -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |