flink sql 1.10.0 对hive GenericUDF支持,之前的图片在邮件组不显示,重新编辑

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

flink sql 1.10.0 对hive GenericUDF支持,之前的图片在邮件组不显示,重新编辑

Chief
hi all:
        现在在做把原来离线 hive 上的任务迁移到flink sql 1.10.0的工作,之前看文档说flink 支持hive的GenericUDF,但是我发现个别在flink中报错,请问是我哪里设置有问题么?
        比如current_timestamp()
        在hive中语句:
select from_unixtime(unix_timestamp(current_timestamp()),'yyyyMMddHHmmss');正常执行
      而提交flink sql 任务报错
org.apache.flink.table.api.ValidationException: SQL validation failed. java.lang.reflect.InvocationTargetException
        at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130)
        at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
        at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
        at com.trusfort.twinkling.sql.util.SqlSubmit.callInsertInto(SqlSubmit.java:55)
        at com.trusfort.twinkling.sql.util.SqlSubmit.callCommand(SqlSubmit.java:27)
        at com.trusfort.twinkling.sql.batch.template.BatchSlidingWindowSqlTemplate.main(BatchSlidingWindowSqlTemplate.java:60)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
        at org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.invokeGetResultType(HiveFunctionUtils.java:77)
        at org.apache.flink.table.planner.functions.utils.HiveScalarSqlFunction.lambda$createReturnTypeInference$0(HiveScalarSqlFunction.java:83)
        at org.apache.calcite.sql.SqlOperator.inferReturnType(SqlOperator.java:470)
        at org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:437)
        at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:303)
        at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:219)
        at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600)
        at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
        at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
        at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
        at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
        at org.apache.calcite.sql.SqlOperator.constructArgTypeList(SqlOperator.java:593)
        at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:237)
        at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:219)
        at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600)
        at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
        at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
        at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
        at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
        at org.apache.calcite.sql.type.InferTypes.lambda$static$0(InferTypes.java:46)
        at org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1865)
        at org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1853)
        at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:477)
        at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4105)
        at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3389)
        at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
        at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
        at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1008)
        at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:968)
        at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
        at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:943)
        at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:650)
        at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:126)
        ... 25 more
Caused by: java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.invokeGetResultType(HiveFunctionUtils.java:73)
        ... 57 more
Caused by: java.lang.NullPointerException
        at org.apache.hadoop.hive.ql.udf.generic.GenericUDFUnixTimeStamp.initializeInput(GenericUDFUnixTimeStamp.java:50)
        at org.apache.hadoop.hive.ql.udf.generic.GenericUDFToUnixTimeStamp.initialize(GenericUDFToUnixTimeStamp.java:67)
        at org.apache.hadoop.hive.ql.udf.generic.GenericUDF.initializeAndFoldConstants(GenericUDF.java:139)
        at org.apache.flink.table.functions.hive.HiveGenericUDF.getHiveResultType(HiveGenericUDF.java:100)
        ... 62 more



语句都是一样的为啥就跑错了,请大家给点思路。
Reply | Threaded
Open this post in threaded view
|

Re: flink sql 1.10.0 对hive GenericUDF支持,之前的图片在邮件组不显示,重新编辑

Jingsong Li
 Hi,

GenericUDFUnixTimeStamp 这个UDF
用上了Hive的SessionState,而我们的hive-integration中目前缺少这部分的支持。
Flink也支持这个函数,你可以考虑先用Flink的函数来支持。

我创建了相关issue,会尽量在1.10.1把它修复了。

[1] https://issues.apache.org/jira/browse/FLINK-16688

Best,
Jingsong Lee

On Fri, Mar 20, 2020 at 11:33 AM Chief <[hidden email]> wrote:

> hi all:
> &nbsp; &nbsp; &nbsp; &nbsp; 现在在做把原来离线 hive 上的任务迁移到flink sql
> 1.10.0的工作,之前看文档说flink 支持hive的GenericUDF,但是我发现个别在flink中报错,请问是我哪里设置有问题么?
> &nbsp; &nbsp; &nbsp; &nbsp; 比如current_timestamp()
> &nbsp; &nbsp; &nbsp; &nbsp; 在hive中语句:
> select
> from_unixtime(unix_timestamp(current_timestamp()),'yyyyMMddHHmmss');正常执行
> &nbsp; &nbsp; &nbsp; 而提交flink sql 任务报错
> org.apache.flink.table.api.ValidationException: SQL validation failed.
> java.lang.reflect.InvocationTargetException
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> com.trusfort.twinkling.sql.util.SqlSubmit.callInsertInto(SqlSubmit.java:55)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> com.trusfort.twinkling.sql.util.SqlSubmit.callCommand(SqlSubmit.java:27)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> com.trusfort.twinkling.sql.batch.template.BatchSlidingWindowSqlTemplate.main(BatchSlidingWindowSqlTemplate.java:60)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> java.lang.reflect.Method.invoke(Method.java:498)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> java.security.AccessController.doPrivileged(Native Method)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> javax.security.auth.Subject.doAs(Subject.java:422)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: java.lang.RuntimeException:
> java.lang.reflect.InvocationTargetException
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.invokeGetResultType(HiveFunctionUtils.java:77)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.flink.table.planner.functions.utils.HiveScalarSqlFunction.lambda$createReturnTypeInference$0(HiveScalarSqlFunction.java:83)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.calcite.sql.SqlOperator.inferReturnType(SqlOperator.java:470)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:437)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:303)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:219)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.calcite.sql.SqlOperator.constructArgTypeList(SqlOperator.java:593)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:237)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:219)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.calcite.sql.type.InferTypes.lambda$static$0(InferTypes.java:46)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1865)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1853)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:477)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4105)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3389)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1008)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:968)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:943)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:650)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:126)
> &nbsp; &nbsp; &nbsp; &nbsp; ... 25 more
> Caused by: java.lang.reflect.InvocationTargetException
> &nbsp; &nbsp; &nbsp; &nbsp; at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> java.lang.reflect.Method.invoke(Method.java:498)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.invokeGetResultType(HiveFunctionUtils.java:73)
> &nbsp; &nbsp; &nbsp; &nbsp; ... 57 more
> Caused by: java.lang.NullPointerException
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.hadoop.hive.ql.udf.generic.GenericUDFUnixTimeStamp.initializeInput(GenericUDFUnixTimeStamp.java:50)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.hadoop.hive.ql.udf.generic.GenericUDFToUnixTimeStamp.initialize(GenericUDFToUnixTimeStamp.java:67)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.hadoop.hive.ql.udf.generic.GenericUDF.initializeAndFoldConstants(GenericUDF.java:139)
> &nbsp; &nbsp; &nbsp; &nbsp; at
> org.apache.flink.table.functions.hive.HiveGenericUDF.getHiveResultType(HiveGenericUDF.java:100)
> &nbsp; &nbsp; &nbsp; &nbsp; ... 62 more
>
>
>
> 语句都是一样的为啥就跑错了,请大家给点思路。



--
Best, Jingsong Lee