flink proctime error

classic Classic list List threaded Threaded
6 messages Options
kcz
Reply | Threaded
Open this post in threaded view
|

flink proctime error

kcz
error:Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered.
如果在sink_table 添加watermark那么就不会报错,我在source定义了时间,sink接收了时间,为什么group by时候会出现那个error?
CREATE TABLE source_table (
        sip VARCHAR,
        proctime as proctime()
) WITH (
        'connector.type' = 'kafka',
        'connector.version' = 'universal',
        'connector.startup-mode' = 'latest-offset',
        'connector.topic' = 'skyeye-tcpflow',
        'connector.properties.group.id' = 'testGroup',
        'connector.properties.zookeeper.connect' = 'x.x.x.x:2181',
        'connector.properties.bootstrap.servers' = 'x.x.x.x:9092',
        'update-mode' = 'append',
        'format.type' = 'json',
        'format.derive-schema' = 'true'
);


CREATE TABLE sink_table (
        ip VARCHAR,
        proctime timestamp(3)


) WITH (
        'connector.type' = 'kafka',
        'connector.version' = 'universal',
        'connector.startup-mode' = 'latest-offset',
        'connector.topic' = 'ip_agg',
        'connector.properties.zookeeper.connect' = 'x.x.x.x:2181',
        'connector.properties.bootstrap.servers' = 'x.x.x.x:9092',
        'update-mode' = 'append',
        'format.type' = 'json',
        'format.derive-schema' = 'true'
);

insert into sink_kafka select sip,proctime from source_kafka;



select TUMBLE_START(proctime, INTERVAL '10' MINUTE),count(1) from sink_table group by TUMBLE(proctime, INTERVAL '10' MINUTE);
Reply | Threaded
Open this post in threaded view
|

Re: flink proctime error

Benchao Li
看你提供的SQL来讲,你是直接在sink_table上做了一个窗口计算,而sink_table并没有定义时间属性。
(是不是笔误,应该是在source_table上做窗口计算?)

了不起的盖茨比 <[hidden email]> 于2020年5月21日周四 下午9:08写道:

> error:Window aggregate can only be defined over a time attribute column,
> but TIMESTAMP(3) encountered.
> 如果在sink_table 添加watermark那么就不会报错,我在source定义了时间,sink接收了时间,为什么group
> by时候会出现那个error?
> CREATE TABLE source_table (
>         sip VARCHAR,
>         proctime as proctime()
> ) WITH (
>         'connector.type' = 'kafka',
>         'connector.version' = 'universal',
>         'connector.startup-mode' = 'latest-offset',
>         'connector.topic' = 'skyeye-tcpflow',
>         'connector.properties.group.id' = 'testGroup',
>         'connector.properties.zookeeper.connect' = 'x.x.x.x:2181',
>         'connector.properties.bootstrap.servers' = 'x.x.x.x:9092',
>         'update-mode' = 'append',
>         'format.type' = 'json',
>         'format.derive-schema' = 'true'
> );
>
>
> CREATE TABLE sink_table (
>         ip VARCHAR,
>         proctime timestamp(3)
>
>
> ) WITH (
>         'connector.type' = 'kafka',
>         'connector.version' = 'universal',
>         'connector.startup-mode' = 'latest-offset',
>         'connector.topic' = 'ip_agg',
>         'connector.properties.zookeeper.connect' = 'x.x.x.x:2181',
>         'connector.properties.bootstrap.servers' = 'x.x.x.x:9092',
>         'update-mode' = 'append',
>         'format.type' = 'json',
>         'format.derive-schema' = 'true'
> );
>
> insert into sink_kafka select sip,proctime from source_kafka;
>
>
>
> select TUMBLE_START(proctime, INTERVAL '10' MINUTE),count(1) from
> sink_table group by TUMBLE(proctime, INTERVAL '10' MINUTE);



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: flink proctime error

Jingsong Li
Hi,

- proctime是虚拟的一个列。
- rowtime是有真实数据的列。

