你第二次贴的DDL好像也有些问题,是不是`proctime AS PROCTIME(),`?
[hidden email] <[hidden email]> 于2020年5月18日周一 上午9:48写道: > Sorry, 之前建表语句copy错了,应该是这样: > CREATE TABLE x.log.yanfa_log ( > dt TIMESTAMP(3), > conn_id STRING, > sequence STRING, > trace_id STRING, > span_info STRING, > service_id STRING, > msg_id STRING, > servicename STRING, > ret_code STRING, > duration STRING, > req_body MAP<String,String>, > res_body MAP<STRING,STRING>, > extra_info MAP<STRING,STRING>, > proctime PROCTIME(), > WATERMARK FOR dt AS dt - INTERVAL '60' SECOND > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = '0.11', > 'connector.topic' = 'x-log-yanfa_log', > 'connector.properties.bootstrap.servers' = '******:9092', > 'connector.properties.zookeeper.connect' = '******:2181', > 'connector.startup-mode' = 'latest-offset', > 'update-mode' = 'append', > 'format.type' = 'json', > 'format.fail-on-missing-field' = 'true' > ); > 报同样的错误 > > -----邮件原件----- > 发件人: [hidden email] <[hidden email]> > 发送时间: 2020年5月18日 9:45 > 收件人: [hidden email] > 主题: flink sql使用维表关联时报Temporal table join currently only supports 'FOR > SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()' > > Hi, all: > 本人使用的flink 版本为1.10.0,planner为BlinkPlanner,用LEFT JOIN FOR > SYSTEM_TIME AS OF 语法关联维表: > select TUMBLE_END(l.dt, INTERVAL '30' SECOND) as index_time, > l.extra_info['cityCode'] as city_code, v.vehicle_level as vehicle_level, > CAST(COUNT(DISTINCT req_body['driverId']) as STRING) as index_value from > x.log.yanfa_log AS l LEFT JOIN x.saic_auth_user.t_driver FOR SYSTEM_TIME AS > OF l.proctime AS d ON l.req_body['driverId'] = d.uid LEFT JOIN > x.saic_cms_config.t_vehicle FOR SYSTEM_TIME AS OF l.proctime AS v ON > d.vin=v.vehicle_vin where l.ret_code = '0' and l.servicename = > 'MatchGtw.uploadLocationV4' and l.req_body['appId'] = 'saic_card' GROUP BY > TUMBLE(l.dt, INTERVAL '30' SECOND), l.extra_info['cityCode'], > v.vehicle_level; > 建表语句用了computed columns: > CREATE TABLE x.log.yanfa_log ( > dt TIMESTAMP(3), > conn_id STRING, > sequence STRING, > trace_id STRING, > span_info STRING, > service_id STRING, > msg_id STRING, > servicename STRING, > ret_code STRING, > duration STRING, > req_body MAP<String,String>, > res_body MAP<STRING,STRING>, > extra_info MAP<STRING,STRING>, > proctime TIMESTAMP(3), > WATERMARK FOR dt AS dt - INTERVAL '60' SECOND > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = '0.11', > 'connector.topic' = 'x-log-yanfa_log', > 'connector.properties.bootstrap.servers' = '******:9092', > 'connector.properties.zookeeper.connect' = '******:2181', > 'connector.startup-mode' = 'latest-offset', > 'update-mode' = 'append', > 'format.type' = 'json', > 'format.fail-on-missing-field' = 'true' > ); > 报如下异常: > Caused by: org.apache.flink.table.api.TableException: Temporal > table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's > proctime field, doesn't support 'PROCTIME()' > at > org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67) > at > org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnTableScanRule.matches(CommonLookupJoinRule.scala:118) > at > org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnTableScanRule.matches(CommonLookupJoinRule.scala:131) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) > at > org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) > at > org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) > at > org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) > at > org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) > at > org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) > at > org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529) > at > org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) > ... 20 more > > > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
HI,
遇到相同的情况 +1,Flink 1.10.1同样不好使。 Best, Xinghalo |
Hi,
试验了下,proctime不能在建表时创建。需要在select的时候基于PROCTIME()函数生成。 比如: Select …., PROCTIME() AS proctime from xxx; Select * from xxx t1 left join yyy for system_time as of t1.proctime as t2 on t1.id = t2.id; 这样才行。 Best, Xinghalo |
Hi,
经过尝试,select时候调用PROCTIME()函数生成proctime字段是可行的,谢谢。 ________________________________ 发件人: 111 <[hidden email]> 发送时间: 2020年5月18日 16:07 收件人: [hidden email] <[hidden email]> 主题: 回复: flink sql使用维表关联时报Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()' Hi, 试验了下,proctime不能在建表时创建。需要在select的时候基于PROCTIME()函数生成。 比如: Select …., PROCTIME() AS proctime from xxx; Select * from xxx t1 left join yyy for system_time as of t1.proctime as t2 on t1.id = t2.id; 这样才行。 Best, Xinghalo
junbaozhang
|
可以的吧,jark大佬的例子http://wuchong.me/blog/2020/02/25/demo-building-real-time-application-with-flink-sql/ <http://wuchong.me/blog/2020/02/25/demo-building-real-time-application-with-flink-sql/>
也是这么用的,我也试过sql client和table api里面都没问题 > 2020年5月18日 下午4:43,[hidden email] 写道: > > Hi, > 经过尝试,select时候调用PROCTIME()函数生成proctime字段是可行的,谢谢。 > ________________________________ > 发件人: 111 <[hidden email]> > 发送时间: 2020年5月18日 16:07 > 收件人: [hidden email] <[hidden email]> > 主题: 回复: flink sql使用维表关联时报Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()' > > Hi, > > > 试验了下,proctime不能在建表时创建。需要在select的时候基于PROCTIME()函数生成。 > > > 比如: > Select …., PROCTIME() AS proctime from xxx; > Select * from xxx t1 left join yyy for system_time as of t1.proctime as t2 on t1.id = t2.id; > 这样才行。 > > > Best, > Xinghalo |
我就是按照这里面的语法去做的,试验没有成功,能把你试验的create table和query sql语句贴出来吗?谢谢。
________________________________ 发件人: 祝尚 <[hidden email]> 发送时间: 2020年5月19日 0:02 收件人: [hidden email] <[hidden email]> 主题: Re: flink sql使用维表关联时报Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()' 可以的吧,jark大佬的例子http://wuchong.me/blog/2020/02/25/demo-building-real-time-application-with-flink-sql/ <http://wuchong.me/blog/2020/02/25/demo-building-real-time-application-with-flink-sql/> 也是这么用的,我也试过sql client和table api里面都没问题 > 2020年5月18日 下午4:43,[hidden email] 写道: > > Hi, > 经过尝试,select时候调用PROCTIME()函数生成proctime字段是可行的,谢谢。 > ________________________________ > 发件人: 111 <[hidden email]> > 发送时间: 2020年5月18日 16:07 > 收件人: [hidden email] <[hidden email]> > 主题: 回复: flink sql使用维表关联时报Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()' > > Hi, > > > 试验了下,proctime不能在建表时创建。需要在select的时候基于PROCTIME()函数生成。 > > > 比如: > Select …., PROCTIME() AS proctime from xxx; > Select * from xxx t1 left join yyy for system_time as of t1.proctime as t2 on t1.id = t2.id; > 这样才行。 > > > Best, > Xinghalo
junbaozhang
|
Hi,
1.10(1.10.1)版本都是支持建表时用计算列声明proctime列的,temporal table也是支持join 和 left join的,我这边之前1.10 release时验证过[1],可以参考 方便把你们完整的sql 贴出来吗? Best, Leonard Xu [1] https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/resources/job-sql-1.10/kafka2hbase/KafkaJoinHbaseJoinMysql2Hbase.sql <https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/resources/job-sql-1.10/kafka2hbase/KafkaJoinHbaseJoinMysql2Hbase.sql> > 在 2020年5月19日,09:23,[hidden email] 写道: > > 我就是按照这里面的语法去做的,试验没有成功,能把你试验的create table和query sql语句贴出来吗?谢谢。 > ________________________________ > 发件人: 祝尚 <[hidden email]> > 发送时间: 2020年5月19日 0:02 > 收件人: [hidden email] <[hidden email]> > 主题: Re: flink sql使用维表关联时报Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()' > > 可以的吧,jark大佬的例子http://wuchong.me/blog/2020/02/25/demo-building-real-time-application-with-flink-sql/ <http://wuchong.me/blog/2020/02/25/demo-building-real-time-application-with-flink-sql/> > 也是这么用的,我也试过sql client和table api里面都没问题 > >> 2020年5月18日 下午4:43,[hidden email] 写道: >> >> Hi, >> 经过尝试,select时候调用PROCTIME()函数生成proctime字段是可行的,谢谢。 >> ________________________________ >> 发件人: 111 <[hidden email]> >> 发送时间: 2020年5月18日 16:07 >> 收件人: [hidden email] <[hidden email]> >> 主题: 回复: flink sql使用维表关联时报Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()' >> >> Hi, >> >> >> 试验了下,proctime不能在建表时创建。需要在select的时候基于PROCTIME()函数生成。 >> >> >> 比如: >> Select …., PROCTIME() AS proctime from xxx; >> Select * from xxx t1 left join yyy for system_time as of t1.proctime as t2 on t1.id = t2.id; >> 这样才行。 >> >> >> Best, >> Xinghalo > |
Hi,
建表语句为: CREATE TABLE x.log.yanfa_log ( dt TIMESTAMP(3), conn_id STRING, sequence STRING, trace_id STRING, span_info STRING, service_id STRING, msg_id STRING, servicename STRING, ret_code STRING, duration STRING, req_body MAP<String,String>, res_body MAP<STRING,STRING>, extra_info MAP<STRING,STRING>, proctime AS PROCTIME(), WATERMARK FOR dt AS dt - INTERVAL '60' SECOND ) WITH ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'x-log-yanfa_log', 'connector.properties.bootstrap.servers' = '*****:9092', 'connector.properties.zookeeper.connect' = '*****:2181', 'connector.startup-mode' = 'latest-offset', 'update-mode' = 'append', 'format.type' = 'json', 'format.fail-on-missing-field' = 'true' ); join sql 可以查看历史记录里面有。 Best, Junbao Zhang ________________________________ 发件人: Leonard Xu <[hidden email]> 发送时间: 2020年5月20日 10:50 收件人: user-zh <[hidden email]> 主题: Re: flink sql使用维表关联时报Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()' Hi, 1.10(1.10.1)版本都是支持建表时用计算列声明proctime列的,temporal table也是支持join 和 left join的,我这边之前1.10 release时验证过[1],可以参考 方便把你们完整的sql 贴出来吗? Best, Leonard Xu [1] https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/resources/job-sql-1.10/kafka2hbase/KafkaJoinHbaseJoinMysql2Hbase.sql <https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/resources/job-sql-1.10/kafka2hbase/KafkaJoinHbaseJoinMysql2Hbase.sql> > 在 2020年5月19日,09:23,[hidden email] 写道: > > 我就是按照这里面的语法去做的,试验没有成功,能把你试验的create table和query sql语句贴出来吗?谢谢。 > ________________________________ > 发件人: 祝尚 <[hidden email]> > 发送时间: 2020年5月19日 0:02 > 收件人: [hidden email] <[hidden email]> > 主题: Re: flink sql使用维表关联时报Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()' > > 可以的吧,jark大佬的例子http://wuchong.me/blog/2020/02/25/demo-building-real-time-application-with-flink-sql/ <http://wuchong.me/blog/2020/02/25/demo-building-real-time-application-with-flink-sql/> > 也是这么用的,我也试过sql client和table api里面都没问题 > >> 2020年5月18日 下午4:43,[hidden email] 写道: >> >> Hi, >> 经过尝试,select时候调用PROCTIME()函数生成proctime字段是可行的,谢谢。 >> ________________________________ >> 发件人: 111 <[hidden email]> >> 发送时间: 2020年5月18日 16:07 >> 收件人: [hidden email] <[hidden email]> >> 主题: 回复: flink sql使用维表关联时报Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()' >> >> Hi, >> >> >> 试验了下,proctime不能在建表时创建。需要在select的时候基于PROCTIME()函数生成。 >> >> >> 比如: >> Select …., PROCTIME() AS proctime from xxx; >> Select * from xxx t1 left join yyy for system_time as of t1.proctime as t2 on t1.id = t2.id; >> 这样才行。 >> >> >> Best, >> Xinghalo >
junbaozhang
|
Hi,wind
用你的sql没有报类似的问题,请确认下版本是1.10.x吗? 另外不建议表名用 x.log.yanfa_log 包含 “.” 这个关键字符,这和表的全名:catalogName.databaseName.tableName 会冲突,应该在建表时会报catalog x 不存在的问题,没复现proctime field不支持的问题。 Best, Leonard > 在 2020年5月20日,11:01,[hidden email] 写道: > > Hi, > 建表语句为: > CREATE TABLE x.log.yanfa_log ( > dt TIMESTAMP(3), > conn_id STRING, > sequence STRING, > trace_id STRING, > span_info STRING, > service_id STRING, > msg_id STRING, > servicename STRING, > ret_code STRING, > duration STRING, > req_body MAP<String,String>, > res_body MAP<STRING,STRING>, > extra_info MAP<STRING,STRING>, > proctime AS PROCTIME(), > WATERMARK FOR dt AS dt - INTERVAL '60' SECOND > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = '0.11', > 'connector.topic' = 'x-log-yanfa_log', > 'connector.properties.bootstrap.servers' = '*****:9092', > 'connector.properties.zookeeper.connect' = '*****:2181', > 'connector.startup-mode' = 'latest-offset', > 'update-mode' = 'append', > 'format.type' = 'json', > 'format.fail-on-missing-field' = 'true' > ); > > join sql 可以查看历史记录里面有。 > > Best, > Junbao Zhang > ________________________________ > 发件人: Leonard Xu <[hidden email]> > 发送时间: 2020年5月20日 10:50 > 收件人: user-zh <[hidden email]> > 主题: Re: flink sql使用维表关联时报Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()' > > Hi, > > 1.10(1.10.1)版本都是支持建表时用计算列声明proctime列的,temporal table也是支持join 和 left join的,我这边之前1.10 release时验证过[1],可以参考 > 方便把你们完整的sql 贴出来吗? > > > Best, > Leonard Xu > [1] > https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/resources/job-sql-1.10/kafka2hbase/KafkaJoinHbaseJoinMysql2Hbase.sql <https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/resources/job-sql-1.10/kafka2hbase/KafkaJoinHbaseJoinMysql2Hbase.sql> > > >> 在 2020年5月19日,09:23,[hidden email] 写道: >> >> 我就是按照这里面的语法去做的,试验没有成功,能把你试验的create table和query sql语句贴出来吗?谢谢。 >> ________________________________ >> 发件人: 祝尚 <[hidden email]> >> 发送时间: 2020年5月19日 0:02 >> 收件人: [hidden email] <[hidden email]> >> 主题: Re: flink sql使用维表关联时报Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()' >> >> 可以的吧,jark大佬的例子http://wuchong.me/blog/2020/02/25/demo-building-real-time-application-with-flink-sql/ <http://wuchong.me/blog/2020/02/25/demo-building-real-time-application-with-flink-sql/> >> 也是这么用的,我也试过sql client和table api里面都没问题 >> >>> 2020年5月18日 下午4:43,[hidden email] 写道: >>> >>> Hi, >>> 经过尝试,select时候调用PROCTIME()函数生成proctime字段是可行的,谢谢。 >>> ________________________________ >>> 发件人: 111 <[hidden email]> >>> 发送时间: 2020年5月18日 16:07 >>> 收件人: [hidden email] <[hidden email]> >>> 主题: 回复: flink sql使用维表关联时报Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()' >>> >>> Hi, >>> >>> >>> 试验了下,proctime不能在建表时创建。需要在select的时候基于PROCTIME()函数生成。 >>> >>> >>> 比如: >>> Select …., PROCTIME() AS proctime from xxx; >>> Select * from xxx t1 left join yyy for system_time as of t1.proctime as t2 on t1.id = t2.id; >>> 这样才行。 >>> >>> >>> Best, >>> Xinghalo >> > |
Hi,
版本用的是1.10.0,x.log.yanfa_log是正常的表格式,本人demo中用的是hive catalog: Catalog myCatalog = new HiveCatalog("x", "default", "D:\\conf", "1.1.0"); tEnv.registerCatalog("x", myCatalog); Best, Junbao Zhang ________________________________ 发件人: Leonard Xu <[hidden email]> 发送时间: 2020年5月20日 11:51 收件人: user-zh <[hidden email]> 主题: Re: flink sql使用维表关联时报Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()' Hi,wind 用你的sql没有报类似的问题,请确认下版本是1.10.x吗? 另外不建议表名用 x.log.yanfa_log 包含 “.” 这个关键字符,这和表的全名:catalogName.databaseName.tableName 会冲突,应该在建表时会报catalog x 不存在的问题,没复现proctime field不支持的问题。 Best, Leonard > 在 2020年5月20日,11:01,[hidden email] 写道: > > Hi, > 建表语句为: > CREATE TABLE x.log.yanfa_log ( > dt TIMESTAMP(3), > conn_id STRING, > sequence STRING, > trace_id STRING, > span_info STRING, > service_id STRING, > msg_id STRING, > servicename STRING, > ret_code STRING, > duration STRING, > req_body MAP<String,String>, > res_body MAP<STRING,STRING>, > extra_info MAP<STRING,STRING>, > proctime AS PROCTIME(), > WATERMARK FOR dt AS dt - INTERVAL '60' SECOND > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = '0.11', > 'connector.topic' = 'x-log-yanfa_log', > 'connector.properties.bootstrap.servers' = '*****:9092', > 'connector.properties.zookeeper.connect' = '*****:2181', > 'connector.startup-mode' = 'latest-offset', > 'update-mode' = 'append', > 'format.type' = 'json', > 'format.fail-on-missing-field' = 'true' > ); > > join sql 可以查看历史记录里面有。 > > Best, > Junbao Zhang > ________________________________ > 发件人: Leonard Xu <[hidden email]> > 发送时间: 2020年5月20日 10:50 > 收件人: user-zh <[hidden email]> > 主题: Re: flink sql使用维表关联时报Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()' > > Hi, > > 1.10(1.10.1)版本都是支持建表时用计算列声明proctime列的,temporal table也是支持join 和 left join的,我这边之前1.10 release时验证过[1],可以参考 > 方便把你们完整的sql 贴出来吗? > > > Best, > Leonard Xu > [1] > https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/resources/job-sql-1.10/kafka2hbase/KafkaJoinHbaseJoinMysql2Hbase.sql <https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/resources/job-sql-1.10/kafka2hbase/KafkaJoinHbaseJoinMysql2Hbase.sql> > > >> 在 2020年5月19日,09:23,[hidden email] 写道: >> >> 我就是按照这里面的语法去做的,试验没有成功,能把你试验的create table和query sql语句贴出来吗?谢谢。 >> ________________________________ >> 发件人: 祝尚 <[hidden email]> >> 发送时间: 2020年5月19日 0:02 >> 收件人: [hidden email] <[hidden email]> >> 主题: Re: flink sql使用维表关联时报Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()' >> >> 可以的吧,jark大佬的例子http://wuchong.me/blog/2020/02/25/demo-building-real-time-application-with-flink-sql/ <http://wuchong.me/blog/2020/02/25/demo-building-real-time-application-with-flink-sql/> >> 也是这么用的,我也试过sql client和table api里面都没问题 >> >>> 2020年5月18日 下午4:43,[hidden email] 写道: >>> >>> Hi, >>> 经过尝试,select时候调用PROCTIME()函数生成proctime字段是可行的,谢谢。 >>> ________________________________ >>> 发件人: 111 <[hidden email]> >>> 发送时间: 2020年5月18日 16:07 >>> 收件人: [hidden email] <[hidden email]> >>> 主题: 回复: flink sql使用维表关联时报Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()' >>> >>> Hi, >>> >>> >>> 试验了下,proctime不能在建表时创建。需要在select的时候基于PROCTIME()函数生成。 >>> >>> >>> 比如: >>> Select …., PROCTIME() AS proctime from xxx; >>> Select * from xxx t1 left join yyy for system_time as of t1.proctime as t2 on t1.id = t2.id; >>> 这样才行。 >>> >>> >>> Best, >>> Xinghalo >> >
junbaozhang
|
Hi Junbao, Xinghalo,
抱歉,现在HiveCatalog保存proctime字段是有bug的,[1]。所以就像你说的,proctime不能在建表时创建。需要在select的时候基于PROCTIME()函数生成,这样来绕过。 正在修复中,你也可以打上patch来试试,或者等下1.11.0或1.10.2的发布。 [1]https://issues.apache.org/jira/browse/FLINK-17189 Best, Jingsong Lee On Wed, May 20, 2020 at 1:58 PM [hidden email] < [hidden email]> wrote: > Hi, > 版本用的是1.10.0,x.log.yanfa_log是正常的表格式,本人demo中用的是hive catalog: > Catalog myCatalog = new HiveCatalog("x", "default", > > "D:\\conf", "1.1.0"); > > tEnv.registerCatalog("x", myCatalog); > > Best, > Junbao Zhang > > ________________________________ > 发件人: Leonard Xu <[hidden email]> > 发送时间: 2020年5月20日 11:51 > 收件人: user-zh <[hidden email]> > 主题: Re: flink sql使用维表关联时报Temporal table join currently only supports 'FOR > SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()' > > Hi,wind > > 用你的sql没有报类似的问题,请确认下版本是1.10.x吗? > 另外不建议表名用 x.log.yanfa_log 包含 “.” > 这个关键字符,这和表的全名:catalogName.databaseName.tableName 会冲突,应该在建表时会报catalog x > 不存在的问题,没复现proctime field不支持的问题。 > > Best, > Leonard > > > 在 2020年5月20日,11:01,[hidden email] 写道: > > > > Hi, > > 建表语句为: > > CREATE TABLE x.log.yanfa_log ( > > dt TIMESTAMP(3), > > conn_id STRING, > > sequence STRING, > > trace_id STRING, > > span_info STRING, > > service_id STRING, > > msg_id STRING, > > servicename STRING, > > ret_code STRING, > > duration STRING, > > req_body MAP<String,String>, > > res_body MAP<STRING,STRING>, > > extra_info MAP<STRING,STRING>, > > proctime AS PROCTIME(), > > WATERMARK FOR dt AS dt - INTERVAL '60' SECOND > > ) WITH ( > > 'connector.type' = 'kafka', > > 'connector.version' = '0.11', > > 'connector.topic' = 'x-log-yanfa_log', > > 'connector.properties.bootstrap.servers' = '*****:9092', > > 'connector.properties.zookeeper.connect' = '*****:2181', > > 'connector.startup-mode' = 'latest-offset', > > 'update-mode' = 'append', > > 'format.type' = 'json', > > 'format.fail-on-missing-field' = 'true' > > ); > > > > join sql 可以查看历史记录里面有。 > > > > Best, > > Junbao Zhang > > ________________________________ > > 发件人: Leonard Xu <[hidden email]> > > 发送时间: 2020年5月20日 10:50 > > 收件人: user-zh <[hidden email]> > > 主题: Re: flink sql使用维表关联时报Temporal table join currently only supports > 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support > 'PROCTIME()' > > > > Hi, > > > > 1.10(1.10.1)版本都是支持建表时用计算列声明proctime列的,temporal table也是支持join 和 left > join的,我这边之前1.10 release时验证过[1],可以参考 > > 方便把你们完整的sql 贴出来吗? > > > > > > Best, > > Leonard Xu > > [1] > > > https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/resources/job-sql-1.10/kafka2hbase/KafkaJoinHbaseJoinMysql2Hbase.sql > < > https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/resources/job-sql-1.10/kafka2hbase/KafkaJoinHbaseJoinMysql2Hbase.sql > > > > > > > >> 在 2020年5月19日,09:23,[hidden email] 写道: > >> > >> 我就是按照这里面的语法去做的,试验没有成功,能把你试验的create table和query sql语句贴出来吗?谢谢。 > >> ________________________________ > >> 发件人: 祝尚 <[hidden email]> > >> 发送时间: 2020年5月19日 0:02 > >> 收件人: [hidden email] <[hidden email]> > >> 主题: Re: flink sql使用维表关联时报Temporal table join currently only supports > 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support > 'PROCTIME()' > >> > >> 可以的吧,jark大佬的例子 > http://wuchong.me/blog/2020/02/25/demo-building-real-time-application-with-flink-sql/ > < > http://wuchong.me/blog/2020/02/25/demo-building-real-time-application-with-flink-sql/ > > > >> 也是这么用的,我也试过sql client和table api里面都没问题 > >> > >>> 2020年5月18日 下午4:43,[hidden email] 写道: > >>> > >>> Hi, > >>> 经过尝试,select时候调用PROCTIME()函数生成proctime字段是可行的,谢谢。 > >>> ________________________________ > >>> 发件人: 111 <[hidden email]> > >>> 发送时间: 2020年5月18日 16:07 > >>> 收件人: [hidden email] <[hidden email]> > >>> 主题: 回复: flink sql使用维表关联时报Temporal table join currently only supports > 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support > 'PROCTIME()' > >>> > >>> Hi, > >>> > >>> > >>> 试验了下,proctime不能在建表时创建。需要在select的时候基于PROCTIME()函数生成。 > >>> > >>> > >>> 比如: > >>> Select …., PROCTIME() AS proctime from xxx; > >>> Select * from xxx t1 left join yyy for system_time as of t1.proctime > as t2 on t1.id = t2.id; > >>> 这样才行。 > >>> > >>> > >>> Best, > >>> Xinghalo > >> > > > > -- Best, Jingsong Lee |
In reply to this post by junbaozhang
Hi,
如劲松所说,这是 hiveCatalog的一个bug,正常是支持的,1.11中会修复 Best, Leonard > 在 2020年5月20日,13:57,[hidden email] 写道: > > Hi, > 版本用的是1.10.0,x.log.yanfa_log是正常的表格式,本人demo中用的是hive catalog: > Catalog myCatalog = new HiveCatalog("x", "default", > > "D:\\conf", "1.1.0"); > > tEnv.registerCatalog("x", myCatalog); > > Best, > Junbao Zhang > > ________________________________ > 发件人: Leonard Xu <[hidden email]> > 发送时间: 2020年5月20日 11:51 > 收件人: user-zh <[hidden email]> > 主题: Re: flink sql使用维表关联时报Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()' > > Hi,wind > > 用你的sql没有报类似的问题,请确认下版本是1.10.x吗? > 另外不建议表名用 x.log.yanfa_log 包含 “.” 这个关键字符,这和表的全名:catalogName.databaseName.tableName 会冲突,应该在建表时会报catalog x 不存在的问题,没复现proctime field不支持的问题。 > > Best, > Leonard > >> 在 2020年5月20日,11:01,[hidden email] 写道: >> >> Hi, >> 建表语句为: >> CREATE TABLE x.log.yanfa_log ( >> dt TIMESTAMP(3), >> conn_id STRING, >> sequence STRING, >> trace_id STRING, >> span_info STRING, >> service_id STRING, >> msg_id STRING, >> servicename STRING, >> ret_code STRING, >> duration STRING, >> req_body MAP<String,String>, >> res_body MAP<STRING,STRING>, >> extra_info MAP<STRING,STRING>, >> proctime AS PROCTIME(), >> WATERMARK FOR dt AS dt - INTERVAL '60' SECOND >> ) WITH ( >> 'connector.type' = 'kafka', >> 'connector.version' = '0.11', >> 'connector.topic' = 'x-log-yanfa_log', >> 'connector.properties.bootstrap.servers' = '*****:9092', >> 'connector.properties.zookeeper.connect' = '*****:2181', >> 'connector.startup-mode' = 'latest-offset', >> 'update-mode' = 'append', >> 'format.type' = 'json', >> 'format.fail-on-missing-field' = 'true' >> ); >> >> join sql 可以查看历史记录里面有。 >> >> Best, >> Junbao Zhang >> ________________________________ >> 发件人: Leonard Xu <[hidden email]> >> 发送时间: 2020年5月20日 10:50 >> 收件人: user-zh <[hidden email]> >> 主题: Re: flink sql使用维表关联时报Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()' >> >> Hi, >> >> 1.10(1.10.1)版本都是支持建表时用计算列声明proctime列的,temporal table也是支持join 和 left join的,我这边之前1.10 release时验证过[1],可以参考 >> 方便把你们完整的sql 贴出来吗? >> >> >> Best, >> Leonard Xu >> [1] >> https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/resources/job-sql-1.10/kafka2hbase/KafkaJoinHbaseJoinMysql2Hbase.sql <https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/resources/job-sql-1.10/kafka2hbase/KafkaJoinHbaseJoinMysql2Hbase.sql> >> >> >>> 在 2020年5月19日,09:23,[hidden email] 写道: >>> >>> 我就是按照这里面的语法去做的,试验没有成功,能把你试验的create table和query sql语句贴出来吗?谢谢。 >>> ________________________________ >>> 发件人: 祝尚 <[hidden email]> >>> 发送时间: 2020年5月19日 0:02 >>> 收件人: [hidden email] <[hidden email]> >>> 主题: Re: flink sql使用维表关联时报Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()' >>> >>> 可以的吧,jark大佬的例子http://wuchong.me/blog/2020/02/25/demo-building-real-time-application-with-flink-sql/ <http://wuchong.me/blog/2020/02/25/demo-building-real-time-application-with-flink-sql/> >>> 也是这么用的,我也试过sql client和table api里面都没问题 >>> >>>> 2020年5月18日 下午4:43,[hidden email] 写道: >>>> >>>> Hi, >>>> 经过尝试,select时候调用PROCTIME()函数生成proctime字段是可行的,谢谢。 >>>> ________________________________ >>>> 发件人: 111 <[hidden email]> >>>> 发送时间: 2020年5月18日 16:07 >>>> 收件人: [hidden email] <[hidden email]> >>>> 主题: 回复: flink sql使用维表关联时报Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()' >>>> >>>> Hi, >>>> >>>> >>>> 试验了下,proctime不能在建表时创建。需要在select的时候基于PROCTIME()函数生成。 >>>> >>>> >>>> 比如: >>>> Select …., PROCTIME() AS proctime from xxx; >>>> Select * from xxx t1 left join yyy for system_time as of t1.proctime as t2 on t1.id = t2.id; >>>> 这样才行。 >>>> >>>> >>>> Best, >>>> Xinghalo >>> >> > |
Free forum by Nabble | Edit this page |