flink1.9中blinkSQL对定义udf的TIMESTAMP类型报错

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

flink1.9中blinkSQL对定义udf的TIMESTAMP类型报错

守护
社区的各位大佬好:


使用场景:flink1.9版本使用flinkSQL创建udf函数使用没有问,当切换到blinkSQL使用这个udf就会报错TIMESTAMP类型错误,udf实现的功能也很简单,就是将时间+8小时,报错信息如下


org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Not support dataType: TIMESTAMP(9)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
        at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
        at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
        at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122)
        at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: java.lang.RuntimeException: Not support dataType: TIMESTAMP(9)
        at org.apache.flink.table.dataformat.DataFormatConverters.getConverterForDataType(DataFormatConverters.java:248)
        at org.apache.flink.table.planner.codegen.CodeGenUtils$.isConverterIdentity(CodeGenUtils.scala:661)
        at org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternal(CodeGenUtils.scala:669)
        at org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternal(CodeGenUtils.scala:665)
        at org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternalIfNeeded(CodeGenUtils.scala:687)
        at org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:79)
        at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:737)
        at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:451)
        at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:51)
        at org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
        at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:131)
        at org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:150)
        at org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:150)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
Reply | Threaded
Open this post in threaded view
|

Re: flink1.9中blinkSQL对定义udf的TIMESTAMP类型报错

JingsongLee
你声明了DataType吗?代码怎么写的?
由于目前只支持精度<=3,所以你得用DataTypes.TIMESTAMP(3)来表示。

Best,
Jingsong Lee


------------------------------------------------------------------
From:守护 <[hidden email]>
Send Time:2019年9月5日(星期四) 11:48
To:user-zh <[hidden email]>
Subject:flink1.9中blinkSQL对定义udf的TIMESTAMP类型报错

社区的各位大佬好:


使用场景:flink1.9版本使用flinkSQL创建udf函数使用没有问,当切换到blinkSQL使用这个udf就会报错TIMESTAMP类型错误,udf实现的功能也很简单,就是将时间+8小时,报错信息如下


org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Not support dataType: TIMESTAMP(9)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
&nbsp; &nbsp; &nbsp; &nbsp; at java.security.AccessController.doPrivileged(Native Method)
&nbsp; &nbsp; &nbsp; &nbsp; at javax.security.auth.Subject.doAs(Subject.java:422)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: java.lang.RuntimeException: Not support dataType: TIMESTAMP(9)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.dataformat.DataFormatConverters.getConverterForDataType(DataFormatConverters.java:248)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.codegen.CodeGenUtils$.isConverterIdentity(CodeGenUtils.scala:661)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternal(CodeGenUtils.scala:669)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternal(CodeGenUtils.scala:665)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternalIfNeeded(CodeGenUtils.scala:687)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:79)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:737)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:451)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:51)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:131)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:150)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:150)
&nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
&nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
&nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
&nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
&nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
&nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.AbstractTraversable.map(Traversable.scala:104)
Reply | Threaded
Open this post in threaded view
|

回复: Re: flink1.9中blinkSQL对定义udf的TIMESTAMP类型报错

守护
在哪声明DataType,这个要引入什么包吗,求指点,我的udf代码如下:


import org.apache.flink.table.functions.ScalarFunction;
import java.sql.Timestamp;




public class UTC2Local extends ScalarFunction {
&nbsp; &nbsp; public Timestamp eval(Timestamp s) {
&nbsp; &nbsp; &nbsp; &nbsp; long timestamp = s.getTime() +&nbsp;28800000;
&nbsp; &nbsp; &nbsp; &nbsp; return new Timestamp(timestamp);
&nbsp; &nbsp; }


}








------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"JingsongLee"<[hidden email]&gt;;
发送时间:&nbsp;2019年9月5日(星期四) 中午11:55
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;  Re: flink1.9中blinkSQL对定义udf的TIMESTAMP类型报错



你声明了DataType吗?代码怎么写的?
由于目前只支持精度<=3,所以你得用DataTypes.TIMESTAMP(3)来表示。

Best,
Jingsong Lee


------------------------------------------------------------------
From:守护 <[hidden email]&gt;
Send Time:2019年9月5日(星期四) 11:48
To:user-zh <[hidden email]&gt;
Subject:flink1.9中blinkSQL对定义udf的TIMESTAMP类型报错

社区的各位大佬好:


使用场景:flink1.9版本使用flinkSQL创建udf函数使用没有问,当切换到blinkSQL使用这个udf就会报错TIMESTAMP类型错误,udf实现的功能也很简单,就是将时间+8小时,报错信息如下


org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Not support dataType: TIMESTAMP(9)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at java.security.AccessController.doPrivileged(Native Method)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at javax.security.auth.Subject.doAs(Subject.java:422)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: java.lang.RuntimeException: Not support dataType: TIMESTAMP(9)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.dataformat.DataFormatConverters.getConverterForDataType(DataFormatConverters.java:248)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.codegen.CodeGenUtils$.isConverterIdentity(CodeGenUtils.scala:661)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternal(CodeGenUtils.scala:669)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternal(CodeGenUtils.scala:665)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternalIfNeeded(CodeGenUtils.scala:687)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:79)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:737)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:451)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:51)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:131)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:150)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:150)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.AbstractTraversable.map(Traversable.scala:104)
Reply | Threaded
Open this post in threaded view
|

