We have a sql that compose a row with a table’s columns. The simplified sql is like:
INSERT INTO flink_log_sink SELECT b.id, Row(b.app_id, b.message) FROM flink_log_source a join flink_log_side b on a.id = b.id; When we submit the sql to Flink, the sql cannot be parsed, with the following error message: org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "." at line 11, column 8. Was expecting one of: ")" ... "," ... at org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76) at cn.imdada.bi.dfl2.core.operation.InsertIntoOperation.execute(InsertIntoOperation.java:35) at cn.imdada.bi.dfl2.core.Main.execute(Main.java:172) at cn.imdada.bi.dfl2.core.Main.main(Main.java:125) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:153) at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80) at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:112) at cn.imdada.bi.dfl2.launcher.yarn.YarnPerJobSubmitter.submit(YarnPerJobSubmitter.java:37) at cn.imdada.bi.dfl2.launcher.LauncherMain.main(LauncherMain.java:127) Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "." at line 11, column 8. Was expecting one of: ")" ... "," ... at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:442) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:205) at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:140) at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:155) at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:180) at org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54) ... 15 more Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered "." at line 11, column 8. Was expecting one of: ")" ... "," ... at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:39525) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:39336) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java:24247) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression3(FlinkSqlParserImpl.java:19024) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2b(FlinkSqlParserImpl.java:18680) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2(FlinkSqlParserImpl.java:18721) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression(FlinkSqlParserImpl.java:18652) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectExpression(FlinkSqlParserImpl.java:11656) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectItem(FlinkSqlParserImpl.java:10508) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectList(FlinkSqlParserImpl.java:10495) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlSelect(FlinkSqlParserImpl.java:7115) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQuery(FlinkSqlParserImpl.java:684) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQueryOrExpr(FlinkSqlParserImpl.java:18635) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.QueryOrExpr(FlinkSqlParserImpl.java:18089) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.OrderedQueryOrExpr(FlinkSqlParserImpl.java:558) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.RichSqlInsert(FlinkSqlParserImpl.java:5709) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3342) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3882) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:253) at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:153) ... 17 more Is this a bug or the expected behavior? If this is the expected behavior, what can we do to avoid it? PS: I tried to create a view to represent the join result, and inserted the view into the sink table. Unfortunately, it didn’t work neither. | | 马阳阳 | | [hidden email] | 签名由网易邮箱大师定制 |
The error message when using a view is as the following:
org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: FlinkLogicalSink(table=[default_catalog.default_database.flink_log_sink], fields=[id, EXPR$1]) +- FlinkLogicalCalc(select=[id, ROW(family.app_id, family.message) AS EXPR$1]) +- FlinkLogicalJoin(condition=[true], joinType=[inner]) :- FlinkLogicalCalc(select=[app_id], where=[IS NOT NULL(id)]) : +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, flink_log_source]], fields=[app_id, id, log_time, message]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, flink_log_side]], fields=[id, family]) This exception indicates that the query uses an unsupported SQL feature. Please check the documentation for the set of currently supported SQL features. at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:72) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:286) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:165) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1275) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:680) at cn.imdada.bi.dfl2.core.operation.InsertIntoOperation.execute(InsertIntoOperation.java:43) at cn.imdada.bi.dfl2.core.Main.execute(Main.java:172) at cn.imdada.bi.dfl2.core.Main.main(Main.java:125) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:153) at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80) at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:112) at cn.imdada.bi.dfl2.launcher.yarn.YarnPerJobSubmitter.submit(YarnPerJobSubmitter.java:37) at cn.imdada.bi.dfl2.launcher.LauncherMain.main(LauncherMain.java:127) Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties: convention=STREAM_PHYSICAL, FlinkRelDistributionTraitDef=any, MiniBatchIntervalTraitDef=None: 0, ModifyKindSetTraitDef=[NONE], UpdateKindTraitDef=[NONE]. Missing conversion is FlinkLogicalTableSourceScan[convention: LOGICAL -> STREAM_PHYSICAL, FlinkRelDistributionTraitDef: any -> single] There is 1 empty subset: rel#311:RelSubset#14.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE], the relevant part of the original plan is as follows 181:FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, flink_log_side]], fields=[id, family]) Root: rel#303:RelSubset#17.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE] Original rel: FlinkLogicalSink(subset=[rel#175:RelSubset#6.LOGICAL.any.None: 0.[NONE].[NONE]], table=[default_catalog.default_database.flink_log_sink], fields=[id, EXPR$1]): rowcount = 9.0E15, cumulative cost = {9.0E15 rows, 9.0E15 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 198 FlinkLogicalCalc(subset=[rel#197:RelSubset#5.LOGICAL.any.None: 0.[NONE].[NONE]], select=[id, ROW(family.app_id, family.message) AS EXPR$1]): rowcount = 9.0E15, cumulative cost = {9.0E15 rows, 9.0E15 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 235 FlinkLogicalJoin(subset=[rel#212:RelSubset#9.LOGICAL.any.None: 0.[NONE].[NONE]], condition=[true], joinType=[inner]): rowcount = 9.0E15, cumulative cost = {9.0E7 rows, 1.9E8 cpu, 1.18E9 io, 0.0 network, 0.0 memory}, id = 211 FlinkLogicalCalc(subset=[rel#210:RelSubset#7.LOGICAL.any.None: 0.[NONE].[NONE]], select=[app_id], where=[IS NOT NULL(id)]): rowcount = 9.0E7, cumulative cost = {9.0E7 rows, 0.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 237 FlinkLogicalTableSourceScan(subset=[rel#178:RelSubset#0.LOGICAL.any.None: 0.[NONE].[NONE]], table=[[default_catalog, default_database, flink_log_source]], fields=[app_id, id, log_time, message]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 4.4E9 io, 0.0 network, 0.0 memory}, id = 177 FlinkLogicalTableSourceScan(subset=[rel#182:RelSubset#3.LOGICAL.any.None: 0.[NONE].[NONE]], table=[[default_catalog, default_database, flink_log_side]], fields=[id, family]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}, id = 181 Sets: Set#12, type: RecordType(VARCHAR(2147483647) app_id, VARCHAR(2147483647) id, BIGINT log_time, VARCHAR(2147483647) message) rel#292:RelSubset#12.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#177 rel#177:FlinkLogicalTableSourceScan.LOGICAL.any.None: 0.[NONE].[NONE](table=[default_catalog, default_database, flink_log_source],fields=app_id, id, log_time, message), rowcount=1.0E8, cumulative cost={1.0E8 rows, 1.0E8 cpu, 4.4E9 io, 0.0 network, 0.0 memory} rel#306:RelSubset#12.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], best=rel#305 rel#305:StreamExecTableSourceScan.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](table=[default_catalog, default_database, flink_log_source],fields=app_id, id, log_time, message), rowcount=1.0E8, cumulative cost={1.0E8 rows, 1.0E8 cpu, 4.4E9 io, 0.0 network, 0.0 memory} Set#13, type: RecordType(VARCHAR(2147483647) app_id) rel#294:RelSubset#13.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#293 rel#293:FlinkLogicalCalc.LOGICAL.any.None: 0.[NONE].[NONE](input=RelSubset#292,select=app_id,where=IS NOT NULL(id)), rowcount=9.0E7, cumulative cost={1.9E8 rows, 1.0E8 cpu, 4.4E9 io, 0.0 network, 0.0 memory} rel#308:RelSubset#13.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], best=rel#307 rel#307:StreamExecCalc.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#306,select=app_id,where=IS NOT NULL(id)), rowcount=9.0E7, cumulative cost={1.9E8 rows, 1.0E8 cpu, 4.4E9 io, 0.0 network, 0.0 memory} rel#310:AbstractConverter.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE](input=RelSubset#308,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=single,MiniBatchIntervalTraitDef=None: 0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]), rowcount=9.0E7, cumulative cost={inf} rel#318:StreamExecExchange.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE](input=RelSubset#308,distribution=single), rowcount=9.0E7, cumulative cost={2.8E8 rows, 1.459E10 cpu, 4.4E9 io, 1.08E9 network, 0.0 memory} rel#309:RelSubset#13.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE], best=rel#318 rel#310:AbstractConverter.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE](input=RelSubset#308,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=single,MiniBatchIntervalTraitDef=None: 0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]), rowcount=9.0E7, cumulative cost={inf} rel#318:StreamExecExchange.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE](input=RelSubset#308,distribution=single), rowcount=9.0E7, cumulative cost={2.8E8 rows, 1.459E10 cpu, 4.4E9 io, 1.08E9 network, 0.0 memory} Set#14, type: RecordType(VARCHAR(2147483647) id, RecordType:peek_no_expand(VARCHAR(2147483647) app_id, VARCHAR(2147483647) message) family) rel#295:RelSubset#14.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#181 rel#181:FlinkLogicalTableSourceScan.LOGICAL.any.None: 0.[NONE].[NONE](table=[default_catalog, default_database, flink_log_side],fields=id, family), rowcount=1.0E8, cumulative cost={1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory} rel#311:RelSubset#14.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE], best=null Set#15, type: RecordType(VARCHAR(2147483647) app_id, VARCHAR(2147483647) id, RecordType:peek_no_expand(VARCHAR(2147483647) app_id, VARCHAR(2147483647) message) family) rel#297:RelSubset#15.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#296 rel#296:FlinkLogicalJoin.LOGICAL.any.None: 0.[NONE].[NONE](left=RelSubset#294,right=RelSubset#295,condition=true,joinType=inner), rowcount=9.0E15, cumulative cost={3.8E8 rows, 3.9E8 cpu, 9.18E9 io, 0.0 network, 0.0 memory} rel#313:RelSubset#15.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], best=null rel#312:StreamExecJoin.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](left=RelSubset#309,right=RelSubset#311,joinType=InnerJoin,where=true,select=app_id, id, family,leftInputSpec=NoUniqueKey,rightInputSpec=HasUniqueKey), rowcount=9.0E15, cumulative cost={inf} Set#16, type: RecordType(VARCHAR(2147483647) id, RecordType(VARCHAR(2147483647) EXPR$0, VARCHAR(2147483647) EXPR$1) EXPR$1) rel#299:RelSubset#16.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#298 rel#298:FlinkLogicalCalc.LOGICAL.any.None: 0.[NONE].[NONE](input=RelSubset#297,select=id, ROW(family.app_id, family.message) AS EXPR$1), rowcount=9.0E15, cumulative cost={9.00000038E15 rows, 9.00000039E15 cpu, 9.18E9 io, 0.0 network, 0.0 memory} rel#315:RelSubset#16.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], best=null rel#314:StreamExecCalc.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#313,select=id, ROW(family.app_id, family.message) AS EXPR$1), rowcount=9.0E15, cumulative cost={inf} Set#17, type: RecordType(VARCHAR(2147483647) id, RecordType(VARCHAR(2147483647) EXPR$0, VARCHAR(2147483647) EXPR$1) EXPR$1) rel#302:RelSubset#17.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#301 rel#301:FlinkLogicalSink.LOGICAL.any.None: 0.[NONE].[NONE](input=RelSubset#299,table=default_catalog.default_database.flink_log_sink,fields=id, EXPR$1), rowcount=9.0E15, cumulative cost={1.800000038E16 rows, 1.800000039E16 cpu, 9.18E9 io, 0.0 network, 0.0 memory} rel#303:RelSubset#17.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], best=null rel#304:AbstractConverter.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#302,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=any,MiniBatchIntervalTraitDef=None: 0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]), rowcount=9.0E15, cumulative cost={inf} rel#316:StreamExecSink.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#315,table=default_catalog.default_database.flink_log_sink,fields=id, EXPR$1), rowcount=9.0E15, cumulative cost={inf} Graphviz: digraph G { root [style=filled,label="Root"]; subgraph cluster12{ label="Set 12 RecordType(VARCHAR(2147483647) app_id, VARCHAR(2147483647) id, BIGINT log_time, VARCHAR(2147483647) message)"; rel177 [label="rel#177:FlinkLogicalTableSourceScan\ntable=[default_catalog, default_database, flink_log_source],fields=app_id, id, log_time, message\nrows=1.0E8, cost={1.0E8 rows, 1.0E8 cpu, 4.4E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box] rel305 [label="rel#305:StreamExecTableSourceScan\ntable=[default_catalog, default_database, flink_log_source],fields=app_id, id, log_time, message\nrows=1.0E8, cost={1.0E8 rows, 1.0E8 cpu, 4.4E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box] subset292 [label="rel#292:RelSubset#12.LOGICAL.any.None: 0.[NONE].[NONE]"] subset306 [label="rel#306:RelSubset#12.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]"] } subgraph cluster13{ label="Set 13 RecordType(VARCHAR(2147483647) app_id)"; rel293 [label="rel#293:FlinkLogicalCalc\ninput=RelSubset#292,select=app_id,where=IS NOT NULL(id)\nrows=9.0E7, cost={1.9E8 rows, 1.0E8 cpu, 4.4E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box] rel307 [label="rel#307:StreamExecCalc\ninput=RelSubset#306,select=app_id,where=IS NOT NULL(id)\nrows=9.0E7, cost={1.9E8 rows, 1.0E8 cpu, 4.4E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box] rel310 [label="rel#310:AbstractConverter\ninput=RelSubset#308,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=single,MiniBatchIntervalTraitDef=None: 0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]\nrows=9.0E7, cost={inf}",shape=box] rel318 [label="rel#318:StreamExecExchange\ninput=RelSubset#308,distribution=single\nrows=9.0E7, cost={2.8E8 rows, 1.459E10 cpu, 4.4E9 io, 1.08E9 network, 0.0 memory}",color=blue,shape=box] subset294 [label="rel#294:RelSubset#13.LOGICAL.any.None: 0.[NONE].[NONE]"] subset308 [label="rel#308:RelSubset#13.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]"] subset309 [label="rel#309:RelSubset#13.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE]"] subset308 -> subset309; } subgraph cluster14{ label="Set 14 RecordType(VARCHAR(2147483647) id, RecordType:peek_no_expand(VARCHAR(2147483647) app_id, VARCHAR(2147483647) message) family)"; rel181 [label="rel#181:FlinkLogicalTableSourceScan\ntable=[default_catalog, default_database, flink_log_side],fields=id, family\nrows=1.0E8, cost={1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box] subset295 [label="rel#295:RelSubset#14.LOGICAL.any.None: 0.[NONE].[NONE]"] subset311 [label="rel#311:RelSubset#14.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE]",color=red] } subgraph cluster15{ label="Set 15 RecordType(VARCHAR(2147483647) app_id, VARCHAR(2147483647) id, RecordType:peek_no_expand(VARCHAR(2147483647) app_id, VARCHAR(2147483647) message) family)"; rel296 [label="rel#296:FlinkLogicalJoin\nleft=RelSubset#294,right=RelSubset#295,condition=true,joinType=inner\nrows=9.0E15, cost={3.8E8 rows, 3.9E8 cpu, 9.18E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box] rel312 [label="rel#312:StreamExecJoin\nleft=RelSubset#309,right=RelSubset#311,joinType=InnerJoin,where=true,select=app_id, id, family,leftInputSpec=NoUniqueKey,rightInputSpec=HasUniqueKey\nrows=9.0E15, cost={inf}",shape=box] subset297 [label="rel#297:RelSubset#15.LOGICAL.any.None: 0.[NONE].[NONE]"] subset313 [label="rel#313:RelSubset#15.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]"] } subgraph cluster16{ label="Set 16 RecordType(VARCHAR(2147483647) id, RecordType(VARCHAR(2147483647) EXPR$0, VARCHAR(2147483647) EXPR$1) EXPR$1)"; rel298 [label="rel#298:FlinkLogicalCalc\ninput=RelSubset#297,select=id, ROW(family.app_id, family.message) AS EXPR$1\nrows=9.0E15, cost={9.00000038E15 rows, 9.00000039E15 cpu, 9.18E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box] rel314 [label="rel#314:StreamExecCalc\ninput=RelSubset#313,select=id, ROW(family.app_id, family.message) AS EXPR$1\nrows=9.0E15, cost={inf}",shape=box] subset299 [label="rel#299:RelSubset#16.LOGICAL.any.None: 0.[NONE].[NONE]"] subset315 [label="rel#315:RelSubset#16.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]"] } subgraph cluster17{ label="Set 17 RecordType(VARCHAR(2147483647) id, RecordType(VARCHAR(2147483647) EXPR$0, VARCHAR(2147483647) EXPR$1) EXPR$1)"; rel301 [label="rel#301:FlinkLogicalSink\ninput=RelSubset#299,table=default_catalog.default_database.flink_log_sink,fields=id, EXPR$1\nrows=9.0E15, cost={1.800000038E16 rows, 1.800000039E16 cpu, 9.18E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box] rel304 [label="rel#304:AbstractConverter\ninput=RelSubset#302,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=any,MiniBatchIntervalTraitDef=None: 0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]\nrows=9.0E15, cost={inf}",shape=box] rel316 [label="rel#316:StreamExecSink\ninput=RelSubset#315,table=default_catalog.default_database.flink_log_sink,fields=id, EXPR$1\nrows=9.0E15, cost={inf}",shape=box] subset302 [label="rel#302:RelSubset#17.LOGICAL.any.None: 0.[NONE].[NONE]"] subset303 [label="rel#303:RelSubset#17.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]"] } root -> subset303; subset292 -> rel177[color=blue]; subset306 -> rel305[color=blue]; subset294 -> rel293[color=blue]; rel293 -> subset292[color=blue]; subset308 -> rel307[color=blue]; rel307 -> subset306[color=blue]; subset309 -> rel310; rel310 -> subset308; subset309 -> rel318[color=blue]; rel318 -> subset308[color=blue]; subset295 -> rel181[color=blue]; subset297 -> rel296[color=blue]; rel296 -> subset294[color=blue,label="0"]; rel296 -> subset295[color=blue,label="1"]; subset313 -> rel312; rel312 -> subset309[label="0"]; rel312 -> subset311[label="1"]; subset299 -> rel298[color=blue]; rel298 -> subset297[color=blue]; subset315 -> rel314; rel314 -> subset313; subset302 -> rel301[color=blue]; rel301 -> subset299[color=blue]; subset303 -> rel304; rel304 -> subset302; subset303 -> rel316; rel316 -> subset315; } at org.apache.calcite.plan.volcano.RelSubset$CheapestPlanReplacer.visit(RelSubset.java:742) at org.apache.calcite.plan.volcano.RelSubset.buildCheapestPlan(RelSubset.java:365) at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:520) at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312) at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) ... 32 more | | 马阳阳 | | [hidden email] | 签名由网易邮箱大师定制 On 01/11/2021 11:04,马阳阳<[hidden email]> wrote: We have a sql that compose a row with a table’s columns. The simplified sql is like: INSERT INTO flink_log_sink SELECT b.id, Row(b.app_id, b.message) FROM flink_log_source a join flink_log_side b on a.id = b.id; When we submit the sql to Flink, the sql cannot be parsed, with the following error message: org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "." at line 11, column 8. Was expecting one of: ")" ... "," ... at org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76) at cn.imdada.bi.dfl2.core.operation.InsertIntoOperation.execute(InsertIntoOperation.java:35) at cn.imdada.bi.dfl2.core.Main.execute(Main.java:172) at cn.imdada.bi.dfl2.core.Main.main(Main.java:125) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:153) at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80) at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:112) at cn.imdada.bi.dfl2.launcher.yarn.YarnPerJobSubmitter.submit(YarnPerJobSubmitter.java:37) at cn.imdada.bi.dfl2.launcher.LauncherMain.main(LauncherMain.java:127) Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "." at line 11, column 8. Was expecting one of: ")" ... "," ... at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:442) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:205) at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:140) at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:155) at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:180) at org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54) ... 15 more Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered "." at line 11, column 8. Was expecting one of: ")" ... "," ... at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:39525) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:39336) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java:24247) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression3(FlinkSqlParserImpl.java:19024) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2b(FlinkSqlParserImpl.java:18680) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2(FlinkSqlParserImpl.java:18721) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression(FlinkSqlParserImpl.java:18652) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectExpression(FlinkSqlParserImpl.java:11656) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectItem(FlinkSqlParserImpl.java:10508) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectList(FlinkSqlParserImpl.java:10495) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlSelect(FlinkSqlParserImpl.java:7115) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQuery(FlinkSqlParserImpl.java:684) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQueryOrExpr(FlinkSqlParserImpl.java:18635) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.QueryOrExpr(FlinkSqlParserImpl.java:18089) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.OrderedQueryOrExpr(FlinkSqlParserImpl.java:558) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.RichSqlInsert(FlinkSqlParserImpl.java:5709) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3342) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3882) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:253) at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:153) ... 17 more Is this a bug or the expected behavior? If this is the expected behavior, what can we do to avoid it? PS: I tried to create a view to represent the join result, and inserted the view into the sink table. Unfortunately, it didn’t work neither. | | 马阳阳 | | [hidden email] | 签名由网易邮箱大师定制 |
In reply to this post by 马阳阳
使用ROW里面的 表别名.字段名 会出现解析错误,我之前使用hbase也有这个问题,我一般是在查询里面包一层子查询
| | 刘海 | | [hidden email] | 签名由网易邮箱大师定制 On 1/11/2021 11:04,马阳阳<[hidden email]> wrote: We have a sql that compose a row with a table’s columns. The simplified sql is like: INSERT INTO flink_log_sink SELECT b.id, Row(b.app_id, b.message) FROM flink_log_source a join flink_log_side b on a.id = b.id; When we submit the sql to Flink, the sql cannot be parsed, with the following error message: org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "." at line 11, column 8. Was expecting one of: ")" ... "," ... at org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76) at cn.imdada.bi.dfl2.core.operation.InsertIntoOperation.execute(InsertIntoOperation.java:35) at cn.imdada.bi.dfl2.core.Main.execute(Main.java:172) at cn.imdada.bi.dfl2.core.Main.main(Main.java:125) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:153) at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80) at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:112) at cn.imdada.bi.dfl2.launcher.yarn.YarnPerJobSubmitter.submit(YarnPerJobSubmitter.java:37) at cn.imdada.bi.dfl2.launcher.LauncherMain.main(LauncherMain.java:127) Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "." at line 11, column 8. Was expecting one of: ")" ... "," ... at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:442) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:205) at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:140) at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:155) at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:180) at org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54) ... 15 more Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered "." at line 11, column 8. Was expecting one of: ")" ... "," ... at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:39525) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:39336) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java:24247) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression3(FlinkSqlParserImpl.java:19024) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2b(FlinkSqlParserImpl.java:18680) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2(FlinkSqlParserImpl.java:18721) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression(FlinkSqlParserImpl.java:18652) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectExpression(FlinkSqlParserImpl.java:11656) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectItem(FlinkSqlParserImpl.java:10508) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectList(FlinkSqlParserImpl.java:10495) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlSelect(FlinkSqlParserImpl.java:7115) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQuery(FlinkSqlParserImpl.java:684) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQueryOrExpr(FlinkSqlParserImpl.java:18635) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.QueryOrExpr(FlinkSqlParserImpl.java:18089) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.OrderedQueryOrExpr(FlinkSqlParserImpl.java:558) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.RichSqlInsert(FlinkSqlParserImpl.java:5709) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3342) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3882) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:253) at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:153) ... 17 more Is this a bug or the expected behavior? If this is the expected behavior, what can we do to avoid it? PS: I tried to create a view to represent the join result, and inserted the view into the sink table. Unfortunately, it didn’t work neither. | | 马阳阳 | | [hidden email] | 签名由网易邮箱大师定制 |
Administrator
|
已知问题,后续版本会修复,作为临时解决办法,可以使用直接这样去构造 (b.app_id, b.message),不用添加 ROW 关键字。
On Mon, 11 Jan 2021 at 11:17, 刘海 <[hidden email]> wrote: > 使用ROW里面的 表别名.字段名 会出现解析错误,我之前使用hbase也有这个问题,我一般是在查询里面包一层子查询 > > > | | > 刘海 > | > | > [hidden email] > | > 签名由网易邮箱大师定制 > On 1/11/2021 11:04,马阳阳<[hidden email]> wrote: > We have a sql that compose a row with a table’s columns. The simplified > sql is like: > INSERT INTO flink_log_sink > SELECT > b.id, > Row(b.app_id, b.message) > FROM flink_log_source a > join flink_log_side b > on a.id = b.id; > > > When we submit the sql to Flink, the sql cannot be parsed, with the > following error message: > org.apache.flink.table.api.SqlParserException: SQL parse failed. > Encountered "." at line 11, column 8. > Was expecting one of: > ")" ... > "," ... > > at > org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76) > at > cn.imdada.bi.dfl2.core.operation.InsertIntoOperation.execute(InsertIntoOperation.java:35) > at cn.imdada.bi.dfl2.core.Main.execute(Main.java:172) > at cn.imdada.bi.dfl2.core.Main.main(Main.java:125) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > at > org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:153) > at > org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80) > at > org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:112) > at > cn.imdada.bi.dfl2.launcher.yarn.YarnPerJobSubmitter.submit(YarnPerJobSubmitter.java:37) > at cn.imdada.bi.dfl2.launcher.LauncherMain.main(LauncherMain.java:127) > Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered > "." at line 11, column 8. > Was expecting one of: > ")" ... > "," ... > > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:442) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:205) > at > org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:140) > at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:155) > at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:180) > at > org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54) > ... 15 more > Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered > "." at line 11, column 8. > Was expecting one of: > ")" ... > "," ... > > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:39525) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:39336) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java:24247) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression3(FlinkSqlParserImpl.java:19024) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2b(FlinkSqlParserImpl.java:18680) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2(FlinkSqlParserImpl.java:18721) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression(FlinkSqlParserImpl.java:18652) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectExpression(FlinkSqlParserImpl.java:11656) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectItem(FlinkSqlParserImpl.java:10508) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectList(FlinkSqlParserImpl.java:10495) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlSelect(FlinkSqlParserImpl.java:7115) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQuery(FlinkSqlParserImpl.java:684) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQueryOrExpr(FlinkSqlParserImpl.java:18635) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.QueryOrExpr(FlinkSqlParserImpl.java:18089) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.OrderedQueryOrExpr(FlinkSqlParserImpl.java:558) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.RichSqlInsert(FlinkSqlParserImpl.java:5709) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3342) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3882) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:253) > at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:153) > ... 17 more > > > Is this a bug or the expected behavior? If this is the expected behavior, > what can we do to avoid it? > > > PS: > I tried to create a view to represent the join result, and inserted the > view into the sink table. Unfortunately, it didn’t work neither. > | | > 马阳阳 > | > | > [hidden email] > | > 签名由网易邮箱大师定制 > > |
Free forum by Nabble | Edit this page |