Hi,
维表创建的DDL跟普通的source没有区别,主要是在使用的时候,需要使用维表join专有的语法。 SELECT o.amout, o.currency, r.rate, o.amount * r.rateFROM Orders AS o JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r ON r.currency = o.currency [hidden email] <[hidden email]> 于2020年4月15日周三 下午7:48写道: > hi 大家 > 想问下flink-1.10-sql支持维表DDL吗,看社区文档好像mysql和hbase支持,但是需要什么字段显示声明为创建的表是维表呀? > > > > [hidden email] > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
In reply to this post by guaishushu1103@163.com
JDBC connector 支持作为维表,DDL无需特殊字段指定。部分可选的参数可以控制temporary join行为[1]。
用作维表join时,需要使用特殊的join语法 [2] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#jdbc-connector [2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#joins *Best Regards,* *Zhenghua Gao* On Wed, Apr 15, 2020 at 7:48 PM [hidden email] < [hidden email]> wrote: > hi 大家 > 想问下flink-1.10-sql支持维表DDL吗,看社区文档好像mysql和hbase支持,但是需要什么字段显示声明为创建的表是维表呀? > > > > [hidden email] > |
Hi,
现在直接使用DDL声明可以正常使用吗?我这边使用的时候发现,FOR SYSTEM_TIME AS OF o.proctime 会报类型不匹配问题…timestamp(3)和time attribute 不匹配. 所以现在只能使用connector内部提供的Lookup function手动注册成 table function,使用lateral table xxx的语法来使用。 Best, Xinghalo |
这个原因是维表join的时候需要使用的时间是*有处理时间属性*[1] 的。
[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html 111 <[hidden email]> 于2020年4月15日周三 下午9:08写道: > Hi, > 现在直接使用DDL声明可以正常使用吗?我这边使用的时候发现,FOR SYSTEM_TIME AS OF o.proctime > 会报类型不匹配问题…timestamp(3)和time attribute 不匹配. > > > 所以现在只能使用connector内部提供的Lookup function手动注册成 table function,使用lateral table > xxx的语法来使用。 > > > Best, > Xinghalo > > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
Hi,
我的时间字段就是proctime()产生的...因为当时有个time关键字的bug,所以按照这个confluence进行了修正。 后来使用时间字段的时候,就出了现在的问题。 https://issues.apache.org/jira/browse/FLINK-16068 Best, Xinghalo 在2020年04月15日 21:21,Benchao Li<[hidden email]> 写道: 这个原因是维表join的时候需要使用的时间是*有处理时间属性*[1] 的。 [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html 111 <[hidden email]> 于2020年4月15日周三 下午9:08写道: Hi, 现在直接使用DDL声明可以正常使用吗?我这边使用的时候发现,FOR SYSTEM_TIME AS OF o.proctime 会报类型不匹配问题…timestamp(3)和time attribute 不匹配. 所以现在只能使用connector内部提供的Lookup function手动注册成 table function,使用lateral table xxx的语法来使用。 Best, Xinghalo -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
Hi,
更正一下,我的问题跟这个类似,遇到的问题也在评论中: https://issues.apache.org/jira/browse/FLINK-16345?jql=text%20~%20%22Caused%20by%3A%20java.lang.AssertionError%3A%20Conversion%20to%20relational%20algebra%20failed%20to%20preserve%20datatypes%3A%22 Best, Xinghalo 在2020年04月16日 08:18,111<[hidden email]> 写道: Hi, 我的时间字段就是proctime()产生的...因为当时有个time关键字的bug,所以按照这个confluence进行了修正。 后来使用时间字段的时候,就出了现在的问题。 https://issues.apache.org/jira/browse/FLINK-16068 Best, Xinghalo 在2020年04月15日 21:21,Benchao Li<[hidden email]> 写道: 这个原因是维表join的时候需要使用的时间是*有处理时间属性*[1] 的。 [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html 111 <[hidden email]> 于2020年4月15日周三 下午9:08写道: Hi, 现在直接使用DDL声明可以正常使用吗?我这边使用的时候发现,FOR SYSTEM_TIME AS OF o.proctime 会报类型不匹配问题…timestamp(3)和time attribute 不匹配. 所以现在只能使用connector内部提供的Lookup function手动注册成 table function,使用lateral table xxx的语法来使用。 Best, Xinghalo -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
Hi,
你提到的这两个issue都是在1.10.1版本中才会修复,但是现在还没有release1.10.1版本。 你现在是用release-1.10 branch编译的么? 此外,是否方便也贴一下完整的DDL以及query呢? 111 <[hidden email]> 于2020年4月16日周四 上午8:22写道: > Hi, > 更正一下,我的问题跟这个类似,遇到的问题也在评论中: > > https://issues.apache.org/jira/browse/FLINK-16345?jql=text%20~%20%22Caused%20by%3A%20java.lang.AssertionError%3A%20Conversion%20to%20relational%20algebra%20failed%20to%20preserve%20datatypes%3A%22 > Best, > Xinghalo > > > 在2020年04月16日 08:18,111<[hidden email]> 写道: > Hi, > 我的时间字段就是proctime()产生的...因为当时有个time关键字的bug,所以按照这个confluence进行了修正。 > 后来使用时间字段的时候,就出了现在的问题。 > https://issues.apache.org/jira/browse/FLINK-16068 > Best, > Xinghalo > > > 在2020年04月15日 21:21,Benchao Li<[hidden email]> 写道: > 这个原因是维表join的时候需要使用的时间是*有处理时间属性*[1] 的。 > > [1] > > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html > > 111 <[hidden email]> 于2020年4月15日周三 下午9:08写道: > > Hi, > 现在直接使用DDL声明可以正常使用吗?我这边使用的时候发现,FOR SYSTEM_TIME AS OF o.proctime > 会报类型不匹配问题…timestamp(3)和time attribute 不匹配. > > > 所以现在只能使用connector内部提供的Lookup function手动注册成 table function,使用lateral table > xxx的语法来使用。 > > > Best, > Xinghalo > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: [hidden email]; [hidden email] > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
Hi,
基于1.10 源码按照jira里面的PR修改不行么? 跟hbase的ddl关系应该不大,就发一个kafka的吧。 //代码占位符 Flink SQL> CREATE TABLE kafka_test1 ( //代码占位符 Flink SQL> CREATE TABLE kafka_test1 ( > id varchar, > a varchar, > b int, > ts as PROCTIME() > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = '0.11', > 'connector.topic' = 'test', > 'connector.properties.zookeeper.connect' = 'localnode2:2181', > 'connector.properties.bootstrap.servers' = 'localnode2:9092', > 'connector.properties.group.id' = 'testGroup', > 'connector.startup-mode' = 'latest-offset', > 'format.type' = 'json' > ) > ; Flink SQL> select a.*,b.* from kafka_test1 a join hbase_test1 FOR SYSTEM_TIME AS OF a.ts as b on a.id = b.rowkey; 异常信息: //代码占位符 [ERROR] Could not execute SQL statement. Reason: java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes: validated type: RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" id, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" a, INTEGER b, TIMESTAMP(3) NOT NULL ts, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" rowkey, RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" a, INTEGER b) f) NOT NULL converted type: RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" id, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" a, INTEGER b, TIME ATTRIBUTE(PROCTIME) NOT NULL ts, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" rowkey, RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" a, INTEGER b) f) NOT NULL rel: LogicalProject(id=[$0], a=[$1], b=[$2], ts=[$3], rowkey=[$4], f=[$5]) LogicalCorrelate(correlation=[$cor1], joinType=[inner], requiredColumns=[{0, 3}]) LogicalProject(id=[$0], a=[$1], b=[$2], ts=[PROCTIME()]) LogicalTableScan(table=[[tgou, collie, kafka_test1, source: [Kafka011TableSource(id, a, b)]]]) LogicalFilter(condition=[=($cor1.id, $0)]) LogicalSnapshot(period=[$cor1.ts]) LogicalTableScan(table=[[tgou, collie, hbase_test1, source: [HBaseTableSource[schema=[rowkey, f], projectFields=null]]]]) Best, Xinghalo |
https://issues.apache.org/jira/browse/FLINK-16068
https://issues.apache.org/jira/browse/FLINK-16345 上面这两个issue的修改都加到了1.10上了么?如果是的话,那这可能是还有其他的bug。 如果你可以在1.10和或者master分支的最新代码上复现这个问题的话,可以建一个issue来跟踪下这个问题。 111 <[hidden email]> 于2020年4月16日周四 上午10:46写道: > Hi, > 基于1.10 源码按照jira里面的PR修改不行么? > 跟hbase的ddl关系应该不大,就发一个kafka的吧。 > > > //代码占位符 > Flink SQL> CREATE TABLE kafka_test1 ( > //代码占位符 > Flink SQL> CREATE TABLE kafka_test1 ( > > id varchar, > > a varchar, > > b int, > > ts as PROCTIME() > > ) WITH ( > > 'connector.type' = 'kafka', > > 'connector.version' = '0.11', > > 'connector.topic' = 'test', > > 'connector.properties.zookeeper.connect' = 'localnode2:2181', > > 'connector.properties.bootstrap.servers' = 'localnode2:9092', > > 'connector.properties.group.id' = 'testGroup', > > 'connector.startup-mode' = 'latest-offset', > > 'format.type' = 'json' > > ) > > ; > [INFO] Table has been created. > > > Flink SQL> select a.*,b.* from kafka_test1 a join hbase_test1 FOR > SYSTEM_TIME AS OF a.ts as b on a.id = b.rowkey; > > > 异常信息: > //代码占位符 > [ERROR] Could not execute SQL statement. Reason: > java.lang.AssertionError: Conversion to relational algebra failed to > preserve datatypes: > validated type: > RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" id, > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" a, INTEGER b, TIMESTAMP(3) NOT > NULL ts, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" rowkey, > RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" a, > INTEGER b) f) NOT NULL > converted type: > RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" id, > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" a, INTEGER b, TIME > ATTRIBUTE(PROCTIME) NOT NULL ts, VARCHAR(2147483647) CHARACTER SET > "UTF-16LE" rowkey, RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER > SET "UTF-16LE" a, INTEGER b) f) NOT NULL > rel: > LogicalProject(id=[$0], a=[$1], b=[$2], ts=[$3], rowkey=[$4], f=[$5]) > LogicalCorrelate(correlation=[$cor1], joinType=[inner], > requiredColumns=[{0, 3}]) > LogicalProject(id=[$0], a=[$1], b=[$2], ts=[PROCTIME()]) > LogicalTableScan(table=[[tgou, collie, kafka_test1, source: > [Kafka011TableSource(id, a, b)]]]) > LogicalFilter(condition=[=($cor1.id, $0)]) > LogicalSnapshot(period=[$cor1.ts]) > LogicalTableScan(table=[[tgou, collie, hbase_test1, source: > [HBaseTableSource[schema=[rowkey, f], projectFields=null]]]]) > > > Best, > Xinghalo -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
Hi,
是的,我都有修改..... 那我去jira里面重新开个issue? 另外,1.10.1或者1.11大概什么时间发布呢?我已经合并了很多PR,现在的版本有点乱了。 Best, Xinghalo |
1.10.1最近正在准备发布,还有几个blocker的issue,应该快了。
1.11的话,应该还比较久,现在都还没有feature freeze。 如果你可以在master上复现这个问题的话,可以建一个issue。 111 <[hidden email]> 于2020年4月16日周四 上午11:32写道: > Hi, > 是的,我都有修改..... > 那我去jira里面重新开个issue? > > > 另外,1.10.1或者1.11大概什么时间发布呢?我已经合并了很多PR,现在的版本有点乱了。 > Best, > Xinghalo -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
|
Hi,
Flikn 1.10.1还没有正式发布,暂时还没有地方可以直接下载。可以从源码直接编译一下~ [hidden email] <[hidden email]> 于2020年4月16日周四 下午3:04写道: > > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
目前社区已经在讨论 release-1.10.1 RC [1] 的发布
[1] http://mail-archives.apache.org/mod_mbox/flink-dev/202004.mbox/%3CCAM7-19K0YsejvZpfVJrvEX6_DOJ7sUViEn9nB-5zfhX8P28_9A%40mail.gmail.com%3E Best, Godfrey Benchao Li <[hidden email]> 于2020年4月16日周四 下午3:06写道: > Hi, > Flikn 1.10.1还没有正式发布,暂时还没有地方可以直接下载。可以从源码直接编译一下~ > > [hidden email] <[hidden email]> 于2020年4月16日周四 下午3:04写道: > > > > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: [hidden email]; [hidden email] > |
1.10.1还剩余最后一个blocker [1],解决之后将创建Release Candidate并启动投票,预计还需要1-2周时间,感谢关注。
Best Regards, Yu [1] https://issues.apache.org/jira/browse/FLINK-16662 On Thu, 16 Apr 2020 at 17:24, godfrey he <[hidden email]> wrote: > 目前社区已经在讨论 release-1.10.1 RC [1] 的发布 > > [1] > > http://mail-archives.apache.org/mod_mbox/flink-dev/202004.mbox/%3CCAM7-19K0YsejvZpfVJrvEX6_DOJ7sUViEn9nB-5zfhX8P28_9A%40mail.gmail.com%3E > > Best, > Godfrey > > Benchao Li <[hidden email]> 于2020年4月16日周四 下午3:06写道: > > > Hi, > > Flikn 1.10.1还没有正式发布,暂时还没有地方可以直接下载。可以从源码直接编译一下~ > > > > [hidden email] <[hidden email]> 于2020年4月16日周四 > 下午3:04写道: > > > > > > > > > > > > -- > > > > Benchao Li > > School of Electronics Engineering and Computer Science, Peking University > > Tel:+86-15650713730 > > Email: [hidden email]; [hidden email] > > > |
Free forum by Nabble | Edit this page |