flink ScalarFunction 重写 getParameterTypes 方法不生效

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

flink ScalarFunction 重写 getParameterTypes 方法不生效

rockeycui@163.com

大家好:
    我在 tableapi 中使用自定义 UDF ,其中 ScalarFunction 在 重写 getParameterTypes 后,语法语义检查并没有生效,而是任务启动后报出错误(重点 TableFunction 正常生效,怀疑是 ScalarFunction  这边的 bug)。
ScalarFunction  如下:
public class Fun extends ScalarFunction {

    public Object eval(Object... params) {
        return "fun";
    }

    @Override
    public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) {
        return new RowTypeInfo(Types.LONG).getFieldTypes();
    }
}
main方法
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    DataStreamSource<String> stringDataStreamSource = env.fromElements(
            "1001,adc0:x,1000000",
            "1002,adc1:x,1000000",
            "1003,adc2:x,1000000",
            "1004,adc3:x,1000000",
            "1005,adc4:x,1000000",
            "1006,adc5:x,1000000"
    );
    TypeInformation[] types = new TypeInformation[]{Types.LONG, Types.STRING, Types.LONG};
    RowTypeInfo typeInformation = new RowTypeInfo(
            types,
            new String[]{"id", "url", "clickTime"});

    DataStream<Row> stream = stringDataStreamSource.map(....).returns(typeInformation);

    tableEnv.registerFunction("fun", new Fun());
    tableEnv.registerDataStream("user_click_info", stream, String.join(",", typeInformation.getFieldNames()));

    String sql = " select *,fun(url) from user_click_info";
    Table table = tableEnv.sqlQuery(sql);
    DataStream<Row> result = tableEnv.toAppendStream(table, Row.class);
    result.print();
    table.printSchema();
    tableEnv.execute("test");
}
可以看到 url 定义为 string 类型,Fun 方法找那个参数声明为 long 类型,此时应该报出语法语义错误,实际报错情况如下:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:626)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:117)
at org.apache.flink.table.planner.delegation.StreamExecutor.execute(StreamExecutor.java:46)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:410)
at com.rock.flink19.tablefunction.TableFunctionTest.main(TableFunctionTest.java:77)
Caused by: java.lang.RuntimeException: Could not instantiate generated class 'StreamExecCalc$11'
at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:67)
at org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:47)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:428)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:144)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:373)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81)
at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:65)
at org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:65)
... 12 more
Caused by: org.codehaus.commons.compiler.CompileException: Line 101, Column 187: Cannot cast "org.apache.flink.table.dataformat.BinaryString" to "java.lang.Long"
at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5049)
at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4416)
at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4394)
at org.codehaus.janino.Java$Cast.accept(Java.java:4887)
at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5055)
at org.codehaus.janino.UnitCompiler.access$8100(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4407)
at org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4398)
at org.codehaus.janino.Java$ParenthesizedExpression.accept(Java.java:4913)
at org.codehaus.janino.UnitCompiler$16.visitLvalue(UnitCompiler.java:4398)
at org.codehaus.janino.UnitCompiler$16.visitLvalue(UnitCompiler.java:4394)
at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4714)
at org.codehaus.janino.UnitCompiler.access$8800(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$16.visitConditionalExpression(UnitCompiler.java:4418)
at org.codehaus.janino.UnitCompiler$16.visitConditionalExpression(UnitCompiler.java:4394)
at org.codehaus.janino.Java$ConditionalExpression.accept(Java.java:4504)
at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5535)
at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5505)
at org.codehaus.janino.UnitCompiler.access$9700(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$16.visitNewInitializedArray(UnitCompiler.java:4432)
at org.codehaus.janino.UnitCompiler$16.visitNewInitializedArray(UnitCompiler.java:4394)
at org.codehaus.janino.Java$NewInitializedArray.accept(Java.java:5362)
at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5165)
at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5019)
at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4416)
at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4394)
at org.codehaus.janino.Java$Cast.accept(Java.java:4887)
at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2580)
at org.codehaus.janino.UnitCompiler.access$2700(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1503)
at org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1487)
at org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3511)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78)
... 15 more


[hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: flink ScalarFunction 重写 getParameterTypes 方法不生效

LakeShen
getParameterTypes 不用重写

[hidden email] <[hidden email]> 于 2019年11月11日周一 下午5:52写道:

>
> 大家好:
>     我在 tableapi 中使用自定义 UDF ,其中 ScalarFunction 在 重写 getParameterTypes
> 后,语法语义检查并没有生效,而是任务启动后报出错误(重点 TableFunction 正常生效,怀疑是 ScalarFunction  这边的
> bug)。
> ScalarFunction  如下:
> public class Fun extends ScalarFunction {
>
>     public Object eval(Object... params) {
>         return "fun";
>     }
>
>     @Override
>     public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) {
>         return new RowTypeInfo(Types.LONG).getFieldTypes();
>     }
> }
> main方法
> public static void main(String[] args) throws Exception {
>     StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>     StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>     DataStreamSource<String> stringDataStreamSource = env.fromElements(
>             "1001,adc0:x,1000000",
>             "1002,adc1:x,1000000",
>             "1003,adc2:x,1000000",
>             "1004,adc3:x,1000000",
>             "1005,adc4:x,1000000",
>             "1006,adc5:x,1000000"
>     );
>     TypeInformation[] types = new TypeInformation[]{Types.LONG,
> Types.STRING, Types.LONG};
>     RowTypeInfo typeInformation = new RowTypeInfo(
>             types,
>             new String[]{"id", "url", "clickTime"});
>
>     DataStream<Row> stream =
> stringDataStreamSource.map(....).returns(typeInformation);
>
>     tableEnv.registerFunction("fun", new Fun());
>     tableEnv.registerDataStream("user_click_info", stream,
> String.join(",", typeInformation.getFieldNames()));
>
>     String sql = " select *,fun(url) from user_click_info";
>     Table table = tableEnv.sqlQuery(sql);
>     DataStream<Row> result = tableEnv.toAppendStream(table, Row.class);
>     result.print();
>     table.printSchema();
>     tableEnv.execute("test");
> }
> 可以看到 url 定义为 string 类型,Fun 方法找那个参数声明为 long 类型,此时应该报出语法语义错误,实际报错情况如下:
>
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:626)
> at
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:117)
> at
> org.apache.flink.table.planner.delegation.StreamExecutor.execute(StreamExecutor.java:46)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:410)
> at
> com.rock.flink19.tablefunction.TableFunctionTest.main(TableFunctionTest.java:77)
> Caused by: java.lang.RuntimeException: Could not instantiate generated
> class 'StreamExecCalc$11'
> at
> org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:67)
> at
> org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:47)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:428)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:144)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:373)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table
> program cannot be compiled. This is a bug. Please file an issue.
> at
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81)
> at
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:65)
> at
> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
> at
> org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:65)
> ... 12 more
> Caused by: org.codehaus.commons.compiler.CompileException: Line 101,
> Column 187: Cannot cast "org.apache.flink.table.dataformat.BinaryString" to
> "java.lang.Long"
> at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5049)
> at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215)
> at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4416)
> at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4394)
> at org.codehaus.janino.Java$Cast.accept(Java.java:4887)
> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5055)
> at org.codehaus.janino.UnitCompiler.access$8100(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4407)
> at
> org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4398)
> at org.codehaus.janino.Java$ParenthesizedExpression.accept(Java.java:4913)
> at org.codehaus.janino.UnitCompiler$16.visitLvalue(UnitCompiler.java:4398)
> at org.codehaus.janino.UnitCompiler$16.visitLvalue(UnitCompiler.java:4394)
> at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
> at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4714)
> at org.codehaus.janino.UnitCompiler.access$8800(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$16.visitConditionalExpression(UnitCompiler.java:4418)
> at
> org.codehaus.janino.UnitCompiler$16.visitConditionalExpression(UnitCompiler.java:4394)
> at org.codehaus.janino.Java$ConditionalExpression.accept(Java.java:4504)
> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
> at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
> at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5535)
> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5505)
> at org.codehaus.janino.UnitCompiler.access$9700(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$16.visitNewInitializedArray(UnitCompiler.java:4432)
> at
> org.codehaus.janino.UnitCompiler$16.visitNewInitializedArray(UnitCompiler.java:4394)
> at org.codehaus.janino.Java$NewInitializedArray.accept(Java.java:5362)
> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
> at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5165)
> at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
> at
> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
> at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5019)
> at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215)
> at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4416)
> at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4394)
> at org.codehaus.janino.Java$Cast.accept(Java.java:4887)
> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
> at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2580)
> at org.codehaus.janino.UnitCompiler.access$2700(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1503)
> at
> org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1487)
> at
> org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3511)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
> at
> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
> at
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
> at
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
> at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
> at
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
> at
> org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
> at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
> at
> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
> at
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78)
> ... 15 more
>
>
> [hidden email]
>