error:Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered.
如果在sink_table 添加watermark那么就不会报错,我在source定义了时间,sink接收了时间,为什么group by时候会出现那个error? CREATE TABLE source_table ( sip VARCHAR, proctime as proctime() ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.startup-mode' = 'latest-offset', 'connector.topic' = 'skyeye-tcpflow', 'connector.properties.group.id' = 'testGroup', 'connector.properties.zookeeper.connect' = 'x.x.x.x:2181', 'connector.properties.bootstrap.servers' = 'x.x.x.x:9092', 'update-mode' = 'append', 'format.type' = 'json', 'format.derive-schema' = 'true' ); CREATE TABLE sink_table ( ip VARCHAR, proctime timestamp(3) ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.startup-mode' = 'latest-offset', 'connector.topic' = 'ip_agg', 'connector.properties.zookeeper.connect' = 'x.x.x.x:2181', 'connector.properties.bootstrap.servers' = 'x.x.x.x:9092', 'update-mode' = 'append', 'format.type' = 'json', 'format.derive-schema' = 'true' ); insert into sink_kafka select sip,proctime from source_kafka; select TUMBLE_START(proctime, INTERVAL '10' MINUTE),count(1) from sink_table group by TUMBLE(proctime, INTERVAL '10' MINUTE); |
看你提供的SQL来讲,你是直接在sink_table上做了一个窗口计算,而sink_table并没有定义时间属性。
(是不是笔误,应该是在source_table上做窗口计算?) 了不起的盖茨比 <[hidden email]> 于2020年5月21日周四 下午9:08写道: > error:Window aggregate can only be defined over a time attribute column, > but TIMESTAMP(3) encountered. > 如果在sink_table 添加watermark那么就不会报错,我在source定义了时间,sink接收了时间,为什么group > by时候会出现那个error? > CREATE TABLE source_table ( > sip VARCHAR, > proctime as proctime() > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.startup-mode' = 'latest-offset', > 'connector.topic' = 'skyeye-tcpflow', > 'connector.properties.group.id' = 'testGroup', > 'connector.properties.zookeeper.connect' = 'x.x.x.x:2181', > 'connector.properties.bootstrap.servers' = 'x.x.x.x:9092', > 'update-mode' = 'append', > 'format.type' = 'json', > 'format.derive-schema' = 'true' > ); > > > CREATE TABLE sink_table ( > ip VARCHAR, > proctime timestamp(3) > > > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.startup-mode' = 'latest-offset', > 'connector.topic' = 'ip_agg', > 'connector.properties.zookeeper.connect' = 'x.x.x.x:2181', > 'connector.properties.bootstrap.servers' = 'x.x.x.x:9092', > 'update-mode' = 'append', > 'format.type' = 'json', > 'format.derive-schema' = 'true' > ); > > insert into sink_kafka select sip,proctime from source_kafka; > > > > select TUMBLE_START(proctime, INTERVAL '10' MINUTE),count(1) from > sink_table group by TUMBLE(proctime, INTERVAL '10' MINUTE); -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
Hi,
- proctime是虚拟的一个列。 - rowtime是有真实数据的列。 看起来你需要在sink_table里定义rowtime,比如像这样: CREATE TABLE sink_table ( ip VARCHAR, proctime timestamp(3), WATERMARK FOR proctime AS proctime .... ) Best, Jingsong Lee On Thu, May 21, 2020 at 9:17 PM Benchao Li <[hidden email]> wrote: > 看你提供的SQL来讲,你是直接在sink_table上做了一个窗口计算,而sink_table并没有定义时间属性。 > (是不是笔误,应该是在source_table上做窗口计算?) > > 了不起的盖茨比 <[hidden email]> 于2020年5月21日周四 下午9:08写道: > > > error:Window aggregate can only be defined over a time attribute column, > > but TIMESTAMP(3) encountered. > > 如果在sink_table 添加watermark那么就不会报错,我在source定义了时间,sink接收了时间,为什么group > > by时候会出现那个error? > > CREATE TABLE source_table ( > > sip VARCHAR, > > proctime as proctime() > > ) WITH ( > > 'connector.type' = 'kafka', > > 'connector.version' = 'universal', > > 'connector.startup-mode' = 'latest-offset', > > 'connector.topic' = 'skyeye-tcpflow', > > 'connector.properties.group.id' = 'testGroup', > > 'connector.properties.zookeeper.connect' = 'x.x.x.x:2181', > > 'connector.properties.bootstrap.servers' = 'x.x.x.x:9092', > > 'update-mode' = 'append', > > 'format.type' = 'json', > > 'format.derive-schema' = 'true' > > ); > > > > > > CREATE TABLE sink_table ( > > ip VARCHAR, > > proctime timestamp(3) > > > > > > ) WITH ( > > 'connector.type' = 'kafka', > > 'connector.version' = 'universal', > > 'connector.startup-mode' = 'latest-offset', > > 'connector.topic' = 'ip_agg', > > 'connector.properties.zookeeper.connect' = 'x.x.x.x:2181', > > 'connector.properties.bootstrap.servers' = 'x.x.x.x:9092', > > 'update-mode' = 'append', > > 'format.type' = 'json', > > 'format.derive-schema' = 'true' > > ); > > > > insert into sink_kafka select sip,proctime from source_kafka; > > > > > > > > select TUMBLE_START(proctime, INTERVAL '10' MINUTE),count(1) from > > sink_table group by TUMBLE(proctime, INTERVAL '10' MINUTE); > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: [hidden email]; [hidden email] > -- Best, Jingsong Lee |
In reply to this post by kcz
我一开始想的是source表采用proctime as proctime() 这样有了一个列,然后这个时间赋值给sink表的一个timestamp(3)列,group时候直接就可以用了。
------------------ Original ------------------ From: Benchao Li <[hidden email]> Date: Thu,May 21,2020 9:17 PM To: user-zh <[hidden email]> Subject: Re: flink proctime error 看你提供的SQL来讲,你是直接在sink_table上做了一个窗口计算,而sink_table并没有定义时间属性。 (是不是笔误,应该是在source_table上做窗口计算?) 了不起的盖茨比 <[hidden email]> 于2020年5月21日周四 下午9:08写道: > error:Window aggregate can only be defined over a time attribute column, > but TIMESTAMP(3) encountered. > 如果在sink_table 添加watermark那么就不会报错,我在source定义了时间,sink接收了时间,为什么group > by时候会出现那个error? > CREATE TABLE source_table ( > sip VARCHAR, > proctime as proctime() > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.startup-mode' = 'latest-offset', > 'connector.topic' = 'skyeye-tcpflow', > 'connector.properties.group.id' = 'testGroup', > 'connector.properties.zookeeper.connect' = 'x.x.x.x:2181', > 'connector.properties.bootstrap.servers' = 'x.x.x.x:9092', > 'update-mode' = 'append', > 'format.type' = 'json', > 'format.derive-schema' = 'true' > ); > > > CREATE TABLE sink_table ( > ip VARCHAR, > proctime timestamp(3) > > > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.startup-mode' = 'latest-offset', > 'connector.topic' = 'ip_agg', > 'connector.properties.zookeeper.connect' = 'x.x.x.x:2181', > 'connector.properties.bootstrap.servers' = 'x.x.x.x:9092', > 'update-mode' = 'append', > 'format.type' = 'json', > 'format.derive-schema' = 'true' > ); > > insert into sink_kafka select sip,proctime from source_kafka; > > > > select TUMBLE_START(proctime, INTERVAL '10' MINUTE),count(1) from > sink_table group by TUMBLE(proctime, INTERVAL '10' MINUTE); -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
In reply to this post by kcz
意思是虚拟出来的列,如果后面计算要用,需要watermark一下,嗯嗯,这个情况测试了,是可以用的。
------------------ Original ------------------ From: Jingsong Li <[hidden email]> Date: Thu,May 21,2020 9:22 PM To: user-zh <[hidden email]> Subject: Re: flink proctime error Hi, - proctime是虚拟的一个列。 - rowtime是有真实数据的列。 看起来你需要在sink_table里定义rowtime,比如像这样: CREATE TABLE sink_table ( ip VARCHAR, proctime timestamp(3), WATERMARK FOR proctime AS proctime .... ) Best, Jingsong Lee On Thu, May 21, 2020 at 9:17 PM Benchao Li <[hidden email]> wrote: > 看你提供的SQL来讲,你是直接在sink_table上做了一个窗口计算,而sink_table并没有定义时间属性。 > (是不是笔误,应该是在source_table上做窗口计算?) > > 了不起的盖茨比 <[hidden email]> 于2020年5月21日周四 下午9:08写道: > > > error:Window aggregate can only be defined over a time attribute column, > > but TIMESTAMP(3) encountered. > > 如果在sink_table 添加watermark那么就不会报错,我在source定义了时间,sink接收了时间,为什么group > > by时候会出现那个error? > > CREATE TABLE source_table ( > > sip VARCHAR, > > proctime as proctime() > > ) WITH ( > > 'connector.type' = 'kafka', > > 'connector.version' = 'universal', > > 'connector.startup-mode' = 'latest-offset', > > 'connector.topic' = 'skyeye-tcpflow', > > 'connector.properties.group.id' = 'testGroup', > > 'connector.properties.zookeeper.connect' = 'x.x.x.x:2181', > > 'connector.properties.bootstrap.servers' = 'x.x.x.x:9092', > > 'update-mode' = 'append', > > 'format.type' = 'json', > > 'format.derive-schema' = 'true' > > ); > > > > > > CREATE TABLE sink_table ( > > ip VARCHAR, > > proctime timestamp(3) > > > > > > ) WITH ( > > 'connector.type' = 'kafka', > > 'connector.version' = 'universal', > > 'connector.startup-mode' = 'latest-offset', > > 'connector.topic' = 'ip_agg', > > 'connector.properties.zookeeper.connect' = 'x.x.x.x:2181', > > 'connector.properties.bootstrap.servers' = 'x.x.x.x:9092', > > 'update-mode' = 'append', > > 'format.type' = 'json', > > 'format.derive-schema' = 'true' > > ); > > > > insert into sink_kafka select sip,proctime from source_kafka; > > > > > > > > select TUMBLE_START(proctime, INTERVAL '10' MINUTE),count(1) from > > sink_table group by TUMBLE(proctime, INTERVAL '10' MINUTE); > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: [hidden email]; [hidden email] > -- Best, Jingsong Lee |
In reply to this post by kcz
谢谢各位大佬,我再去官网学学。
------------------ Original ------------------ From: Jingsong Li <[hidden email]> Date: Thu,May 21,2020 9:22 PM To: user-zh <[hidden email]> Subject: Re: flink proctime error Hi, - proctime是虚拟的一个列。 - rowtime是有真实数据的列。 看起来你需要在sink_table里定义rowtime,比如像这样: CREATE TABLE sink_table ( ip VARCHAR, proctime timestamp(3), WATERMARK FOR proctime AS proctime .... ) Best, Jingsong Lee On Thu, May 21, 2020 at 9:17 PM Benchao Li <[hidden email]> wrote: > 看你提供的SQL来讲,你是直接在sink_table上做了一个窗口计算,而sink_table并没有定义时间属性。 > (是不是笔误,应该是在source_table上做窗口计算?) > > 了不起的盖茨比 <[hidden email]> 于2020年5月21日周四 下午9:08写道: > > > error:Window aggregate can only be defined over a time attribute column, > > but TIMESTAMP(3) encountered. > > 如果在sink_table 添加watermark那么就不会报错,我在source定义了时间,sink接收了时间,为什么group > > by时候会出现那个error? > > CREATE TABLE source_table ( > > sip VARCHAR, > > proctime as proctime() > > ) WITH ( > > 'connector.type' = 'kafka', > > 'connector.version' = 'universal', > > 'connector.startup-mode' = 'latest-offset', > > 'connector.topic' = 'skyeye-tcpflow', > > 'connector.properties.group.id' = 'testGroup', > > 'connector.properties.zookeeper.connect' = 'x.x.x.x:2181', > > 'connector.properties.bootstrap.servers' = 'x.x.x.x:9092', > > 'update-mode' = 'append', > > 'format.type' = 'json', > > 'format.derive-schema' = 'true' > > ); > > > > > > CREATE TABLE sink_table ( > > ip VARCHAR, > > proctime timestamp(3) > > > > > > ) WITH ( > > 'connector.type' = 'kafka', > > 'connector.version' = 'universal', > > 'connector.startup-mode' = 'latest-offset', > > 'connector.topic' = 'ip_agg', > > 'connector.properties.zookeeper.connect' = 'x.x.x.x:2181', > > 'connector.properties.bootstrap.servers' = 'x.x.x.x:9092', > > 'update-mode' = 'append', > > 'format.type' = 'json', > > 'format.derive-schema' = 'true' > > ); > > > > insert into sink_kafka select sip,proctime from source_kafka; > > > > > > > > select TUMBLE_START(proctime, INTERVAL '10' MINUTE),count(1) from > > sink_table group by TUMBLE(proctime, INTERVAL '10' MINUTE); > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: [hidden email]; [hidden email] > -- Best, Jingsong Lee |
Free forum by Nabble | Edit this page |