hi,我在flinksql中使用 select * from a join b on a.ip <b.startip and a.ip >b.endip 报了一个错误 不支持此功能 类似这种有没有类似between的函数可以使用
|
方便补充一下以下信息么?
1. 你使用的Flink的版本? 2. 使用的planner,是blink planner还是old planner? 3. 用的是streaming mode还是batch mode? 4. 具体的报错信息是什么? 小屁孩 <[hidden email]> 于2020年6月9日周二 下午4:26写道: > hi,我在flinksql中使用 select * from a join b on a.ip <b.startip and a.ip > >b.endip 报了一个错误 不支持此功能 类似这种有没有类似between的函数可以使用 |
hi,我使用的是
1 flink1.9.0 2 oldplanner <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala_2.11</artifactId> <version>1.9.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>1.9.0</version> </dependency> 3 streaming mode 4. 代码类似如下 val sqlStream = env.createInput(jdbcInput) tnv.registerDataStream("sqlStream",sqlStream,'netstruct_id,'start_ip,'end_ip) tnv.registerDataStream("OMstream",value,'ip) // val table = tnv.sqlQuery("select * from OMstream as a left join sqlStream as b on a.ip >b.start_ip and a.ip<b.end_ip") val table = tnv.sqlQuery("select b.netstruct_id from OMstream as a left join sqlStream as b on a.ip > b.start_ip and a.ip <b.end_ip ") val resRow = table.toRetractStream[Row] 5 报错信息如下 Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: LogicalProject(netstruct_id=[$1]) LogicalJoin(condition=[AND(>($0, $2), <($0, $3))], joinType=[left]) FlinkLogicalDataStreamScan(id=[1], fields=[ip]) FlinkLogicalDataStreamScan(id=[2], fields=[netstruct_id, start_ip, end_ip]) This exception indicates that the query uses an unsupported SQL feature. Please check the documentation for the set of currently supported SQL features. at org.apache.flink.table.plan.Optimizer.runVolcanoPlanner(Optimizer.scala:245) at org.apache.flink.table.plan.Optimizer.optimizeLogicalPlan(Optimizer.scala:160) at org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:66) at org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410) at org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:182) at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127) at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127) at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:201) at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:124) at org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:146) at com.jwell56.linkstarck.LInkStream$.main(LInkStream.scala:37) at com.jwell56.linkstarck.LInkStream.main(LInkStream.scala) 6 我也尝试使用了 select b.netstruct_id from OMstream as a left join sqlStream as b on a.ip > b.start_ip 同样是单个大小比较也是不可以的 谢谢! ------------------ 原始邮件 ------------------ 发件人: "Benchao Li"<[hidden email]>; 发送时间: 2020年6月9日(星期二) 下午4:37 收件人: "user-zh"<[hidden email]>; 主题: Re: 关于flinksql between问题 方便补充一下以下信息么? 1. 你使用的Flink的版本? 2. 使用的planner,是blink planner还是old planner? 3. 用的是streaming mode还是batch mode? 4. 具体的报错信息是什么? 小屁孩 <[hidden email]> 于2020年6月9日周二 下午4:26写道: > hi,我在flinksql中使用 select * from a join b on a.ip <b.startip and a.ip > &gt;b.endip 报了一个错误 不支持此功能 类似这种有没有类似between的函数可以使用 |
我在1.10中用 useBlinkPlanner是可以的,用useOldPlanner是不可以的! 会报你下面的错误: Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: LogicalProject(num=[$0]) LogicalJoin(condition=[AND(>($0, $1), <($0, $2))], joinType=[inner]) FlinkLogicalDataStreamScan(id=[1], fields=[num]) FlinkLogicalDataStreamScan(id=[2], fields=[startNum, endNum]) This exception indicates that the query uses an unsupported SQL feature. 发件人: 小屁孩 发送时间: 2020-06-09 17:41 收件人: user-zh 主题: 回复: 关于flinksql between问题 hi,我使用的是 1 flink1.9.0 2 oldplanner <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala_2.11</artifactId> <version>1.9.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>1.9.0</version> </dependency> 3 streaming mode 4. 代码类似如下 val sqlStream = env.createInput(jdbcInput) tnv.registerDataStream("sqlStream",sqlStream,'netstruct_id,'start_ip,'end_ip) tnv.registerDataStream("OMstream",value,'ip) // val table = tnv.sqlQuery("select * from OMstream as a left join sqlStream as b on a.ip >b.start_ip and a.ip<b.end_ip") val table = tnv.sqlQuery("select b.netstruct_id from OMstream as a left join sqlStream as b on a.ip > b.start_ip and a.ip <b.end_ip ") val resRow = table.toRetractStream[Row] 5 报错信息如下 Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: LogicalProject(netstruct_id=[$1]) LogicalJoin(condition=[AND(>($0, $2), <($0, $3))], joinType=[left]) FlinkLogicalDataStreamScan(id=[1], fields=[ip]) FlinkLogicalDataStreamScan(id=[2], fields=[netstruct_id, start_ip, end_ip]) This exception indicates that the query uses an unsupported SQL feature. Please check the documentation for the set of currently supported SQL features. at org.apache.flink.table.plan.Optimizer.runVolcanoPlanner(Optimizer.scala:245) at org.apache.flink.table.plan.Optimizer.optimizeLogicalPlan(Optimizer.scala:160) at org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:66) at org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410) at org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:182) at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127) at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127) at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:201) at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:124) at org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:146) at com.jwell56.linkstarck.LInkStream$.main(LInkStream.scala:37) at com.jwell56.linkstarck.LInkStream.main(LInkStream.scala) 6 我也尝试使用了 select b.netstruct_id from OMstream as a left join sqlStream as b on a.ip > b.start_ip 同样是单个大小比较也是不可以的 谢谢! ------------------ 原始邮件 ------------------ 发件人: "Benchao Li"<[hidden email]>; 发送时间: 2020年6月9日(星期二) 下午4:37 收件人: "user-zh"<[hidden email]>; 主题: Re: 关于flinksql between问题 方便补充一下以下信息么? 1. 你使用的Flink的版本? 2. 使用的planner,是blink planner还是old planner? 3. 用的是streaming mode还是batch mode? 4. 具体的报错信息是什么? 小屁孩 <[hidden email]> 于2020年6月9日周二 下午4:26写道: > hi,我在flinksql中使用 select * from a join b on a.ip <b.startip and a.ip > &gt;b.endip 报了一个错误 不支持此功能 类似这种有没有类似between的函数可以使用 |
hi,感谢指导 已经可以实现,可以再问一下 如果我join的是一个mysql维表 我是自定义的source定时更新mysql表这样跟流表关联是可以的吗?
------------------ 原始邮件 ------------------ 发件人: "[hidden email]"<[hidden email]>; 发送时间: 2020年6月9日(星期二) 晚上6:35 收件人: "user-zh"<[hidden email]>; 主题: 回复: 回复: 关于flinksql between问题 我在1.10中用 useBlinkPlanner是可以的,用useOldPlanner是不可以的! 会报你下面的错误: Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: LogicalProject(num=[$0]) LogicalJoin(condition=[AND(>($0, $1), <($0, $2))], joinType=[inner]) FlinkLogicalDataStreamScan(id=[1], fields=[num]) FlinkLogicalDataStreamScan(id=[2], fields=[startNum, endNum]) This exception indicates that the query uses an unsupported SQL feature. 发件人: 小屁孩 发送时间: 2020-06-09 17:41 收件人: user-zh 主题: 回复: 关于flinksql between问题 hi,我使用的是&nbsp; 1 flink1.9.0 2 oldplanner <dependency&gt; <groupId&gt;org.apache.flink</groupId&gt; <artifactId&gt;flink-table-api-scala_2.11</artifactId&gt; <version&gt;1.9.0</version&gt; </dependency&gt; <dependency&gt; <groupId&gt;org.apache.flink</groupId&gt; <artifactId&gt;flink-table-planner_2.11</artifactId&gt; <version&gt;1.9.0</version&gt; </dependency&gt; 3 streaming mode 4. 代码类似如下 &nbsp; &nbsp; val sqlStream = env.createInput(jdbcInput) &nbsp; &nbsp; tnv.registerDataStream("sqlStream",sqlStream,'netstruct_id,'start_ip,'end_ip) &nbsp; &nbsp; tnv.registerDataStream("OMstream",value,'ip) //&nbsp; &nbsp; val table = tnv.sqlQuery("select * from&nbsp; OMstream as&nbsp; a left join sqlStream as&nbsp; b on a.ip &gt;b.start_ip and a.ip<b.end_ip") &nbsp; &nbsp; val table = tnv.sqlQuery("select b.netstruct_id from&nbsp; OMstream as&nbsp; a left join sqlStream as b on a.ip &gt; b.start_ip and a.ip <b.end_ip ") &nbsp; &nbsp; val resRow = table.toRetractStream[Row] 5 报错信息如下 Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query:&nbsp; LogicalProject(netstruct_id=[$1]) &nbsp; LogicalJoin(condition=[AND(&gt;($0, $2), <($0, $3))], joinType=[left]) &nbsp; &nbsp; FlinkLogicalDataStreamScan(id=[1], fields=[ip]) &nbsp; &nbsp; FlinkLogicalDataStreamScan(id=[2], fields=[netstruct_id, start_ip, end_ip]) This exception indicates that the query uses an unsupported SQL feature. Please check the documentation for the set of currently supported SQL features. at org.apache.flink.table.plan.Optimizer.runVolcanoPlanner(Optimizer.scala:245) at org.apache.flink.table.plan.Optimizer.optimizeLogicalPlan(Optimizer.scala:160) at org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:66) at org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410) at org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:182) at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127) at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127) at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:201) at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:124) at org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:146) at com.jwell56.linkstarck.LInkStream$.main(LInkStream.scala:37) at com.jwell56.linkstarck.LInkStream.main(LInkStream.scala) 6 我也尝试使用了&nbsp; select b.netstruct_id from&nbsp; OMstream as&nbsp; a left join sqlStream as b on a.ip &gt; b.start_ip 同样是单个大小比较也是不可以的&nbsp; 谢谢! ------------------&nbsp;原始邮件&nbsp;------------------ 发件人:&nbsp;"Benchao Li"<[hidden email]&gt;; 发送时间:&nbsp;2020年6月9日(星期二) 下午4:37 收件人:&nbsp;"user-zh"<[hidden email]&gt;; 主题:&nbsp;Re: 关于flinksql between问题 方便补充一下以下信息么? 1. 你使用的Flink的版本? 2. 使用的planner,是blink planner还是old planner? 3. 用的是streaming mode还是batch mode? 4. 具体的报错信息是什么? 小屁孩 <[hidden email]&gt; 于2020年6月9日周二 下午4:26写道: &gt; hi,我在flinksql中使用 select * from a join b on a.ip <b.startip and a.ip &gt; &amp;gt;b.endip 报了一个错误 不支持此功能 类似这种有没有类似between的函数可以使用 |
你的意思是你的mysql维表是自定义的,然后是定期更新的维表内容是么?只要你实现的是LookupSource,应该是没问题的。
内部实现你可以自己控制。 小屁孩 <[hidden email]> 于2020年6月10日周三 上午10:46写道: > hi,感谢指导 已经可以实现,可以再问一下 如果我join的是一个mysql维表 > 我是自定义的source定时更新mysql表这样跟流表关联是可以的吗? > > > > > ------------------ 原始邮件 ------------------ > 发件人: "[hidden email]"<[hidden email]>; > 发送时间: 2020年6月9日(星期二) 晚上6:35 > 收件人: "user-zh"<[hidden email]>; > > 主题: 回复: 回复: 关于flinksql between问题 > > > > > 我在1.10中用 useBlinkPlanner是可以的,用useOldPlanner是不可以的! > > 会报你下面的错误: > Exception in thread "main" > org.apache.flink.table.api.TableException: Cannot generate a valid > execution plan for the given query: > > LogicalProject(num=[$0]) > LogicalJoin(condition=[AND(>($0, $1), <($0, $2))], > joinType=[inner]) > FlinkLogicalDataStreamScan(id=[1], fields=[num]) > FlinkLogicalDataStreamScan(id=[2], fields=[startNum, > endNum]) > > This exception indicates that the query uses an unsupported SQL feature. > > > > > > 发件人: 小屁孩 > 发送时间: 2020-06-09 17:41 > 收件人: user-zh > 主题: 回复: 关于flinksql between问题 > hi,我使用的是&nbsp; > 1 flink1.9.0 > 2 oldplanner > <dependency&gt; > <groupId&gt;org.apache.flink</groupId&gt; > <artifactId&gt;flink-table-api-scala_2.11</artifactId&gt; > <version&gt;1.9.0</version&gt; > </dependency&gt; > > > <dependency&gt; > <groupId&gt;org.apache.flink</groupId&gt; > <artifactId&gt;flink-table-planner_2.11</artifactId&gt; > <version&gt;1.9.0</version&gt; > </dependency&gt; > > 3 streaming mode > 4. 代码类似如下 > &nbsp; &nbsp; val sqlStream = env.createInput(jdbcInput) > &nbsp; &nbsp; > tnv.registerDataStream("sqlStream",sqlStream,'netstruct_id,'start_ip,'end_ip) > &nbsp; &nbsp; tnv.registerDataStream("OMstream",value,'ip) > //&nbsp; &nbsp; val table = tnv.sqlQuery("select * from&nbsp; > OMstream as&nbsp; a left join sqlStream as&nbsp; b on a.ip > &gt;b.start_ip and a.ip<b.end_ip") > &nbsp; &nbsp; val table = tnv.sqlQuery("select b.netstruct_id > from&nbsp; OMstream as&nbsp; a left join sqlStream as b on a.ip > &gt; b.start_ip and a.ip <b.end_ip ") > &nbsp; &nbsp; val resRow = table.toRetractStream[Row] > > 5 报错信息如下 > Exception in thread "main" org.apache.flink.table.api.TableException: > Cannot generate a valid execution plan for the given query:&nbsp; > > > LogicalProject(netstruct_id=[$1]) > &nbsp; LogicalJoin(condition=[AND(&gt;($0, $2), <($0, $3))], > joinType=[left]) > &nbsp; &nbsp; FlinkLogicalDataStreamScan(id=[1], fields=[ip]) > &nbsp; &nbsp; FlinkLogicalDataStreamScan(id=[2], > fields=[netstruct_id, start_ip, end_ip]) > > > This exception indicates that the query uses an unsupported SQL feature. > Please check the documentation for the set of currently supported SQL > features. > at > org.apache.flink.table.plan.Optimizer.runVolcanoPlanner(Optimizer.scala:245) > at > org.apache.flink.table.plan.Optimizer.optimizeLogicalPlan(Optimizer.scala:160) > at > org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:66) > at > org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410) > at org.apache.flink.table.planner.StreamPlanner.org > $apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:182) > at > org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127) > at > org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127) > at > org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:201) > at > org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:124) > at > org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:146) > at com.jwell56.linkstarck.LInkStream$.main(LInkStream.scala:37) > at com.jwell56.linkstarck.LInkStream.main(LInkStream.scala) > > > > > > 6 我也尝试使用了&nbsp; > select b.netstruct_id from&nbsp; OMstream as&nbsp; a left join > sqlStream as b on a.ip &gt; b.start_ip > 同样是单个大小比较也是不可以的&nbsp; > > > 谢谢! > > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > 发件人:&nbsp;"Benchao Li"<[hidden email]&gt;; > 发送时间:&nbsp;2020年6月9日(星期二) 下午4:37 > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > > 主题:&nbsp;Re: 关于flinksql between问题 > > > > 方便补充一下以下信息么? > 1. 你使用的Flink的版本? > 2. 使用的planner,是blink planner还是old planner? > 3. 用的是streaming mode还是batch mode? > 4. 具体的报错信息是什么? > > 小屁孩 <[hidden email]&gt; 于2020年6月9日周二 下午4:26写道: > > &gt; hi,我在flinksql中使用 select * from a join b on a.ip <b.startip and > a.ip > &gt; &amp;gt;b.endip 报了一个错误 不支持此功能 类似这种有没有类似between的函数可以使用 |
In reply to this post by 小屁孩
Hi,
看你描述的想要的是自定义source(左表), 需要同一张mysql 维表做join,如果是这样的话,你的sql看起来有点问题,目前的写法是regular join, 维表join的语法[1]: SELECT o.amout, o.currency, r.rate, o.amount * r.rate FROM Orders AS o JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r ON r.currency = o.currency 另外JDBC connector是实现了LookupSource的,也就是支持做维表,维表的更新的 connector.lookup.cache.ttl 参数控制维表中cache的过期时间,不知道是否满足你的需求。 Best, Leonard Xu [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins <https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector <https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector> > 在 2020年6月10日,10:43,小屁孩 <[hidden email]> 写道: > > hi,感谢指导 已经可以实现,可以再问一下 如果我join的是一个mysql维表 我是自定义的source定时更新mysql表这样跟流表关联是可以的吗? > > > > > ------------------ 原始邮件 ------------------ > 发件人: "[hidden email]"<[hidden email]>; > 发送时间: 2020年6月9日(星期二) 晚上6:35 > 收件人: "user-zh"<[hidden email]>; > > 主题: 回复: 回复: 关于flinksql between问题 > > > > > 我在1.10中用 useBlinkPlanner是可以的,用useOldPlanner是不可以的! > > 会报你下面的错误: > Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: > > LogicalProject(num=[$0]) > LogicalJoin(condition=[AND(>($0, $1), <($0, $2))], joinType=[inner]) > FlinkLogicalDataStreamScan(id=[1], fields=[num]) > FlinkLogicalDataStreamScan(id=[2], fields=[startNum, endNum]) > > This exception indicates that the query uses an unsupported SQL feature. > > > > > > 发件人: 小屁孩 > 发送时间: 2020-06-09 17:41 > 收件人: user-zh > 主题: 回复: 关于flinksql between问题 > hi,我使用的是&nbsp; > 1 flink1.9.0 > 2 oldplanner > <dependency&gt; > <groupId&gt;org.apache.flink</groupId&gt; > <artifactId&gt;flink-table-api-scala_2.11</artifactId&gt; > <version&gt;1.9.0</version&gt; > </dependency&gt; > > > <dependency&gt; > <groupId&gt;org.apache.flink</groupId&gt; > <artifactId&gt;flink-table-planner_2.11</artifactId&gt; > <version&gt;1.9.0</version&gt; > </dependency&gt; > > 3 streaming mode > 4. 代码类似如下 > &nbsp; &nbsp; val sqlStream = env.createInput(jdbcInput) > &nbsp; &nbsp; tnv.registerDataStream("sqlStream",sqlStream,'netstruct_id,'start_ip,'end_ip) > &nbsp; &nbsp; tnv.registerDataStream("OMstream",value,'ip) > //&nbsp; &nbsp; val table = tnv.sqlQuery("select * from&nbsp; OMstream as&nbsp; a left join sqlStream as&nbsp; b on a.ip &gt;b.start_ip and a.ip<b.end_ip") > &nbsp; &nbsp; val table = tnv.sqlQuery("select b.netstruct_id from&nbsp; OMstream as&nbsp; a left join sqlStream as b on a.ip &gt; b.start_ip and a.ip <b.end_ip ") > &nbsp; &nbsp; val resRow = table.toRetractStream[Row] > > 5 报错信息如下 > Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query:&nbsp; > > > LogicalProject(netstruct_id=[$1]) > &nbsp; LogicalJoin(condition=[AND(&gt;($0, $2), <($0, $3))], joinType=[left]) > &nbsp; &nbsp; FlinkLogicalDataStreamScan(id=[1], fields=[ip]) > &nbsp; &nbsp; FlinkLogicalDataStreamScan(id=[2], fields=[netstruct_id, start_ip, end_ip]) > > > This exception indicates that the query uses an unsupported SQL feature. > Please check the documentation for the set of currently supported SQL features. > at org.apache.flink.table.plan.Optimizer.runVolcanoPlanner(Optimizer.scala:245) > at org.apache.flink.table.plan.Optimizer.optimizeLogicalPlan(Optimizer.scala:160) > at org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:66) > at org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410) > at org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:182) > at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127) > at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127) > at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127) > at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:201) > at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:124) > at org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:146) > at com.jwell56.linkstarck.LInkStream$.main(LInkStream.scala:37) > at com.jwell56.linkstarck.LInkStream.main(LInkStream.scala) > > > > > > 6 我也尝试使用了&nbsp; > select b.netstruct_id from&nbsp; OMstream as&nbsp; a left join sqlStream as b on a.ip &gt; b.start_ip > 同样是单个大小比较也是不可以的&nbsp; > > > 谢谢! > > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > 发件人:&nbsp;"Benchao Li"<[hidden email]&gt;; > 发送时间:&nbsp;2020年6月9日(星期二) 下午4:37 > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > > 主题:&nbsp;Re: 关于flinksql between问题 > > > > 方便补充一下以下信息么? > 1. 你使用的Flink的版本? > 2. 使用的planner,是blink planner还是old planner? > 3. 用的是streaming mode还是batch mode? > 4. 具体的报错信息是什么? > > 小屁孩 <[hidden email]&gt; 于2020年6月9日周二 下午4:26写道: > > &gt; hi,我在flinksql中使用 select * from a join b on a.ip <b.startip and a.ip > &gt; &amp;gt;b.endip 报了一个错误 不支持此功能 类似这种有没有类似between的函数可以使用 |
非常感谢,使用的flink1.10.0 中流转成表是是否有字段的数目的限制 我把流转成表 我的流是一个封装的实体类
tnv.registerDataStream("sqlStream",mysqlinst,'start_ip,'end_ip,'area_id,'unit_id,'device_id) tnv.registerDataStream("OMstream",value,'original_network_id,'asset_id,'types,'d_ip,'d_port,'s_ip,'s_port,'devip,'url,'common_des,'operation_des,'raw_log,'severity,'happen_time,'create_time,'s_ip_num, 'd_ip_num,'method,'asset_area_id,'device_id,'s_mac,'d_mac,'scope,'dcope,'s_asset_id,'d_asset_id,'asset_unit_id,'area_id,'unit_id,'enterprise_id) tnv.registerFunction("ip_to_num",IPtoNum) 在转成表时 如下错误 log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Exception in thread "main" org.apache.flink.table.api.ValidationException: Too many fields referenced from an atomic type. at org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInfoFromAtomicType(FieldInfoUtils.java:388) at org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInformation(FieldInfoUtils.java:259) at org.apache.flink.table.typeutils.FieldInfoUtils.getFieldsInfo(FieldInfoUtils.java:227) at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$$anonfun$1.apply(StreamTableEnvironmentImpl.scala:237) at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$$anonfun$1.apply(StreamTableEnvironmentImpl.scala:236) at scala.Option.map(Option.scala:146) at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.asQueryOperation(StreamTableEnvironmentImpl.scala:236) at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.fromDataStream(StreamTableEnvironmentImpl.scala:81) at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.registerDataStream(StreamTableEnvironmentImpl.scala:94) at com.jwell56.networksyslog.jobs.jobsource.MysqlLogToES$.main(MysqlLogToES.scala:77) at com.jwell56.networksyslog.jobs.jobsource.MysqlLogToES.main(MysqlLogToES.scala) ------------------ 原始邮件 ------------------ 发件人: "Leonard Xu"<[hidden email]>; 发送时间: 2020年6月10日(星期三) 中午1:16 收件人: "user-zh"<[hidden email]>; 主题: Re: 关于flinksql between问题 Hi, 看你描述的想要的是自定义source(左表), 需要同一张mysql 维表做join,如果是这样的话,你的sql看起来有点问题,目前的写法是regular join, 维表join的语法[1]: SELECT o.amout, o.currency, r.rate, o.amount * r.rate FROM Orders AS o JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r ON r.currency = o.currency 另外JDBC connector是实现了LookupSource的,也就是支持做维表,维表的更新的 connector.lookup.cache.ttl 参数控制维表中cache的过期时间,不知道是否满足你的需求。 Best, Leonard Xu [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins <https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector <https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector> > 在 2020年6月10日,10:43,小屁孩 <[hidden email]> 写道: > > hi,感谢指导 已经可以实现,可以再问一下 如果我join的是一个mysql维表 我是自定义的source定时更新mysql表这样跟流表关联是可以的吗? > > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > 发件人:&nbsp;"[hidden email]"<[hidden email]&gt;; > 发送时间:&nbsp;2020年6月9日(星期二) 晚上6:35 > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > > 主题:&nbsp;回复: 回复: 关于flinksql between问题 > > > > > &nbsp; 我在1.10中用 useBlinkPlanner是可以的,用useOldPlanner是不可以的! > &nbsp; > 会报你下面的错误: > &nbsp; Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: > > LogicalProject(num=[$0]) > &nbsp; LogicalJoin(condition=[AND(&gt;($0, $1), <($0, $2))], joinType=[inner]) > &nbsp;&nbsp;&nbsp; FlinkLogicalDataStreamScan(id=[1], fields=[num]) > &nbsp;&nbsp;&nbsp; FlinkLogicalDataStreamScan(id=[2], fields=[startNum, endNum]) > > This exception indicates that the query uses an unsupported SQL feature. > > > > > &nbsp; > 发件人: 小屁孩 > 发送时间: 2020-06-09 17:41 > 收件人: user-zh > 主题: 回复: 关于flinksql between问题 > hi,我使用的是&amp;nbsp; > 1 flink1.9.0 > 2 oldplanner > <dependency&amp;gt; > <groupId&amp;gt;org.apache.flink</groupId&amp;gt; > <artifactId&amp;gt;flink-table-api-scala_2.11</artifactId&amp;gt; > <version&amp;gt;1.9.0</version&amp;gt; > </dependency&amp;gt; > &nbsp; > &nbsp; > <dependency&amp;gt; > <groupId&amp;gt;org.apache.flink</groupId&amp;gt; > <artifactId&amp;gt;flink-table-planner_2.11</artifactId&amp;gt; > <version&amp;gt;1.9.0</version&amp;gt; > </dependency&amp;gt; > &nbsp; > 3 streaming mode > 4. 代码类似如下 > &amp;nbsp; &amp;nbsp; val sqlStream = env.createInput(jdbcInput) > &amp;nbsp; &amp;nbsp; tnv.registerDataStream("sqlStream",sqlStream,'netstruct_id,'start_ip,'end_ip) > &amp;nbsp; &amp;nbsp; tnv.registerDataStream("OMstream",value,'ip) > //&amp;nbsp; &amp;nbsp; val table = tnv.sqlQuery("select * from&amp;nbsp; OMstream as&amp;nbsp; a left join sqlStream as&amp;nbsp; b on a.ip &amp;gt;b.start_ip and a.ip<b.end_ip") > &amp;nbsp; &amp;nbsp; val table = tnv.sqlQuery("select b.netstruct_id from&amp;nbsp; OMstream as&amp;nbsp; a left join sqlStream as b on a.ip &amp;gt; b.start_ip and a.ip <b.end_ip ") > &amp;nbsp; &amp;nbsp; val resRow = table.toRetractStream[Row] > &nbsp; > 5 报错信息如下 > Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query:&amp;nbsp; > &nbsp; > &nbsp; > LogicalProject(netstruct_id=[$1]) > &amp;nbsp; LogicalJoin(condition=[AND(&amp;gt;($0, $2), <($0, $3))], joinType=[left]) > &amp;nbsp; &amp;nbsp; FlinkLogicalDataStreamScan(id=[1], fields=[ip]) > &amp;nbsp; &amp;nbsp; FlinkLogicalDataStreamScan(id=[2], fields=[netstruct_id, start_ip, end_ip]) > &nbsp; > &nbsp; > This exception indicates that the query uses an unsupported SQL feature. > Please check the documentation for the set of currently supported SQL features. > at org.apache.flink.table.plan.Optimizer.runVolcanoPlanner(Optimizer.scala:245) > at org.apache.flink.table.plan.Optimizer.optimizeLogicalPlan(Optimizer.scala:160) > at org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:66) > at org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410) > at org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:182) > at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127) > at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127) > at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127) > at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:201) > at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:124) > at org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:146) > at com.jwell56.linkstarck.LInkStream$.main(LInkStream.scala:37) > at com.jwell56.linkstarck.LInkStream.main(LInkStream.scala) > &nbsp; > &nbsp; > &nbsp; > &nbsp; > &nbsp; > 6 我也尝试使用了&amp;nbsp; > select b.netstruct_id from&amp;nbsp; OMstream as&amp;nbsp; a left join sqlStream as b on a.ip &amp;gt; b.start_ip > 同样是单个大小比较也是不可以的&amp;nbsp; > &nbsp; > &nbsp; > 谢谢! > &nbsp; > &nbsp; > &nbsp; > &nbsp; > ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------ > 发件人:&amp;nbsp;"Benchao Li"<[hidden email]&amp;gt;; > 发送时间:&amp;nbsp;2020年6月9日(星期二) 下午4:37 > 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;; > &nbsp; > 主题:&amp;nbsp;Re: 关于flinksql between问题 > &nbsp; > &nbsp; > &nbsp; > 方便补充一下以下信息么? > 1. 你使用的Flink的版本? > 2. 使用的planner,是blink planner还是old planner? > 3. 用的是streaming mode还是batch mode? > 4. 具体的报错信息是什么? > &nbsp; > 小屁孩 <[hidden email]&amp;gt; 于2020年6月9日周二 下午4:26写道: > &nbsp; > &amp;gt; hi,我在flinksql中使用 select * from a join b on a.ip <b.startip and a.ip > &amp;gt; &amp;amp;gt;b.endip 报了一个错误 不支持此功能 类似这种有没有类似between的函数可以使用 |
应该是你的value或者mysqlinst值中数据字段和你定义的字段不一致造成的! 发件人: 小屁孩 发送时间: 2020-06-10 15:25 收件人: user-zh 主题: 回复: 关于flinksql between问题 非常感谢,使用的flink1.10.0 中流转成表是是否有字段的数目的限制 我把流转成表 我的流是一个封装的实体类 tnv.registerDataStream("sqlStream",mysqlinst,'start_ip,'end_ip,'area_id,'unit_id,'device_id) tnv.registerDataStream("OMstream",value,'original_network_id,'asset_id,'types,'d_ip,'d_port,'s_ip,'s_port,'devip,'url,'common_des,'operation_des,'raw_log,'severity,'happen_time,'create_time,'s_ip_num, 'd_ip_num,'method,'asset_area_id,'device_id,'s_mac,'d_mac,'scope,'dcope,'s_asset_id,'d_asset_id,'asset_unit_id,'area_id,'unit_id,'enterprise_id) tnv.registerFunction("ip_to_num",IPtoNum) 在转成表时 如下错误 log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Exception in thread "main" org.apache.flink.table.api.ValidationException: Too many fields referenced from an atomic type. at org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInfoFromAtomicType(FieldInfoUtils.java:388) at org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInformation(FieldInfoUtils.java:259) at org.apache.flink.table.typeutils.FieldInfoUtils.getFieldsInfo(FieldInfoUtils.java:227) at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$$anonfun$1.apply(StreamTableEnvironmentImpl.scala:237) at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$$anonfun$1.apply(StreamTableEnvironmentImpl.scala:236) at scala.Option.map(Option.scala:146) at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.asQueryOperation(StreamTableEnvironmentImpl.scala:236) at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.fromDataStream(StreamTableEnvironmentImpl.scala:81) at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.registerDataStream(StreamTableEnvironmentImpl.scala:94) at com.jwell56.networksyslog.jobs.jobsource.MysqlLogToES$.main(MysqlLogToES.scala:77) at com.jwell56.networksyslog.jobs.jobsource.MysqlLogToES.main(MysqlLogToES.scala) ------------------ 原始邮件 ------------------ 发件人: "Leonard Xu"<[hidden email]>; 发送时间: 2020年6月10日(星期三) 中午1:16 收件人: "user-zh"<[hidden email]>; 主题: Re: 关于flinksql between问题 Hi, 看你描述的想要的是自定义source(左表), 需要同一张mysql 维表做join,如果是这样的话,你的sql看起来有点问题,目前的写法是regular join, 维表join的语法[1]: SELECT o.amout, o.currency, r.rate, o.amount * r.rate FROM Orders AS o JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r ON r.currency = o.currency 另外JDBC connector是实现了LookupSource的,也就是支持做维表,维表的更新的 connector.lookup.cache.ttl 参数控制维表中cache的过期时间,不知道是否满足你的需求。 Best, Leonard Xu [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins <https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector <https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector> > 在 2020年6月10日,10:43,小屁孩 <[hidden email]> 写道: > > hi,感谢指导 已经可以实现,可以再问一下 如果我join的是一个mysql维表 我是自定义的source定时更新mysql表这样跟流表关联是可以的吗? > > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > 发件人:&nbsp;"[hidden email]"<[hidden email]&gt;; > 发送时间:&nbsp;2020年6月9日(星期二) 晚上6:35 > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > > 主题:&nbsp;回复: 回复: 关于flinksql between问题 > > > > > &nbsp; 我在1.10中用 useBlinkPlanner是可以的,用useOldPlanner是不可以的! > &nbsp; > 会报你下面的错误: > &nbsp; Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: > > LogicalProject(num=[$0]) > &nbsp; LogicalJoin(condition=[AND(&gt;($0, $1), <($0, $2))], joinType=[inner]) > &nbsp;&nbsp;&nbsp; FlinkLogicalDataStreamScan(id=[1], fields=[num]) > &nbsp;&nbsp;&nbsp; FlinkLogicalDataStreamScan(id=[2], fields=[startNum, endNum]) > > This exception indicates that the query uses an unsupported SQL feature. > > > > > &nbsp; > 发件人: 小屁孩 > 发送时间: 2020-06-09 17:41 > 收件人: user-zh > 主题: 回复: 关于flinksql between问题 > hi,我使用的是&amp;nbsp; > 1 flink1.9.0 > 2 oldplanner > <dependency&amp;gt; > <groupId&amp;gt;org.apache.flink</groupId&amp;gt; > <artifactId&amp;gt;flink-table-api-scala_2.11</artifactId&amp;gt; > <version&amp;gt;1.9.0</version&amp;gt; > </dependency&amp;gt; > &nbsp; > &nbsp; > <dependency&amp;gt; > <groupId&amp;gt;org.apache.flink</groupId&amp;gt; > <artifactId&amp;gt;flink-table-planner_2.11</artifactId&amp;gt; > <version&amp;gt;1.9.0</version&amp;gt; > </dependency&amp;gt; > &nbsp; > 3 streaming mode > 4. 代码类似如下 > &amp;nbsp; &amp;nbsp; val sqlStream = env.createInput(jdbcInput) > &amp;nbsp; &amp;nbsp; tnv.registerDataStream("sqlStream",sqlStream,'netstruct_id,'start_ip,'end_ip) > &amp;nbsp; &amp;nbsp; tnv.registerDataStream("OMstream",value,'ip) > //&amp;nbsp; &amp;nbsp; val table = tnv.sqlQuery("select * from&amp;nbsp; OMstream as&amp;nbsp; a left join sqlStream as&amp;nbsp; b on a.ip &amp;gt;b.start_ip and a.ip<b.end_ip") > &amp;nbsp; &amp;nbsp; val table = tnv.sqlQuery("select b.netstruct_id from&amp;nbsp; OMstream as&amp;nbsp; a left join sqlStream as b on a.ip &amp;gt; b.start_ip and a.ip <b.end_ip ") > &amp;nbsp; &amp;nbsp; val resRow = table.toRetractStream[Row] > &nbsp; > 5 报错信息如下 > Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query:&amp;nbsp; > &nbsp; > &nbsp; > LogicalProject(netstruct_id=[$1]) > &amp;nbsp; LogicalJoin(condition=[AND(&amp;gt;($0, $2), <($0, $3))], joinType=[left]) > &amp;nbsp; &amp;nbsp; FlinkLogicalDataStreamScan(id=[1], fields=[ip]) > &amp;nbsp; &amp;nbsp; FlinkLogicalDataStreamScan(id=[2], fields=[netstruct_id, start_ip, end_ip]) > &nbsp; > &nbsp; > This exception indicates that the query uses an unsupported SQL feature. > Please check the documentation for the set of currently supported SQL features. > at org.apache.flink.table.plan.Optimizer.runVolcanoPlanner(Optimizer.scala:245) > at org.apache.flink.table.plan.Optimizer.optimizeLogicalPlan(Optimizer.scala:160) > at org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:66) > at org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410) > at org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:182) > at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127) > at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127) > at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127) > at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:201) > at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:124) > at org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:146) > at com.jwell56.linkstarck.LInkStream$.main(LInkStream.scala:37) > at com.jwell56.linkstarck.LInkStream.main(LInkStream.scala) > &nbsp; > &nbsp; > &nbsp; > &nbsp; > &nbsp; > 6 我也尝试使用了&amp;nbsp; > select b.netstruct_id from&amp;nbsp; OMstream as&amp;nbsp; a left join sqlStream as b on a.ip &amp;gt; b.start_ip > 同样是单个大小比较也是不可以的&amp;nbsp; > &nbsp; > &nbsp; > 谢谢! > &nbsp; > &nbsp; > &nbsp; > &nbsp; > ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------ > 发件人:&amp;nbsp;"Benchao Li"<[hidden email]&amp;gt;; > 发送时间:&amp;nbsp;2020年6月9日(星期二) 下午4:37 > 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;; > &nbsp; > 主题:&amp;nbsp;Re: 关于flinksql between问题 > &nbsp; > &nbsp; > &nbsp; > 方便补充一下以下信息么? > 1. 你使用的Flink的版本? > 2. 使用的planner,是blink planner还是old planner? > 3. 用的是streaming mode还是batch mode? > 4. 具体的报错信息是什么? > &nbsp; > 小屁孩 <[hidden email]&amp;gt; 于2020年6月9日周二 下午4:26写道: > &nbsp; > &amp;gt; hi,我在flinksql中使用 select * from a join b on a.ip <b.startip and a.ip > &amp;gt; &amp;amp;gt;b.endip 报了一个错误 不支持此功能 类似这种有没有类似between的函数可以使用 |
In reply to this post by 小屁孩
>
> 非常感谢,使用的flink1.10.0 中流转成表是是否有字段的数目的限制 我把流转成表 我的流是一个封装的实体类 转换的时候没有这个字段数目的限制的,另外看你的字段也不是很多,一般业务上几百个字段都正常的,你检查下你字段的对应关系 祝好, Leonard Xu > tnv.registerDataStream("sqlStream",mysqlinst,'start_ip,'end_ip,'area_id,'unit_id,'device_id) > tnv.registerDataStream("OMstream",value,'original_network_id,'asset_id,'types,'d_ip,'d_port,'s_ip,'s_port,'devip,'url,'common_des,'operation_des,'raw_log,'severity,'happen_time,'create_time,'s_ip_num, > 'd_ip_num,'method,'asset_area_id,'device_id,'s_mac,'d_mac,'scope,'dcope,'s_asset_id,'d_asset_id,'asset_unit_id,'area_id,'unit_id,'enterprise_id) > tnv.registerFunction("ip_to_num",IPtoNum) > > 在转成表时 如下错误 > log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig <http://logging.apache.org/log4j/1.2/faq.html#noconfig> for more info. > Exception in thread "main" org.apache.flink.table.api.ValidationException: Too many fields referenced from an atomic type. > at org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInfoFromAtomicType(FieldInfoUtils.java:388) > at org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInformation(FieldInfoUtils.java:259) > at org.apache.flink.table.typeutils.FieldInfoUtils.getFieldsInfo(FieldInfoUtils.java:227) > at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$$anonfun$1.apply(StreamTableEnvironmentImpl.scala:237) > at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$$anonfun$1.apply(StreamTableEnvironmentImpl.scala:236) > at scala.Option.map(Option.scala:146) > at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.asQueryOperation(StreamTableEnvironmentImpl.scala:236) > at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.fromDataStream(StreamTableEnvironmentImpl.scala:81) > at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.registerDataStream(StreamTableEnvironmentImpl.scala:94) > at com.jwell56.networksyslog.jobs.jobsource.MysqlLogToES$.main(MysqlLogToES.scala:77) > at com.jwell56.networksyslog.jobs.jobsource.MysqlLogToES.main(MysqlLogToES.scala) > > > > > > ------------------ 原始邮件 ------------------ > 发件人: "Leonard Xu"<[hidden email] <mailto:[hidden email]>>; > 发送时间: 2020年6月10日(星期三) 中午1:16 > 收件人: "user-zh"<[hidden email] <mailto:[hidden email]>>; > > 主题: Re: 关于flinksql between问题 > > > > Hi, > > 看你描述的想要的是自定义source(左表), 需要同一张mysql 维表做join,如果是这样的话,你的sql看起来有点问题,目前的写法是regular join, 维表join的语法[1]: > > SELECT > o.amout, o.currency, r.rate, o.amount * r.rate > FROM > Orders AS o > JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r > ON r.currency = o.currency > 另外JDBC connector是实现了LookupSource的,也就是支持做维表,维表的更新的 connector.lookup.cache.ttl 参数控制维表中cache的过期时间,不知道是否满足你的需求。 > > Best, > Leonard Xu > > [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins <https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins><https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins>>; > [2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector <https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector>>; > > > 在 2020年6月10日,10:43,小屁孩 <[hidden email] <mailto:[hidden email]>> 写道: > > > > hi,感谢指导 已经可以实现,可以再问一下 如果我join的是一个mysql维表 我是自定义的source定时更新mysql表这样跟流表关联是可以的吗? > > > > > > > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > > 发件人:&nbsp;"[hidden email] <mailto:[hidden email]>"<[hidden email] <mailto:[hidden email]>&gt;; > > 发送时间:&nbsp;2020年6月9日(星期二) 晚上6:35 > > 收件人:&nbsp;"user-zh"<[hidden email] <mailto:[hidden email]>&gt;; > > > > 主题:&nbsp;回复: 回复: 关于flinksql between问题 > > > > > > > > > > &nbsp; 我在1.10中用 useBlinkPlanner是可以的,用useOldPlanner是不可以的! > > &nbsp; > > 会报你下面的错误: > > &nbsp; Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: > > > > LogicalProject(num=[$0]) > > &nbsp; LogicalJoin(condition=[AND(&gt;($0, $1), <($0, $2))], joinType=[inner]) > > &nbsp;&nbsp;&nbsp; FlinkLogicalDataStreamScan(id=[1], fields=[num]) > > &nbsp;&nbsp;&nbsp; FlinkLogicalDataStreamScan(id=[2], fields=[startNum, endNum]) > > > > This exception indicates that the query uses an unsupported SQL feature. > > > > > > > > > > &nbsp; > > 发件人: 小屁孩 > > 发送时间: 2020-06-09 17:41 > > 收件人: user-zh > > 主题: 回复: 关于flinksql between问题 > > hi,我使用的是&amp;nbsp; > > 1 flink1.9.0 > > 2 oldplanner > > <dependency&amp;gt; > > <groupId&amp;gt;org.apache.flink</groupId&amp;gt; > > <artifactId&amp;gt;flink-table-api-scala_2.11</artifactId&amp;gt; > > <version&amp;gt;1.9.0</version&amp;gt; > > </dependency&amp;gt; > > &nbsp; > > &nbsp; > > <dependency&amp;gt; > > <groupId&amp;gt;org.apache.flink</groupId&amp;gt; > > <artifactId&amp;gt;flink-table-planner_2.11</artifactId&amp;gt; > > <version&amp;gt;1.9.0</version&amp;gt; > > </dependency&amp;gt; > > &nbsp; > > 3 streaming mode > > 4. 代码类似如下 > > &amp;nbsp; &amp;nbsp; val sqlStream = env.createInput(jdbcInput) > > &amp;nbsp; &amp;nbsp; tnv.registerDataStream("sqlStream",sqlStream,'netstruct_id,'start_ip,'end_ip) > > &amp;nbsp; &amp;nbsp; tnv.registerDataStream("OMstream",value,'ip) > > //&amp;nbsp; &amp;nbsp; val table = tnv.sqlQuery("select * from&amp;nbsp; OMstream as&amp;nbsp; a left join sqlStream as&amp;nbsp; b on a.ip &amp;gt;b.start_ip and a.ip<b.end_ip") > > &amp;nbsp; &amp;nbsp; val table = tnv.sqlQuery("select b.netstruct_id from&amp;nbsp; OMstream as&amp;nbsp; a left join sqlStream as b on a.ip &amp;gt; b.start_ip and a.ip <b.end_ip ") > > &amp;nbsp; &amp;nbsp; val resRow = table.toRetractStream[Row] > > &nbsp; > > 5 报错信息如下 > > Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query:&amp;nbsp; > > &nbsp; > > &nbsp; > > LogicalProject(netstruct_id=[$1]) > > &amp;nbsp; LogicalJoin(condition=[AND(&amp;gt;($0, $2), <($0, $3))], joinType=[left]) > > &amp;nbsp; &amp;nbsp; FlinkLogicalDataStreamScan(id=[1], fields=[ip]) > > &amp;nbsp; &amp;nbsp; FlinkLogicalDataStreamScan(id=[2], fields=[netstruct_id, start_ip, end_ip]) > > &nbsp; > > &nbsp; > > This exception indicates that the query uses an unsupported SQL feature. > > Please check the documentation for the set of currently supported SQL features. > > at org.apache.flink.table.plan.Optimizer.runVolcanoPlanner(Optimizer.scala:245) > > at org.apache.flink.table.plan.Optimizer.optimizeLogicalPlan(Optimizer.scala:160) > > at org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:66) > > at org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410) > > at org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:182) > > at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127) > > at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127) > > at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > > at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > > at org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127) > > at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:201) > > at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:124) > > at org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:146) > > at com.jwell56.linkstarck.LInkStream$.main(LInkStream.scala:37) > > at com.jwell56.linkstarck.LInkStream.main(LInkStream.scala) > > &nbsp; > > &nbsp; > > &nbsp; > > &nbsp; > > &nbsp; > > 6 我也尝试使用了&amp;nbsp; > > select b.netstruct_id from&amp;nbsp; OMstream as&amp;nbsp; a left join sqlStream as b on a.ip &amp;gt; b.start_ip > > 同样是单个大小比较也是不可以的&amp;nbsp; > > &nbsp; > > &nbsp; > > 谢谢! > > &nbsp; > > &nbsp; > > &nbsp; > > &nbsp; > > ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------ > > 发件人:&amp;nbsp;"Benchao Li"<[hidden email] <mailto:[hidden email]>&amp;gt;; > > 发送时间:&amp;nbsp;2020年6月9日(星期二) 下午4:37 > > 收件人:&amp;nbsp;"user-zh"<[hidden email] <mailto:[hidden email]>&amp;gt;; > > &nbsp; > > 主题:&amp;nbsp;Re: 关于flinksql between问题 > > &nbsp; > > &nbsp; > > &nbsp; > > 方便补充一下以下信息么? > > 1. 你使用的Flink的版本? > > 2. 使用的planner,是blink planner还是old planner? > > 3. 用的是streaming mode还是batch mode? > > 4. 具体的报错信息是什么? > > &nbsp; > > 小屁孩 <[hidden email]&amp;gt; 于2020年6月9日周二 下午4:26写道: > > &nbsp; > > &amp;gt; hi,我在flinksql中使用 select * from a join b on a.ip <b.startip and a.ip > > &amp;gt; &amp;amp;gt;b.endip 报了一个错误 不支持此功能 类似这种有没有类似between的函数可以使用 |
感谢 !是我字段对应不对,一张流表join一张mysql表 是否支持group by 如果支持他的工作原理又是怎么样的呢?目前我的业务场景:流表 A(id,ip) mysql表B(startip,endip,area_id)
关联条件是A.ip between(B.startip,B.endIp) 可能关联出多个area_id 然后取出最小的area_id 我目前的做法就是使用sql between完之后再根据ID 取出最小的 代码大概如下: val table = tnv.sqlQuery("select a.*,b.area_id as s_area_id,b.unit_id as s_unit_id,(ip_to_num(b.end_ip)-ip_to_num(b.start_ip)) as scoped from OMstream as a left join sqlStream as b on ip_to_num(a.s_ip) > ip_to_num(b.start_ip) and ip_to_num(a.s_ip) <ip_to_num(b.end_ip) and a.device_id=b.device_id") val value2: DataStream[(Boolean, Row)] = table.toRetractStream[Row] val value3 = value2.filter(_._1) //目前是数据到了这步之后 后面的数据没有输出 val value4 =value3.map(x => { val mes =info(x._2.getField(0).toString, x._2.getField(1).toString, x._2.getField(2).toString, x._2.getField(3).toString, x._2.getField(4).toString, x._2.getField(5).toString, x._2.getField(6).toString, x._2.getField(7).toString, x._2.getField(8).toString, x._2.getField(9).toString, x._2.getField(10).toString, x._2.getField(11).toString.toInt, x._2.getField(12).toString, x._2.getField(13).toString, x._2.getField(14).toString, x._2.getField(15).toString, x._2.getField(16).toString, x._2.getField(17).toString, x._2.getField(18).toString.toInt, x._2.getField(19).toString, x._2.getField(20).toString, x._2.getField(21).toString, x._2.getField(22).toString, x._2.getField(23).toString, x._2.getField(24).toString, x._2.getField(25).toString, x._2.getField(26).toString.toInt, x._2.getField(27).toString.toInt, x._2.getField(28).toString.toInt) mes }) //这步只是对数据row转成case class value4.print //数据打印不出来 value4.keyBy("rowkey").timeWindow(Time.seconds(2)).minBy("scoped") ------------------ 原始邮件 ------------------ 发件人: "Leonard Xu"<[hidden email]>; 发送时间: 2020年6月10日(星期三) 晚上8:28 收件人: "user-zh"<[hidden email]>; 主题: Re: 关于flinksql between问题 > > 非常感谢,使用的flink1.10.0 中流转成表是是否有字段的数目的限制 我把流转成表 我的流是一个封装的实体类 转换的时候没有这个字段数目的限制的,另外看你的字段也不是很多,一般业务上几百个字段都正常的,你检查下你字段的对应关系 祝好, Leonard Xu > tnv.registerDataStream("sqlStream",mysqlinst,'start_ip,'end_ip,'area_id,'unit_id,'device_id) > &nbsp; &nbsp; tnv.registerDataStream("OMstream",value,'original_network_id,'asset_id,'types,'d_ip,'d_port,'s_ip,'s_port,'devip,'url,'common_des,'operation_des,'raw_log,'severity,'happen_time,'create_time,'s_ip_num, > &nbsp; 'd_ip_num,'method,'asset_area_id,'device_id,'s_mac,'d_mac,'scope,'dcope,'s_asset_id,'d_asset_id,'asset_unit_id,'area_id,'unit_id,'enterprise_id) > &nbsp; &nbsp; tnv.registerFunction("ip_to_num",IPtoNum) > > &nbsp;在转成表时 如下错误 > log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig <http://logging.apache.org/log4j/1.2/faq.html#noconfig> for more info. > Exception in thread "main" org.apache.flink.table.api.ValidationException: Too many fields referenced from an atomic type. > at org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInfoFromAtomicType(FieldInfoUtils.java:388) > at org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInformation(FieldInfoUtils.java:259) > at org.apache.flink.table.typeutils.FieldInfoUtils.getFieldsInfo(FieldInfoUtils.java:227) > at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$$anonfun$1.apply(StreamTableEnvironmentImpl.scala:237) > at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$$anonfun$1.apply(StreamTableEnvironmentImpl.scala:236) > at scala.Option.map(Option.scala:146) > at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.asQueryOperation(StreamTableEnvironmentImpl.scala:236) > at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.fromDataStream(StreamTableEnvironmentImpl.scala:81) > at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.registerDataStream(StreamTableEnvironmentImpl.scala:94) > at com.jwell56.networksyslog.jobs.jobsource.MysqlLogToES$.main(MysqlLogToES.scala:77) > at com.jwell56.networksyslog.jobs.jobsource.MysqlLogToES.main(MysqlLogToES.scala) > > > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > 发件人:&nbsp;"Leonard Xu"<[hidden email] <mailto:[hidden email]>&gt;; > 发送时间:&nbsp;2020年6月10日(星期三) 中午1:16 > 收件人:&nbsp;"user-zh"<[hidden email] <mailto:[hidden email]>&gt;; > > 主题:&nbsp;Re: 关于flinksql between问题 > > > > Hi, > > 看你描述的想要的是自定义source(左表),&nbsp; 需要同一张mysql 维表做join,如果是这样的话,你的sql看起来有点问题,目前的写法是regular join, 维表join的语法[1]: > > SELECT > &nbsp; o.amout, o.currency, r.rate, o.amount * r.rate > FROM > &nbsp; Orders AS o > &nbsp; JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r > &nbsp; ON r.currency = o.currency > 另外JDBC connector是实现了LookupSource的,也就是支持做维表,维表的更新的 connector.lookup.cache.ttl 参数控制维表中cache的过期时间,不知道是否满足你的需求。 > > Best, > Leonard Xu > > [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins <https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins><https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins&gt <https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins&gt>; > [2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector <https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector&gt <https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector&gt>; > > &gt; 在 2020年6月10日,10:43,小屁孩 <[hidden email] <mailto:[hidden email]>&gt; 写道: > &gt; > &gt; hi,感谢指导 已经可以实现,可以再问一下 如果我join的是一个mysql维表 我是自定义的source定时更新mysql表这样跟流表关联是可以的吗? > &gt; > &gt; > &gt; > &gt; > &gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------ > &gt; 发件人:&amp;nbsp;"[hidden email] <mailto:[hidden email]>"<[hidden email] <mailto:[hidden email]>&amp;gt;; > &gt; 发送时间:&amp;nbsp;2020年6月9日(星期二) 晚上6:35 > &gt; 收件人:&amp;nbsp;"user-zh"<[hidden email] <mailto:[hidden email]>&amp;gt;; > &gt; > &gt; 主题:&amp;nbsp;回复: 回复: 关于flinksql between问题 > &gt; > &gt; > &gt; > &gt; > &gt; &amp;nbsp; 我在1.10中用 useBlinkPlanner是可以的,用useOldPlanner是不可以的! > &gt; &amp;nbsp; > &gt; 会报你下面的错误: > &gt; &amp;nbsp; Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: > &gt; > &gt; LogicalProject(num=[$0]) > &gt; &amp;nbsp; LogicalJoin(condition=[AND(&amp;gt;($0, $1), <($0, $2))], joinType=[inner]) > &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; FlinkLogicalDataStreamScan(id=[1], fields=[num]) > &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; FlinkLogicalDataStreamScan(id=[2], fields=[startNum, endNum]) > &gt; > &gt; This exception indicates that the query uses an unsupported SQL feature. > &gt; > &gt; > &gt; > &gt; > &gt; &amp;nbsp; > &gt; 发件人: 小屁孩 > &gt; 发送时间: 2020-06-09 17:41 > &gt; 收件人: user-zh > &gt; 主题: 回复: 关于flinksql between问题 > &gt; hi,我使用的是&amp;amp;nbsp; > &gt; 1 flink1.9.0 > &gt; 2 oldplanner > &gt; <dependency&amp;amp;gt; > &gt; <groupId&amp;amp;gt;org.apache.flink</groupId&amp;amp;gt; > &gt; <artifactId&amp;amp;gt;flink-table-api-scala_2.11</artifactId&amp;amp;gt; > &gt; <version&amp;amp;gt;1.9.0</version&amp;amp;gt; > &gt; </dependency&amp;amp;gt; > &gt; &amp;nbsp; > &gt; &amp;nbsp; > &gt; <dependency&amp;amp;gt; > &gt; <groupId&amp;amp;gt;org.apache.flink</groupId&amp;amp;gt; > &gt; <artifactId&amp;amp;gt;flink-table-planner_2.11</artifactId&amp;amp;gt; > &gt; <version&amp;amp;gt;1.9.0</version&amp;amp;gt; > &gt; </dependency&amp;amp;gt; > &gt; &amp;nbsp; > &gt; 3 streaming mode > &gt; 4. 代码类似如下 > &gt; &amp;amp;nbsp; &amp;amp;nbsp; val sqlStream = env.createInput(jdbcInput) > &gt; &amp;amp;nbsp; &amp;amp;nbsp; tnv.registerDataStream("sqlStream",sqlStream,'netstruct_id,'start_ip,'end_ip) > &gt; &amp;amp;nbsp; &amp;amp;nbsp; tnv.registerDataStream("OMstream",value,'ip) > &gt; //&amp;amp;nbsp; &amp;amp;nbsp; val table = tnv.sqlQuery("select * from&amp;amp;nbsp; OMstream as&amp;amp;nbsp; a left join sqlStream as&amp;amp;nbsp; b on a.ip &amp;amp;gt;b.start_ip and a.ip<b.end_ip") > &gt; &amp;amp;nbsp; &amp;amp;nbsp; val table = tnv.sqlQuery("select b.netstruct_id from&amp;amp;nbsp; OMstream as&amp;amp;nbsp; a left join sqlStream as b on a.ip &amp;amp;gt; b.start_ip and a.ip <b.end_ip ") > &gt; &amp;amp;nbsp; &amp;amp;nbsp; val resRow = table.toRetractStream[Row] > &gt; &amp;nbsp; > &gt; 5 报错信息如下 > &gt; Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query:&amp;amp;nbsp; > &gt; &amp;nbsp; > &gt; &amp;nbsp; > &gt; LogicalProject(netstruct_id=[$1]) > &gt; &amp;amp;nbsp; LogicalJoin(condition=[AND(&amp;amp;gt;($0, $2), <($0, $3))], joinType=[left]) > &gt; &amp;amp;nbsp; &amp;amp;nbsp; FlinkLogicalDataStreamScan(id=[1], fields=[ip]) > &gt; &amp;amp;nbsp; &amp;amp;nbsp; FlinkLogicalDataStreamScan(id=[2], fields=[netstruct_id, start_ip, end_ip]) > &gt; &amp;nbsp; > &gt; &amp;nbsp; > &gt; This exception indicates that the query uses an unsupported SQL feature. > &gt; Please check the documentation for the set of currently supported SQL features. > &gt; at org.apache.flink.table.plan.Optimizer.runVolcanoPlanner(Optimizer.scala:245) > &gt; at org.apache.flink.table.plan.Optimizer.optimizeLogicalPlan(Optimizer.scala:160) > &gt; at org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:66) > &gt; at org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410) > &gt; at org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:182) > &gt; at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127) > &gt; at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127) > &gt; at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > &gt; at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > &gt; at scala.collection.Iterator$class.foreach(Iterator.scala:891) > &gt; at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > &gt; at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > &gt; at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > &gt; at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > &gt; at scala.collection.AbstractTraversable.map(Traversable.scala:104) > &gt; at org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127) > &gt; at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:201) > &gt; at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:124) > &gt; at org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:146) > &gt; at com.jwell56.linkstarck.LInkStream$.main(LInkStream.scala:37) > &gt; at com.jwell56.linkstarck.LInkStream.main(LInkStream.scala) > &gt; &amp;nbsp; > &gt; &amp;nbsp; > &gt; &amp;nbsp; > &gt; &amp;nbsp; > &gt; &amp;nbsp; > &gt; 6 我也尝试使用了&amp;amp;nbsp; > &gt; select b.netstruct_id from&amp;amp;nbsp; OMstream as&amp;amp;nbsp; a left join sqlStream as b on a.ip &amp;amp;gt; b.start_ip > &gt; 同样是单个大小比较也是不可以的&amp;amp;nbsp; > &gt; &amp;nbsp; > &gt; &amp;nbsp; > &gt; 谢谢! > &gt; &amp;nbsp; > &gt; &amp;nbsp; > &gt; &amp;nbsp; > &gt; &amp;nbsp; > &gt; ------------------&amp;amp;nbsp;原始邮件&amp;amp;nbsp;------------------ > &gt; 发件人:&amp;amp;nbsp;"Benchao Li"<[hidden email] <mailto:[hidden email]>&amp;amp;gt;; > &gt; 发送时间:&amp;amp;nbsp;2020年6月9日(星期二) 下午4:37 > &gt; 收件人:&amp;amp;nbsp;"user-zh"<[hidden email] <mailto:[hidden email]>&amp;amp;gt;; > &gt; &amp;nbsp; > &gt; 主题:&amp;amp;nbsp;Re: 关于flinksql between问题 > &gt; &amp;nbsp; > &gt; &amp;nbsp; > &gt; &amp;nbsp; > &gt; 方便补充一下以下信息么? > &gt; 1. 你使用的Flink的版本? > &gt; 2. 使用的planner,是blink planner还是old planner? > &gt; 3. 用的是streaming mode还是batch mode? > &gt; 4. 具体的报错信息是什么? > &gt; &amp;nbsp; > &gt; 小屁孩 <[hidden email]&amp;amp;gt; 于2020年6月9日周二 下午4:26写道: > &gt; &amp;nbsp; > &gt; &amp;amp;gt; hi,我在flinksql中使用 select * from a join b on a.ip <b.startip and a.ip > &gt; &amp;amp;gt; &amp;amp;amp;gt;b.endip 报了一个错误 不支持此功能 类似这种有没有类似between的函数可以使用 |
Free forum by Nabble | Edit this page |