flink1.11 tablefunction

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

flink1.11 tablefunction

Dream-底限
hi
我这面在定义一个表函数,通过继承TableFunction完成操作,但是eval方法中的参数类型都是java基本类型(至少看到的demo都是如此),想问一下eval方法中可以传flink
内部类型吗,比如说我想在eval()方法中传递Row类型要怎么操作,eval(Row row)
Reply | Threaded
Open this post in threaded view
|

Re: flink1.11 tablefunction

godfrey he
可以,定义清楚 getResultType 和 getParameterTypes, 可以参考[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide


Dream-底限 <[hidden email]> 于2020年7月21日周二 下午7:25写道:

> hi
>
> 我这面在定义一个表函数,通过继承TableFunction完成操作,但是eval方法中的参数类型都是java基本类型(至少看到的demo都是如此),想问一下eval方法中可以传flink
> 内部类型吗,比如说我想在eval()方法中传递Row类型要怎么操作,eval(Row row)
>
Reply | Threaded
Open this post in threaded view
|

Re: flink1.11 tablefunction

Dream-底限
hi,
 我想将一个array<row>打散成多行,但是并没有成功

@FunctionHint(input =@DataTypeHint("ARRAY<ROW<rule_id STRING,rule_name
STRING,rule_type_name STRING,`result` INT,in_path BOOLEAN>>") ,output
= @DataTypeHint("ROW<rule_id STRING,rule_name STRING,rule_type_name
STRING,`result` INT,in_path BOOLEAN>"))
public static class FlatRowFunction extends TableFunction<Row> {
    private static final long serialVersionUID = 1L;

    public void eval(Row[] rows) {
        for (Row row : rows) {
            collect(row);
        }
    }
}

异常如下:

org.apache.flink.table.api.ValidationException: SQL validation failed.
From line 1, column 149 to line 1, column 174: No match found for
function signature
flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647) rule_id,
VARCHAR(2147483647) rule_name, VARCHAR(2147483647) rule_type_name,
INTEGER result, BOOLEAN in_path) ARRAY>)

        at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
        at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187)
        at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:658)
        at com.akulaku.data.flink.ParserDataTest.parserDataTest(ParserDataTest.java:60)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
        at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
        at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
        at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
        at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
        at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
        at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
        at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
        at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
        at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
Caused by: org.apache.calcite.runtime.CalciteContextException: From
line 1, column 149 to line 1, column 174: No match found for function
signature flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647)
rule_id, VARCHAR(2147483647) rule_name, VARCHAR(2147483647)
rule_type_name, INTEGER result, BOOLEAN in_path) ARRAY>)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
        at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
        at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
        at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
        at org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882)
        at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305)
        at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218)
        at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
        at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
        at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
        at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
        at org.apache.calcite.sql.validate.ProcedureNamespace.validateImpl(ProcedureNamespace.java:57)
        at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
        at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
        at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
        at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256)
        at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238)
        at org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3303)
        at org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86)
        at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247)
        at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510)
        at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
        at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
        at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
        at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
        at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
        at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059)
        at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
        at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141)
        ... 28 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No
match found for function signature
flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647) rule_id,
VARCHAR(2147483647) rule_name, VARCHAR(2147483647) rule_type_name,
INTEGER result, BOOLEAN in_path) ARRAY>)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
        at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550)
        ... 56 more


godfrey he <[hidden email]> 于2020年7月21日周二 下午7:41写道:

> 可以,定义清楚 getResultType 和 getParameterTypes, 可以参考[1]
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide
>
>
> Dream-底限 <[hidden email]> 于2020年7月21日周二 下午7:25写道:
>
> > hi
> >
> >
> 我这面在定义一个表函数,通过继承TableFunction完成操作,但是eval方法中的参数类型都是java基本类型(至少看到的demo都是如此),想问一下eval方法中可以传flink
> > 内部类型吗,比如说我想在eval()方法中传递Row类型要怎么操作,eval(Row row)
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: flink1.11 tablefunction

Jark
Administrator
Hi,

Row[] 作为 eval 参数,目前还不支持。社区已经有一个 issue 在跟进支持这个功能:
https://issues.apache.org/jira/browse/FLINK-17855


Best,
Jark

On Wed, 22 Jul 2020 at 10:45, Dream-底限 <[hidden email]> wrote:

> hi,
>  我想将一个array<row>打散成多行,但是并没有成功
>
> @FunctionHint(input =@DataTypeHint("ARRAY<ROW<rule_id STRING,rule_name
> STRING,rule_type_name STRING,`result` INT,in_path BOOLEAN>>") ,output
> = @DataTypeHint("ROW<rule_id STRING,rule_name STRING,rule_type_name
> STRING,`result` INT,in_path BOOLEAN>"))
> public static class FlatRowFunction extends TableFunction<Row> {
>     private static final long serialVersionUID = 1L;
>
>     public void eval(Row[] rows) {
>         for (Row row : rows) {
>             collect(row);
>         }
>     }
> }
>
> 异常如下:
>
> org.apache.flink.table.api.ValidationException: SQL validation failed.
> From line 1, column 149 to line 1, column 174: No match found for
> function signature
> flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647) rule_id,
> VARCHAR(2147483647) rule_name, VARCHAR(2147483647) rule_type_name,
> INTEGER result, BOOLEAN in_path) ARRAY>)
>
>         at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
>         at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
>         at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187)
>         at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>         at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:658)
>         at
> com.akulaku.data.flink.ParserDataTest.parserDataTest(ParserDataTest.java:60)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>         at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>         at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>         at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>         at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>         at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>         at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>         at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>         at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>         at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>         at
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>         at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>         at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>         at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>         at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>         at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>         at
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
>         at
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
>         at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From
> line 1, column 149 to line 1, column 174: No match found for function
> signature flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647)
> rule_id, VARCHAR(2147483647) rule_name, VARCHAR(2147483647)
> rule_type_name, INTEGER result, BOOLEAN in_path) ARRAY>)
>         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
>         at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>         at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>         at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
>         at
> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
>         at
> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882)
>         at
> org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305)
>         at
> org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
>         at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
>         at
> org.apache.calcite.sql.validate.ProcedureNamespace.validateImpl(ProcedureNamespace.java:57)
>         at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3303)
>         at
> org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510)
>         at
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>         at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
>         at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
>         at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141)
>         ... 28 more
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No
> match found for function signature
> flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647) rule_id,
> VARCHAR(2147483647) rule_name, VARCHAR(2147483647) rule_type_name,
> INTEGER result, BOOLEAN in_path) ARRAY>)
>         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
>         at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>         at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>         at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
>         at
> org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550)
>         ... 56 more
>
>
> godfrey he <[hidden email]> 于2020年7月21日周二 下午7:41写道:
>
> > 可以,定义清楚 getResultType 和 getParameterTypes, 可以参考[1]
> >
> > [1]
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide
> >
> >
> > Dream-底限 <[hidden email]> 于2020年7月21日周二 下午7:25写道:
> >
> > > hi
> > >
> > >
> >
> 我这面在定义一个表函数,通过继承TableFunction完成操作,但是eval方法中的参数类型都是java基本类型(至少看到的demo都是如此),想问一下eval方法中可以传flink
> > > 内部类型吗,比如说我想在eval()方法中传递Row类型要怎么操作,eval(Row row)
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: flink1.11 tablefunction