看起来你需要在sink_table里定义rowtime,比如像这样:
CREATE TABLE sink_table (
        ip VARCHAR,
        proctime timestamp(3),
        WATERMARK FOR proctime AS proctime
        ....
)

Best,
Jingsong Lee

On Thu, May 21, 2020 at 9:17 PM Benchao Li <[hidden email]> wrote:

> 看你提供的SQL来讲,你是直接在sink_table上做了一个窗口计算,而sink_table并没有定义时间属性。
> (是不是笔误,应该是在source_table上做窗口计算?)
>
> 了不起的盖茨比 <[hidden email]> 于2020年5月21日周四 下午9:08写道:
>
> > error:Window aggregate can only be defined over a time attribute column,
> > but TIMESTAMP(3) encountered.
> > 如果在sink_table 添加watermark那么就不会报错,我在source定义了时间,sink接收了时间,为什么group
> > by时候会出现那个error?
> > CREATE TABLE source_table (
> >         sip VARCHAR,
> >         proctime as proctime()
> > ) WITH (
> >         'connector.type' = 'kafka',
> >         'connector.version' = 'universal',
> >         'connector.startup-mode' = 'latest-offset',
> >         'connector.topic' = 'skyeye-tcpflow',
> >         'connector.properties.group.id' = 'testGroup',
> >         'connector.properties.zookeeper.connect' = 'x.x.x.x:2181',
> >         'connector.properties.bootstrap.servers' = 'x.x.x.x:9092',
> >         'update-mode' = 'append',
> >         'format.type' = 'json',
> >         'format.derive-schema' = 'true'
> > );
> >
> >
> > CREATE TABLE sink_table (
> >         ip VARCHAR,
> >         proctime timestamp(3)
> >
> >
> > ) WITH (
> >         'connector.type' = 'kafka',
> >         'connector.version' = 'universal',
> >         'connector.startup-mode' = 'latest-offset',
> >         'connector.topic' = 'ip_agg',
> >         'connector.properties.zookeeper.connect' = 'x.x.x.x:2181',
> >         'connector.properties.bootstrap.servers' = 'x.x.x.x:9092',
> >         'update-mode' = 'append',
> >         'format.type' = 'json',
> >         'format.derive-schema' = 'true'
> > );
> >
> > insert into sink_kafka select sip,proctime from source_kafka;
> >
> >
> >
> > select TUMBLE_START(proctime, INTERVAL '10' MINUTE),count(1) from
> > sink_table group by TUMBLE(proctime, INTERVAL '10' MINUTE);
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: [hidden email]; [hidden email]
>


--
Best, Jingsong Lee
kcz
Reply | Threaded
Open this post in threaded view
|

Re: flink proctime error

kcz
In reply to this post by kcz
我一开始想的是source表采用proctime as proctime() 这样有了一个列,然后这个时间赋值给sink表的一个timestamp(3)列,group时候直接就可以用了。





------------------ Original ------------------
From: Benchao Li <[hidden email]&gt;
Date: Thu,May 21,2020 9:17 PM
To: user-zh <[hidden email]&gt;
Subject: Re: flink proctime error



看你提供的SQL来讲,你是直接在sink_table上做了一个窗口计算,而sink_table并没有定义时间属性。
(是不是笔误,应该是在source_table上做窗口计算?)

了不起的盖茨比 <[hidden email]&gt; 于2020年5月21日周四 下午9:08写道:

