关于flinksql between问题

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

关于flinksql between问题

小屁孩
hi,我在flinksql中使用 select * from a join b on a.ip <b.startip and a.ip &gt;b.endip 报了一个错误 不支持此功能 类似这种有没有类似between的函数可以使用
Reply | Threaded
Open this post in threaded view
|

Re: 关于flinksql between问题

Benchao Li-2
方便补充一下以下信息么?
1. 你使用的Flink的版本?
2. 使用的planner,是blink planner还是old planner?
3. 用的是streaming mode还是batch mode?
4. 具体的报错信息是什么?

小屁孩 <[hidden email]> 于2020年6月9日周二 下午4:26写道:

> hi,我在flinksql中使用 select * from a join b on a.ip <b.startip and a.ip
> &gt;b.endip 报了一个错误 不支持此功能 类似这种有没有类似between的函数可以使用
Reply | Threaded
Open this post in threaded view
|

回复: 关于flinksql between问题

小屁孩
hi,我使用的是&nbsp;
1 flink1.9.0
2 oldplanner
                <dependency&gt;
                        <groupId&gt;org.apache.flink</groupId&gt;
                        <artifactId&gt;flink-table-api-scala_2.11</artifactId&gt;
                        <version&gt;1.9.0</version&gt;
                </dependency&gt;


                <dependency&gt;
                        <groupId&gt;org.apache.flink</groupId&gt;
                        <artifactId&gt;flink-table-planner_2.11</artifactId&gt;
                        <version&gt;1.9.0</version&gt;
                </dependency&gt;

3 streaming mode
4. 代码类似如下
&nbsp; &nbsp; val sqlStream = env.createInput(jdbcInput)
&nbsp; &nbsp; tnv.registerDataStream("sqlStream",sqlStream,'netstruct_id,'start_ip,'end_ip)
&nbsp; &nbsp; tnv.registerDataStream("OMstream",value,'ip)
//&nbsp; &nbsp; val table = tnv.sqlQuery("select * from&nbsp; OMstream as&nbsp; a left join sqlStream as&nbsp; b on a.ip &gt;b.start_ip and a.ip<b.end_ip")
&nbsp; &nbsp; val table = tnv.sqlQuery("select b.netstruct_id from&nbsp; OMstream as&nbsp; a left join sqlStream as b on a.ip &gt; b.start_ip and a.ip <b.end_ip ")
&nbsp; &nbsp; val resRow = table.toRetractStream[Row]

5 报错信息如下
Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query:&nbsp;


LogicalProject(netstruct_id=[$1])
&nbsp; LogicalJoin(condition=[AND(&gt;($0, $2), <($0, $3))], joinType=[left])
&nbsp; &nbsp; FlinkLogicalDataStreamScan(id=[1], fields=[ip])
&nbsp; &nbsp; FlinkLogicalDataStreamScan(id=[2], fields=[netstruct_id, start_ip, end_ip])


This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
        at org.apache.flink.table.plan.Optimizer.runVolcanoPlanner(Optimizer.scala:245)
        at org.apache.flink.table.plan.Optimizer.optimizeLogicalPlan(Optimizer.scala:160)
        at org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:66)
        at org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410)
        at org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:182)
        at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127)
        at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127)
        at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:201)
        at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:124)
        at org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:146)
        at com.jwell56.linkstarck.LInkStream$.main(LInkStream.scala:37)
        at com.jwell56.linkstarck.LInkStream.main(LInkStream.scala)





6 我也尝试使用了&nbsp;
select b.netstruct_id from&nbsp; OMstream as&nbsp; a left join sqlStream as b on a.ip &gt; b.start_ip
同样是单个大小比较也是不可以的&nbsp;


谢谢!




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"Benchao Li"<[hidden email]&gt;;
发送时间:&nbsp;2020年6月9日(星期二) 下午4:37
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: 关于flinksql between问题



方便补充一下以下信息么?
1. 你使用的Flink的版本?
2. 使用的planner,是blink planner还是old planner?
3. 用的是streaming mode还是batch mode?
4. 具体的报错信息是什么?

小屁孩 <[hidden email]&gt; 于2020年6月9日周二 下午4:26写道:

&gt; hi,我在flinksql中使用 select * from a join b on a.ip <b.startip and a.ip
&gt; &amp;gt;b.endip 报了一个错误 不支持此功能 类似这种有没有类似between的函数可以使用
Reply | Threaded
Open this post in threaded view
|

回复: 回复: 关于flinksql between问题

wangweiguang@stevegame.cn

  我在1.10中用 useBlinkPlanner是可以的,用useOldPlanner是不可以的!
 
会报你下面的错误:
  Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query:

LogicalProject(num=[$0])
  LogicalJoin(condition=[AND(>($0, $1), <($0, $2))], joinType=[inner])
    FlinkLogicalDataStreamScan(id=[1], fields=[num])
    FlinkLogicalDataStreamScan(id=[2], fields=[startNum, endNum])

This exception indicates that the query uses an unsupported SQL feature.




 
发件人: 小屁孩
发送时间: 2020-06-09 17:41
收件人: user-zh
主题: 回复: 关于flinksql between问题
hi,我使用的是&nbsp;
1 flink1.9.0
2 oldplanner
<dependency&gt;
<groupId&gt;org.apache.flink</groupId&gt;
<artifactId&gt;flink-table-api-scala_2.11</artifactId&gt;
<version&gt;1.9.0</version&gt;
</dependency&gt;
 
 
<dependency&gt;
<groupId&gt;org.apache.flink</groupId&gt;
<artifactId&gt;flink-table-planner_2.11</artifactId&gt;
<version&gt;1.9.0</version&gt;
</dependency&gt;
 
3 streaming mode
4. 代码类似如下
&nbsp; &nbsp; val sqlStream = env.createInput(jdbcInput)
&nbsp; &nbsp; tnv.registerDataStream("sqlStream",sqlStream,'netstruct_id,'start_ip,'end_ip)
&nbsp; &nbsp; tnv.registerDataStream("OMstream",value,'ip)
//&nbsp; &nbsp; val table = tnv.sqlQuery("select * from&nbsp; OMstream as&nbsp; a left join sqlStream as&nbsp; b on a.ip &gt;b.start_ip and a.ip<b.end_ip")
&nbsp; &nbsp; val table = tnv.sqlQuery("select b.netstruct_id from&nbsp; OMstream as&nbsp; a left join sqlStream as b on a.ip &gt; b.start_ip and a.ip <b.end_ip ")
&nbsp; &nbsp; val resRow = table.toRetractStream[Row]
 
5 报错信息如下
Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query:&nbsp;
 
 
LogicalProject(netstruct_id=[$1])
&nbsp; LogicalJoin(condition=[AND(&gt;($0, $2), <($0, $3))], joinType=[left])
&nbsp; &nbsp; FlinkLogicalDataStreamScan(id=[1], fields=[ip])
&nbsp; &nbsp; FlinkLogicalDataStreamScan(id=[2], fields=[netstruct_id, start_ip, end_ip])
 
 
This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
at org.apache.flink.table.plan.Optimizer.runVolcanoPlanner(Optimizer.scala:245)
at org.apache.flink.table.plan.Optimizer.optimizeLogicalPlan(Optimizer.scala:160)
at org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:66)
at org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410)
at org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:182)
at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127)
at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127)
at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:201)
at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:124)
at org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:146)
at com.jwell56.linkstarck.LInkStream$.main(LInkStream.scala:37)
at com.jwell56.linkstarck.LInkStream.main(LInkStream.scala)
 
 
 
 
 
6 我也尝试使用了&nbsp;
select b.netstruct_id from&nbsp; OMstream as&nbsp; a left join sqlStream as b on a.ip &gt; b.start_ip
同样是单个大小比较也是不可以的&nbsp;
 
 
谢谢!
 
 
 
 
------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"Benchao Li"<[hidden email]&gt;;
发送时间:&nbsp;2020年6月9日(星期二) 下午4:37
收件人:&nbsp;"user-zh"<[hidden email]&gt;;
 
主题:&nbsp;Re: 关于flinksql between问题
 
 
 
方便补充一下以下信息么?
1. 你使用的Flink的版本?
2. 使用的planner,是blink planner还是old planner?
3. 用的是streaming mode还是batch mode?
4. 具体的报错信息是什么?
 
小屁孩 <[hidden email]&gt; 于2020年6月9日周二 下午4:26写道:
 
&gt; hi,我在flinksql中使用 select * from a join b on a.ip <b.startip and a.ip
&gt; &amp;gt;b.endip 报了一个错误 不支持此功能 类似这种有没有类似between的函数可以使用
Reply | Threaded
Open this post in threaded view
|

回复: 回复: 关于flinksql between问题

小屁孩
hi,感谢指导 已经可以实现,可以再问一下 如果我join的是一个mysql维表 我是自定义的source定时更新mysql表这样跟流表关联是可以的吗?




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"[hidden email]"<[hidden email]&gt;;
发送时间:&nbsp;2020年6月9日(星期二) 晚上6:35
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;回复: 回复: 关于flinksql between问题




&nbsp; 我在1.10中用 useBlinkPlanner是可以的,用useOldPlanner是不可以的!
&nbsp;
会报你下面的错误:
&nbsp; Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query:

LogicalProject(num=[$0])
&nbsp; LogicalJoin(condition=[AND(&gt;($0, $1), <($0, $2))], joinType=[inner])
&nbsp;&nbsp;&nbsp; FlinkLogicalDataStreamScan(id=[1], fields=[num])
&nbsp;&nbsp;&nbsp; FlinkLogicalDataStreamScan(id=[2], fields=[startNum, endNum])