Benchao Li-2
Hi,
如果只是想打平array,Flink有个内置的方法,可以参考[1]的”Expanding arrays into a relation“部分

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins

Jark Wu <[hidden email]> 于2020年7月22日周三 上午11:17写道:

> Hi,
>
> Row[] 作为 eval 参数,目前还不支持。社区已经有一个 issue 在跟进支持这个功能:
> https://issues.apache.org/jira/browse/FLINK-17855
>
>
> Best,
> Jark
>
> On Wed, 22 Jul 2020 at 10:45, Dream-底限 <[hidden email]> wrote:
>
> > hi,
> >  我想将一个array<row>打散成多行,但是并没有成功
> >
> > @FunctionHint(input =@DataTypeHint("ARRAY<ROW<rule_id STRING,rule_name
> > STRING,rule_type_name STRING,`result` INT,in_path BOOLEAN>>") ,output
> > = @DataTypeHint("ROW<rule_id STRING,rule_name STRING,rule_type_name
> > STRING,`result` INT,in_path BOOLEAN>"))
> > public static class FlatRowFunction extends TableFunction<Row> {
> >     private static final long serialVersionUID = 1L;
> >
> >     public void eval(Row[] rows) {
> >         for (Row row : rows) {
> >             collect(row);
> >         }
> >     }
> > }
> >
> > 异常如下:
> >
> > org.apache.flink.table.api.ValidationException: SQL validation failed.
> > From line 1, column 149 to line 1, column 174: No match found for
> > function signature
> > flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647) rule_id,
> > VARCHAR(2147483647) rule_name, VARCHAR(2147483647) rule_type_name,
> > INTEGER result, BOOLEAN in_path) ARRAY>)
> >
> >         at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> >
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
> >         at
> >
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
> >         at
> >
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187)
> >         at
> >
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> >         at
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:658)
> >         at
> >
> com.akulaku.data.flink.ParserDataTest.parserDataTest(ParserDataTest.java:60)
> >         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >         at
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >         at
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >         at java.lang.reflect.Method.invoke(Method.java:498)
> >         at
> >
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> >         at
> >
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> >         at
> >
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> >         at
> >
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> >         at
> >
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> >         at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> >         at
> >
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> >         at
> >
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> >         at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> >         at
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> >         at
> > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> >         at
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> >         at
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> >         at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> >         at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> >         at
> >
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
> >         at
> >
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
> >         at
> >
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
> >         at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
> > Caused by: org.apache.calcite.runtime.CalciteContextException: From
> > line 1, column 149 to line 1, column 174: No match found for function
> > signature flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647)
> > rule_id, VARCHAR(2147483647) rule_name, VARCHAR(2147483647)
> > rule_type_name, INTEGER result, BOOLEAN in_path) ARRAY>)
> >         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> > Method)
> >         at
> >
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> >         at
> >
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> >         at
> java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> >         at
> >
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
> >         at
> > org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
> >         at
> > org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
> >         at
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
> >         at
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882)
> >         at
> > org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305)
> >         at
> > org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218)
> >         at
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
> >         at
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
> >         at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
> >         at
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
> >         at
> >
> org.apache.calcite.sql.validate.ProcedureNamespace.validateImpl(ProcedureNamespace.java:57)
> >         at
> >
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> >         at
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
> >         at
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
> >         at
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256)
> >         at
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238)
> >         at
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3303)
> >         at
> >
> org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86)
> >         at
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247)
> >         at
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510)
> >         at
> >
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
> >         at
> >
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> >         at
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
> >         at
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
> >         at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
> >         at
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059)
> >         at
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
> >         at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> >
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141)
> >         ... 28 more
> > Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No
> > match found for function signature
> > flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647) rule_id,
> > VARCHAR(2147483647) rule_name, VARCHAR(2147483647) rule_type_name,
> > INTEGER result, BOOLEAN in_path) ARRAY>)
> >         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> > Method)
> >         at
> >
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> >         at
> >
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> >         at
> java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> >         at
> >
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
> >         at
> > org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550)
> >         ... 56 more
> >
> >
> > godfrey he <[hidden email]> 于2020年7月21日周二 下午7:41写道:
> >
> > > 可以,定义清楚 getResultType 和 getParameterTypes, 可以参考[1]
> > >
> > > [1]
> > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide
> > >
> > >
> > > Dream-底限 <[hidden email]> 于2020年7月21日周二 下午7:25写道:
> > >
> > > > hi
> > > >
> > > >
> > >
> >
> 我这面在定义一个表函数,通过继承TableFunction完成操作,但是eval方法中的参数类型都是java基本类型(至少看到的demo都是如此),想问一下eval方法中可以传flink
> > > > 内部类型吗,比如说我想在eval()方法中传递Row类型要怎么操作,eval(Row row)
> > > >
> > >
> >
>


--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re: flink1.11 tablefunction

Dream-底限
hi 、Benchao Li
我尝试了将数组打散的方式,但是报了一个莫名其妙的错误,可以帮忙看看嘛

tableEnv.executeSql("CREATE TABLE parser_data_test (\n" +
        "  data ROW<flow_task_id BIGINT,flow_id STRING,flow_version
STRING,path STRING,country_id INT,create_time BIGINT," +
        "spent_time DECIMAL(10,2),features
ROW<`user_ic_no_aku_uid.pdl_cdpd`
STRING,`user_ic_no_aku_uid.pdl_current_unpay` INT," +
        "`user_ic_no_aku_uid.current_overdue_collection`
INT>,rule_results ARRAY<ROW<rule_id STRING,rule_name STRING," +
        "rule_type_name STRING,`result` INT,in_path BOOLEAN>>>,\n" +
        "  createTime BIGINT,\n" +
        "  tindex INT\n" +
        ") WITH (\n" +
        " 'connector' = 'kafka-0.11',\n" +
        " 'topic' = 'parser_data_test',\n" +
        " 'properties.bootstrap.servers' = 'localhost:9092',\n" +
        " 'properties.group.id' = 'testGroup',\n" +
        " 'scan.startup.mode' = 'earliest-offset',\n" +
        " 'format' = 'json',\n" +
        " 'json.fail-on-missing-field' = 'false',\n" +
        " 'json.ignore-parse-errors' = 'true'\n" +
        ")");

Table table = tableEnv.sqlQuery("select
data.flow_task_id,data.features.`user_ic_no_aku_uid.pdl_current_unpay`,rule_id,tindex
from parser_data_test CROSS JOIN UNNEST(data.rule_results) AS t
(rule_id,rule_name,rule_type_name,`result`,in_path)");

table.printSchema();
tableEnv.toAppendStream(table,
Types.ROW(TypeConversions.fromDataTypeToLegacyInfo(table.getSchema().getFieldDataTypes()))).print();


异常信息:

rg.apache.flink.table.api.ValidationException: SQL validation failed.
From line 0, column 0 to line 1, column 139: Column 'data.data' not
found in table 'parser_data_test'

        at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
        at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185)
        at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664)
        at com.akulaku.data.flink.ParserDataTest.parserDataTest(ParserDataTest.java:63)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
        at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
        at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
        at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
        at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
        at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
        at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
        at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
        at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
        at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