&gt; error:Window aggregate can only be defined over a time attribute column,
&gt; but TIMESTAMP(3) encountered.
&gt; 如果在sink_table 添加watermark那么就不会报错,我在source定义了时间,sink接收了时间,为什么group
&gt; by时候会出现那个error?
&gt; CREATE TABLE source_table (
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; sip VARCHAR,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; proctime as proctime()
&gt; ) WITH (
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.type' = 'kafka',
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.version' = 'universal',
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.startup-mode' = 'latest-offset',
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.topic' = 'skyeye-tcpflow',
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.properties.group.id' = 'testGroup',
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.properties.zookeeper.connect' = 'x.x.x.x:2181',
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.properties.bootstrap.servers' = 'x.x.x.x:9092',
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'update-mode' = 'append',
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'format.type' = 'json',
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'format.derive-schema' = 'true'
&gt; );
&gt;
&gt;
&gt; CREATE TABLE sink_table (
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ip VARCHAR,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; proctime timestamp(3)
&gt;
&gt;
&gt; ) WITH (
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.type' = 'kafka',
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.version' = 'universal',
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.startup-mode' = 'latest-offset',
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.topic' = 'ip_agg',
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.properties.zookeeper.connect' = 'x.x.x.x:2181',
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.properties.bootstrap.servers' = 'x.x.x.x:9092',
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'update-mode' = 'append',
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'format.type' = 'json',
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'format.derive-schema' = 'true'
&gt; );
&gt;
&gt; insert into sink_kafka select sip,proctime from source_kafka;
&gt;
&gt;
&gt;
&gt; select TUMBLE_START(proctime, INTERVAL '10' MINUTE),count(1) from
&gt; sink_table group by TUMBLE(proctime, INTERVAL '10' MINUTE);



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
kcz
Reply | Threaded
Open this post in threaded view
|

Re: flink proctime error

kcz
In reply to this post by kcz
意思是虚拟出来的列,如果后面计算要用,需要watermark一下,嗯嗯,这个情况测试了,是可以用的。





------------------ Original ------------------
From: Jingsong Li <[hidden email]&gt;
Date: Thu,May 21,2020 9:22 PM
To: user-zh <[hidden email]&gt;
Subject: Re: flink proctime error



Hi,

- proctime是虚拟的一个列。
- rowtime是有真实数据的列。

看起来你需要在sink_table里定义rowtime,比如像这样:
CREATE TABLE sink_table (
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ip VARCHAR,
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; proctime timestamp(3),
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; WATERMARK FOR proctime AS proctime
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ....
)

Best,
Jingsong Lee

On Thu, May 21, 2020 at 9:17 PM Benchao Li <[hidden email]&gt; wrote:

&gt; 看你提供的SQL来讲,你是直接在sink_table上做了一个窗口计算,而sink_table并没有定义时间属性。
&gt; (是不是笔误,应该是在source_table上做窗口计算?)
&gt;
&gt; 了不起的盖茨比 <[hidden email]&gt; 于2020年5月21日周四 下午9:08写道:
&gt;
&gt; &gt; error:Window aggregate can only be defined over a time attribute column,
&gt; &gt; but TIMESTAMP(3) encountered.
&gt; &gt; 如果在sink_table 添加watermark那么就不会报错,我在source定义了时间,sink接收了时间,为什么group
&gt; &gt; by时候会出现那个error?
&gt; &gt; CREATE TABLE source_table (
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; sip VARCHAR,
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; proctime as proctime()
&gt; &gt; ) WITH (
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.type' = 'kafka',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.version' = 'universal',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.startup-mode' = 'latest-offset',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.topic' = 'skyeye-tcpflow',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.properties.group.id' = 'testGroup',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.properties.zookeeper.connect' = 'x.x.x.x:2181',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.properties.bootstrap.servers' = 'x.x.x.x:9092',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'update-mode' = 'append',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'format.type' = 'json',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'format.derive-schema' = 'true'
&gt; &gt; );
&gt; &gt;
&gt; &gt;
&gt; &gt; CREATE TABLE sink_table (
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ip VARCHAR,
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; proctime timestamp(3)
&gt; &gt;
&gt; &gt;
&gt; &gt; ) WITH (
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.type' = 'kafka',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.version' = 'universal',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.startup-mode' = 'latest-offset',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.topic' = 'ip_agg',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.properties.zookeeper.connect' = 'x.x.x.x:2181',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.properties.bootstrap.servers' = 'x.x.x.x:9092',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'update-mode' = 'append',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'format.type' = 'json',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'format.derive-schema' = 'true'
&gt; &gt; );
&gt; &gt;
&gt; &gt; insert into sink_kafka select sip,proctime from source_kafka;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt; select TUMBLE_START(proctime, INTERVAL '10' MINUTE),count(1) from
&gt; &gt; sink_table group by TUMBLE(proctime, INTERVAL '10' MINUTE);
&gt;
&gt;
&gt;
&gt; --
&gt;
&gt; Benchao Li
&gt; School of Electronics Engineering and Computer Science, Peking University
&gt; Tel:+86-15650713730
&gt; Email: [hidden email]; [hidden email]
&gt;