Re: Re: flink1.9中blinkSQL对定义udf的TIMESTAMP类型报错

JingsongLee
override getResultType方法,返回Types.SQL_TIMESTAMP.
这样应该可以绕过。
1.10会修复这个问题。

Best,
Jingsong Lee


------------------------------------------------------------------
From:守护 <[hidden email]>
Send Time:2019年9月5日(星期四) 12:11
To:[hidden email] JingsongLee <[hidden email]>; user-zh <[hidden email]>
Subject:回复: Re: flink1.9中blinkSQL对定义udf的TIMESTAMP类型报错

在哪声明DataType,这个要引入什么包吗,求指点,我的udf代码如下:

import org.apache.flink.table.functions.ScalarFunction;
import java.sql.Timestamp;


public class UTC2Local extends ScalarFunction {
    public Timestamp eval(Timestamp s) {
        long timestamp = s.getTime() + 28800000;
        return new Timestamp(timestamp);
    }

}



------------------ 原始邮件 ------------------
发件人: "JingsongLee"<[hidden email]>;
发送时间: 2019年9月5日(星期四) 中午11:55
收件人: "user-zh"<[hidden email]>;
主题:  Re: flink1.9中blinkSQL对定义udf的TIMESTAMP类型报错

你声明了DataType吗?代码怎么写的?
由于目前只支持精度<=3,所以你得用DataTypes.TIMESTAMP(3)来表示。

Best,
Jingsong Lee


------------------------------------------------------------------
From:守护 <[hidden email]>
Send Time:2019年9月5日(星期四) 11:48
To:user-zh <[hidden email]>
Subject:flink1.9中blinkSQL对定义udf的TIMESTAMP类型报错

社区的各位大佬好:


使用场景:flink1.9版本使用flinkSQL创建udf函数使用没有问,当切换到blinkSQL使用这个udf就会报错TIMESTAMP类型错误,udf实现的功能也很简单,就是将时间+8小时,报错信息如下


org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Not support dataType: TIMESTAMP(9)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
&nbsp; &nbsp; &nbsp; &nbsp; at java.security.AccessController.doPrivileged(Native Method)
&nbsp; &nbsp; &nbsp; &nbsp; at javax.security.auth.Subject.doAs(Subject.java:422)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: java.lang.RuntimeException: Not support dataType: TIMESTAMP(9)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.dataformat.DataFormatConverters.getConverterForDataType(DataFormatConverters.java:248)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.codegen.CodeGenUtils$.isConverterIdentity(CodeGenUtils.scala:661)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternal(CodeGenUtils.scala:669)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternal(CodeGenUtils.scala:665)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternalIfNeeded(CodeGenUtils.scala:687)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:79)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:737)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:451)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:51)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:131)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:150)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:150)
&nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
&nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
&nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
&nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
&nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
&nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.AbstractTraversable.map(Traversable.scala:104)
Reply | Threaded
Open this post in threaded view
|

回复: Re: Re: flink1.9中blinkSQL对定义udf的TIMESTAMP类型报错

守护
能给一个使用的例子吗,我这有点不知道怎么做啊




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"[hidden email] Jingso"<[hidden email]&gt;;
发送时间:&nbsp;2019年9月5日(星期四) 下午2:09
收件人:&nbsp;"守护"<[hidden email]&gt;;"user-zh"<[hidden email]&gt;;

主题:&nbsp;  Re: Re: flink1.9中blinkSQL对定义udf的TIMESTAMP类型报错



override getResultType方法,返回Types.SQL_TIMESTAMP.
这样应该可以绕过。
1.10会修复这个问题。


Best,
Jingsong Lee


------------------------------------------------------------------
From:守护 <[hidden email]&gt;
Send Time:2019年9月5日(星期四) 12:11
To:[hidden email] JingsongLee <[hidden email]&gt;; user-zh <[hidden email]&gt;
Subject:回复:  Re: flink1.9中blinkSQL对定义udf的TIMESTAMP类型报错


在哪声明DataType,这个要引入什么包吗,求指点,我的udf代码如下:


import org.apache.flink.table.functions.ScalarFunction;
import java.sql.Timestamp;




public class UTC2Local extends ScalarFunction {
&nbsp; &nbsp; public Timestamp eval(Timestamp s) {
&nbsp; &nbsp; &nbsp; &nbsp; long timestamp = s.getTime() + 28800000;
&nbsp; &nbsp; &nbsp; &nbsp; return new Timestamp(timestamp);
&nbsp; &nbsp; }


}








------------------ 原始邮件 ------------------
发件人:&nbsp;"JingsongLee"<[hidden email]&gt;;
发送时间:&nbsp;2019年9月5日(星期四) 中午11:55
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;  Re: flink1.9中blinkSQL对定义udf的TIMESTAMP类型报错