Caused by: org.apache.calcite.runtime.CalciteContextException: From
line 0, column 0 to line 1, column 139: Column 'data.data' not found
in table 'parser_data_test'
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
        at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
        at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
        at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
        at org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:439)
        at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5991)
        at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5976)
        at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:321)
        at org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5583)
        at org.apache.calcite.sql.validate.SqlValidatorImpl.validateUnnest(SqlValidatorImpl.java:3271)
        at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3253)
        at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238)
        at org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3303)
        at org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86)
        at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247)
        at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510)
        at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
        at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
        at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
        at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
        at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
        at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059)
        at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
        at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141)
        ... 28 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException:
Column 'data.data' not found in table 'parser_data_test'
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
        at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550)
        ... 51 more


Benchao Li <[hidden email]> 于2020年7月22日周三 下午2:05写道:

> Hi,
> 如果只是想打平array,Flink有个内置的方法,可以参考[1]的”Expanding arrays into a relation“部分
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins
>
> Jark Wu <[hidden email]> 于2020年7月22日周三 上午11:17写道:
>
> > Hi,
> >
> > Row[] 作为 eval 参数,目前还不支持。社区已经有一个 issue 在跟进支持这个功能:
> > https://issues.apache.org/jira/browse/FLINK-17855
> >
> >
> > Best,
> > Jark
> >
> > On Wed, 22 Jul 2020 at 10:45, Dream-底限 <[hidden email]> wrote:
> >
> > > hi,
> > >  我想将一个array<row>打散成多行,但是并没有成功
> > >
> > > @FunctionHint(input =@DataTypeHint("ARRAY<ROW<rule_id STRING,rule_name
> > > STRING,rule_type_name STRING,`result` INT,in_path BOOLEAN>>") ,output
> > > = @DataTypeHint("ROW<rule_id STRING,rule_name STRING,rule_type_name
> > > STRING,`result` INT,in_path BOOLEAN>"))
> > > public static class FlatRowFunction extends TableFunction<Row> {
> > >     private static final long serialVersionUID = 1L;
> > >
> > >     public void eval(Row[] rows) {
> > >         for (Row row : rows) {
> > >             collect(row);
> > >         }
> > >     }
> > > }
> > >
> > > 异常如下:
> > >
> > > org.apache.flink.table.api.ValidationException: SQL validation failed.
> > > From line 1, column 149 to line 1, column 174: No match found for
> > > function signature
> > > flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647) rule_id,
> > > VARCHAR(2147483647) rule_name, VARCHAR(2147483647) rule_type_name,
> > > INTEGER result, BOOLEAN in_path) ARRAY>)
> > >
> > >         at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> > >
> >
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
> > >         at
> > >
> >
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
> > >         at
> > >
> >
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187)
> > >         at
> > >
> >
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> > >         at
> > >
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:658)
> > >         at
> > >
> >
> com.akulaku.data.flink.ParserDataTest.parserDataTest(ParserDataTest.java:60)
> > >         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > >         at
> > >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > >         at
> > >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > >         at java.lang.reflect.Method.invoke(Method.java:498)
> > >         at
> > >
> >
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> > >         at
> > >
> >
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> > >         at
> > >
> >
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> > >         at
> > >
> >
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> > >         at
> > >
> >
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> > >         at
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> > >         at
> > >
> >
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> > >         at
> > >
> >
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> > >         at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> > >         at
> > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> > >         at
> > > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> > >         at
> > org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> > >         at
> > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> > >         at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> > >         at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> > >         at
> > >
> >
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
> > >         at
> > >
> >
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
> > >         at
> > >
> >
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
> > >         at
> com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
> > > Caused by: org.apache.calcite.runtime.CalciteContextException: From
> > > line 1, column 149 to line 1, column 174: No match found for function
> > > signature flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647)
> > > rule_id, VARCHAR(2147483647) rule_name, VARCHAR(2147483647)
> > > rule_type_name, INTEGER result, BOOLEAN in_path) ARRAY>)
> > >         at
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> > > Method)
> > >         at
> > >
> >
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> > >         at
> > >
> >
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> > >         at
> > java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> > >         at
> > >
> >
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
> > >         at
> > > org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
> > >         at
> > > org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
> > >         at
> > >
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
> > >         at
> > >
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882)
> > >         at
> > > org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305)
> > >         at
> > > org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218)
> > >         at
> > >
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
> > >         at
> > >
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
> > >         at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
> > >         at
> > >
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
> > >         at
> > >
> >
> org.apache.calcite.sql.validate.ProcedureNamespace.validateImpl(ProcedureNamespace.java:57)
> > >         at
> > >
> >
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> > >         at
> > >
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
> > >         at
> > >
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
> > >         at
> > >
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256)
> > >         at
> > >
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238)
> > >         at
> > >
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3303)
> > >         at
> > >
> >
> org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86)
> > >         at
> > >
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247)
> > >         at
> > >
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510)
> > >         at
> > >
> >
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
> > >         at
> > >
> >
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> > >         at
> > >
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
> > >         at
> > >
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
> > >         at
> org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
> > >         at
> > >
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059)
> > >         at
> > >
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
> > >         at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> > >
> >
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141)
> > >         ... 28 more
> > > Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No
> > > match found for function signature
> > > flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647) rule_id,
> > > VARCHAR(2147483647) rule_name, VARCHAR(2147483647) rule_type_name,
> > > INTEGER result, BOOLEAN in_path) ARRAY>)
> > >         at
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> > > Method)
> > >         at
> > >
> >
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> > >         at
> > >
> >
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> > >         at
> > java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> > >         at
> > >
> >
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
> > >         at
> > > org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550)
> > >         ... 56 more
> > >
> > >
> > > godfrey he <[hidden email]> 于2020年7月21日周二 下午7:41写道:
> > >
> > > > 可以,定义清楚 getResultType 和 getParameterTypes, 可以参考[1]
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide
> > > >
> > > >
> > > > Dream-底限 <[hidden email]> 于2020年7月21日周二 下午7:25写道:
> > > >
> > > > > hi
> > > > >
> > > > >
> > > >
> > >
> >
> 我这面在定义一个表函数,通过继承TableFunction完成操作,但是eval方法中的参数类型都是java基本类型(至少看到的demo都是如此),想问一下eval方法中可以传flink
> > > > > 内部类型吗,比如说我想在eval()方法中传递Row类型要怎么操作,eval(Row row)
> > > > >
> > > >
> > >
> >
>
>
> --
>
> Best,
> Benchao Li
>
Reply | Threaded
Open this post in threaded view
|

Re: flink1.11 tablefunction

Benchao Li-2
我感觉这可能是calcite的bug,CC Danny老师

Dream-底限 <[hidden email]> 于2020年7月22日周三 下午5:46写道:

> hi 、Benchao Li
> 我尝试了将数组打散的方式,但是报了一个莫名其妙的错误,可以帮忙看看嘛
>
> tableEnv.executeSql("CREATE TABLE parser_data_test (\n" +
>         "  data ROW<flow_task_id BIGINT,flow_id STRING,flow_version
> STRING,path STRING,country_id INT,create_time BIGINT," +
>         "spent_time DECIMAL(10,2),features
> ROW<`user_ic_no_aku_uid.pdl_cdpd`
> STRING,`user_ic_no_aku_uid.pdl_current_unpay` INT," +
>         "`user_ic_no_aku_uid.current_overdue_collection`
> INT>,rule_results ARRAY<ROW<rule_id STRING,rule_name STRING," +
>         "rule_type_name STRING,`result` INT,in_path BOOLEAN>>>,\n" +
>         "  createTime BIGINT,\n" +
>         "  tindex INT\n" +
>         ") WITH (\n" +
>         " 'connector' = 'kafka-0.11',\n" +
>         " 'topic' = 'parser_data_test',\n" +
>         " 'properties.bootstrap.servers' = 'localhost:9092',\n" +
>         " 'properties.group.id' = 'testGroup',\n" +
>         " 'scan.startup.mode' = 'earliest-offset',\n" +
>         " 'format' = 'json',\n" +
>         " 'json.fail-on-missing-field' = 'false',\n" +
>         " 'json.ignore-parse-errors' = 'true'\n" +
>         ")");
>
> Table table = tableEnv.sqlQuery("select
>
> data.flow_task_id,data.features.`user_ic_no_aku_uid.pdl_current_unpay`,rule_id,tindex
> from parser_data_test CROSS JOIN UNNEST(data.rule_results) AS t
> (rule_id,rule_name,rule_type_name,`result`,in_path)");
>
> table.printSchema();
> tableEnv.toAppendStream(table,
>
> Types.ROW(TypeConversions.fromDataTypeToLegacyInfo(table.getSchema().getFieldDataTypes()))).print();
>
>
> 异常信息:
>
> rg.apache.flink.table.api.ValidationException: SQL validation failed.
> From line 0, column 0 to line 1, column 139: Column 'data.data' not
> found in table 'parser_data_test'
>
>         at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
>         at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
>         at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185)
>         at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>         at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664)
>         at
> com.akulaku.data.flink.ParserDataTest.parserDataTest(ParserDataTest.java:63)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>         at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>         at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>         at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>         at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>         at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>         at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>         at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>         at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>         at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>         at
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>         at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>         at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>         at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>         at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>         at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>         at
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
>         at
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
>         at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From
> line 0, column 0 to line 1, column 139: Column 'data.data' not found
> in table 'parser_data_test'
>         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
>         at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>         at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>         at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
>         at
> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
>         at
> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
>         at
> org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:439)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5991)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5976)
>         at
> org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:321)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5583)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateUnnest(SqlValidatorImpl.java:3271)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3253)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3303)
>         at
> org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510)
>         at
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>         at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
>         at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
>         at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141)
>         ... 28 more
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException:
> Column 'data.data' not found in table 'parser_data_test'
>         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
>         at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>         at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>         at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
>         at
> org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550)
>         ... 51 more
>
>
> Benchao Li <[hidden email]> 于2020年7月22日周三 下午2:05写道:
>
> > Hi,
> > 如果只是想打平array,Flink有个内置的方法,可以参考[1]的”Expanding arrays into a relation“部分
> >
> > [1]
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins
> >
> > Jark Wu <[hidden email]> 于2020年7月22日周三 上午11:17写道:
> >
> > > Hi,
> > >
> > > Row[] 作为 eval 参数,目前还不支持。社区已经有一个 issue 在跟进支持这个功能:
> > > https://issues.apache.org/jira/browse/FLINK-17855
> > >
> > >
> > > Best,
> > > Jark
> > >
> > > On Wed, 22 Jul 2020 at 10:45, Dream-底限 <[hidden email]> wrote:
> > >
> > > > hi,
> > > >  我想将一个array<row>打散成多行,但是并没有成功
> > > >
> > > > @FunctionHint(input =@DataTypeHint("ARRAY<ROW<rule_id
> STRING,rule_name
> > > > STRING,rule_type_name STRING,`result` INT,in_path BOOLEAN>>") ,output
> > > > = @DataTypeHint("ROW<rule_id STRING,rule_name STRING,rule_type_name
> > > > STRING,`result` INT,in_path BOOLEAN>"))
> > > > public static class FlatRowFunction extends TableFunction<Row> {
> > > >     private static final long serialVersionUID = 1L;
> > > >
> > > >     public void eval(Row[] rows) {
> > > >         for (Row row : rows) {
> > > >             collect(row);
> > > >         }
> > > >     }
> > > > }
> > > >
> > > > 异常如下:
> > > >
> > > > org.apache.flink.table.api.ValidationException: SQL validation
> failed.
> > > > From line 1, column 149 to line 1, column 174: No match found for
> > > > function signature
> > > > flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647) rule_id,
> > > > VARCHAR(2147483647) rule_name, VARCHAR(2147483647) rule_type_name,
> > > > INTEGER result, BOOLEAN in_path) ARRAY>)
> > > >
> > > >         at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> > > >
> > >
> >
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
> > > >         at
> > > >
> > >
> >
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
> > > >         at
> > > >
> > >
> >
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187)
> > > >         at
> > > >
> > >
> >
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> > > >         at
> > > >
> > >
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:658)
> > > >         at
> > > >
> > >
> >
> com.akulaku.data.flink.ParserDataTest.parserDataTest(ParserDataTest.java:60)
> > > >         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> > > >         at
> > > >
> > >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > > >         at
> > > >
> > >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > > >         at java.lang.reflect.Method.invoke(Method.java:498)
> > > >         at
> > > >
> > >
> >
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> > > >         at
> > > >
> > >
> >
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> > > >         at
> > > >
> > >
> >
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> > > >         at
> > > >
> > >
> >
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> > > >         at
> > > >
> > >
> >
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> > > >         at
> > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> > > >         at
> > > >
> > >
> >
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> > > >         at
> > > >
> > >
> >
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> > > >         at
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> > > >         at
> > > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> > > >         at
> > > > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> > > >         at
> > > org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> > > >         at
> > > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> > > >         at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> > > >         at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> > > >         at
> > > >
> > >
> >
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
> > > >         at
> > > >
> > >
> >
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
> > > >         at
> > > >
> > >
> >
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
> > > >         at
> > com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
> > > > Caused by: org.apache.calcite.runtime.CalciteContextException: From
> > > > line 1, column 149 to line 1, column 174: No match found for function
> > > > signature flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647)
> > > > rule_id, VARCHAR(2147483647) rule_name, VARCHAR(2147483647)
> > > > rule_type_name, INTEGER result, BOOLEAN in_path) ARRAY>)
> > > >         at
> > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> > > > Method)
> > > >         at
> > > >
> > >
> >
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> > > >         at
> > > >
> > >
> >
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> > > >         at
> > > java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> > > >         at
> > > >
> > >
> >
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
> > > >         at
> > > > org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
> > > >         at
> > > > org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
> > > >         at
> > > >
> > >
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
> > > >         at
> > > >
> > >
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882)
> > > >         at
> > > > org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305)
> > > >         at
> > > > org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218)
> > > >         at
> > > >
> > >
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
> > > >         at
> > > >
> > >
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
> > > >         at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
> > > >         at
> > > >
> > >
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
> > > >         at
> > > >
> > >
> >
> org.apache.calcite.sql.validate.ProcedureNamespace.validateImpl(ProcedureNamespace.java:57)
> > > >         at
> > > >
> > >
> >
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> > > >         at
> > > >
> > >
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
> > > >         at
> > > >
> > >
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
> > > >         at
> > > >
> > >
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256)
> > > >         at
> > > >
> > >
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238)
> > > >         at
> > > >
> > >
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3303)
> > > >         at
> > > >
> > >
> >
> org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86)
> > > >         at
> > > >
> > >
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247)
> > > >         at
> > > >
> > >
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510)
> > > >         at
> > > >
> > >
> >
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
> > > >         at
> > > >
> > >
> >
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> > > >         at
> > > >
> > >
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
> > > >         at
> > > >
> > >
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
> > > >         at
> > org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
> > > >         at
> > > >
> > >
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059)
> > > >         at
> > > >
> > >
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
> > > >         at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> > > >
> > >
> >
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141)
> > > >         ... 28 more
> > > > Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No
> > > > match found for function signature
> > > > flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647) rule_id,
> > > > VARCHAR(2147483647) rule_name, VARCHAR(2147483647) rule_type_name,
> > > > INTEGER result, BOOLEAN in_path) ARRAY>)
> > > >         at
> > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> > > > Method)
> > > >         at
> > > >
> > >
> >
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> > > >         at
> > > >
> > >
> >
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> > > >         at
> > > java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> > > >         at
> > > >
> > >
> >
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
> > > >         at
> > > > org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550)
> > > >         ... 56 more
> > > >
> > > >
> > > > godfrey he <[hidden email]> 于2020年7月21日周二 下午7:41写道:
> > > >
> > > > > 可以,定义清楚 getResultType 和 getParameterTypes, 可以参考[1]
> > > > >
> > > > > [1]
> > > > >
> > > > >
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide
> > > > >
> > > > >
> > > > > Dream-底限 <[hidden email]> 于2020年7月21日周二 下午7:25写道:
> > > > >
> > > > > > hi
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> 我这面在定义一个表函数,通过继承TableFunction完成操作,但是eval方法中的参数类型都是java基本类型(至少看到的demo都是如此),想问一下eval方法中可以传flink
> > > > > > 内部类型吗,比如说我想在eval()方法中传递Row类型要怎么操作,eval(Row row)
> > > > > >
> > > > >
> > > >
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>