This exception indicates that the query uses an unsupported SQL feature.




&nbsp;
发件人: 小屁孩
发送时间: 2020-06-09 17:41
收件人: user-zh
主题: 回复: 关于flinksql between问题
hi,我使用的是&amp;nbsp;
1 flink1.9.0
2 oldplanner
<dependency&amp;gt;
<groupId&amp;gt;org.apache.flink</groupId&amp;gt;
<artifactId&amp;gt;flink-table-api-scala_2.11</artifactId&amp;gt;
<version&amp;gt;1.9.0</version&amp;gt;
</dependency&amp;gt;
&nbsp;
&nbsp;
<dependency&amp;gt;
<groupId&amp;gt;org.apache.flink</groupId&amp;gt;
<artifactId&amp;gt;flink-table-planner_2.11</artifactId&amp;gt;
<version&amp;gt;1.9.0</version&amp;gt;
</dependency&amp;gt;
&nbsp;
3 streaming mode
4. 代码类似如下
&amp;nbsp; &amp;nbsp; val sqlStream = env.createInput(jdbcInput)
&amp;nbsp; &amp;nbsp; tnv.registerDataStream("sqlStream",sqlStream,'netstruct_id,'start_ip,'end_ip)
&amp;nbsp; &amp;nbsp; tnv.registerDataStream("OMstream",value,'ip)
//&amp;nbsp; &amp;nbsp; val table = tnv.sqlQuery("select * from&amp;nbsp; OMstream as&amp;nbsp; a left join sqlStream as&amp;nbsp; b on a.ip &amp;gt;b.start_ip and a.ip<b.end_ip")
&amp;nbsp; &amp;nbsp; val table = tnv.sqlQuery("select b.netstruct_id from&amp;nbsp; OMstream as&amp;nbsp; a left join sqlStream as b on a.ip &amp;gt; b.start_ip and a.ip <b.end_ip ")
&amp;nbsp; &amp;nbsp; val resRow = table.toRetractStream[Row]
&nbsp;
5 报错信息如下
Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query:&amp;nbsp;
&nbsp;
&nbsp;
LogicalProject(netstruct_id=[$1])
&amp;nbsp; LogicalJoin(condition=[AND(&amp;gt;($0, $2), <($0, $3))], joinType=[left])
&amp;nbsp; &amp;nbsp; FlinkLogicalDataStreamScan(id=[1], fields=[ip])
&amp;nbsp; &amp;nbsp; FlinkLogicalDataStreamScan(id=[2], fields=[netstruct_id, start_ip, end_ip])
&nbsp;
&nbsp;
This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
at org.apache.flink.table.plan.Optimizer.runVolcanoPlanner(Optimizer.scala:245)
at org.apache.flink.table.plan.Optimizer.optimizeLogicalPlan(Optimizer.scala:160)
at org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:66)
at org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410)
at org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:182)
at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127)
at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127)
at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:201)
at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:124)
at org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:146)
at com.jwell56.linkstarck.LInkStream$.main(LInkStream.scala:37)
at com.jwell56.linkstarck.LInkStream.main(LInkStream.scala)
&nbsp;
&nbsp;
&nbsp;
&nbsp;
&nbsp;
6 我也尝试使用了&amp;nbsp;
select b.netstruct_id from&amp;nbsp; OMstream as&amp;nbsp; a left join sqlStream as b on a.ip &amp;gt; b.start_ip
同样是单个大小比较也是不可以的&amp;nbsp;
&nbsp;
&nbsp;
谢谢!
&nbsp;
&nbsp;
&nbsp;
&nbsp;
------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
发件人:&amp;nbsp;"Benchao Li"<[hidden email]&amp;gt;;
发送时间:&amp;nbsp;2020年6月9日(星期二) 下午4:37
收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
&nbsp;
主题:&amp;nbsp;Re: 关于flinksql between问题
&nbsp;
&nbsp;
&nbsp;
方便补充一下以下信息么?
1. 你使用的Flink的版本?
2. 使用的planner,是blink planner还是old planner?
3. 用的是streaming mode还是batch mode?
4. 具体的报错信息是什么?
&nbsp;
小屁孩 <[hidden email]&amp;gt; 于2020年6月9日周二 下午4:26写道:
&nbsp;
&amp;gt; hi,我在flinksql中使用 select * from a join b on a.ip <b.startip and a.ip
&amp;gt; &amp;amp;gt;b.endip 报了一个错误 不支持此功能 类似这种有没有类似between的函数可以使用
Reply | Threaded
Open this post in threaded view
|

Re: 回复: 关于flinksql between问题

Benchao Li-2
你的意思是你的mysql维表是自定义的,然后是定期更新的维表内容是么?只要你实现的是LookupSource,应该是没问题的。
内部实现你可以自己控制。

小屁孩 <[hidden email]> 于2020年6月10日周三 上午10:46写道:

> hi,感谢指导 已经可以实现,可以再问一下 如果我join的是一个mysql维表
> 我是自定义的source定时更新mysql表这样跟流表关联是可以的吗?
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"[hidden email]"<[hidden email]&gt;;
> 发送时间:&nbsp;2020年6月9日(星期二) 晚上6:35
> 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
>
> 主题:&nbsp;回复: 回复: 关于flinksql between问题
>
>
>
>
> &nbsp; 我在1.10中用 useBlinkPlanner是可以的,用useOldPlanner是不可以的!
> &nbsp;
> 会报你下面的错误:
> &nbsp; Exception in thread "main"
> org.apache.flink.table.api.TableException: Cannot generate a valid
> execution plan for the given query:
>
> LogicalProject(num=[$0])
> &nbsp; LogicalJoin(condition=[AND(&gt;($0, $1), <($0, $2))],
> joinType=[inner])
> &nbsp;&nbsp;&nbsp; FlinkLogicalDataStreamScan(id=[1], fields=[num])
> &nbsp;&nbsp;&nbsp; FlinkLogicalDataStreamScan(id=[2], fields=[startNum,
> endNum])
>
> This exception indicates that the query uses an unsupported SQL feature.
>
>
>
>
> &nbsp;
> 发件人: 小屁孩
> 发送时间: 2020-06-09 17:41
> 收件人: user-zh
> 主题: 回复: 关于flinksql between问题
> hi,我使用的是&amp;nbsp;
> 1 flink1.9.0
> 2 oldplanner
> <dependency&amp;gt;
> <groupId&amp;gt;org.apache.flink</groupId&amp;gt;
> <artifactId&amp;gt;flink-table-api-scala_2.11</artifactId&amp;gt;
> <version&amp;gt;1.9.0</version&amp;gt;
> </dependency&amp;gt;
> &nbsp;
> &nbsp;
> <dependency&amp;gt;
> <groupId&amp;gt;org.apache.flink</groupId&amp;gt;
> <artifactId&amp;gt;flink-table-planner_2.11</artifactId&amp;gt;
> <version&amp;gt;1.9.0</version&amp;gt;
> </dependency&amp;gt;
> &nbsp;
> 3 streaming mode
> 4. 代码类似如下
> &amp;nbsp; &amp;nbsp; val sqlStream = env.createInput(jdbcInput)
> &amp;nbsp; &amp;nbsp;
> tnv.registerDataStream("sqlStream",sqlStream,'netstruct_id,'start_ip,'end_ip)
> &amp;nbsp; &amp;nbsp; tnv.registerDataStream("OMstream",value,'ip)
> //&amp;nbsp; &amp;nbsp; val table = tnv.sqlQuery("select * from&amp;nbsp;
> OMstream as&amp;nbsp; a left join sqlStream as&amp;nbsp; b on a.ip
> &amp;gt;b.start_ip and a.ip<b.end_ip")
> &amp;nbsp; &amp;nbsp; val table = tnv.sqlQuery("select b.netstruct_id
> from&amp;nbsp; OMstream as&amp;nbsp; a left join sqlStream as b on a.ip
> &amp;gt; b.start_ip and a.ip <b.end_ip ")
> &amp;nbsp; &amp;nbsp; val resRow = table.toRetractStream[Row]
> &nbsp;
> 5 报错信息如下
> Exception in thread "main" org.apache.flink.table.api.TableException:
> Cannot generate a valid execution plan for the given query:&amp;nbsp;
> &nbsp;
> &nbsp;
> LogicalProject(netstruct_id=[$1])
> &amp;nbsp; LogicalJoin(condition=[AND(&amp;gt;($0, $2), <($0, $3))],
> joinType=[left])
> &amp;nbsp; &amp;nbsp; FlinkLogicalDataStreamScan(id=[1], fields=[ip])
> &amp;nbsp; &amp;nbsp; FlinkLogicalDataStreamScan(id=[2],
> fields=[netstruct_id, start_ip, end_ip])
> &nbsp;
> &nbsp;
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL
> features.
> at
> org.apache.flink.table.plan.Optimizer.runVolcanoPlanner(Optimizer.scala:245)
> at
> org.apache.flink.table.plan.Optimizer.optimizeLogicalPlan(Optimizer.scala:160)
> at
> org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:66)
> at
> org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410)
> at org.apache.flink.table.planner.StreamPlanner.org
> $apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:182)
> at
> org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127)
> at
> org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127)
> at
> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:201)
> at
> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:124)
> at
> org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:146)
> at com.jwell56.linkstarck.LInkStream$.main(LInkStream.scala:37)
> at com.jwell56.linkstarck.LInkStream.main(LInkStream.scala)
> &nbsp;
> &nbsp;
> &nbsp;
> &nbsp;
> &nbsp;
> 6 我也尝试使用了&amp;nbsp;
> select b.netstruct_id from&amp;nbsp; OMstream as&amp;nbsp; a left join
> sqlStream as b on a.ip &amp;gt; b.start_ip
> 同样是单个大小比较也是不可以的&amp;nbsp;
> &nbsp;
> &nbsp;
> 谢谢!
> &nbsp;
> &nbsp;
> &nbsp;
> &nbsp;
> ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
> 发件人:&amp;nbsp;"Benchao Li"<[hidden email]&amp;gt;;
> 发送时间:&amp;nbsp;2020年6月9日(星期二) 下午4:37
> 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
> &nbsp;
> 主题:&amp;nbsp;Re: 关于flinksql between问题
> &nbsp;
> &nbsp;
> &nbsp;
> 方便补充一下以下信息么?
> 1. 你使用的Flink的版本?
> 2. 使用的planner,是blink planner还是old planner?
> 3. 用的是streaming mode还是batch mode?
> 4. 具体的报错信息是什么?
> &nbsp;
> 小屁孩 <[hidden email]&amp;gt; 于2020年6月9日周二 下午4:26写道:
> &nbsp;
> &amp;gt; hi,我在flinksql中使用 select * from a join b on a.ip <b.startip and
> a.ip
> &amp;gt; &amp;amp;gt;b.endip 报了一个错误 不支持此功能 类似这种有没有类似between的函数可以使用
Reply | Threaded
Open this post in threaded view
|