你声明了DataType吗?代码怎么写的?
由于目前只支持精度<=3,所以你得用DataTypes.TIMESTAMP(3)来表示。

Best,
Jingsong Lee


------------------------------------------------------------------
From:守护 <[hidden email]&gt;
Send Time:2019年9月5日(星期四) 11:48
To:user-zh <[hidden email]&gt;
Subject:flink1.9中blinkSQL对定义udf的TIMESTAMP类型报错

社区的各位大佬好:


使用场景:flink1.9版本使用flinkSQL创建udf函数使用没有问,当切换到blinkSQL使用这个udf就会报错TIMESTAMP类型错误,udf实现的功能也很简单,就是将时间+8小时,报错信息如下


org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Not support dataType: TIMESTAMP(9)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at java.security.AccessController.doPrivileged(Native Method)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at javax.security.auth.Subject.doAs(Subject.java:422)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: java.lang.RuntimeException: Not support dataType: TIMESTAMP(9)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.dataformat.DataFormatConverters.getConverterForDataType(DataFormatConverters.java:248)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.codegen.CodeGenUtils$.isConverterIdentity(CodeGenUtils.scala:661)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternal(CodeGenUtils.scala:669)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternal(CodeGenUtils.scala:665)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternalIfNeeded(CodeGenUtils.scala:687)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:79)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:737)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:451)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:51)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:131)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:150)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:150)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.AbstractTraversable.map(Traversable.scala:104)
Reply | Threaded
Open this post in threaded view
|

回复: Re: Re: flink1.9中blinkSQL对定义udf的TIMESTAMP类型报错

守护
In reply to this post by JingsongLee
感谢大佬们的回复,现在问题已经解决,就是通过override getResultType方法


public TypeInformation<?&gt; getResultType(Class<?&gt;[] signature) {
return Types.SQL_TIMESTAMP;
}







------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"JingsongLee"<[hidden email]&gt;;
发送时间:&nbsp;2019年9月5日(星期四) 下午2:09
收件人:&nbsp;"守护"<[hidden email]&gt;;"user-zh"<[hidden email]&gt;;

主题:&nbsp;  Re: Re: flink1.9中blinkSQL对定义udf的TIMESTAMP类型报错



override getResultType方法,返回Types.SQL_TIMESTAMP.
这样应该可以绕过。
1.10会修复这个问题。

Best,
Jingsong Lee


------------------------------------------------------------------
From:守护 <[hidden email]&gt;
Send Time:2019年9月5日(星期四) 12:11
To:[hidden email] JingsongLee <[hidden email]&gt;; user-zh <[hidden email]&gt;
Subject:回复: Re: flink1.9中blinkSQL对定义udf的TIMESTAMP类型报错

在哪声明DataType,这个要引入什么包吗,求指点,我的udf代码如下:

import org.apache.flink.table.functions.ScalarFunction;
import java.sql.Timestamp;


public class UTC2Local extends ScalarFunction {
&nbsp;&nbsp;&nbsp; public Timestamp eval(Timestamp s) {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; long timestamp = s.getTime() + 28800000;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return new Timestamp(timestamp);
&nbsp;&nbsp;&nbsp; }

}



------------------ 原始邮件 ------------------
发件人: "JingsongLee"<[hidden email]&gt;;
发送时间: 2019年9月5日(星期四) 中午11:55
收件人: "user-zh"<[hidden email]&gt;;
主题:&nbsp; Re: flink1.9中blinkSQL对定义udf的TIMESTAMP类型报错

你声明了DataType吗?代码怎么写的?
由于目前只支持精度<=3,所以你得用DataTypes.TIMESTAMP(3)来表示。

Best,
Jingsong Lee


------------------------------------------------------------------
From:守护 <[hidden email]&gt;
Send Time:2019年9月5日(星期四) 11:48
To:user-zh <[hidden email]&gt;
Subject:flink1.9中blinkSQL对定义udf的TIMESTAMP类型报错

社区的各位大佬好:


使用场景:flink1.9版本使用flinkSQL创建udf函数使用没有问,当切换到blinkSQL使用这个udf就会报错TIMESTAMP类型错误,udf实现的功能也很简单,就是将时间+8小时,报错信息如下


org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Not support dataType: TIMESTAMP(9)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at java.security.AccessController.doPrivileged(Native Method)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at javax.security.auth.Subject.doAs(Subject.java:422)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: java.lang.RuntimeException: Not support dataType: TIMESTAMP(9)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.dataformat.DataFormatConverters.getConverterForDataType(DataFormatConverters.java:248)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.codegen.CodeGenUtils$.isConverterIdentity(CodeGenUtils.scala:661)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternal(CodeGenUtils.scala:669)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternal(CodeGenUtils.scala:665)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternalIfNeeded(CodeGenUtils.scala:687)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:79)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:737)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:451)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:51)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:131)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:150)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:150)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.AbstractTraversable.map(Traversable.scala:104)