--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re: flink1.11 tablefunction

Benchao Li-2
现在有一个work around,就是你可以用子查询先把row展开,比如:
select ...
from (
  select data.rule_results as rule_results, ...
) cross join unnest(rule_results) as t(...)

Benchao Li <[hidden email]> 于2020年7月23日周四 下午12:44写道:

> 我感觉这可能是calcite的bug,CC Danny老师
>
> Dream-底限 <[hidden email]> 于2020年7月22日周三 下午5:46写道:
>
>> hi 、Benchao Li
>> 我尝试了将数组打散的方式,但是报了一个莫名其妙的错误,可以帮忙看看嘛
>>
>> tableEnv.executeSql("CREATE TABLE parser_data_test (\n" +
>>         "  data ROW<flow_task_id BIGINT,flow_id STRING,flow_version
>> STRING,path STRING,country_id INT,create_time BIGINT," +
>>         "spent_time DECIMAL(10,2),features
>> ROW<`user_ic_no_aku_uid.pdl_cdpd`
>> STRING,`user_ic_no_aku_uid.pdl_current_unpay` INT," +
>>         "`user_ic_no_aku_uid.current_overdue_collection`
>> INT>,rule_results ARRAY<ROW<rule_id STRING,rule_name STRING," +
>>         "rule_type_name STRING,`result` INT,in_path BOOLEAN>>>,\n" +
>>         "  createTime BIGINT,\n" +
>>         "  tindex INT\n" +
>>         ") WITH (\n" +
>>         " 'connector' = 'kafka-0.11',\n" +
>>         " 'topic' = 'parser_data_test',\n" +
>>         " 'properties.bootstrap.servers' = 'localhost:9092',\n" +
>>         " 'properties.group.id' = 'testGroup',\n" +
>>         " 'scan.startup.mode' = 'earliest-offset',\n" +
>>         " 'format' = 'json',\n" +
>>         " 'json.fail-on-missing-field' = 'false',\n" +
>>         " 'json.ignore-parse-errors' = 'true'\n" +
>>         ")");
>>
>> Table table = tableEnv.sqlQuery("select
>>
>> data.flow_task_id,data.features.`user_ic_no_aku_uid.pdl_current_unpay`,rule_id,tindex
>> from parser_data_test CROSS JOIN UNNEST(data.rule_results) AS t
>> (rule_id,rule_name,rule_type_name,`result`,in_path)");
>>
>> table.printSchema();
>> tableEnv.toAppendStream(table,
>>
>> Types.ROW(TypeConversions.fromDataTypeToLegacyInfo(table.getSchema().getFieldDataTypes()))).print();
>>
>>
>> 异常信息:
>>
>> rg.apache.flink.table.api.ValidationException: SQL validation failed.
>> From line 0, column 0 to line 1, column 139: Column 'data.data' not
>> found in table 'parser_data_test'
>>
>>         at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
>>         at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
>>         at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185)
>>         at
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>>         at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664)
>>         at
>> com.akulaku.data.flink.ParserDataTest.parserDataTest(ParserDataTest.java:63)
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:498)
>>         at
>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>>         at
>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>>         at
>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>>         at
>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>>         at
>> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>>         at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>>         at
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>>         at
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>>         at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>>         at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>>         at
>> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>>         at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>>         at
>> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>>         at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>>         at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>>         at
>> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>>         at
>> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
>>         at
>> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
>>         at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
>> Caused by: org.apache.calcite.runtime.CalciteContextException: From
>> line 0, column 0 to line 1, column 139: Column 'data.data' not found
>> in table 'parser_data_test'
>>         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>> Method)
>>         at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>>         at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>         at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>         at
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
>>         at
>> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
>>         at
>> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
>>         at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
>>         at
>> org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:439)
>>         at
>> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5991)
>>         at
>> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5976)
>>         at
>> org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:321)
>>         at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5583)
>>         at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateUnnest(SqlValidatorImpl.java:3271)
>>         at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3253)
>>         at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238)
>>         at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3303)
>>         at
>> org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86)
>>         at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247)
>>         at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510)
>>         at
>> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>>         at
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>>         at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
>>         at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
>>         at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
>>         at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059)
>>         at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
>>         at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141)
>>         ... 28 more
>> Caused by: org.apache.calcite.sql.validate.SqlValidatorException:
>> Column 'data.data' not found in table 'parser_data_test'
>>         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>> Method)
>>         at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>>         at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>         at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>         at
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
>>         at
>> org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550)
>>         ... 51 more
>>
>>
>> Benchao Li <[hidden email]> 于2020年7月22日周三 下午2:05写道:
>>
>> > Hi,
>> > 如果只是想打平array,Flink有个内置的方法,可以参考[1]的”Expanding arrays into a relation“部分
>> >
>> > [1]
>> >
>> >
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins
>> >
>> > Jark Wu <[hidden email]> 于2020年7月22日周三 上午11:17写道:
>> >
>> > > Hi,
>> > >
>> > > Row[] 作为 eval 参数,目前还不支持。社区已经有一个 issue 在跟进支持这个功能:
>> > > https://issues.apache.org/jira/browse/FLINK-17855
>> > >
>> > >
>> > > Best,
>> > > Jark
>> > >
>> > > On Wed, 22 Jul 2020 at 10:45, Dream-底限 <[hidden email]> wrote:
>> > >
>> > > > hi,
>> > > >  我想将一个array<row>打散成多行,但是并没有成功
>> > > >
>> > > > @FunctionHint(input =@DataTypeHint("ARRAY<ROW<rule_id
>> STRING,rule_name
>> > > > STRING,rule_type_name STRING,`result` INT,in_path BOOLEAN>>")
>> ,output
>> > > > = @DataTypeHint("ROW<rule_id STRING,rule_name STRING,rule_type_name
>> > > > STRING,`result` INT,in_path BOOLEAN>"))
>> > > > public static class FlatRowFunction extends TableFunction<Row> {
>> > > >     private static final long serialVersionUID = 1L;
>> > > >
>> > > >     public void eval(Row[] rows) {
>> > > >         for (Row row : rows) {
>> > > >             collect(row);
>> > > >         }
>> > > >     }
>> > > > }
>> > > >
>> > > > 异常如下:
>> > > >
>> > > > org.apache.flink.table.api.ValidationException: SQL validation
>> failed.
>> > > > From line 1, column 149 to line 1, column 174: No match found for
>> > > > function signature
>> > > > flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647) rule_id,
>> > > > VARCHAR(2147483647) rule_name, VARCHAR(2147483647) rule_type_name,
>> > > > INTEGER result, BOOLEAN in_path) ARRAY>)
>> > > >
>> > > >         at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>> > > >
>> > >
>> >
>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
>> > > >         at
>> > > >
>> > >
>> >
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
>> > > >         at
>> > > >
>> > >
>> >
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187)
>> > > >         at
>> > > >
>> > >
>> >
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>> > > >         at
>> > > >
>> > >
>> >
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:658)
>> > > >         at
>> > > >
>> > >
>> >
>> com.akulaku.data.flink.ParserDataTest.parserDataTest(ParserDataTest.java:60)
>> > > >         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>> Method)
>> > > >         at
>> > > >
>> > >
>> >
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> > > >         at
>> > > >
>> > >
>> >
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> > > >         at java.lang.reflect.Method.invoke(Method.java:498)
>> > > >         at
>> > > >
>> > >
>> >
>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>> > > >         at
>> > > >
>> > >
>> >
>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>> > > >         at
>> > > >
>> > >
>> >
>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>> > > >         at
>> > > >
>> > >
>> >
>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>> > > >         at
>> > > >
>> > >
>> >
>> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>> > > >         at
>> > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>> > > >         at
>> > > >
>> > >
>> >
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>> > > >         at
>> > > >
>> > >
>> >
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>> > > >         at
>> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>> > > >         at
>> > > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>> > > >         at
>> > > > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>> > > >         at
>> > > org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>> > > >         at
>> > > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>> > > >         at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>> > > >         at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>> > > >         at
>> > > >
>> > >
>> >
>> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>> > > >         at
>> > > >
>> > >
>> >
>> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
>> > > >         at
>> > > >
>> > >
>> >
>> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
>> > > >         at
>> > com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
>> > > > Caused by: org.apache.calcite.runtime.CalciteContextException: From
>> > > > line 1, column 149 to line 1, column 174: No match found for
>> function
>> > > > signature flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647)
>> > > > rule_id, VARCHAR(2147483647) rule_name, VARCHAR(2147483647)
>> > > > rule_type_name, INTEGER result, BOOLEAN in_path) ARRAY>)
>> > > >         at
>> > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>> > > > Method)
>> > > >         at
>> > > >
>> > >
>> >
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> > > >         at
>> > > >
>> > >
>> >
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> > > >         at
>> > > java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> > > >         at
>> > > >
>> > >
>> >
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
>> > > >         at
>> > > > org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
>> > > >         at
>> > > > org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
>> > > >         at
>> > > >
>> > >
>> >
>> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
>> > > >         at
>> > > >
>> > >
>> >
>> org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882)
>> > > >         at
>> > > > org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305)
>> > > >         at
>> > > > org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218)
>> > > >         at
>> > > >
>> > >
>> >
>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
>> > > >         at
>> > > >
>> > >
>> >
>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
>> > > >         at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>> > > >         at
>> > > >
>> > >
>> >
>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
>> > > >         at
>> > > >
>> > >
>> >
>> org.apache.calcite.sql.validate.ProcedureNamespace.validateImpl(ProcedureNamespace.java:57)
>> > > >         at
>> > > >
>> > >
>> >
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>> > > >         at
>> > > >
>> > >
>> >
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
>> > > >         at
>> > > >
>> > >
>> >
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
>> > > >         at
>> > > >
>> > >
>> >
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256)
>> > > >         at
>> > > >
>> > >
>> >
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238)
>> > > >         at
>> > > >
>> > >
>> >
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3303)
>> > > >         at
>> > > >
>> > >
>> >
>> org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86)
>> > > >         at
>> > > >
>> > >
>> >
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247)
>> > > >         at
>> > > >
>> > >
>> >
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510)
>> > > >         at
>> > > >
>> > >
>> >
>> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>> > > >         at
>> > > >
>> > >
>> >
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>> > > >         at
>> > > >
>> > >
>> >
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
>> > > >         at
>> > > >
>> > >
>> >
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
>> > > >         at
>> > org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
>> > > >         at
>> > > >
>> > >
>> >
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059)
>> > > >         at
>> > > >
>> > >
>> >
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
>> > > >         at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>> > > >
>> > >
>> >
>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141)
>> > > >         ... 28 more
>> > > > Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No
>> > > > match found for function signature
>> > > > flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647) rule_id,
>> > > > VARCHAR(2147483647) rule_name, VARCHAR(2147483647) rule_type_name,
>> > > > INTEGER result, BOOLEAN in_path) ARRAY>)
>> > > >         at
>> > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>> > > > Method)
>> > > >         at
>> > > >
>> > >
>> >
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> > > >         at
>> > > >
>> > >
>> >
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> > > >         at
>> > > java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> > > >         at
>> > > >
>> > >
>> >
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
>> > > >         at
>> > > > org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550)
>> > > >         ... 56 more
>> > > >
>> > > >
>> > > > godfrey he <[hidden email]> 于2020年7月21日周二 下午7:41写道:
>> > > >
>> > > > > 可以,定义清楚 getResultType 和 getParameterTypes, 可以参考[1]
>> > > > >
>> > > > > [1]
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide
>> > > > >
>> > > > >
>> > > > > Dream-底限 <[hidden email]> 于2020年7月21日周二 下午7:25写道:
>> > > > >
>> > > > > > hi
>> > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> 我这面在定义一个表函数,通过继承TableFunction完成操作,但是eval方法中的参数类型都是java基本类型(至少看到的demo都是如此),想问一下eval方法中可以传flink
>> > > > > > 内部类型吗,比如说我想在eval()方法中传递Row类型要怎么操作,eval(Row row)
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> >
>> > --
>> >
>> > Best,
>> > Benchao Li
>> >
>>
>
>
> --
>
> Best,
> Benchao Li
>