Re: 关于flinksql between问题

Leonard Xu
In reply to this post by 小屁孩
Hi,

看你描述的想要的是自定义source(左表),  需要同一张mysql 维表做join,如果是这样的话,你的sql看起来有点问题,目前的写法是regular join, 维表join的语法[1]:

SELECT
  o.amout, o.currency, r.rate, o.amount * r.rate
FROM
  Orders AS o
  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
  ON r.currency = o.currency
另外JDBC connector是实现了LookupSource的,也就是支持做维表,维表的更新的 connector.lookup.cache.ttl 参数控制维表中cache的过期时间,不知道是否满足你的需求。

Best,
Leonard Xu

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins <https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins>
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector <https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector>

> 在 2020年6月10日,10:43,小屁孩 <[hidden email]> 写道:
>
> hi,感谢指导 已经可以实现,可以再问一下 如果我join的是一个mysql维表 我是自定义的source定时更新mysql表这样跟流表关联是可以的吗?
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"[hidden email]"<[hidden email]&gt;;
> 发送时间:&nbsp;2020年6月9日(星期二) 晚上6:35
> 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
>
> 主题:&nbsp;回复: 回复: 关于flinksql between问题
>
>
>
>
> &nbsp; 我在1.10中用 useBlinkPlanner是可以的,用useOldPlanner是不可以的!
> &nbsp;
> 会报你下面的错误:
> &nbsp; Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query:
>
> LogicalProject(num=[$0])
> &nbsp; LogicalJoin(condition=[AND(&gt;($0, $1), <($0, $2))], joinType=[inner])
> &nbsp;&nbsp;&nbsp; FlinkLogicalDataStreamScan(id=[1], fields=[num])
> &nbsp;&nbsp;&nbsp; FlinkLogicalDataStreamScan(id=[2], fields=[startNum, endNum])
>
> This exception indicates that the query uses an unsupported SQL feature.
>
>
>
>
> &nbsp;
> 发件人: 小屁孩
> 发送时间: 2020-06-09 17:41
> 收件人: user-zh
> 主题: 回复: 关于flinksql between问题
> hi,我使用的是&amp;nbsp;
> 1 flink1.9.0
> 2 oldplanner
> <dependency&amp;gt;
> <groupId&amp;gt;org.apache.flink</groupId&amp;gt;
> <artifactId&amp;gt;flink-table-api-scala_2.11</artifactId&amp;gt;
> <version&amp;gt;1.9.0</version&amp;gt;
> </dependency&amp;gt;
> &nbsp;
> &nbsp;
> <dependency&amp;gt;
> <groupId&amp;gt;org.apache.flink</groupId&amp;gt;
> <artifactId&amp;gt;flink-table-planner_2.11</artifactId&amp;gt;
> <version&amp;gt;1.9.0</version&amp;gt;
> </dependency&amp;gt;
> &nbsp;
> 3 streaming mode
> 4. 代码类似如下
> &amp;nbsp; &amp;nbsp; val sqlStream = env.createInput(jdbcInput)
> &amp;nbsp; &amp;nbsp; tnv.registerDataStream("sqlStream",sqlStream,'netstruct_id,'start_ip,'end_ip)
> &amp;nbsp; &amp;nbsp; tnv.registerDataStream("OMstream",value,'ip)
> //&amp;nbsp; &amp;nbsp; val table = tnv.sqlQuery("select * from&amp;nbsp; OMstream as&amp;nbsp; a left join sqlStream as&amp;nbsp; b on a.ip &amp;gt;b.start_ip and a.ip<b.end_ip")
> &amp;nbsp; &amp;nbsp; val table = tnv.sqlQuery("select b.netstruct_id from&amp;nbsp; OMstream as&amp;nbsp; a left join sqlStream as b on a.ip &amp;gt; b.start_ip and a.ip <b.end_ip ")
> &amp;nbsp; &amp;nbsp; val resRow = table.toRetractStream[Row]
> &nbsp;
> 5 报错信息如下
> Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query:&amp;nbsp;
> &nbsp;
> &nbsp;
> LogicalProject(netstruct_id=[$1])
> &amp;nbsp; LogicalJoin(condition=[AND(&amp;gt;($0, $2), <($0, $3))], joinType=[left])
> &amp;nbsp; &amp;nbsp; FlinkLogicalDataStreamScan(id=[1], fields=[ip])
> &amp;nbsp; &amp;nbsp; FlinkLogicalDataStreamScan(id=[2], fields=[netstruct_id, start_ip, end_ip])
> &nbsp;
> &nbsp;
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL features.
> at org.apache.flink.table.plan.Optimizer.runVolcanoPlanner(Optimizer.scala:245)
> at org.apache.flink.table.plan.Optimizer.optimizeLogicalPlan(Optimizer.scala:160)
> at org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:66)
> at org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410)
> at org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:182)
> at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127)
> at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127)
> at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:201)
> at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:124)
> at org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:146)
> at com.jwell56.linkstarck.LInkStream$.main(LInkStream.scala:37)
> at com.jwell56.linkstarck.LInkStream.main(LInkStream.scala)
> &nbsp;
> &nbsp;
> &nbsp;
> &nbsp;
> &nbsp;
> 6 我也尝试使用了&amp;nbsp;
> select b.netstruct_id from&amp;nbsp; OMstream as&amp;nbsp; a left join sqlStream as b on a.ip &amp;gt; b.start_ip
> 同样是单个大小比较也是不可以的&amp;nbsp;
> &nbsp;
> &nbsp;
> 谢谢!
> &nbsp;
> &nbsp;
> &nbsp;
> &nbsp;
> ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
> 发件人:&amp;nbsp;"Benchao Li"<[hidden email]&amp;gt;;
> 发送时间:&amp;nbsp;2020年6月9日(星期二) 下午4:37
> 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
> &nbsp;
> 主题:&amp;nbsp;Re: 关于flinksql between问题
> &nbsp;
> &nbsp;
> &nbsp;
> 方便补充一下以下信息么?
> 1. 你使用的Flink的版本?
> 2. 使用的planner,是blink planner还是old planner?
> 3. 用的是streaming mode还是batch mode?
> 4. 具体的报错信息是什么?
> &nbsp;
> 小屁孩 <[hidden email]&amp;gt; 于2020年6月9日周二 下午4:26写道:
> &nbsp;
> &amp;gt; hi,我在flinksql中使用 select * from a join b on a.ip <b.startip and a.ip
> &amp;gt; &amp;amp;gt;b.endip 报了一个错误 不支持此功能 类似这种有没有类似between的函数可以使用

Reply | Threaded
Open this post in threaded view
|

回复: 关于flinksql between问题

