hi
我这面在定义一个表函数,通过继承TableFunction完成操作,但是eval方法中的参数类型都是java基本类型(至少看到的demo都是如此),想问一下eval方法中可以传flink 内部类型吗,比如说我想在eval()方法中传递Row类型要怎么操作,eval(Row row) |
可以,定义清楚 getResultType 和 getParameterTypes, 可以参考[1]
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide Dream-底限 <[hidden email]> 于2020年7月21日周二 下午7:25写道: > hi > > 我这面在定义一个表函数,通过继承TableFunction完成操作,但是eval方法中的参数类型都是java基本类型(至少看到的demo都是如此),想问一下eval方法中可以传flink > 内部类型吗,比如说我想在eval()方法中传递Row类型要怎么操作,eval(Row row) > |
hi,
我想将一个array<row>打散成多行,但是并没有成功 @FunctionHint(input =@DataTypeHint("ARRAY<ROW<rule_id STRING,rule_name STRING,rule_type_name STRING,`result` INT,in_path BOOLEAN>>") ,output = @DataTypeHint("ROW<rule_id STRING,rule_name STRING,rule_type_name STRING,`result` INT,in_path BOOLEAN>")) public static class FlatRowFunction extends TableFunction<Row> { private static final long serialVersionUID = 1L; public void eval(Row[] rows) { for (Row row : rows) { collect(row); } } } 异常如下: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 149 to line 1, column 174: No match found for function signature flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647) rule_id, VARCHAR(2147483647) rule_name, VARCHAR(2147483647) rule_type_name, INTEGER result, BOOLEAN in_path) ARRAY>) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:658) at com.akulaku.data.flink.ParserDataTest.parserDataTest(ParserDataTest.java:60) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230) at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58) Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 149 to line 1, column 174: No match found for function signature flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647) rule_id, VARCHAR(2147483647) rule_name, VARCHAR(2147483647) rule_type_name, INTEGER result, BOOLEAN in_path) ARRAY>) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089) at org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800) at org.apache.calcite.sql.validate.ProcedureNamespace.validateImpl(ProcedureNamespace.java:57) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3303) at org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510) at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059) at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141) ... 28 more Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No match found for function signature flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647) rule_id, VARCHAR(2147483647) rule_name, VARCHAR(2147483647) rule_type_name, INTEGER result, BOOLEAN in_path) ARRAY>) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550) ... 56 more godfrey he <[hidden email]> 于2020年7月21日周二 下午7:41写道: > 可以,定义清楚 getResultType 和 getParameterTypes, 可以参考[1] > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide > > > Dream-底限 <[hidden email]> 于2020年7月21日周二 下午7:25写道: > > > hi > > > > > 我这面在定义一个表函数,通过继承TableFunction完成操作,但是eval方法中的参数类型都是java基本类型(至少看到的demo都是如此),想问一下eval方法中可以传flink > > 内部类型吗,比如说我想在eval()方法中传递Row类型要怎么操作,eval(Row row) > > > |
Administrator
|
Hi,
Row[] 作为 eval 参数,目前还不支持。社区已经有一个 issue 在跟进支持这个功能: https://issues.apache.org/jira/browse/FLINK-17855 Best, Jark On Wed, 22 Jul 2020 at 10:45, Dream-底限 <[hidden email]> wrote: > hi, > 我想将一个array<row>打散成多行,但是并没有成功 > > @FunctionHint(input =@DataTypeHint("ARRAY<ROW<rule_id STRING,rule_name > STRING,rule_type_name STRING,`result` INT,in_path BOOLEAN>>") ,output > = @DataTypeHint("ROW<rule_id STRING,rule_name STRING,rule_type_name > STRING,`result` INT,in_path BOOLEAN>")) > public static class FlatRowFunction extends TableFunction<Row> { > private static final long serialVersionUID = 1L; > > public void eval(Row[] rows) { > for (Row row : rows) { > collect(row); > } > } > } > > 异常如下: > > org.apache.flink.table.api.ValidationException: SQL validation failed. > From line 1, column 149 to line 1, column 174: No match found for > function signature > flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647) rule_id, > VARCHAR(2147483647) rule_name, VARCHAR(2147483647) rule_type_name, > INTEGER result, BOOLEAN in_path) ARRAY>) > > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:658) > at > com.akulaku.data.flink.ParserDataTest.parserDataTest(ParserDataTest.java:60) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) > at > com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230) > at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58) > Caused by: org.apache.calcite.runtime.CalciteContextException: From > line 1, column 149 to line 1, column 174: No match found for function > signature flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647) > rule_id, VARCHAR(2147483647) rule_name, VARCHAR(2147483647) > rule_type_name, INTEGER result, BOOLEAN in_path) ARRAY>) > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) > at > org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839) > at > org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882) > at > org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305) > at > org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845) > at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800) > at > org.apache.calcite.sql.validate.ProcedureNamespace.validateImpl(ProcedureNamespace.java:57) > at > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3303) > at > org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510) > at > org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) > at > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) > at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766) > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141) > ... 28 more > Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No > match found for function signature > flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647) rule_id, > VARCHAR(2147483647) rule_name, VARCHAR(2147483647) rule_type_name, > INTEGER result, BOOLEAN in_path) ARRAY>) > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) > at > org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550) > ... 56 more > > > godfrey he <[hidden email]> 于2020年7月21日周二 下午7:41写道: > > > 可以,定义清楚 getResultType 和 getParameterTypes, 可以参考[1] > > > > [1] > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide > > > > > > Dream-底限 <[hidden email]> 于2020年7月21日周二 下午7:25写道: > > > > > hi > > > > > > > > > 我这面在定义一个表函数,通过继承TableFunction完成操作,但是eval方法中的参数类型都是java基本类型(至少看到的demo都是如此),想问一下eval方法中可以传flink > > > 内部类型吗,比如说我想在eval()方法中传递Row类型要怎么操作,eval(Row row) > > > > > > |
Hi,
如果只是想打平array,Flink有个内置的方法,可以参考[1]的”Expanding arrays into a relation“部分 [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins Jark Wu <[hidden email]> 于2020年7月22日周三 上午11:17写道: > Hi, > > Row[] 作为 eval 参数,目前还不支持。社区已经有一个 issue 在跟进支持这个功能: > https://issues.apache.org/jira/browse/FLINK-17855 > > > Best, > Jark > > On Wed, 22 Jul 2020 at 10:45, Dream-底限 <[hidden email]> wrote: > > > hi, > > 我想将一个array<row>打散成多行,但是并没有成功 > > > > @FunctionHint(input =@DataTypeHint("ARRAY<ROW<rule_id STRING,rule_name > > STRING,rule_type_name STRING,`result` INT,in_path BOOLEAN>>") ,output > > = @DataTypeHint("ROW<rule_id STRING,rule_name STRING,rule_type_name > > STRING,`result` INT,in_path BOOLEAN>")) > > public static class FlatRowFunction extends TableFunction<Row> { > > private static final long serialVersionUID = 1L; > > > > public void eval(Row[] rows) { > > for (Row row : rows) { > > collect(row); > > } > > } > > } > > > > 异常如下: > > > > org.apache.flink.table.api.ValidationException: SQL validation failed. > > From line 1, column 149 to line 1, column 174: No match found for > > function signature > > flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647) rule_id, > > VARCHAR(2147483647) rule_name, VARCHAR(2147483647) rule_type_name, > > INTEGER result, BOOLEAN in_path) ARRAY>) > > > > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > > > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) > > at > > > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) > > at > > > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187) > > at > > > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) > > at > > > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:658) > > at > > > com.akulaku.data.flink.ParserDataTest.parserDataTest(ParserDataTest.java:60) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > at > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:498) > > at > > > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > > at > > > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > > at > > > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > > at > > > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > > at > > > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > > at > > > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > > at > > > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > > at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > > at > > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > > at > org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > > at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > > at > > > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > > at > > > com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) > > at > > > com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230) > > at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58) > > Caused by: org.apache.calcite.runtime.CalciteContextException: From > > line 1, column 149 to line 1, column 174: No match found for function > > signature flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647) > > rule_id, VARCHAR(2147483647) rule_name, VARCHAR(2147483647) > > rule_type_name, INTEGER result, BOOLEAN in_path) ARRAY>) > > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > > Method) > > at > > > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > > at > > > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > > at > java.lang.reflect.Constructor.newInstance(Constructor.java:423) > > at > > > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) > > at > > org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839) > > at > > org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) > > at > > > org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089) > > at > > > org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882) > > at > > org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305) > > at > > org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218) > > at > > > org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858) > > at > > > org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845) > > at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) > > at > > > org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800) > > at > > > org.apache.calcite.sql.validate.ProcedureNamespace.validateImpl(ProcedureNamespace.java:57) > > at > > > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > > at > > > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) > > at > > > org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) > > at > > > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256) > > at > > > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238) > > at > > > org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3303) > > at > > > org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86) > > at > > > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247) > > at > > > org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510) > > at > > > org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) > > at > > > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > > at > > > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) > > at > > > org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) > > at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) > > at > > > org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059) > > at > > > org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766) > > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > > > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141) > > ... 28 more > > Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No > > match found for function signature > > flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647) rule_id, > > VARCHAR(2147483647) rule_name, VARCHAR(2147483647) rule_type_name, > > INTEGER result, BOOLEAN in_path) ARRAY>) > > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > > Method) > > at > > > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > > at > > > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > > at > java.lang.reflect.Constructor.newInstance(Constructor.java:423) > > at > > > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) > > at > > org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550) > > ... 56 more > > > > > > godfrey he <[hidden email]> 于2020年7月21日周二 下午7:41写道: > > > > > 可以,定义清楚 getResultType 和 getParameterTypes, 可以参考[1] > > > > > > [1] > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide > > > > > > > > > Dream-底限 <[hidden email]> 于2020年7月21日周二 下午7:25写道: > > > > > > > hi > > > > > > > > > > > > > > 我这面在定义一个表函数,通过继承TableFunction完成操作,但是eval方法中的参数类型都是java基本类型(至少看到的demo都是如此),想问一下eval方法中可以传flink > > > > 内部类型吗,比如说我想在eval()方法中传递Row类型要怎么操作,eval(Row row) > > > > > > > > > > -- Best, Benchao Li |
hi 、Benchao Li
我尝试了将数组打散的方式,但是报了一个莫名其妙的错误,可以帮忙看看嘛 tableEnv.executeSql("CREATE TABLE parser_data_test (\n" + " data ROW<flow_task_id BIGINT,flow_id STRING,flow_version STRING,path STRING,country_id INT,create_time BIGINT," + "spent_time DECIMAL(10,2),features ROW<`user_ic_no_aku_uid.pdl_cdpd` STRING,`user_ic_no_aku_uid.pdl_current_unpay` INT," + "`user_ic_no_aku_uid.current_overdue_collection` INT>,rule_results ARRAY<ROW<rule_id STRING,rule_name STRING," + "rule_type_name STRING,`result` INT,in_path BOOLEAN>>>,\n" + " createTime BIGINT,\n" + " tindex INT\n" + ") WITH (\n" + " 'connector' = 'kafka-0.11',\n" + " 'topic' = 'parser_data_test',\n" + " 'properties.bootstrap.servers' = 'localhost:9092',\n" + " 'properties.group.id' = 'testGroup',\n" + " 'scan.startup.mode' = 'earliest-offset',\n" + " 'format' = 'json',\n" + " 'json.fail-on-missing-field' = 'false',\n" + " 'json.ignore-parse-errors' = 'true'\n" + ")"); Table table = tableEnv.sqlQuery("select data.flow_task_id,data.features.`user_ic_no_aku_uid.pdl_current_unpay`,rule_id,tindex from parser_data_test CROSS JOIN UNNEST(data.rule_results) AS t (rule_id,rule_name,rule_type_name,`result`,in_path)"); table.printSchema(); tableEnv.toAppendStream(table, Types.ROW(TypeConversions.fromDataTypeToLegacyInfo(table.getSchema().getFieldDataTypes()))).print(); 异常信息: rg.apache.flink.table.api.ValidationException: SQL validation failed. From line 0, column 0 to line 1, column 139: Column 'data.data' not found in table 'parser_data_test' at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664) at com.akulaku.data.flink.ParserDataTest.parserDataTest(ParserDataTest.java:63) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230) at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58) Caused by: org.apache.calcite.runtime.CalciteContextException: From line 0, column 0 to line 1, column 139: Column 'data.data' not found in table 'parser_data_test' at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089) at org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:439) at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5991) at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5976) at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:321) at org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5583) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateUnnest(SqlValidatorImpl.java:3271) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3253) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3303) at org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510) at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059) at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141) ... 28 more Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column 'data.data' not found in table 'parser_data_test' at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550) ... 51 more Benchao Li <[hidden email]> 于2020年7月22日周三 下午2:05写道: > Hi, > 如果只是想打平array,Flink有个内置的方法,可以参考[1]的”Expanding arrays into a relation“部分 > > [1] > > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins > > Jark Wu <[hidden email]> 于2020年7月22日周三 上午11:17写道: > > > Hi, > > > > Row[] 作为 eval 参数,目前还不支持。社区已经有一个 issue 在跟进支持这个功能: > > https://issues.apache.org/jira/browse/FLINK-17855 > > > > > > Best, > > Jark > > > > On Wed, 22 Jul 2020 at 10:45, Dream-底限 <[hidden email]> wrote: > > > > > hi, > > > 我想将一个array<row>打散成多行,但是并没有成功 > > > > > > @FunctionHint(input =@DataTypeHint("ARRAY<ROW<rule_id STRING,rule_name > > > STRING,rule_type_name STRING,`result` INT,in_path BOOLEAN>>") ,output > > > = @DataTypeHint("ROW<rule_id STRING,rule_name STRING,rule_type_name > > > STRING,`result` INT,in_path BOOLEAN>")) > > > public static class FlatRowFunction extends TableFunction<Row> { > > > private static final long serialVersionUID = 1L; > > > > > > public void eval(Row[] rows) { > > > for (Row row : rows) { > > > collect(row); > > > } > > > } > > > } > > > > > > 异常如下: > > > > > > org.apache.flink.table.api.ValidationException: SQL validation failed. > > > From line 1, column 149 to line 1, column 174: No match found for > > > function signature > > > flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647) rule_id, > > > VARCHAR(2147483647) rule_name, VARCHAR(2147483647) rule_type_name, > > > INTEGER result, BOOLEAN in_path) ARRAY>) > > > > > > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > > > > > > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) > > > at > > > > > > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) > > > at > > > > > > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187) > > > at > > > > > > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) > > > at > > > > > > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:658) > > > at > > > > > > com.akulaku.data.flink.ParserDataTest.parserDataTest(ParserDataTest.java:60) > > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > > at > > > > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > > at > > > > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > > at java.lang.reflect.Method.invoke(Method.java:498) > > > at > > > > > > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > > > at > > > > > > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > > > at > > > > > > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > > > at > > > > > > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > > > at > > > > > > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > > > at > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > > > at > > > > > > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > > > at > > > > > > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > > > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > > > at > > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > > > at > > > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > > > at > > org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > > > at > > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > > > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > > > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > > > at > > > > > > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > > > at > > > > > > com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) > > > at > > > > > > com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230) > > > at > com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58) > > > Caused by: org.apache.calcite.runtime.CalciteContextException: From > > > line 1, column 149 to line 1, column 174: No match found for function > > > signature flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647) > > > rule_id, VARCHAR(2147483647) rule_name, VARCHAR(2147483647) > > > rule_type_name, INTEGER result, BOOLEAN in_path) ARRAY>) > > > at > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > > > Method) > > > at > > > > > > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > > > at > > > > > > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > > > at > > java.lang.reflect.Constructor.newInstance(Constructor.java:423) > > > at > > > > > > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) > > > at > > > org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839) > > > at > > > org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) > > > at > > > > > > org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089) > > > at > > > > > > org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882) > > > at > > > org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305) > > > at > > > org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218) > > > at > > > > > > org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858) > > > at > > > > > > org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845) > > > at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) > > > at > > > > > > org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800) > > > at > > > > > > org.apache.calcite.sql.validate.ProcedureNamespace.validateImpl(ProcedureNamespace.java:57) > > > at > > > > > > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > > > at > > > > > > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) > > > at > > > > > > org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) > > > at > > > > > > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256) > > > at > > > > > > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238) > > > at > > > > > > org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3303) > > > at > > > > > > org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86) > > > at > > > > > > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247) > > > at > > > > > > org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510) > > > at > > > > > > org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) > > > at > > > > > > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > > > at > > > > > > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) > > > at > > > > > > org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) > > > at > org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) > > > at > > > > > > org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059) > > > at > > > > > > org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766) > > > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > > > > > > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141) > > > ... 28 more > > > Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No > > > match found for function signature > > > flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647) rule_id, > > > VARCHAR(2147483647) rule_name, VARCHAR(2147483647) rule_type_name, > > > INTEGER result, BOOLEAN in_path) ARRAY>) > > > at > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > > > Method) > > > at > > > > > > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > > > at > > > > > > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > > > at > > java.lang.reflect.Constructor.newInstance(Constructor.java:423) > > > at > > > > > > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) > > > at > > > org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550) > > > ... 56 more > > > > > > > > > godfrey he <[hidden email]> 于2020年7月21日周二 下午7:41写道: > > > > > > > 可以,定义清楚 getResultType 和 getParameterTypes, 可以参考[1] > > > > > > > > [1] > > > > > > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide > > > > > > > > > > > > Dream-底限 <[hidden email]> 于2020年7月21日周二 下午7:25写道: > > > > > > > > > hi > > > > > > > > > > > > > > > > > > > > 我这面在定义一个表函数,通过继承TableFunction完成操作,但是eval方法中的参数类型都是java基本类型(至少看到的demo都是如此),想问一下eval方法中可以传flink > > > > > 内部类型吗,比如说我想在eval()方法中传递Row类型要怎么操作,eval(Row row) > > > > > > > > > > > > > > > > > -- > > Best, > Benchao Li > |
我感觉这可能是calcite的bug,CC Danny老师
Dream-底限 <[hidden email]> 于2020年7月22日周三 下午5:46写道: > hi 、Benchao Li > 我尝试了将数组打散的方式,但是报了一个莫名其妙的错误,可以帮忙看看嘛 > > tableEnv.executeSql("CREATE TABLE parser_data_test (\n" + > " data ROW<flow_task_id BIGINT,flow_id STRING,flow_version > STRING,path STRING,country_id INT,create_time BIGINT," + > "spent_time DECIMAL(10,2),features > ROW<`user_ic_no_aku_uid.pdl_cdpd` > STRING,`user_ic_no_aku_uid.pdl_current_unpay` INT," + > "`user_ic_no_aku_uid.current_overdue_collection` > INT>,rule_results ARRAY<ROW<rule_id STRING,rule_name STRING," + > "rule_type_name STRING,`result` INT,in_path BOOLEAN>>>,\n" + > " createTime BIGINT,\n" + > " tindex INT\n" + > ") WITH (\n" + > " 'connector' = 'kafka-0.11',\n" + > " 'topic' = 'parser_data_test',\n" + > " 'properties.bootstrap.servers' = 'localhost:9092',\n" + > " 'properties.group.id' = 'testGroup',\n" + > " 'scan.startup.mode' = 'earliest-offset',\n" + > " 'format' = 'json',\n" + > " 'json.fail-on-missing-field' = 'false',\n" + > " 'json.ignore-parse-errors' = 'true'\n" + > ")"); > > Table table = tableEnv.sqlQuery("select > > data.flow_task_id,data.features.`user_ic_no_aku_uid.pdl_current_unpay`,rule_id,tindex > from parser_data_test CROSS JOIN UNNEST(data.rule_results) AS t > (rule_id,rule_name,rule_type_name,`result`,in_path)"); > > table.printSchema(); > tableEnv.toAppendStream(table, > > Types.ROW(TypeConversions.fromDataTypeToLegacyInfo(table.getSchema().getFieldDataTypes()))).print(); > > > 异常信息: > > rg.apache.flink.table.api.ValidationException: SQL validation failed. > From line 0, column 0 to line 1, column 139: Column 'data.data' not > found in table 'parser_data_test' > > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664) > at > com.akulaku.data.flink.ParserDataTest.parserDataTest(ParserDataTest.java:63) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) > at > com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230) > at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58) > Caused by: org.apache.calcite.runtime.CalciteContextException: From > line 0, column 0 to line 1, column 139: Column 'data.data' not found > in table 'parser_data_test' > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) > at > org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839) > at > org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089) > at > org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:439) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5991) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5976) > at > org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:321) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5583) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateUnnest(SqlValidatorImpl.java:3271) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3253) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3303) > at > org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510) > at > org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) > at > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) > at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766) > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141) > ... 28 more > Caused by: org.apache.calcite.sql.validate.SqlValidatorException: > Column 'data.data' not found in table 'parser_data_test' > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) > at > org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550) > ... 51 more > > > Benchao Li <[hidden email]> 于2020年7月22日周三 下午2:05写道: > > > Hi, > > 如果只是想打平array,Flink有个内置的方法,可以参考[1]的”Expanding arrays into a relation“部分 > > > > [1] > > > > > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins > > > > Jark Wu <[hidden email]> 于2020年7月22日周三 上午11:17写道: > > > > > Hi, > > > > > > Row[] 作为 eval 参数,目前还不支持。社区已经有一个 issue 在跟进支持这个功能: > > > https://issues.apache.org/jira/browse/FLINK-17855 > > > > > > > > > Best, > > > Jark > > > > > > On Wed, 22 Jul 2020 at 10:45, Dream-底限 <[hidden email]> wrote: > > > > > > > hi, > > > > 我想将一个array<row>打散成多行,但是并没有成功 > > > > > > > > @FunctionHint(input =@DataTypeHint("ARRAY<ROW<rule_id > STRING,rule_name > > > > STRING,rule_type_name STRING,`result` INT,in_path BOOLEAN>>") ,output > > > > = @DataTypeHint("ROW<rule_id STRING,rule_name STRING,rule_type_name > > > > STRING,`result` INT,in_path BOOLEAN>")) > > > > public static class FlatRowFunction extends TableFunction<Row> { > > > > private static final long serialVersionUID = 1L; > > > > > > > > public void eval(Row[] rows) { > > > > for (Row row : rows) { > > > > collect(row); > > > > } > > > > } > > > > } > > > > > > > > 异常如下: > > > > > > > > org.apache.flink.table.api.ValidationException: SQL validation > failed. > > > > From line 1, column 149 to line 1, column 174: No match found for > > > > function signature > > > > flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647) rule_id, > > > > VARCHAR(2147483647) rule_name, VARCHAR(2147483647) rule_type_name, > > > > INTEGER result, BOOLEAN in_path) ARRAY>) > > > > > > > > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > > > > > > > > > > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) > > > > at > > > > > > > > > > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) > > > > at > > > > > > > > > > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187) > > > > at > > > > > > > > > > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) > > > > at > > > > > > > > > > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:658) > > > > at > > > > > > > > > > com.akulaku.data.flink.ParserDataTest.parserDataTest(ParserDataTest.java:60) > > > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > > > > at > > > > > > > > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > > > at > > > > > > > > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > > > at java.lang.reflect.Method.invoke(Method.java:498) > > > > at > > > > > > > > > > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > > > > at > > > > > > > > > > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > > > > at > > > > > > > > > > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > > > > at > > > > > > > > > > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > > > > at > > > > > > > > > > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > > > > at > > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > > > > at > > > > > > > > > > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > > > > at > > > > > > > > > > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > > > > at > org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > > > > at > > > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > > > > at > > > > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > > > > at > > > org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > > > > at > > > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > > > > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > > > > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > > > > at > > > > > > > > > > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > > > > at > > > > > > > > > > com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) > > > > at > > > > > > > > > > com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230) > > > > at > > com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58) > > > > Caused by: org.apache.calcite.runtime.CalciteContextException: From > > > > line 1, column 149 to line 1, column 174: No match found for function > > > > signature flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647) > > > > rule_id, VARCHAR(2147483647) rule_name, VARCHAR(2147483647) > > > > rule_type_name, INTEGER result, BOOLEAN in_path) ARRAY>) > > > > at > > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > > > > Method) > > > > at > > > > > > > > > > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > > > > at > > > > > > > > > > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > > > > at > > > java.lang.reflect.Constructor.newInstance(Constructor.java:423) > > > > at > > > > > > > > > > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) > > > > at > > > > org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839) > > > > at > > > > org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) > > > > at > > > > > > > > > > org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089) > > > > at > > > > > > > > > > org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882) > > > > at > > > > org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305) > > > > at > > > > org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218) > > > > at > > > > > > > > > > org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858) > > > > at > > > > > > > > > > org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845) > > > > at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) > > > > at > > > > > > > > > > org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800) > > > > at > > > > > > > > > > org.apache.calcite.sql.validate.ProcedureNamespace.validateImpl(ProcedureNamespace.java:57) > > > > at > > > > > > > > > > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > > > > at > > > > > > > > > > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) > > > > at > > > > > > > > > > org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) > > > > at > > > > > > > > > > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256) > > > > at > > > > > > > > > > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238) > > > > at > > > > > > > > > > org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3303) > > > > at > > > > > > > > > > org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86) > > > > at > > > > > > > > > > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247) > > > > at > > > > > > > > > > org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510) > > > > at > > > > > > > > > > org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) > > > > at > > > > > > > > > > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > > > > at > > > > > > > > > > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) > > > > at > > > > > > > > > > org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) > > > > at > > org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) > > > > at > > > > > > > > > > org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059) > > > > at > > > > > > > > > > org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766) > > > > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > > > > > > > > > > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141) > > > > ... 28 more > > > > Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No > > > > match found for function signature > > > > flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647) rule_id, > > > > VARCHAR(2147483647) rule_name, VARCHAR(2147483647) rule_type_name, > > > > INTEGER result, BOOLEAN in_path) ARRAY>) > > > > at > > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > > > > Method) > > > > at > > > > > > > > > > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > > > > at > > > > > > > > > > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > > > > at > > > java.lang.reflect.Constructor.newInstance(Constructor.java:423) > > > > at > > > > > > > > > > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) > > > > at > > > > org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550) > > > > ... 56 more > > > > > > > > > > > > godfrey he <[hidden email]> 于2020年7月21日周二 下午7:41写道: > > > > > > > > > 可以,定义清楚 getResultType 和 getParameterTypes, 可以参考[1] > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide > > > > > > > > > > > > > > > Dream-底限 <[hidden email]> 于2020年7月21日周二 下午7:25写道: > > > > > > > > > > > hi > > > > > > > > > > > > > > > > > > > > > > > > > > > 我这面在定义一个表函数,通过继承TableFunction完成操作,但是eval方法中的参数类型都是java基本类型(至少看到的demo都是如此),想问一下eval方法中可以传flink > > > > > > 内部类型吗,比如说我想在eval()方法中传递Row类型要怎么操作,eval(Row row) > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > Best, > > Benchao Li > > > -- Best, Benchao Li |
现在有一个work around,就是你可以用子查询先把row展开,比如:
select ... from ( select data.rule_results as rule_results, ... ) cross join unnest(rule_results) as t(...) Benchao Li <[hidden email]> 于2020年7月23日周四 下午12:44写道: > 我感觉这可能是calcite的bug,CC Danny老师 > > Dream-底限 <[hidden email]> 于2020年7月22日周三 下午5:46写道: > >> hi 、Benchao Li >> 我尝试了将数组打散的方式,但是报了一个莫名其妙的错误,可以帮忙看看嘛 >> >> tableEnv.executeSql("CREATE TABLE parser_data_test (\n" + >> " data ROW<flow_task_id BIGINT,flow_id STRING,flow_version >> STRING,path STRING,country_id INT,create_time BIGINT," + >> "spent_time DECIMAL(10,2),features >> ROW<`user_ic_no_aku_uid.pdl_cdpd` >> STRING,`user_ic_no_aku_uid.pdl_current_unpay` INT," + >> "`user_ic_no_aku_uid.current_overdue_collection` >> INT>,rule_results ARRAY<ROW<rule_id STRING,rule_name STRING," + >> "rule_type_name STRING,`result` INT,in_path BOOLEAN>>>,\n" + >> " createTime BIGINT,\n" + >> " tindex INT\n" + >> ") WITH (\n" + >> " 'connector' = 'kafka-0.11',\n" + >> " 'topic' = 'parser_data_test',\n" + >> " 'properties.bootstrap.servers' = 'localhost:9092',\n" + >> " 'properties.group.id' = 'testGroup',\n" + >> " 'scan.startup.mode' = 'earliest-offset',\n" + >> " 'format' = 'json',\n" + >> " 'json.fail-on-missing-field' = 'false',\n" + >> " 'json.ignore-parse-errors' = 'true'\n" + >> ")"); >> >> Table table = tableEnv.sqlQuery("select >> >> data.flow_task_id,data.features.`user_ic_no_aku_uid.pdl_current_unpay`,rule_id,tindex >> from parser_data_test CROSS JOIN UNNEST(data.rule_results) AS t >> (rule_id,rule_name,rule_type_name,`result`,in_path)"); >> >> table.printSchema(); >> tableEnv.toAppendStream(table, >> >> Types.ROW(TypeConversions.fromDataTypeToLegacyInfo(table.getSchema().getFieldDataTypes()))).print(); >> >> >> 异常信息: >> >> rg.apache.flink.table.api.ValidationException: SQL validation failed. >> From line 0, column 0 to line 1, column 139: Column 'data.data' not >> found in table 'parser_data_test' >> >> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) >> at >> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185) >> at >> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664) >> at >> com.akulaku.data.flink.ParserDataTest.parserDataTest(ParserDataTest.java:63) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) >> at >> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) >> at >> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) >> at >> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) >> at >> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) >> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) >> at >> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) >> at >> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) >> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) >> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) >> at >> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) >> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) >> at >> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) >> at org.junit.runners.ParentRunner.run(ParentRunner.java:363) >> at org.junit.runner.JUnitCore.run(JUnitCore.java:137) >> at >> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) >> at >> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) >> at >> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230) >> at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58) >> Caused by: org.apache.calcite.runtime.CalciteContextException: From >> line 0, column 0 to line 1, column 139: Column 'data.data' not found >> in table 'parser_data_test' >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native >> Method) >> at >> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) >> at >> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423) >> at >> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) >> at >> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839) >> at >> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089) >> at >> org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:439) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5991) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5976) >> at >> org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:321) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5583) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateUnnest(SqlValidatorImpl.java:3271) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3253) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3303) >> at >> org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510) >> at >> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) >> at >> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) >> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766) >> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141) >> ... 28 more >> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: >> Column 'data.data' not found in table 'parser_data_test' >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native >> Method) >> at >> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) >> at >> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423) >> at >> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) >> at >> org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550) >> ... 51 more >> >> >> Benchao Li <[hidden email]> 于2020年7月22日周三 下午2:05写道: >> >> > Hi, >> > 如果只是想打平array,Flink有个内置的方法,可以参考[1]的”Expanding arrays into a relation“部分 >> > >> > [1] >> > >> > >> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins >> > >> > Jark Wu <[hidden email]> 于2020年7月22日周三 上午11:17写道: >> > >> > > Hi, >> > > >> > > Row[] 作为 eval 参数,目前还不支持。社区已经有一个 issue 在跟进支持这个功能: >> > > https://issues.apache.org/jira/browse/FLINK-17855 >> > > >> > > >> > > Best, >> > > Jark >> > > >> > > On Wed, 22 Jul 2020 at 10:45, Dream-底限 <[hidden email]> wrote: >> > > >> > > > hi, >> > > > 我想将一个array<row>打散成多行,但是并没有成功 >> > > > >> > > > @FunctionHint(input =@DataTypeHint("ARRAY<ROW<rule_id >> STRING,rule_name >> > > > STRING,rule_type_name STRING,`result` INT,in_path BOOLEAN>>") >> ,output >> > > > = @DataTypeHint("ROW<rule_id STRING,rule_name STRING,rule_type_name >> > > > STRING,`result` INT,in_path BOOLEAN>")) >> > > > public static class FlatRowFunction extends TableFunction<Row> { >> > > > private static final long serialVersionUID = 1L; >> > > > >> > > > public void eval(Row[] rows) { >> > > > for (Row row : rows) { >> > > > collect(row); >> > > > } >> > > > } >> > > > } >> > > > >> > > > 异常如下: >> > > > >> > > > org.apache.flink.table.api.ValidationException: SQL validation >> failed. >> > > > From line 1, column 149 to line 1, column 174: No match found for >> > > > function signature >> > > > flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647) rule_id, >> > > > VARCHAR(2147483647) rule_name, VARCHAR(2147483647) rule_type_name, >> > > > INTEGER result, BOOLEAN in_path) ARRAY>) >> > > > >> > > > at >> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >> > > > >> > > >> > >> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) >> > > > at >> > > > >> > > >> > >> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) >> > > > at >> > > > >> > > >> > >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187) >> > > > at >> > > > >> > > >> > >> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) >> > > > at >> > > > >> > > >> > >> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:658) >> > > > at >> > > > >> > > >> > >> com.akulaku.data.flink.ParserDataTest.parserDataTest(ParserDataTest.java:60) >> > > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native >> Method) >> > > > at >> > > > >> > > >> > >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> > > > at >> > > > >> > > >> > >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> > > > at java.lang.reflect.Method.invoke(Method.java:498) >> > > > at >> > > > >> > > >> > >> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) >> > > > at >> > > > >> > > >> > >> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) >> > > > at >> > > > >> > > >> > >> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) >> > > > at >> > > > >> > > >> > >> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) >> > > > at >> > > > >> > > >> > >> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) >> > > > at >> > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) >> > > > at >> > > > >> > > >> > >> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) >> > > > at >> > > > >> > > >> > >> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) >> > > > at >> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) >> > > > at >> > > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) >> > > > at >> > > > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) >> > > > at >> > > org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) >> > > > at >> > > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) >> > > > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) >> > > > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) >> > > > at >> > > > >> > > >> > >> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) >> > > > at >> > > > >> > > >> > >> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) >> > > > at >> > > > >> > > >> > >> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230) >> > > > at >> > com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58) >> > > > Caused by: org.apache.calcite.runtime.CalciteContextException: From >> > > > line 1, column 149 to line 1, column 174: No match found for >> function >> > > > signature flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647) >> > > > rule_id, VARCHAR(2147483647) rule_name, VARCHAR(2147483647) >> > > > rule_type_name, INTEGER result, BOOLEAN in_path) ARRAY>) >> > > > at >> > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native >> > > > Method) >> > > > at >> > > > >> > > >> > >> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) >> > > > at >> > > > >> > > >> > >> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >> > > > at >> > > java.lang.reflect.Constructor.newInstance(Constructor.java:423) >> > > > at >> > > > >> > > >> > >> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) >> > > > at >> > > > org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839) >> > > > at >> > > > org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) >> > > > at >> > > > >> > > >> > >> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089) >> > > > at >> > > > >> > > >> > >> org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882) >> > > > at >> > > > org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305) >> > > > at >> > > > org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218) >> > > > at >> > > > >> > > >> > >> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858) >> > > > at >> > > > >> > > >> > >> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845) >> > > > at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) >> > > > at >> > > > >> > > >> > >> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800) >> > > > at >> > > > >> > > >> > >> org.apache.calcite.sql.validate.ProcedureNamespace.validateImpl(ProcedureNamespace.java:57) >> > > > at >> > > > >> > > >> > >> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) >> > > > at >> > > > >> > > >> > >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) >> > > > at >> > > > >> > > >> > >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) >> > > > at >> > > > >> > > >> > >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256) >> > > > at >> > > > >> > > >> > >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238) >> > > > at >> > > > >> > > >> > >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3303) >> > > > at >> > > > >> > > >> > >> org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86) >> > > > at >> > > > >> > > >> > >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247) >> > > > at >> > > > >> > > >> > >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510) >> > > > at >> > > > >> > > >> > >> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) >> > > > at >> > > > >> > > >> > >> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) >> > > > at >> > > > >> > > >> > >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) >> > > > at >> > > > >> > > >> > >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) >> > > > at >> > org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) >> > > > at >> > > > >> > > >> > >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059) >> > > > at >> > > > >> > > >> > >> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766) >> > > > at >> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >> > > > >> > > >> > >> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141) >> > > > ... 28 more >> > > > Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No >> > > > match found for function signature >> > > > flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647) rule_id, >> > > > VARCHAR(2147483647) rule_name, VARCHAR(2147483647) rule_type_name, >> > > > INTEGER result, BOOLEAN in_path) ARRAY>) >> > > > at >> > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native >> > > > Method) >> > > > at >> > > > >> > > >> > >> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) >> > > > at >> > > > >> > > >> > >> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >> > > > at >> > > java.lang.reflect.Constructor.newInstance(Constructor.java:423) >> > > > at >> > > > >> > > >> > >> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) >> > > > at >> > > > org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550) >> > > > ... 56 more >> > > > >> > > > >> > > > godfrey he <[hidden email]> 于2020年7月21日周二 下午7:41写道: >> > > > >> > > > > 可以,定义清楚 getResultType 和 getParameterTypes, 可以参考[1] >> > > > > >> > > > > [1] >> > > > > >> > > > > >> > > > >> > > >> > >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide >> > > > > >> > > > > >> > > > > Dream-底限 <[hidden email]> 于2020年7月21日周二 下午7:25写道: >> > > > > >> > > > > > hi >> > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> 我这面在定义一个表函数,通过继承TableFunction完成操作,但是eval方法中的参数类型都是java基本类型(至少看到的demo都是如此),想问一下eval方法中可以传flink >> > > > > > 内部类型吗,比如说我想在eval()方法中传递Row类型要怎么操作,eval(Row row) >> > > > > > >> > > > > >> > > > >> > > >> > >> > >> > -- >> > >> > Best, >> > Benchao Li >> > >> > > > -- > > Best, > Benchao Li > -- Best, Benchao Li |
hi
这貌似确实是一个bug,先用子查询打开后程序就可以运行正常了 Benchao Li <[hidden email]> 于2020年7月23日周四 下午12:52写道: > 现在有一个work around,就是你可以用子查询先把row展开,比如: > select ... > from ( > select data.rule_results as rule_results, ... > ) cross join unnest(rule_results) as t(...) > > Benchao Li <[hidden email]> 于2020年7月23日周四 下午12:44写道: > > > 我感觉这可能是calcite的bug,CC Danny老师 > > > > Dream-底限 <[hidden email]> 于2020年7月22日周三 下午5:46写道: > > > >> hi 、Benchao Li > >> 我尝试了将数组打散的方式,但是报了一个莫名其妙的错误,可以帮忙看看嘛 > >> > >> tableEnv.executeSql("CREATE TABLE parser_data_test (\n" + > >> " data ROW<flow_task_id BIGINT,flow_id STRING,flow_version > >> STRING,path STRING,country_id INT,create_time BIGINT," + > >> "spent_time DECIMAL(10,2),features > >> ROW<`user_ic_no_aku_uid.pdl_cdpd` > >> STRING,`user_ic_no_aku_uid.pdl_current_unpay` INT," + > >> "`user_ic_no_aku_uid.current_overdue_collection` > >> INT>,rule_results ARRAY<ROW<rule_id STRING,rule_name STRING," + > >> "rule_type_name STRING,`result` INT,in_path BOOLEAN>>>,\n" + > >> " createTime BIGINT,\n" + > >> " tindex INT\n" + > >> ") WITH (\n" + > >> " 'connector' = 'kafka-0.11',\n" + > >> " 'topic' = 'parser_data_test',\n" + > >> " 'properties.bootstrap.servers' = 'localhost:9092',\n" + > >> " 'properties.group.id' = 'testGroup',\n" + > >> " 'scan.startup.mode' = 'earliest-offset',\n" + > >> " 'format' = 'json',\n" + > >> " 'json.fail-on-missing-field' = 'false',\n" + > >> " 'json.ignore-parse-errors' = 'true'\n" + > >> ")"); > >> > >> Table table = tableEnv.sqlQuery("select > >> > >> > data.flow_task_id,data.features.`user_ic_no_aku_uid.pdl_current_unpay`,rule_id,tindex > >> from parser_data_test CROSS JOIN UNNEST(data.rule_results) AS t > >> (rule_id,rule_name,rule_type_name,`result`,in_path)"); > >> > >> table.printSchema(); > >> tableEnv.toAppendStream(table, > >> > >> > Types.ROW(TypeConversions.fromDataTypeToLegacyInfo(table.getSchema().getFieldDataTypes()))).print(); > >> > >> > >> 异常信息: > >> > >> rg.apache.flink.table.api.ValidationException: SQL validation failed. > >> From line 0, column 0 to line 1, column 139: Column 'data.data' not > >> found in table 'parser_data_test' > >> > >> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > >> > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) > >> at > >> > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) > >> at > >> > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185) > >> at > >> > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) > >> at > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664) > >> at > >> > com.akulaku.data.flink.ParserDataTest.parserDataTest(ParserDataTest.java:63) > >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > >> at > >> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > >> at > >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >> at java.lang.reflect.Method.invoke(Method.java:498) > >> at > >> > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > >> at > >> > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > >> at > >> > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > >> at > >> > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > >> at > >> > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > >> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > >> at > >> > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > >> at > >> > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > >> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > >> at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > >> at > >> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > >> at > org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > >> at > >> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > >> at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > >> at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > >> at > >> > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > >> at > >> > com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) > >> at > >> > com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230) > >> at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58) > >> Caused by: org.apache.calcite.runtime.CalciteContextException: From > >> line 0, column 0 to line 1, column 139: Column 'data.data' not found > >> in table 'parser_data_test' > >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > >> Method) > >> at > >> > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > >> at > >> > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > >> at > java.lang.reflect.Constructor.newInstance(Constructor.java:423) > >> at > >> > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) > >> at > >> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839) > >> at > >> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) > >> at > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089) > >> at > >> > org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:439) > >> at > >> > org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5991) > >> at > >> > org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5976) > >> at > >> org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:321) > >> at > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5583) > >> at > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.validateUnnest(SqlValidatorImpl.java:3271) > >> at > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3253) > >> at > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238) > >> at > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3303) > >> at > >> > org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86) > >> at > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247) > >> at > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510) > >> at > >> > org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) > >> at > >> > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > >> at > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) > >> at > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) > >> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) > >> at > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059) > >> at > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766) > >> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > >> > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141) > >> ... 28 more > >> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: > >> Column 'data.data' not found in table 'parser_data_test' > >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > >> Method) > >> at > >> > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > >> at > >> > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > >> at > java.lang.reflect.Constructor.newInstance(Constructor.java:423) > >> at > >> > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) > >> at > >> org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550) > >> ... 51 more > >> > >> > >> Benchao Li <[hidden email]> 于2020年7月22日周三 下午2:05写道: > >> > >> > Hi, > >> > 如果只是想打平array,Flink有个内置的方法,可以参考[1]的”Expanding arrays into a relation“部分 > >> > > >> > [1] > >> > > >> > > >> > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins > >> > > >> > Jark Wu <[hidden email]> 于2020年7月22日周三 上午11:17写道: > >> > > >> > > Hi, > >> > > > >> > > Row[] 作为 eval 参数,目前还不支持。社区已经有一个 issue 在跟进支持这个功能: > >> > > https://issues.apache.org/jira/browse/FLINK-17855 > >> > > > >> > > > >> > > Best, > >> > > Jark > >> > > > >> > > On Wed, 22 Jul 2020 at 10:45, Dream-底限 <[hidden email]> wrote: > >> > > > >> > > > hi, > >> > > > 我想将一个array<row>打散成多行,但是并没有成功 > >> > > > > >> > > > @FunctionHint(input =@DataTypeHint("ARRAY<ROW<rule_id > >> STRING,rule_name > >> > > > STRING,rule_type_name STRING,`result` INT,in_path BOOLEAN>>") > >> ,output > >> > > > = @DataTypeHint("ROW<rule_id STRING,rule_name > STRING,rule_type_name > >> > > > STRING,`result` INT,in_path BOOLEAN>")) > >> > > > public static class FlatRowFunction extends TableFunction<Row> { > >> > > > private static final long serialVersionUID = 1L; > >> > > > > >> > > > public void eval(Row[] rows) { > >> > > > for (Row row : rows) { > >> > > > collect(row); > >> > > > } > >> > > > } > >> > > > } > >> > > > > >> > > > 异常如下: > >> > > > > >> > > > org.apache.flink.table.api.ValidationException: SQL validation > >> failed. > >> > > > From line 1, column 149 to line 1, column 174: No match found for > >> > > > function signature > >> > > > flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647) rule_id, > >> > > > VARCHAR(2147483647) rule_name, VARCHAR(2147483647) rule_type_name, > >> > > > INTEGER result, BOOLEAN in_path) ARRAY>) > >> > > > > >> > > > at > >> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > >> > > > > >> > > > >> > > >> > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) > >> > > > at > >> > > > > >> > > > >> > > >> > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) > >> > > > at > >> > > > > >> > > > >> > > >> > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187) > >> > > > at > >> > > > > >> > > > >> > > >> > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) > >> > > > at > >> > > > > >> > > > >> > > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:658) > >> > > > at > >> > > > > >> > > > >> > > >> > com.akulaku.data.flink.ParserDataTest.parserDataTest(ParserDataTest.java:60) > >> > > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > >> Method) > >> > > > at > >> > > > > >> > > > >> > > >> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > >> > > > at > >> > > > > >> > > > >> > > >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >> > > > at java.lang.reflect.Method.invoke(Method.java:498) > >> > > > at > >> > > > > >> > > > >> > > >> > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > >> > > > at > >> > > > > >> > > > >> > > >> > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > >> > > > at > >> > > > > >> > > > >> > > >> > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > >> > > > at > >> > > > > >> > > > >> > > >> > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > >> > > > at > >> > > > > >> > > > >> > > >> > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > >> > > > at > >> > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > >> > > > at > >> > > > > >> > > > >> > > >> > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > >> > > > at > >> > > > > >> > > > >> > > >> > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > >> > > > at > >> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > >> > > > at > >> > > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > >> > > > at > >> > > > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > >> > > > at > >> > > org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > >> > > > at > >> > > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > >> > > > at > org.junit.runners.ParentRunner.run(ParentRunner.java:363) > >> > > > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > >> > > > at > >> > > > > >> > > > >> > > >> > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > >> > > > at > >> > > > > >> > > > >> > > >> > com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) > >> > > > at > >> > > > > >> > > > >> > > >> > com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230) > >> > > > at > >> > com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58) > >> > > > Caused by: org.apache.calcite.runtime.CalciteContextException: > From > >> > > > line 1, column 149 to line 1, column 174: No match found for > >> function > >> > > > signature flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647) > >> > > > rule_id, VARCHAR(2147483647) rule_name, VARCHAR(2147483647) > >> > > > rule_type_name, INTEGER result, BOOLEAN in_path) ARRAY>) > >> > > > at > >> > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > >> > > > Method) > >> > > > at > >> > > > > >> > > > >> > > >> > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > >> > > > at > >> > > > > >> > > > >> > > >> > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > >> > > > at > >> > > java.lang.reflect.Constructor.newInstance(Constructor.java:423) > >> > > > at > >> > > > > >> > > > >> > > >> > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) > >> > > > at > >> > > > > org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839) > >> > > > at > >> > > > > org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) > >> > > > at > >> > > > > >> > > > >> > > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089) > >> > > > at > >> > > > > >> > > > >> > > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882) > >> > > > at > >> > > > > org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305) > >> > > > at > >> > > > > org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218) > >> > > > at > >> > > > > >> > > > >> > > >> > org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858) > >> > > > at > >> > > > > >> > > > >> > > >> > org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845) > >> > > > at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) > >> > > > at > >> > > > > >> > > > >> > > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800) > >> > > > at > >> > > > > >> > > > >> > > >> > org.apache.calcite.sql.validate.ProcedureNamespace.validateImpl(ProcedureNamespace.java:57) > >> > > > at > >> > > > > >> > > > >> > > >> > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > >> > > > at > >> > > > > >> > > > >> > > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) > >> > > > at > >> > > > > >> > > > >> > > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) > >> > > > at > >> > > > > >> > > > >> > > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256) > >> > > > at > >> > > > > >> > > > >> > > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238) > >> > > > at > >> > > > > >> > > > >> > > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3303) > >> > > > at > >> > > > > >> > > > >> > > >> > org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86) > >> > > > at > >> > > > > >> > > > >> > > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247) > >> > > > at > >> > > > > >> > > > >> > > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510) > >> > > > at > >> > > > > >> > > > >> > > >> > org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) > >> > > > at > >> > > > > >> > > > >> > > >> > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > >> > > > at > >> > > > > >> > > > >> > > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) > >> > > > at > >> > > > > >> > > > >> > > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) > >> > > > at > >> > org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) > >> > > > at > >> > > > > >> > > > >> > > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059) > >> > > > at > >> > > > > >> > > > >> > > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766) > >> > > > at > >> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > >> > > > > >> > > > >> > > >> > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141) > >> > > > ... 28 more > >> > > > Caused by: org.apache.calcite.sql.validate.SqlValidatorException: > No > >> > > > match found for function signature > >> > > > flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647) rule_id, > >> > > > VARCHAR(2147483647) rule_name, VARCHAR(2147483647) rule_type_name, > >> > > > INTEGER result, BOOLEAN in_path) ARRAY>) > >> > > > at > >> > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > >> > > > Method) > >> > > > at > >> > > > > >> > > > >> > > >> > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > >> > > > at > >> > > > > >> > > > >> > > >> > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > >> > > > at > >> > > java.lang.reflect.Constructor.newInstance(Constructor.java:423) > >> > > > at > >> > > > > >> > > > >> > > >> > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) > >> > > > at > >> > > > org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550) > >> > > > ... 56 more > >> > > > > >> > > > > >> > > > godfrey he <[hidden email]> 于2020年7月21日周二 下午7:41写道: > >> > > > > >> > > > > 可以,定义清楚 getResultType 和 getParameterTypes, 可以参考[1] > >> > > > > > >> > > > > [1] > >> > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide > >> > > > > > >> > > > > > >> > > > > Dream-底限 <[hidden email]> 于2020年7月21日周二 下午7:25写道: > >> > > > > > >> > > > > > hi > >> > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > 我这面在定义一个表函数,通过继承TableFunction完成操作,但是eval方法中的参数类型都是java基本类型(至少看到的demo都是如此),想问一下eval方法中可以传flink > >> > > > > > 内部类型吗,比如说我想在eval()方法中传递Row类型要怎么操作,eval(Row row) > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > > >> > -- > >> > > >> > Best, > >> > Benchao Li > >> > > >> > > > > > > -- > > > > Best, > > Benchao Li > > > > > -- > > Best, > Benchao Li > |
Free forum by Nabble | Edit this page |