--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re: flink1.11 tablefunction

Dream-底限
hi
这貌似确实是一个bug,先用子查询打开后程序就可以运行正常了

Benchao Li <[hidden email]> 于2020年7月23日周四 下午12:52写道:

> 现在有一个work around,就是你可以用子查询先把row展开,比如:
> select ...
> from (
>   select data.rule_results as rule_results, ...
> ) cross join unnest(rule_results) as t(...)
>
> Benchao Li <[hidden email]> 于2020年7月23日周四 下午12:44写道:
>
> > 我感觉这可能是calcite的bug,CC Danny老师
> >
> > Dream-底限 <[hidden email]> 于2020年7月22日周三 下午5:46写道:
> >
> >> hi 、Benchao Li
> >> 我尝试了将数组打散的方式,但是报了一个莫名其妙的错误,可以帮忙看看嘛
> >>
> >> tableEnv.executeSql("CREATE TABLE parser_data_test (\n" +
> >>         "  data ROW<flow_task_id BIGINT,flow_id STRING,flow_version
> >> STRING,path STRING,country_id INT,create_time BIGINT," +
> >>         "spent_time DECIMAL(10,2),features
> >> ROW<`user_ic_no_aku_uid.pdl_cdpd`
> >> STRING,`user_ic_no_aku_uid.pdl_current_unpay` INT," +
> >>         "`user_ic_no_aku_uid.current_overdue_collection`
> >> INT>,rule_results ARRAY<ROW<rule_id STRING,rule_name STRING," +
> >>         "rule_type_name STRING,`result` INT,in_path BOOLEAN>>>,\n" +
> >>         "  createTime BIGINT,\n" +
> >>         "  tindex INT\n" +
> >>         ") WITH (\n" +
> >>         " 'connector' = 'kafka-0.11',\n" +
> >>         " 'topic' = 'parser_data_test',\n" +
> >>         " 'properties.bootstrap.servers' = 'localhost:9092',\n" +
> >>         " 'properties.group.id' = 'testGroup',\n" +
> >>         " 'scan.startup.mode' = 'earliest-offset',\n" +
> >>         " 'format' = 'json',\n" +
> >>         " 'json.fail-on-missing-field' = 'false',\n" +
> >>         " 'json.ignore-parse-errors' = 'true'\n" +
> >>         ")");
> >>
> >> Table table = tableEnv.sqlQuery("select
> >>
> >>
> data.flow_task_id,data.features.`user_ic_no_aku_uid.pdl_current_unpay`,rule_id,tindex
> >> from parser_data_test CROSS JOIN UNNEST(data.rule_results) AS t
> >> (rule_id,rule_name,rule_type_name,`result`,in_path)");
> >>
> >> table.printSchema();
> >> tableEnv.toAppendStream(table,
> >>
> >>
> Types.ROW(TypeConversions.fromDataTypeToLegacyInfo(table.getSchema().getFieldDataTypes()))).print();
> >>
> >>
> >> 异常信息:
> >>
> >> rg.apache.flink.table.api.ValidationException: SQL validation failed.
> >> From line 0, column 0 to line 1, column 139: Column 'data.data' not
> >> found in table 'parser_data_test'
> >>
> >>         at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> >>
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
> >>         at
> >>
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
> >>         at
> >>
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185)
> >>         at
> >>
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> >>         at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664)
> >>         at
> >>
> com.akulaku.data.flink.ParserDataTest.parserDataTest(ParserDataTest.java:63)
> >>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >>         at
> >>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >>         at
> >>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >>         at java.lang.reflect.Method.invoke(Method.java:498)
> >>         at
> >>
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> >>         at
> >>
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> >>         at
> >>
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> >>         at
> >>
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> >>         at
> >>
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> >>         at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> >>         at
> >>
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> >>         at
> >>
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> >>         at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> >>         at
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> >>         at
> >> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> >>         at
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> >>         at
> >> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> >>         at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> >>         at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> >>         at
> >>
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
> >>         at
> >>
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
> >>         at
> >>
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
> >>         at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
> >> Caused by: org.apache.calcite.runtime.CalciteContextException: From
> >> line 0, column 0 to line 1, column 139: Column 'data.data' not found
> >> in table 'parser_data_test'
> >>         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> >> Method)
> >>         at
> >>
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> >>         at
> >>
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> >>         at
> java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> >>         at
> >>
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
> >>         at
> >> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
> >>         at
> >> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
> >>         at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
> >>         at
> >>
> org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:439)
> >>         at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5991)
> >>         at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5976)
> >>         at
> >> org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:321)
> >>         at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5583)
> >>         at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateUnnest(SqlValidatorImpl.java:3271)
> >>         at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3253)
> >>         at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238)
> >>         at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3303)
> >>         at
> >>
> org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86)
> >>         at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247)
> >>         at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510)
> >>         at
> >>
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
> >>         at
> >>
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> >>         at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
> >>         at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
> >>         at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
> >>         at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059)
> >>         at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
> >>         at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> >>
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141)
> >>         ... 28 more
> >> Caused by: org.apache.calcite.sql.validate.SqlValidatorException:
> >> Column 'data.data' not found in table 'parser_data_test'
> >>         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> >> Method)
> >>         at
> >>
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> >>         at
> >>
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> >>         at
> java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> >>         at
> >>
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
> >>         at
> >> org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550)
> >>         ... 51 more
> >>
> >>
> >> Benchao Li <[hidden email]> 于2020年7月22日周三 下午2:05写道:
> >>
> >> > Hi,
> >> > 如果只是想打平array,Flink有个内置的方法,可以参考[1]的”Expanding arrays into a relation“部分
> >> >
> >> > [1]
> >> >
> >> >
> >>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins
> >> >
> >> > Jark Wu <[hidden email]> 于2020年7月22日周三 上午11:17写道:
> >> >
> >> > > Hi,
> >> > >
> >> > > Row[] 作为 eval 参数,目前还不支持。社区已经有一个 issue 在跟进支持这个功能:
> >> > > https://issues.apache.org/jira/browse/FLINK-17855
> >> > >
> >> > >
> >> > > Best,
> >> > > Jark
> >> > >
> >> > > On Wed, 22 Jul 2020 at 10:45, Dream-底限 <[hidden email]> wrote:
> >> > >
> >> > > > hi,
> >> > > >  我想将一个array<row>打散成多行,但是并没有成功
> >> > > >
> >> > > > @FunctionHint(input =@DataTypeHint("ARRAY<ROW<rule_id
> >> STRING,rule_name
> >> > > > STRING,rule_type_name STRING,`result` INT,in_path BOOLEAN>>")
> >> ,output
> >> > > > = @DataTypeHint("ROW<rule_id STRING,rule_name
> STRING,rule_type_name
> >> > > > STRING,`result` INT,in_path BOOLEAN>"))
> >> > > > public static class FlatRowFunction extends TableFunction<Row> {
> >> > > >     private static final long serialVersionUID = 1L;
> >> > > >
> >> > > >     public void eval(Row[] rows) {
> >> > > >         for (Row row : rows) {
> >> > > >             collect(row);
> >> > > >         }
> >> > > >     }
> >> > > > }
> >> > > >
> >> > > > 异常如下:
> >> > > >
> >> > > > org.apache.flink.table.api.ValidationException: SQL validation
> >> failed.
> >> > > > From line 1, column 149 to line 1, column 174: No match found for
> >> > > > function signature
> >> > > > flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647) rule_id,
> >> > > > VARCHAR(2147483647) rule_name, VARCHAR(2147483647) rule_type_name,
> >> > > > INTEGER result, BOOLEAN in_path) ARRAY>)
> >> > > >
> >> > > >         at
> >> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> >> > > >
> >> > >
> >> >
> >>
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
> >> > > >         at
> >> > > >
> >> > >
> >> >
> >>
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
> >> > > >         at
> >> > > >
> >> > >
> >> >
> >>
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187)
> >> > > >         at
> >> > > >
> >> > >
> >> >
> >>
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> >> > > >         at
> >> > > >
> >> > >
> >> >
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:658)
> >> > > >         at
> >> > > >
> >> > >
> >> >
> >>
> com.akulaku.data.flink.ParserDataTest.parserDataTest(ParserDataTest.java:60)
> >> > > >         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> >> Method)
> >> > > >         at
> >> > > >
> >> > >
> >> >
> >>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >> > > >         at
> >> > > >
> >> > >
> >> >
> >>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> > > >         at java.lang.reflect.Method.invoke(Method.java:498)
> >> > > >         at
> >> > > >
> >> > >
> >> >
> >>
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> >> > > >         at
> >> > > >
> >> > >
> >> >
> >>
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> >> > > >         at
> >> > > >
> >> > >
> >> >
> >>
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> >> > > >         at
> >> > > >
> >> > >
> >> >
> >>
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> >> > > >         at
> >> > > >
> >> > >
> >> >
> >>
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> >> > > >         at
> >> > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> >> > > >         at
> >> > > >
> >> > >
> >> >
> >>
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> >> > > >         at
> >> > > >
> >> > >
> >> >
> >>
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> >> > > >         at
> >> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> >> > > >         at
> >> > > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> >> > > >         at
> >> > > > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> >> > > >         at
> >> > > org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> >> > > >         at
> >> > > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> >> > > >         at
> org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> >> > > >         at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> >> > > >         at
> >> > > >
> >> > >
> >> >
> >>
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
> >> > > >         at
> >> > > >
> >> > >
> >> >
> >>
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
> >> > > >         at
> >> > > >
> >> > >
> >> >
> >>
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
> >> > > >         at
> >> > com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
> >> > > > Caused by: org.apache.calcite.runtime.CalciteContextException:
> From
> >> > > > line 1, column 149 to line 1, column 174: No match found for
> >> function
> >> > > > signature flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647)
> >> > > > rule_id, VARCHAR(2147483647) rule_name, VARCHAR(2147483647)
> >> > > > rule_type_name, INTEGER result, BOOLEAN in_path) ARRAY>)
> >> > > >         at
> >> > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> >> > > > Method)
> >> > > >         at
> >> > > >
> >> > >
> >> >
> >>
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> >> > > >         at
> >> > > >
> >> > >
> >> >
> >>
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> >> > > >         at
> >> > > java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> >> > > >         at
> >> > > >
> >> > >
> >> >
> >>
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
> >> > > >         at
> >> > > >
> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
> >> > > >         at
> >> > > >
> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
> >> > > >         at
> >> > > >
> >> > >
> >> >
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
> >> > > >         at
> >> > > >
> >> > >
> >> >
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882)
> >> > > >         at
> >> > > >
> org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305)
> >> > > >         at
> >> > > >
> org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218)
> >> > > >         at
> >> > > >
> >> > >
> >> >
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
> >> > > >         at
> >> > > >
> >> > >
> >> >
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
> >> > > >         at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
> >> > > >         at
> >> > > >
> >> > >
> >> >
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
> >> > > >         at
> >> > > >
> >> > >
> >> >
> >>
> org.apache.calcite.sql.validate.ProcedureNamespace.validateImpl(ProcedureNamespace.java:57)
> >> > > >         at
> >> > > >
> >> > >
> >> >
> >>
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> >> > > >         at
> >> > > >
> >> > >
> >> >
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
> >> > > >         at
> >> > > >
> >> > >
> >> >
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
> >> > > >         at
> >> > > >
> >> > >
> >> >
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256)
> >> > > >         at
> >> > > >
> >> > >
> >> >
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238)
> >> > > >         at
> >> > > >
> >> > >
> >> >
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3303)
> >> > > >         at
> >> > > >
> >> > >
> >> >
> >>
> org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86)
> >> > > >         at
> >> > > >
> >> > >
> >> >
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247)
> >> > > >         at
> >> > > >
> >> > >
> >> >
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510)
> >> > > >         at
> >> > > >
> >> > >
> >> >
> >>
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
> >> > > >         at
> >> > > >
> >> > >
> >> >
> >>
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> >> > > >         at
> >> > > >
> >> > >
> >> >
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
> >> > > >         at
> >> > > >
> >> > >
> >> >
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
> >> > > >         at
> >> > org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
> >> > > >         at
> >> > > >
> >> > >
> >> >
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059)
> >> > > >         at
> >> > > >
> >> > >
> >> >
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
> >> > > >         at
> >> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> >> > > >
> >> > >
> >> >
> >>
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141)
> >> > > >         ... 28 more
> >> > > > Caused by: org.apache.calcite.sql.validate.SqlValidatorException:
> No
> >> > > > match found for function signature
> >> > > > flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647) rule_id,
> >> > > > VARCHAR(2147483647) rule_name, VARCHAR(2147483647) rule_type_name,
> >> > > > INTEGER result, BOOLEAN in_path) ARRAY>)
> >> > > >         at
> >> > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> >> > > > Method)
> >> > > >         at
> >> > > >
> >> > >
> >> >
> >>
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> >> > > >         at
> >> > > >
> >> > >
> >> >
> >>
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> >> > > >         at
> >> > > java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> >> > > >         at
> >> > > >
> >> > >
> >> >
> >>
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
> >> > > >         at
> >> > > > org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550)
> >> > > >         ... 56 more
> >> > > >
> >> > > >
> >> > > > godfrey he <[hidden email]> 于2020年7月21日周二 下午7:41写道:
> >> > > >
> >> > > > > 可以,定义清楚 getResultType 和 getParameterTypes, 可以参考[1]
> >> > > > >
> >> > > > > [1]
> >> > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide
> >> > > > >
> >> > > > >
> >> > > > > Dream-底限 <[hidden email]> 于2020年7月21日周二 下午7:25写道:
> >> > > > >
> >> > > > > > hi
> >> > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> 我这面在定义一个表函数,通过继承TableFunction完成操作,但是eval方法中的参数类型都是java基本类型(至少看到的demo都是如此),想问一下eval方法中可以传flink
> >> > > > > > 内部类型吗,比如说我想在eval()方法中传递Row类型要怎么操作,eval(Row row)
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >> >
> >> > --
> >> >
> >> > Best,
> >> > Benchao Li
> >> >
> >>
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>
>
> --
>
> Best,
> Benchao Li
>