大家好: 我在 tableapi 中使用自定义 UDF ,其中 ScalarFunction 在 重写 getParameterTypes 后,语法语义检查并没有生效,而是任务启动后报出错误(重点 TableFunction 正常生效,怀疑是 ScalarFunction 这边的 bug)。 ScalarFunction 如下: public class Fun extends ScalarFunction { public Object eval(Object... params) { return "fun"; } @Override public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) { return new RowTypeInfo(Types.LONG).getFieldTypes(); } } main方法 public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); DataStreamSource<String> stringDataStreamSource = env.fromElements( "1001,adc0:x,1000000", "1002,adc1:x,1000000", "1003,adc2:x,1000000", "1004,adc3:x,1000000", "1005,adc4:x,1000000", "1006,adc5:x,1000000" ); TypeInformation[] types = new TypeInformation[]{Types.LONG, Types.STRING, Types.LONG}; RowTypeInfo typeInformation = new RowTypeInfo( types, new String[]{"id", "url", "clickTime"}); DataStream<Row> stream = stringDataStreamSource.map(....).returns(typeInformation); tableEnv.registerFunction("fun", new Fun()); tableEnv.registerDataStream("user_click_info", stream, String.join(",", typeInformation.getFieldNames())); String sql = " select *,fun(url) from user_click_info"; Table table = tableEnv.sqlQuery(sql); DataStream<Row> result = tableEnv.toAppendStream(table, Row.class); result.print(); table.printSchema(); tableEnv.execute("test"); } 可以看到 url 定义为 string 类型,Fun 方法找那个参数声明为 long 类型,此时应该报出语法语义错误,实际报错情况如下: Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:626) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:117) at org.apache.flink.table.planner.delegation.StreamExecutor.execute(StreamExecutor.java:46) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:410) at com.rock.flink19.tablefunction.TableFunctionTest.main(TableFunctionTest.java:77) Caused by: java.lang.RuntimeException: Could not instantiate generated class 'StreamExecCalc$11' at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:67) at org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:47) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:428) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354) at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:144) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:373) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81) at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:65) at org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78) at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:65) ... 12 more Caused by: org.codehaus.commons.compiler.CompileException: Line 101, Column 187: Cannot cast "org.apache.flink.table.dataformat.BinaryString" to "java.lang.Long" at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124) at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5049) at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4416) at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4394) at org.codehaus.janino.Java$Cast.accept(Java.java:4887) at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5055) at org.codehaus.janino.UnitCompiler.access$8100(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4407) at org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4398) at org.codehaus.janino.Java$ParenthesizedExpression.accept(Java.java:4913) at org.codehaus.janino.UnitCompiler$16.visitLvalue(UnitCompiler.java:4398) at org.codehaus.janino.UnitCompiler$16.visitLvalue(UnitCompiler.java:4394) at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137) at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4714) at org.codehaus.janino.UnitCompiler.access$8800(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$16.visitConditionalExpression(UnitCompiler.java:4418) at org.codehaus.janino.UnitCompiler$16.visitConditionalExpression(UnitCompiler.java:4394) at org.codehaus.janino.Java$ConditionalExpression.accept(Java.java:4504) at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5535) at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5505) at org.codehaus.janino.UnitCompiler.access$9700(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$16.visitNewInitializedArray(UnitCompiler.java:4432) at org.codehaus.janino.UnitCompiler$16.visitNewInitializedArray(UnitCompiler.java:4394) at org.codehaus.janino.Java$NewInitializedArray.accept(Java.java:5362) at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5165) at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421) at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394) at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062) at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5019) at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4416) at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4394) at org.codehaus.janino.Java$Cast.accept(Java.java:4887) at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2580) at org.codehaus.janino.UnitCompiler.access$2700(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1503) at org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1487) at org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3511) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388) at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357) at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432) at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411) at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406) at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406) at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378) at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237) at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465) at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216) at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207) at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75) at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78) ... 15 more [hidden email] |
getParameterTypes 不用重写
[hidden email] <[hidden email]> 于 2019年11月11日周一 下午5:52写道: > > 大家好: > 我在 tableapi 中使用自定义 UDF ,其中 ScalarFunction 在 重写 getParameterTypes > 后,语法语义检查并没有生效,而是任务启动后报出错误(重点 TableFunction 正常生效,怀疑是 ScalarFunction 这边的 > bug)。 > ScalarFunction 如下: > public class Fun extends ScalarFunction { > > public Object eval(Object... params) { > return "fun"; > } > > @Override > public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) { > return new RowTypeInfo(Types.LONG).getFieldTypes(); > } > } > main方法 > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); > DataStreamSource<String> stringDataStreamSource = env.fromElements( > "1001,adc0:x,1000000", > "1002,adc1:x,1000000", > "1003,adc2:x,1000000", > "1004,adc3:x,1000000", > "1005,adc4:x,1000000", > "1006,adc5:x,1000000" > ); > TypeInformation[] types = new TypeInformation[]{Types.LONG, > Types.STRING, Types.LONG}; > RowTypeInfo typeInformation = new RowTypeInfo( > types, > new String[]{"id", "url", "clickTime"}); > > DataStream<Row> stream = > stringDataStreamSource.map(....).returns(typeInformation); > > tableEnv.registerFunction("fun", new Fun()); > tableEnv.registerDataStream("user_click_info", stream, > String.join(",", typeInformation.getFieldNames())); > > String sql = " select *,fun(url) from user_click_info"; > Table table = tableEnv.sqlQuery(sql); > DataStream<Row> result = tableEnv.toAppendStream(table, Row.class); > result.print(); > table.printSchema(); > tableEnv.execute("test"); > } > 可以看到 url 定义为 string 类型,Fun 方法找那个参数声明为 long 类型,此时应该报出语法语义错误,实际报错情况如下: > > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:626) > at > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:117) > at > org.apache.flink.table.planner.delegation.StreamExecutor.execute(StreamExecutor.java:46) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:410) > at > com.rock.flink19.tablefunction.TableFunctionTest.main(TableFunctionTest.java:77) > Caused by: java.lang.RuntimeException: Could not instantiate generated > class 'StreamExecCalc$11' > at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:67) > at > org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:47) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:428) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:144) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:373) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.api.common.InvalidProgramException: Table > program cannot be compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81) > at > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:65) > at > org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78) > at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:65) > ... 12 more > Caused by: org.codehaus.commons.compiler.CompileException: Line 101, > Column 187: Cannot cast "org.apache.flink.table.dataformat.BinaryString" to > "java.lang.Long" > at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124) > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5049) > at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215) > at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4416) > at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4394) > at org.codehaus.janino.Java$Cast.accept(Java.java:4887) > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5055) > at org.codehaus.janino.UnitCompiler.access$8100(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4407) > at > org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4398) > at org.codehaus.janino.Java$ParenthesizedExpression.accept(Java.java:4913) > at org.codehaus.janino.UnitCompiler$16.visitLvalue(UnitCompiler.java:4398) > at org.codehaus.janino.UnitCompiler$16.visitLvalue(UnitCompiler.java:4394) > at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137) > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) > at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4714) > at org.codehaus.janino.UnitCompiler.access$8800(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$16.visitConditionalExpression(UnitCompiler.java:4418) > at > org.codehaus.janino.UnitCompiler$16.visitConditionalExpression(UnitCompiler.java:4394) > at org.codehaus.janino.Java$ConditionalExpression.accept(Java.java:4504) > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) > at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) > at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5535) > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5505) > at org.codehaus.janino.UnitCompiler.access$9700(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$16.visitNewInitializedArray(UnitCompiler.java:4432) > at > org.codehaus.janino.UnitCompiler$16.visitNewInitializedArray(UnitCompiler.java:4394) > at org.codehaus.janino.Java$NewInitializedArray.accept(Java.java:5362) > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) > at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5165) > at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421) > at > org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394) > at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062) > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) > at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5019) > at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215) > at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4416) > at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4394) > at org.codehaus.janino.Java$Cast.accept(Java.java:4887) > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) > at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2580) > at org.codehaus.janino.UnitCompiler.access$2700(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1503) > at > org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1487) > at > org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3511) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) > at > org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432) > at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411) > at > org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406) > at > org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406) > at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378) > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237) > at > org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465) > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216) > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207) > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75) > at > org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78) > ... 15 more > > > [hidden email] > |
Free forum by Nabble | Edit this page |