小屁孩
非常感谢,使用的flink1.10.0 中流转成表是是否有字段的数目的限制 我把流转成表 我的流是一个封装的实体类
tnv.registerDataStream("sqlStream",mysqlinst,'start_ip,'end_ip,'area_id,'unit_id,'device_id)
&nbsp; &nbsp; tnv.registerDataStream("OMstream",value,'original_network_id,'asset_id,'types,'d_ip,'d_port,'s_ip,'s_port,'devip,'url,'common_des,'operation_des,'raw_log,'severity,'happen_time,'create_time,'s_ip_num,
&nbsp; 'd_ip_num,'method,'asset_area_id,'device_id,'s_mac,'d_mac,'scope,'dcope,'s_asset_id,'d_asset_id,'asset_unit_id,'area_id,'unit_id,'enterprise_id)
&nbsp; &nbsp; tnv.registerFunction("ip_to_num",IPtoNum)

&nbsp;在转成表时 如下错误
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread "main" org.apache.flink.table.api.ValidationException: Too many fields referenced from an atomic type.
        at org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInfoFromAtomicType(FieldInfoUtils.java:388)
        at org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInformation(FieldInfoUtils.java:259)
        at org.apache.flink.table.typeutils.FieldInfoUtils.getFieldsInfo(FieldInfoUtils.java:227)
        at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$$anonfun$1.apply(StreamTableEnvironmentImpl.scala:237)
        at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$$anonfun$1.apply(StreamTableEnvironmentImpl.scala:236)
        at scala.Option.map(Option.scala:146)
        at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.asQueryOperation(StreamTableEnvironmentImpl.scala:236)
        at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.fromDataStream(StreamTableEnvironmentImpl.scala:81)
        at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.registerDataStream(StreamTableEnvironmentImpl.scala:94)
        at com.jwell56.networksyslog.jobs.jobsource.MysqlLogToES$.main(MysqlLogToES.scala:77)
        at com.jwell56.networksyslog.jobs.jobsource.MysqlLogToES.main(MysqlLogToES.scala)





------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"Leonard Xu"<[hidden email]&gt;;
发送时间:&nbsp;2020年6月10日(星期三) 中午1:16
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: 关于flinksql between问题



Hi,

看你描述的想要的是自定义source(左表),&nbsp; 需要同一张mysql 维表做join,如果是这样的话,你的sql看起来有点问题,目前的写法是regular join, 维表join的语法[1]:

SELECT
&nbsp; o.amout, o.currency, r.rate, o.amount * r.rate
FROM
&nbsp; Orders AS o
&nbsp; JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
&nbsp; ON r.currency = o.currency
另外JDBC connector是实现了LookupSource的,也就是支持做维表,维表的更新的 connector.lookup.cache.ttl 参数控制维表中cache的过期时间,不知道是否满足你的需求。

Best,
Leonard Xu

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins <https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins&gt;
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector <https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector&gt;

&gt; 在 2020年6月10日,10:43,小屁孩 <[hidden email]&gt; 写道:
&gt;
&gt; hi,感谢指导 已经可以实现,可以再问一下 如果我join的是一个mysql维表 我是自定义的source定时更新mysql表这样跟流表关联是可以的吗?
&gt;
&gt;
&gt;
&gt;
&gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
&gt; 发件人:&amp;nbsp;"[hidden email]"<[hidden email]&amp;gt;;
&gt; 发送时间:&amp;nbsp;2020年6月9日(星期二) 晚上6:35
&gt; 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
&gt;
&gt; 主题:&amp;nbsp;回复: 回复: 关于flinksql between问题
&gt;
&gt;
&gt;
&gt;
&gt; &amp;nbsp; 我在1.10中用 useBlinkPlanner是可以的,用useOldPlanner是不可以的!
&gt; &amp;nbsp;
&gt; 会报你下面的错误:
&gt; &amp;nbsp; Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query:
&gt;
&gt; LogicalProject(num=[$0])
&gt; &amp;nbsp; LogicalJoin(condition=[AND(&amp;gt;($0, $1), <($0, $2))], joinType=[inner])
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; FlinkLogicalDataStreamScan(id=[1], fields=[num])
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; FlinkLogicalDataStreamScan(id=[2], fields=[startNum, endNum])
&gt;
&gt; This exception indicates that the query uses an unsupported SQL feature.
&gt;
&gt;
&gt;
&gt;
&gt; &amp;nbsp;
&gt; 发件人: 小屁孩
&gt; 发送时间: 2020-06-09 17:41
&gt; 收件人: user-zh
&gt; 主题: 回复: 关于flinksql between问题
&gt; hi,我使用的是&amp;amp;nbsp;
&gt; 1 flink1.9.0
&gt; 2 oldplanner
&gt; <dependency&amp;amp;gt;
&gt; <groupId&amp;amp;gt;org.apache.flink</groupId&amp;amp;gt;
&gt; <artifactId&amp;amp;gt;flink-table-api-scala_2.11</artifactId&amp;amp;gt;
&gt; <version&amp;amp;gt;1.9.0</version&amp;amp;gt;
&gt; </dependency&amp;amp;gt;
&gt; &amp;nbsp;
&gt; &amp;nbsp;
&gt; <dependency&amp;amp;gt;
&gt; <groupId&amp;amp;gt;org.apache.flink</groupId&amp;amp;gt;
&gt; <artifactId&amp;amp;gt;flink-table-planner_2.11</artifactId&amp;amp;gt;
&gt; <version&amp;amp;gt;1.9.0</version&amp;amp;gt;
&gt; </dependency&amp;amp;gt;
&gt; &amp;nbsp;
&gt; 3 streaming mode
&gt; 4. 代码类似如下
&gt; &amp;amp;nbsp; &amp;amp;nbsp; val sqlStream = env.createInput(jdbcInput)
&gt; &amp;amp;nbsp; &amp;amp;nbsp; tnv.registerDataStream("sqlStream",sqlStream,'netstruct_id,'start_ip,'end_ip)
&gt; &amp;amp;nbsp; &amp;amp;nbsp; tnv.registerDataStream("OMstream",value,'ip)
&gt; //&amp;amp;nbsp; &amp;amp;nbsp; val table = tnv.sqlQuery("select * from&amp;amp;nbsp; OMstream as&amp;amp;nbsp; a left join sqlStream as&amp;amp;nbsp; b on a.ip &amp;amp;gt;b.start_ip and a.ip<b.end_ip")
&gt; &amp;amp;nbsp; &amp;amp;nbsp; val table = tnv.sqlQuery("select b.netstruct_id from&amp;amp;nbsp; OMstream as&amp;amp;nbsp; a left join sqlStream as b on a.ip &amp;amp;gt; b.start_ip and a.ip <b.end_ip ")
&gt; &amp;amp;nbsp; &amp;amp;nbsp; val resRow = table.toRetractStream[Row]
&gt; &amp;nbsp;
&gt; 5 报错信息如下
&gt; Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query:&amp;amp;nbsp;
&gt; &amp;nbsp;
&gt; &amp;nbsp;
&gt; LogicalProject(netstruct_id=[$1])
&gt; &amp;amp;nbsp; LogicalJoin(condition=[AND(&amp;amp;gt;($0, $2), <($0, $3))], joinType=[left])
&gt; &amp;amp;nbsp; &amp;amp;nbsp; FlinkLogicalDataStreamScan(id=[1], fields=[ip])
&gt; &amp;amp;nbsp; &amp;amp;nbsp; FlinkLogicalDataStreamScan(id=[2], fields=[netstruct_id, start_ip, end_ip])
&gt; &amp;nbsp;
&gt; &amp;nbsp;
&gt; This exception indicates that the query uses an unsupported SQL feature.
&gt; Please check the documentation for the set of currently supported SQL features.
&gt; at org.apache.flink.table.plan.Optimizer.runVolcanoPlanner(Optimizer.scala:245)
&gt; at org.apache.flink.table.plan.Optimizer.optimizeLogicalPlan(Optimizer.scala:160)
&gt; at org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:66)
&gt; at org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410)
&gt; at org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:182)
&gt; at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127)
&gt; at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127)
&gt; at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
&gt; at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
&gt; at scala.collection.Iterator$class.foreach(Iterator.scala:891)
&gt; at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
&gt; at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
&gt; at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
&gt; at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
&gt; at scala.collection.AbstractTraversable.map(Traversable.scala:104)
&gt; at org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127)
&gt; at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:201)
&gt; at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:124)
&gt; at org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:146)
&gt; at com.jwell56.linkstarck.LInkStream$.main(LInkStream.scala:37)
&gt; at com.jwell56.linkstarck.LInkStream.main(LInkStream.scala)
&gt; &amp;nbsp;
&gt; &amp;nbsp;
&gt; &amp;nbsp;
&gt; &amp;nbsp;
&gt; &amp;nbsp;
&gt; 6 我也尝试使用了&amp;amp;nbsp;
&gt; select b.netstruct_id from&amp;amp;nbsp; OMstream as&amp;amp;nbsp; a left join sqlStream as b on a.ip &amp;amp;gt; b.start_ip
&gt; 同样是单个大小比较也是不可以的&amp;amp;nbsp;
&gt; &amp;nbsp;
&gt; &amp;nbsp;
&gt; 谢谢!
&gt; &amp;nbsp;
&gt; &amp;nbsp;
&gt; &amp;nbsp;
&gt; &amp;nbsp;
&gt; ------------------&amp;amp;nbsp;原始邮件&amp;amp;nbsp;------------------
&gt; 发件人:&amp;amp;nbsp;"Benchao Li"<[hidden email]&amp;amp;gt;;
&gt; 发送时间:&amp;amp;nbsp;2020年6月9日(星期二) 下午4:37
&gt; 收件人:&amp;amp;nbsp;"user-zh"<[hidden email]&amp;amp;gt;;
&gt; &amp;nbsp;
&gt; 主题:&amp;amp;nbsp;Re: 关于flinksql between问题
&gt; &amp;nbsp;
&gt; &amp;nbsp;
&gt; &amp;nbsp;
&gt; 方便补充一下以下信息么?
&gt; 1. 你使用的Flink的版本?
&gt; 2. 使用的planner,是blink planner还是old planner?
&gt; 3. 用的是streaming mode还是batch mode?
&gt; 4. 具体的报错信息是什么?
&gt; &amp;nbsp;
&gt; 小屁孩 <[hidden email]&amp;amp;gt; 于2020年6月9日周二 下午4:26写道:
&gt; &amp;nbsp;
&gt; &amp;amp;gt; hi,我在flinksql中使用 select * from a join b on a.ip <b.startip and a.ip
&gt; &amp;amp;gt; &amp;amp;amp;gt;b.endip 报了一个错误 不支持此功能 类似这种有没有类似between的函数可以使用
Reply | Threaded
Open this post in threaded view
|

回复: 回复: 关于flinksql between问题

wangweiguang@stevegame.cn

  应该是你的value或者mysqlinst值中数据字段和你定义的字段不一致造成的!


 