--
Best, Jingsong Lee
kcz
Reply | Threaded
Open this post in threaded view
|

Re: flink proctime error

kcz
In reply to this post by kcz
谢谢各位大佬,我再去官网学学。





------------------ Original ------------------
From: Jingsong Li <[hidden email]&gt;
Date: Thu,May 21,2020 9:22 PM
To: user-zh <[hidden email]&gt;
Subject: Re: flink proctime error



Hi,

- proctime是虚拟的一个列。
- rowtime是有真实数据的列。

看起来你需要在sink_table里定义rowtime,比如像这样:
CREATE TABLE sink_table (
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ip VARCHAR,
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; proctime timestamp(3),
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; WATERMARK FOR proctime AS proctime
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ....
)

Best,
Jingsong Lee

On Thu, May 21, 2020 at 9:17 PM Benchao Li <[hidden email]&gt; wrote:

&gt; 看你提供的SQL来讲,你是直接在sink_table上做了一个窗口计算,而sink_table并没有定义时间属性。
&gt; (是不是笔误,应该是在source_table上做窗口计算?)
&gt;
&gt; 了不起的盖茨比 <[hidden email]&gt; 于2020年5月21日周四 下午9:08写道:
&gt;
&gt; &gt; error:Window aggregate can only be defined over a time attribute column,
&gt; &gt; but TIMESTAMP(3) encountered.
&gt; &gt; 如果在sink_table 添加watermark那么就不会报错,我在source定义了时间,sink接收了时间,为什么group
&gt; &gt; by时候会出现那个error?
&gt; &gt; CREATE TABLE source_table (
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; sip VARCHAR,
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; proctime as proctime()
&gt; &gt; ) WITH (
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.type' = 'kafka',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.version' = 'universal',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.startup-mode' = 'latest-offset',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.topic' = 'skyeye-tcpflow',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.properties.group.id' = 'testGroup',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.properties.zookeeper.connect' = 'x.x.x.x:2181',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.properties.bootstrap.servers' = 'x.x.x.x:9092',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'update-mode' = 'append',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'format.type' = 'json',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'format.derive-schema' = 'true'
&gt; &gt; );
&gt; &gt;
&gt; &gt;
&gt; &gt; CREATE TABLE sink_table (
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ip VARCHAR,
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; proctime timestamp(3)
&gt; &gt;
&gt; &gt;
&gt; &gt; ) WITH (
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.type' = 'kafka',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.version' = 'universal',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.startup-mode' = 'latest-offset',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.topic' = 'ip_agg',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.properties.zookeeper.connect' = 'x.x.x.x:2181',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.properties.bootstrap.servers' = 'x.x.x.x:9092',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'update-mode' = 'append',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'format.type' = 'json',
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'format.derive-schema' = 'true'
&gt; &gt; );
&gt; &gt;
&gt; &gt; insert into sink_kafka select sip,proctime from source_kafka;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt; select TUMBLE_START(proctime, INTERVAL '10' MINUTE),count(1) from
&gt; &gt; sink_table group by TUMBLE(proctime, INTERVAL '10' MINUTE);
&gt;
&gt;
&gt;
&gt; --
&gt;
&gt; Benchao Li
&gt; School of Electronics Engineering and Computer Science, Peking University
&gt; Tel:+86-15650713730
&gt; Email: [hidden email]; [hidden email]
&gt;


--
Best, Jingsong Lee