发件人: 小屁孩
发送时间: 2020-06-10 15:25
收件人: user-zh
主题: 回复: 关于flinksql between问题
非常感谢,使用的flink1.10.0 中流转成表是是否有字段的数目的限制 我把流转成表 我的流是一个封装的实体类
tnv.registerDataStream("sqlStream",mysqlinst,'start_ip,'end_ip,'area_id,'unit_id,'device_id)
&nbsp; &nbsp; tnv.registerDataStream("OMstream",value,'original_network_id,'asset_id,'types,'d_ip,'d_port,'s_ip,'s_port,'devip,'url,'common_des,'operation_des,'raw_log,'severity,'happen_time,'create_time,'s_ip_num,
&nbsp; 'd_ip_num,'method,'asset_area_id,'device_id,'s_mac,'d_mac,'scope,'dcope,'s_asset_id,'d_asset_id,'asset_unit_id,'area_id,'unit_id,'enterprise_id)
&nbsp; &nbsp; tnv.registerFunction("ip_to_num",IPtoNum)
 
&nbsp;在转成表时 如下错误
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread "main" org.apache.flink.table.api.ValidationException: Too many fields referenced from an atomic type.
at org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInfoFromAtomicType(FieldInfoUtils.java:388)
at org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInformation(FieldInfoUtils.java:259)
at org.apache.flink.table.typeutils.FieldInfoUtils.getFieldsInfo(FieldInfoUtils.java:227)
at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$$anonfun$1.apply(StreamTableEnvironmentImpl.scala:237)
at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$$anonfun$1.apply(StreamTableEnvironmentImpl.scala:236)
at scala.Option.map(Option.scala:146)
at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.asQueryOperation(StreamTableEnvironmentImpl.scala:236)
at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.fromDataStream(StreamTableEnvironmentImpl.scala:81)
at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.registerDataStream(StreamTableEnvironmentImpl.scala:94)
at com.jwell56.networksyslog.jobs.jobsource.MysqlLogToES$.main(MysqlLogToES.scala:77)
at com.jwell56.networksyslog.jobs.jobsource.MysqlLogToES.main(MysqlLogToES.scala)
 
 
 
 
 
------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"Leonard Xu"<[hidden email]&gt;;
发送时间:&nbsp;2020年6月10日(星期三) 中午1:16
收件人:&nbsp;"user-zh"<[hidden email]&gt;;
 
主题:&nbsp;Re: 关于flinksql between问题
 
 
 
Hi,
 
看你描述的想要的是自定义source(左表),&nbsp; 需要同一张mysql 维表做join,如果是这样的话,你的sql看起来有点问题,目前的写法是regular join, 维表join的语法[1]:
 
SELECT
&nbsp; o.amout, o.currency, r.rate, o.amount * r.rate
FROM
&nbsp; Orders AS o
&nbsp; JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
&nbsp; ON r.currency = o.currency
另外JDBC connector是实现了LookupSource的,也就是支持做维表,维表的更新的 connector.lookup.cache.ttl 参数控制维表中cache的过期时间,不知道是否满足你的需求。
 
Best,
Leonard Xu
 
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins <https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins&gt;
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector <https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector&gt;
 
&gt; 在 2020年6月10日,10:43,小屁孩 <[hidden email]&gt; 写道:
&gt;
&gt; hi,感谢指导 已经可以实现,可以再问一下 如果我join的是一个mysql维表 我是自定义的source定时更新mysql表这样跟流表关联是可以的吗?
&gt;
&gt;
&gt;
&gt;
&gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
&gt; 发件人:&amp;nbsp;"[hidden email]"<[hidden email]&amp;gt;;
&gt; 发送时间:&amp;nbsp;2020年6月9日(星期二) 晚上6:35
&gt; 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
&gt;
&gt; 主题:&amp;nbsp;回复: 回复: 关于flinksql between问题
&gt;
&gt;
&gt;
&gt;
&gt; &amp;nbsp; 我在1.10中用 useBlinkPlanner是可以的,用useOldPlanner是不可以的!
&gt; &amp;nbsp;
&gt; 会报你下面的错误:
&gt; &amp;nbsp; Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query:
&gt;
&gt; LogicalProject(num=[$0])
&gt; &amp;nbsp; LogicalJoin(condition=[AND(&amp;gt;($0, $1), <($0, $2))], joinType=[inner])
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; FlinkLogicalDataStreamScan(id=[1], fields=[num])
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; FlinkLogicalDataStreamScan(id=[2], fields=[startNum, endNum])
&gt;
&gt; This exception indicates that the query uses an unsupported SQL feature.
&gt;
&gt;
&gt;
&gt;
&gt; &amp;nbsp;
&gt; 发件人: 小屁孩
&gt; 发送时间: 2020-06-09 17:41
&gt; 收件人: user-zh
&gt; 主题: 回复: 关于flinksql between问题
&gt; hi,我使用的是&amp;amp;nbsp;
&gt; 1 flink1.9.0
&gt; 2 oldplanner
&gt; <dependency&amp;amp;gt;
&gt; <groupId&amp;amp;gt;org.apache.flink</groupId&amp;amp;gt;
&gt; <artifactId&amp;amp;gt;flink-table-api-scala_2.11</artifactId&amp;amp;gt;
&gt; <version&amp;amp;gt;1.9.0</version&amp;amp;gt;
&gt; </dependency&amp;amp;gt;
&gt; &amp;nbsp;
&gt; &amp;nbsp;
&gt; <dependency&amp;amp;gt;
&gt; <groupId&amp;amp;gt;org.apache.flink</groupId&amp;amp;gt;
&gt; <artifactId&amp;amp;gt;flink-table-planner_2.11</artifactId&amp;amp;gt;
&gt; <version&amp;amp;gt;1.9.0</version&amp;amp;gt;
&gt; </dependency&amp;amp;gt;
&gt; &amp;nbsp;
&gt; 3 streaming mode
&gt; 4. 代码类似如下
&gt; &amp;amp;nbsp; &amp;amp;nbsp; val sqlStream = env.createInput(jdbcInput)
&gt; &amp;amp;nbsp; &amp;amp;nbsp; tnv.registerDataStream("sqlStream",sqlStream,'netstruct_id,'start_ip,'end_ip)
&gt; &amp;amp;nbsp; &amp;amp;nbsp; tnv.registerDataStream("OMstream",value,'ip)
&gt; //&amp;amp;nbsp; &amp;amp;nbsp; val table = tnv.sqlQuery("select * from&amp;amp;nbsp; OMstream as&amp;amp;nbsp; a left join sqlStream as&amp;amp;nbsp; b on a.ip &amp;amp;gt;b.start_ip and a.ip<b.end_ip")
&gt; &amp;amp;nbsp; &amp;amp;nbsp; val table = tnv.sqlQuery("select b.netstruct_id from&amp;amp;nbsp; OMstream as&amp;amp;nbsp; a left join sqlStream as b on a.ip &amp;amp;gt; b.start_ip and a.ip <b.end_ip ")
&gt; &amp;amp;nbsp; &amp;amp;nbsp; val resRow = table.toRetractStream[Row]
&gt; &amp;nbsp;
&gt; 5 报错信息如下
&gt; Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query:&amp;amp;nbsp;
&gt; &amp;nbsp;
&gt; &amp;nbsp;
&gt; LogicalProject(netstruct_id=[$1])
&gt; &amp;amp;nbsp; LogicalJoin(condition=[AND(&amp;amp;gt;($0, $2), <($0, $3))], joinType=[left])
&gt; &amp;amp;nbsp; &amp;amp;nbsp; FlinkLogicalDataStreamScan(id=[1], fields=[ip])
&gt; &amp;amp;nbsp; &amp;amp;nbsp; FlinkLogicalDataStreamScan(id=[2], fields=[netstruct_id, start_ip, end_ip])
&gt; &amp;nbsp;
&gt; &amp;nbsp;
&gt; This exception indicates that the query uses an unsupported SQL feature.
&gt; Please check the documentation for the set of currently supported SQL features.
&gt; at org.apache.flink.table.plan.Optimizer.runVolcanoPlanner(Optimizer.scala:245)
&gt; at org.apache.flink.table.plan.Optimizer.optimizeLogicalPlan(Optimizer.scala:160)
&gt; at org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:66)
&gt; at org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410)
&gt; at org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:182)
&gt; at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127)
&gt; at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127)
&gt; at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
&gt; at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
&gt; at scala.collection.Iterator$class.foreach(Iterator.scala:891)
&gt; at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
&gt; at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
&gt; at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
&gt; at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
&gt; at scala.collection.AbstractTraversable.map(Traversable.scala:104)
&gt; at org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127)
&gt; at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:201)
&gt; at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:124)
&gt; at org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:146)
&gt; at com.jwell56.linkstarck.LInkStream$.main(LInkStream.scala:37)
&gt; at com.jwell56.linkstarck.LInkStream.main(LInkStream.scala)
&gt; &amp;nbsp;
&gt; &amp;nbsp;
&gt; &amp;nbsp;
&gt; &amp;nbsp;
&gt; &amp;nbsp;
&gt; 6 我也尝试使用了&amp;amp;nbsp;
&gt; select b.netstruct_id from&amp;amp;nbsp; OMstream as&amp;amp;nbsp; a left join sqlStream as b on a.ip &amp;amp;gt; b.start_ip
&gt; 同样是单个大小比较也是不可以的&amp;amp;nbsp;
&gt; &amp;nbsp;
&gt; &amp;nbsp;
&gt; 谢谢!
&gt; &amp;nbsp;
&gt; &amp;nbsp;
&gt; &amp;nbsp;
&gt; &amp;nbsp;
&gt; ------------------&amp;amp;nbsp;原始邮件&amp;amp;nbsp;------------------
&gt; 发件人:&amp;amp;nbsp;"Benchao Li"<[hidden email]&amp;amp;gt;;
&gt; 发送时间:&amp;amp;nbsp;2020年6月9日(星期二) 下午4:37
&gt; 收件人:&amp;amp;nbsp;"user-zh"<[hidden email]&amp;amp;gt;;
&gt; &amp;nbsp;
&gt; 主题:&amp;amp;nbsp;Re: 关于flinksql between问题
&gt; &amp;nbsp;
&gt; &amp;nbsp;
&gt; &amp;nbsp;
&gt; 方便补充一下以下信息么?
&gt; 1. 你使用的Flink的版本?
&gt; 2. 使用的planner,是blink planner还是old planner?
&gt; 3. 用的是streaming mode还是batch mode?
&gt; 4. 具体的报错信息是什么?
&gt; &amp;nbsp;
&gt; 小屁孩 <[hidden email]&amp;amp;gt; 于2020年6月9日周二 下午4:26写道:
&gt; &amp;nbsp;
&gt; &amp;amp;gt; hi,我在flinksql中使用 select * from a join b on a.ip <b.startip and a.ip
&gt; &amp;amp;gt; &amp;amp;amp;gt;b.endip 报了一个错误 不支持此功能 类似这种有没有类似between的函数可以使用
Reply | Threaded
Open this post in threaded view
|

Re: 关于flinksql between问题

Leonard Xu
In reply to this post by 小屁孩
>
> 非常感谢,使用的flink1.10.0 中流转成表是是否有字段的数目的限制 我把流转成表 我的流是一个封装的实体类

转换的时候没有这个字段数目的限制的,另外看你的字段也不是很多,一般业务上几百个字段都正常的,你检查下你字段的对应关系

祝好,
Leonard Xu


> tnv.registerDataStream("sqlStream",mysqlinst,'start_ip,'end_ip,'area_id,'unit_id,'device_id)
> &nbsp; &nbsp; tnv.registerDataStream("OMstream",value,'original_network_id,'asset_id,'types,'d_ip,'d_port,'s_ip,'s_port,'devip,'url,'common_des,'operation_des,'raw_log,'severity,'happen_time,'create_time,'s_ip_num,
> &nbsp; 'd_ip_num,'method,'asset_area_id,'device_id,'s_mac,'d_mac,'scope,'dcope,'s_asset_id,'d_asset_id,'asset_unit_id,'area_id,'unit_id,'enterprise_id)
> &nbsp; &nbsp; tnv.registerFunction("ip_to_num",IPtoNum)
>
> &nbsp;在转成表时 如下错误
> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig <http://logging.apache.org/log4j/1.2/faq.html#noconfig> for more info.
> Exception in thread "main" org.apache.flink.table.api.ValidationException: Too many fields referenced from an atomic type.
> at org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInfoFromAtomicType(FieldInfoUtils.java:388)
> at org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInformation(FieldInfoUtils.java:259)
> at org.apache.flink.table.typeutils.FieldInfoUtils.getFieldsInfo(FieldInfoUtils.java:227)
> at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$$anonfun$1.apply(StreamTableEnvironmentImpl.scala:237)
> at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$$anonfun$1.apply(StreamTableEnvironmentImpl.scala:236)
> at scala.Option.map(Option.scala:146)
> at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.asQueryOperation(StreamTableEnvironmentImpl.scala:236)
> at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.fromDataStream(StreamTableEnvironmentImpl.scala:81)
> at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.registerDataStream(StreamTableEnvironmentImpl.scala:94)
> at com.jwell56.networksyslog.jobs.jobsource.MysqlLogToES$.main(MysqlLogToES.scala:77)
> at com.jwell56.networksyslog.jobs.jobsource.MysqlLogToES.main(MysqlLogToES.scala)
>
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"Leonard Xu"<[hidden email] <mailto:[hidden email]>&gt;;
> 发送时间:&nbsp;2020年6月10日(星期三) 中午1:16
> 收件人:&nbsp;"user-zh"<[hidden email] <mailto:[hidden email]>&gt;;
>
> 主题:&nbsp;Re: 关于flinksql between问题
>
>
>
> Hi,
>
> 看你描述的想要的是自定义source(左表),&nbsp; 需要同一张mysql 维表做join,如果是这样的话,你的sql看起来有点问题,目前的写法是regular join, 维表join的语法[1]:
>
> SELECT
> &nbsp; o.amout, o.currency, r.rate, o.amount * r.rate
> FROM
> &nbsp; Orders AS o
> &nbsp; JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
> &nbsp; ON r.currency = o.currency
> 另外JDBC connector是实现了LookupSource的,也就是支持做维表,维表的更新的 connector.lookup.cache.ttl 参数控制维表中cache的过期时间,不知道是否满足你的需求。
>
> Best,
> Leonard Xu
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins <https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins><https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins&gt <https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins&gt>;
> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector <https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector&gt <https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector&gt>;
>
> &gt; 在 2020年6月10日,10:43,小屁孩 <[hidden email] <mailto:[hidden email]>&gt; 写道:
> &gt;
> &gt; hi,感谢指导 已经可以实现,可以再问一下 如果我join的是一个mysql维表 我是自定义的source定时更新mysql表这样跟流表关联是可以的吗?
> &gt;
> &gt;
> &gt;
> &gt;
> &gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
> &gt; 发件人:&amp;nbsp;"[hidden email] <mailto:[hidden email]>"<[hidden email] <mailto:[hidden email]>&amp;gt;;
> &gt; 发送时间:&amp;nbsp;2020年6月9日(星期二) 晚上6:35
> &gt; 收件人:&amp;nbsp;"user-zh"<[hidden email] <mailto:[hidden email]>&amp;gt;;
> &gt;
> &gt; 主题:&amp;nbsp;回复: 回复: 关于flinksql between问题
> &gt;
> &gt;
> &gt;
> &gt;
> &gt; &amp;nbsp; 我在1.10中用 useBlinkPlanner是可以的,用useOldPlanner是不可以的!
> &gt; &amp;nbsp;
> &gt; 会报你下面的错误:
> &gt; &amp;nbsp; Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query:
> &gt;
> &gt; LogicalProject(num=[$0])
> &gt; &amp;nbsp; LogicalJoin(condition=[AND(&amp;gt;($0, $1), <($0, $2))], joinType=[inner])
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; FlinkLogicalDataStreamScan(id=[1], fields=[num])
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; FlinkLogicalDataStreamScan(id=[2], fields=[startNum, endNum])
> &gt;
> &gt; This exception indicates that the query uses an unsupported SQL feature.
> &gt;
> &gt;
> &gt;
> &gt;
> &gt; &amp;nbsp;
> &gt; 发件人: 小屁孩
> &gt; 发送时间: 2020-06-09 17:41
> &gt; 收件人: user-zh
> &gt; 主题: 回复: 关于flinksql between问题
> &gt; hi,我使用的是&amp;amp;nbsp;
> &gt; 1 flink1.9.0
> &gt; 2 oldplanner
> &gt; <dependency&amp;amp;gt;
> &gt; <groupId&amp;amp;gt;org.apache.flink</groupId&amp;amp;gt;
> &gt; <artifactId&amp;amp;gt;flink-table-api-scala_2.11</artifactId&amp;amp;gt;
> &gt; <version&amp;amp;gt;1.9.0</version&amp;amp;gt;
> &gt; </dependency&amp;amp;gt;
> &gt; &amp;nbsp;
> &gt; &amp;nbsp;
> &gt; <dependency&amp;amp;gt;
> &gt; <groupId&amp;amp;gt;org.apache.flink</groupId&amp;amp;gt;
> &gt; <artifactId&amp;amp;gt;flink-table-planner_2.11</artifactId&amp;amp;gt;
> &gt; <version&amp;amp;gt;1.9.0</version&amp;amp;gt;
> &gt; </dependency&amp;amp;gt;
> &gt; &amp;nbsp;
> &gt; 3 streaming mode
> &gt; 4. 代码类似如下
> &gt; &amp;amp;nbsp; &amp;amp;nbsp; val sqlStream = env.createInput(jdbcInput)
> &gt; &amp;amp;nbsp; &amp;amp;nbsp; tnv.registerDataStream("sqlStream",sqlStream,'netstruct_id,'start_ip,'end_ip)
> &gt; &amp;amp;nbsp; &amp;amp;nbsp; tnv.registerDataStream("OMstream",value,'ip)
> &gt; //&amp;amp;nbsp; &amp;amp;nbsp; val table = tnv.sqlQuery("select * from&amp;amp;nbsp; OMstream as&amp;amp;nbsp; a left join sqlStream as&amp;amp;nbsp; b on a.ip &amp;amp;gt;b.start_ip and a.ip<b.end_ip")
> &gt; &amp;amp;nbsp; &amp;amp;nbsp; val table = tnv.sqlQuery("select b.netstruct_id from&amp;amp;nbsp; OMstream as&amp;amp;nbsp; a left join sqlStream as b on a.ip &amp;amp;gt; b.start_ip and a.ip <b.end_ip ")
> &gt; &amp;amp;nbsp; &amp;amp;nbsp; val resRow = table.toRetractStream[Row]
> &gt; &amp;nbsp;
> &gt; 5 报错信息如下
> &gt; Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query:&amp;amp;nbsp;
> &gt; &amp;nbsp;
> &gt; &amp;nbsp;
> &gt; LogicalProject(netstruct_id=[$1])
> &gt; &amp;amp;nbsp; LogicalJoin(condition=[AND(&amp;amp;gt;($0, $2), <($0, $3))], joinType=[left])
> &gt; &amp;amp;nbsp; &amp;amp;nbsp; FlinkLogicalDataStreamScan(id=[1], fields=[ip])
> &gt; &amp;amp;nbsp; &amp;amp;nbsp; FlinkLogicalDataStreamScan(id=[2], fields=[netstruct_id, start_ip, end_ip])
> &gt; &amp;nbsp;
> &gt; &amp;nbsp;
> &gt; This exception indicates that the query uses an unsupported SQL feature.
> &gt; Please check the documentation for the set of currently supported SQL features.
> &gt; at org.apache.flink.table.plan.Optimizer.runVolcanoPlanner(Optimizer.scala:245)
> &gt; at org.apache.flink.table.plan.Optimizer.optimizeLogicalPlan(Optimizer.scala:160)
> &gt; at org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:66)
> &gt; at org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410)
> &gt; at org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:182)
> &gt; at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127)
> &gt; at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127)
> &gt; at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> &gt; at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> &gt; at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> &gt; at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> &gt; at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> &gt; at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> &gt; at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> &gt; at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> &gt; at org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127)
> &gt; at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:201)
> &gt; at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:124)
> &gt; at org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:146)
> &gt; at com.jwell56.linkstarck.LInkStream$.main(LInkStream.scala:37)
> &gt; at com.jwell56.linkstarck.LInkStream.main(LInkStream.scala)
> &gt; &amp;nbsp;
> &gt; &amp;nbsp;
> &gt; &amp;nbsp;
> &gt; &amp;nbsp;
> &gt; &amp;nbsp;
> &gt; 6 我也尝试使用了&amp;amp;nbsp;
> &gt; select b.netstruct_id from&amp;amp;nbsp; OMstream as&amp;amp;nbsp; a left join sqlStream as b on a.ip &amp;amp;gt; b.start_ip
> &gt; 同样是单个大小比较也是不可以的&amp;amp;nbsp;
> &gt; &amp;nbsp;
> &gt; &amp;nbsp;
> &gt; 谢谢!
> &gt; &amp;nbsp;
> &gt; &amp;nbsp;
> &gt; &amp;nbsp;
> &gt; &amp;nbsp;
> &gt; ------------------&amp;amp;nbsp;原始邮件&amp;amp;nbsp;------------------
> &gt; 发件人:&amp;amp;nbsp;"Benchao Li"<[hidden email] <mailto:[hidden email]>&amp;amp;gt;;
> &gt; 发送时间:&amp;amp;nbsp;2020年6月9日(星期二) 下午4:37
> &gt; 收件人:&amp;amp;nbsp;"user-zh"<[hidden email] <mailto:[hidden email]>&amp;amp;gt;;
> &gt; &amp;nbsp;
> &gt; 主题:&amp;amp;nbsp;Re: 关于flinksql between问题
> &gt; &amp;nbsp;
> &gt; &amp;nbsp;
> &gt; &amp;nbsp;
> &gt; 方便补充一下以下信息么?
> &gt; 1. 你使用的Flink的版本?
> &gt; 2. 使用的planner,是blink planner还是old planner?
> &gt; 3. 用的是streaming mode还是batch mode?
> &gt; 4. 具体的报错信息是什么?
> &gt; &amp;nbsp;
> &gt; 小屁孩 <[hidden email]&amp;amp;gt; 于2020年6月9日周二 下午4:26写道:
> &gt; &amp;nbsp;
> &gt; &amp;amp;gt; hi,我在flinksql中使用 select * from a join b on a.ip <b.startip and a.ip
> &gt; &amp;amp;gt; &amp;amp;amp;gt;b.endip 报了一个错误 不支持此功能 类似这种有没有类似between的函数可以使用

Reply | Threaded
Open this post in threaded view
|

回复: 关于flinksql between问题

小屁孩
感谢 !是我字段对应不对,一张流表join一张mysql表 是否支持group by 如果支持他的工作原理又是怎么样的呢?目前我的业务场景:流表 A(id,ip) mysql表B(startip,endip,area_id)
关联条件是A.ip between(B.startip,B.endIp) 可能关联出多个area_id 然后取出最小的area_id
我目前的做法就是使用sql between完之后再根据ID 取出最小的
代码大概如下:
&nbsp; &nbsp; val table = tnv.sqlQuery("select a.*,b.area_id as s_area_id,b.unit_id as s_unit_id,(ip_to_num(b.end_ip)-ip_to_num(b.start_ip)) as scoped&nbsp; from&nbsp; OMstream as a left join sqlStream as b on ip_to_num(a.s_ip) &gt; ip_to_num(b.start_ip) and ip_to_num(a.s_ip) <ip_to_num(b.end_ip) and a.device_id=b.device_id")
&nbsp; &nbsp; val value2: DataStream[(Boolean, Row)] = table.toRetractStream[Row]
&nbsp; &nbsp; val value3 = value2.filter(_._1)&nbsp; //目前是数据到了这步之后 后面的数据没有输出
&nbsp; &nbsp; val value4 =value3.map(x =&gt; {
&nbsp; &nbsp; &nbsp; val mes =info(x._2.getField(0).toString, x._2.getField(1).toString, x._2.getField(2).toString, x._2.getField(3).toString, x._2.getField(4).toString, x._2.getField(5).toString, x._2.getField(6).toString, x._2.getField(7).toString, x._2.getField(8).toString, x._2.getField(9).toString,
&nbsp; &nbsp; &nbsp; &nbsp; x._2.getField(10).toString, x._2.getField(11).toString.toInt, x._2.getField(12).toString, x._2.getField(13).toString, x._2.getField(14).toString, x._2.getField(15).toString, x._2.getField(16).toString, x._2.getField(17).toString, x._2.getField(18).toString.toInt, x._2.getField(19).toString, x._2.getField(20).toString, x._2.getField(21).toString,
&nbsp; &nbsp; &nbsp; &nbsp; x._2.getField(22).toString, x._2.getField(23).toString, x._2.getField(24).toString, x._2.getField(25).toString, x._2.getField(26).toString.toInt, x._2.getField(27).toString.toInt, x._2.getField(28).toString.toInt)
&nbsp; &nbsp; &nbsp; mes
&nbsp; &nbsp; }) //这步只是对数据row转成case class&nbsp;
&nbsp; &nbsp; value4.print&nbsp; //数据打印不出来
&nbsp; &nbsp; value4.keyBy("rowkey").timeWindow(Time.seconds(2)).minBy("scoped")



------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"Leonard Xu"<[hidden email]&gt;;
发送时间:&nbsp;2020年6月10日(星期三) 晚上8:28
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: 关于flinksql between问题



&gt;
&gt; 非常感谢,使用的flink1.10.0 中流转成表是是否有字段的数目的限制 我把流转成表 我的流是一个封装的实体类

转换的时候没有这个字段数目的限制的,另外看你的字段也不是很多,一般业务上几百个字段都正常的,你检查下你字段的对应关系

祝好,
Leonard Xu


&gt; tnv.registerDataStream("sqlStream",mysqlinst,'start_ip,'end_ip,'area_id,'unit_id,'device_id)
&gt; &amp;nbsp; &amp;nbsp; tnv.registerDataStream("OMstream",value,'original_network_id,'asset_id,'types,'d_ip,'d_port,'s_ip,'s_port,'devip,'url,'common_des,'operation_des,'raw_log,'severity,'happen_time,'create_time,'s_ip_num,
&gt; &amp;nbsp; 'd_ip_num,'method,'asset_area_id,'device_id,'s_mac,'d_mac,'scope,'dcope,'s_asset_id,'d_asset_id,'asset_unit_id,'area_id,'unit_id,'enterprise_id)
&gt; &amp;nbsp; &amp;nbsp; tnv.registerFunction("ip_to_num",IPtoNum)
&gt;
&gt; &amp;nbsp;在转成表时 如下错误
&gt; log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig <http://logging.apache.org/log4j/1.2/faq.html#noconfig&gt; for more info.
&gt; Exception in thread "main" org.apache.flink.table.api.ValidationException: Too many fields referenced from an atomic type.
&gt; at org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInfoFromAtomicType(FieldInfoUtils.java:388)
&gt; at org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInformation(FieldInfoUtils.java:259)
&gt; at org.apache.flink.table.typeutils.FieldInfoUtils.getFieldsInfo(FieldInfoUtils.java:227)
&gt; at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$$anonfun$1.apply(StreamTableEnvironmentImpl.scala:237)
&gt; at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$$anonfun$1.apply(StreamTableEnvironmentImpl.scala:236)
&gt; at scala.Option.map(Option.scala:146)
&gt; at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.asQueryOperation(StreamTableEnvironmentImpl.scala:236)
&gt; at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.fromDataStream(StreamTableEnvironmentImpl.scala:81)
&gt; at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.registerDataStream(StreamTableEnvironmentImpl.scala:94)
&gt; at com.jwell56.networksyslog.jobs.jobsource.MysqlLogToES$.main(MysqlLogToES.scala:77)
&gt; at com.jwell56.networksyslog.jobs.jobsource.MysqlLogToES.main(MysqlLogToES.scala)
&gt;
&gt;
&gt;
&gt;
&gt;
&gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
&gt; 发件人:&amp;nbsp;"Leonard Xu"<[hidden email] <mailto:[hidden email]&gt;&amp;gt;;
&gt; 发送时间:&amp;nbsp;2020年6月10日(星期三) 中午1:16
&gt; 收件人:&amp;nbsp;"user-zh"<[hidden email] <mailto:[hidden email]&gt;&amp;gt;;
&gt;
&gt; 主题:&amp;nbsp;Re: 关于flinksql between问题
&gt;
&gt;
&gt;
&gt; Hi,
&gt;
&gt; 看你描述的想要的是自定义source(左表),&amp;nbsp; 需要同一张mysql 维表做join,如果是这样的话,你的sql看起来有点问题,目前的写法是regular join, 维表join的语法[1]:
&gt;
&gt; SELECT
&gt; &amp;nbsp; o.amout, o.currency, r.rate, o.amount * r.rate
&gt; FROM
&gt; &amp;nbsp; Orders AS o
&gt; &amp;nbsp; JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
&gt; &amp;nbsp; ON r.currency = o.currency
&gt; 另外JDBC connector是实现了LookupSource的,也就是支持做维表,维表的更新的 connector.lookup.cache.ttl 参数控制维表中cache的过期时间,不知道是否满足你的需求。
&gt;
&gt; Best,
&gt; Leonard Xu
&gt;
&gt; [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins <https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins&gt;<https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins&amp;gt <https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins&amp;gt&gt;;
&gt; [2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector <https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector&gt; <https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector&amp;gt <https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector&amp;gt&gt;;
&gt;
&gt; &amp;gt; 在 2020年6月10日,10:43,小屁孩 <[hidden email] <mailto:[hidden email]&gt;&amp;gt; 写道:
&gt; &amp;gt;
&gt; &amp;gt; hi,感谢指导 已经可以实现,可以再问一下 如果我join的是一个mysql维表 我是自定义的source定时更新mysql表这样跟流表关联是可以的吗?
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; ------------------&amp;amp;nbsp;原始邮件&amp;amp;nbsp;------------------
&gt; &amp;gt; 发件人:&amp;amp;nbsp;"[hidden email] <mailto:[hidden email]&gt;"<[hidden email] <mailto:[hidden email]&gt;&amp;amp;gt;;
&gt; &amp;gt; 发送时间:&amp;amp;nbsp;2020年6月9日(星期二) 晚上6:35
&gt; &amp;gt; 收件人:&amp;amp;nbsp;"user-zh"<[hidden email] <mailto:[hidden email]&gt;&amp;amp;gt;;
&gt; &amp;gt;
&gt; &amp;gt; 主题:&amp;amp;nbsp;回复: 回复: 关于flinksql between问题
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp; 我在1.10中用 useBlinkPlanner是可以的,用useOldPlanner是不可以的!
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; 会报你下面的错误:
&gt; &amp;gt; &amp;amp;nbsp; Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query:
&gt; &amp;gt;
&gt; &amp;gt; LogicalProject(num=[$0])
&gt; &amp;gt; &amp;amp;nbsp; LogicalJoin(condition=[AND(&amp;amp;gt;($0, $1), <($0, $2))], joinType=[inner])
&gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; FlinkLogicalDataStreamScan(id=[1], fields=[num])
&gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; FlinkLogicalDataStreamScan(id=[2], fields=[startNum, endNum])
&gt; &amp;gt;
&gt; &amp;gt; This exception indicates that the query uses an unsupported SQL feature.
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; 发件人: 小屁孩
&gt; &amp;gt; 发送时间: 2020-06-09 17:41
&gt; &amp;gt; 收件人: user-zh
&gt; &amp;gt; 主题: 回复: 关于flinksql between问题
&gt; &amp;gt; hi,我使用的是&amp;amp;amp;nbsp;
&gt; &amp;gt; 1 flink1.9.0
&gt; &amp;gt; 2 oldplanner
&gt; &amp;gt; <dependency&amp;amp;amp;gt;
&gt; &amp;gt; <groupId&amp;amp;amp;gt;org.apache.flink</groupId&amp;amp;amp;gt;
&gt; &amp;gt; <artifactId&amp;amp;amp;gt;flink-table-api-scala_2.11</artifactId&amp;amp;amp;gt;
&gt; &amp;gt; <version&amp;amp;amp;gt;1.9.0</version&amp;amp;amp;gt;
&gt; &amp;gt; </dependency&amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; <dependency&amp;amp;amp;gt;
&gt; &amp;gt; <groupId&amp;amp;amp;gt;org.apache.flink</groupId&amp;amp;amp;gt;
&gt; &amp;gt; <artifactId&amp;amp;amp;gt;flink-table-planner_2.11</artifactId&amp;amp;amp;gt;
&gt; &amp;gt; <version&amp;amp;amp;gt;1.9.0</version&amp;amp;amp;gt;
&gt; &amp;gt; </dependency&amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; 3 streaming mode
&gt; &amp;gt; 4. 代码类似如下
&gt; &amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp; val sqlStream = env.createInput(jdbcInput)
&gt; &amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp; tnv.registerDataStream("sqlStream",sqlStream,'netstruct_id,'start_ip,'end_ip)
&gt; &amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp; tnv.registerDataStream("OMstream",value,'ip)
&gt; &amp;gt; //&amp;amp;amp;nbsp; &amp;amp;amp;nbsp; val table = tnv.sqlQuery("select * from&amp;amp;amp;nbsp; OMstream as&amp;amp;amp;nbsp; a left join sqlStream as&amp;amp;amp;nbsp; b on a.ip &amp;amp;amp;gt;b.start_ip and a.ip<b.end_ip")
&gt; &amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp; val table = tnv.sqlQuery("select b.netstruct_id from&amp;amp;amp;nbsp; OMstream as&amp;amp;amp;nbsp; a left join sqlStream as b on a.ip &amp;amp;amp;gt; b.start_ip and a.ip <b.end_ip ")
&gt; &amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp; val resRow = table.toRetractStream[Row]
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; 5 报错信息如下
&gt; &amp;gt; Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query:&amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; LogicalProject(netstruct_id=[$1])
&gt; &amp;gt; &amp;amp;amp;nbsp; LogicalJoin(condition=[AND(&amp;amp;amp;gt;($0, $2), <($0, $3))], joinType=[left])
&gt; &amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp; FlinkLogicalDataStreamScan(id=[1], fields=[ip])
&gt; &amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp; FlinkLogicalDataStreamScan(id=[2], fields=[netstruct_id, start_ip, end_ip])
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; This exception indicates that the query uses an unsupported SQL feature.
&gt; &amp;gt; Please check the documentation for the set of currently supported SQL features.
&gt; &amp;gt; at org.apache.flink.table.plan.Optimizer.runVolcanoPlanner(Optimizer.scala:245)
&gt; &amp;gt; at org.apache.flink.table.plan.Optimizer.optimizeLogicalPlan(Optimizer.scala:160)
&gt; &amp;gt; at org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:66)
&gt; &amp;gt; at org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410)
&gt; &amp;gt; at org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:182)
&gt; &amp;gt; at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127)
&gt; &amp;gt; at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127)
&gt; &amp;gt; at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
&gt; &amp;gt; at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
&gt; &amp;gt; at scala.collection.Iterator$class.foreach(Iterator.scala:891)
&gt; &amp;gt; at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
&gt; &amp;gt; at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
&gt; &amp;gt; at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
&gt; &amp;gt; at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
&gt; &amp;gt; at scala.collection.AbstractTraversable.map(Traversable.scala:104)
&gt; &amp;gt; at org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127)
&gt; &amp;gt; at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:201)
&gt; &amp;gt; at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:124)
&gt; &amp;gt; at org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:146)
&gt; &amp;gt; at com.jwell56.linkstarck.LInkStream$.main(LInkStream.scala:37)
&gt; &amp;gt; at com.jwell56.linkstarck.LInkStream.main(LInkStream.scala)
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; 6 我也尝试使用了&amp;amp;amp;nbsp;
&gt; &amp;gt; select b.netstruct_id from&amp;amp;amp;nbsp; OMstream as&amp;amp;amp;nbsp; a left join sqlStream as b on a.ip &amp;amp;amp;gt; b.start_ip
&gt; &amp;gt; 同样是单个大小比较也是不可以的&amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; 谢谢!
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; ------------------&amp;amp;amp;nbsp;原始邮件&amp;amp;amp;nbsp;------------------
&gt; &amp;gt; 发件人:&amp;amp;amp;nbsp;"Benchao Li"<[hidden email] <mailto:[hidden email]&gt;&amp;amp;amp;gt;;
&gt; &amp;gt; 发送时间:&amp;amp;amp;nbsp;2020年6月9日(星期二) 下午4:37
&gt; &amp;gt; 收件人:&amp;amp;amp;nbsp;"user-zh"<[hidden email] <mailto:[hidden email]&gt;&amp;amp;amp;gt;;
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; 主题:&amp;amp;amp;nbsp;Re: 关于flinksql between问题
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; 方便补充一下以下信息么?
&gt; &amp;gt; 1. 你使用的Flink的版本?
&gt; &amp;gt; 2. 使用的planner,是blink planner还是old planner?
&gt; &amp;gt; 3. 用的是streaming mode还是batch mode?
&gt; &amp;gt; 4. 具体的报错信息是什么?
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; 小屁孩 <[hidden email]&amp;amp;amp;gt; 于2020年6月9日周二 下午4:26写道:
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;amp;gt; hi,我在flinksql中使用 select * from a join b on a.ip <b.startip and a.ip
&gt; &amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt;b.endip 报了一个错误 不支持此功能 类似这种有没有类似between的函数可以使用