A group window expects a time attribute for grouping in a stream environment谢谢

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

A group window expects a time attribute for grouping in a stream environment谢谢

Appleyuchi
代码是:
https://paste.ubuntu.com/p/gVGrj2V7ZF/
报错:
A group window expects a time attribute for grouping in a stream environment.
但是代码的数据源中已经有时间属性了.
请问应该怎么修改代码?
谢谢






 
Reply | Threaded
Open this post in threaded view
|

Re: A group window expects a time attribute for grouping in a stream environment谢谢

Jark
Administrator
1). 所谓时间属性,不是指 timestamp 类型的字段,而是一个特殊概念。可以看下文档如果声明时间属性:
https://ci.apache.org/projects/flink/flink-docs-release-1).12/dev/table/streaming/time_attributes.html
2. 你的代码好像也不对。 L45: Table orders = tEnv.from("Orders"); 没看到你有注册过 "Orders"
表。这一行应该执行不成功把。

Best,
Jark

On Wed, 9 Dec 2020 at 15:44, Appleyuchi <[hidden email]> wrote:

> 代码是:
> https://paste.ubuntu.com/p/gVGrj2V7ZF/
> 报错:
> A group window expects a time attribute for grouping in a stream
> environment.
> 但是代码的数据源中已经有时间属性了.
> 请问应该怎么修改代码?
> 谢谢
>
>
>
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: A group window expects a time attribute for grouping in a stream environment谢谢

Jark
Administrator
链接错了。重发下。

1) 所谓时间属性,不是指 timestamp 类型的字段,而是一个特殊概念。可以看下文档如果声明时间属性:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html
<https://ci.apache.org/projects/flink/flink-docs-release-1).12/dev/table/streaming/time_attributes.html>
2) 你的代码好像也不对。 L45: Table orders = tEnv.from("Orders"); 没看到你有注册过 "Orders"
表。这一行应该执行不成功把。

Best,
Jark

On Thu, 10 Dec 2020 at 11:09, Jark Wu <[hidden email]> wrote:

> 1). 所谓时间属性,不是指 timestamp 类型的字段,而是一个特殊概念。可以看下文档如果声明时间属性:
> https://ci.apache.org/projects/flink/flink-docs-release-1).12/dev/table/streaming/time_attributes.html
> 2. 你的代码好像也不对。 L45: Table orders = tEnv.from("Orders"); 没看到你有注册过 "Orders"
> 表。这一行应该执行不成功把。
>
> Best,
> Jark
>
> On Wed, 9 Dec 2020 at 15:44, Appleyuchi <[hidden email]> wrote:
>
>> 代码是:
>> https://paste.ubuntu.com/p/gVGrj2V7ZF/
>> 报错:
>> A group window expects a time attribute for grouping in a stream
>> environment.
>> 但是代码的数据源中已经有时间属性了.
>> 请问应该怎么修改代码?
>> 谢谢
>>
>>
>>
>>
>>
>>
>>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: A group window expects a time attribute for grouping in a stream environment谢谢

Leonard Xu
Hi,
补充下昨天线下提供的答疑,在从datastream 转换到Table时,如果希望转换后的Table上能够继续使用watermark,
需要(1)让datastream中包含watermark (2)在table上声明event time 属性. 文档可以参考[1]

给出文档中省略的watermark生成部分code:

        // 老版本
//        Table orders = tEnv.fromDataStream(orderA.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Order>() {
//                @Override
//                public long extractAscendingTimestamp(Order element) {
//                    return element.rowtime;
//                }
//            }), $("user"), $("product"), $("amount"),$("rowtime").rowtime());
        // 新版本
        Table orders = tEnv.fromDataStream(orderA.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator((ctx) -> new WatermarkGenerator<Order>() {
                @Override
                public void onEvent(Order order, long eventTimestamp, WatermarkOutput watermarkOutput) {
                    watermarkOutput.emitWatermark(new Watermark(eventTimestamp));
                }
                @Override
                public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
                }
            }))
            , $("user"), $("product"), $("amount"),$("rowtime").rowtime());
如果代码不多,可以直接贴在邮件中哈。


祝好,
Leonard
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/streaming/time_attributes.html#%E5%9C%A8-datastream-%E5%88%B0-table-%E8%BD%AC%E6%8D%A2%E6%97%B6%E5%AE%9A%E4%B9%89-1

> 在 2020年12月10日,11:10,Jark Wu <[hidden email]> 写道:
>
> 链接错了。重发下。
>
> 1) 所谓时间属性,不是指 timestamp 类型的字段,而是一个特殊概念。可以看下文档如果声明时间属性:
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html
> <https://ci.apache.org/projects/flink/flink-docs-release-1).12/dev/table/streaming/time_attributes.html>
> 2) 你的代码好像也不对。 L45: Table orders = tEnv.from("Orders"); 没看到你有注册过 "Orders"
> 表。这一行应该执行不成功把。
>
> Best,
> Jark
>
> On Thu, 10 Dec 2020 at 11:09, Jark Wu <[hidden email]> wrote:
>
>> 1). 所谓时间属性,不是指 timestamp 类型的字段,而是一个特殊概念。可以看下文档如果声明时间属性:
>> https://ci.apache.org/projects/flink/flink-docs-release-1).12/dev/table/streaming/time_attributes.html
>> 2. 你的代码好像也不对。 L45: Table orders = tEnv.from("Orders"); 没看到你有注册过 "Orders"
>> 表。这一行应该执行不成功把。
>>
>> Best,
>> Jark
>>
>> On Wed, 9 Dec 2020 at 15:44, Appleyuchi <[hidden email]> wrote:
>>
>>> 代码是:
>>> https://paste.ubuntu.com/p/gVGrj2V7ZF/
>>> 报错:
>>> A group window expects a time attribute for grouping in a stream
>>> environment.
>>> 但是代码的数据源中已经有时间属性了.
>>> 请问应该怎么修改代码?
>>> 谢谢
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>>

Reply | Threaded
Open this post in threaded view
|

Re:Re: A group window expects a time attribute for grouping in a stream environment谢谢

Appleyuchi
谢谢两位大佬的回复。
还剩下两个小问题,
①请问输出结果里面的"+"是什么意思,不应该是2017吗?
1> (true,3,+1705471-09-26T16:50,+1705471-09-26T16:55,+1705471-09-26T16:54:59.999,4)
8> (true,1,+1705471-09-26T16:50,+1705471-09-26T16:55,+1705471-09-26T16:54:59.999,6)
4> (true,1,+1705471-09-26T16:50,+1705471-09-26T16:55,+1705471-09-26T16:54:59.999,8)
最后的999是啥意思???


②代码中有两处时间限定
.window(Tumble.over(lit(5).minutes())
$("user"),
$("w").start(),//输出很奇怪
$("w").end(),
$("w").rowtime(),
请问这两处时间限定有什么区别吗?
是不是
前者是"全局范围限定"?
后者是在前者限定的基础上做进一步限定?
如果是的话,end里面是否可以设定时间戳?
----------------------------------------------------------------------
昨天的完整代码是:
https://paste.ubuntu.com/p/9JsFDKC5V8/


谢谢谢谢~!!!











在 2020-12-10 12:02:31,"Leonard Xu" <[hidden email]> 写道:

>Hi,
>补充下昨天线下提供的答疑,在从datastream 转换到Table时,如果希望转换后的Table上能够继续使用watermark,
>需要(1)让datastream中包含watermark (2)在table上声明event time 属性. 文档可以参考[1]
>
>给出文档中省略的watermark生成部分code:
>
>        // 老版本
>//        Table orders = tEnv.fromDataStream(orderA.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Order>() {
>//                @Override
>//                public long extractAscendingTimestamp(Order element) {
>//                    return element.rowtime;
>//                }
>//            }), $("user"), $("product"), $("amount"),$("rowtime").rowtime());
>        // 新版本
>        Table orders = tEnv.fromDataStream(orderA.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator((ctx) -> new WatermarkGenerator<Order>() {
>                @Override
>                public void onEvent(Order order, long eventTimestamp, WatermarkOutput watermarkOutput) {
>                    watermarkOutput.emitWatermark(new Watermark(eventTimestamp));
>                }
>                @Override
>                public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
>                }
>            }))
>            , $("user"), $("product"), $("amount"),$("rowtime").rowtime());
>如果代码不多,可以直接贴在邮件中哈。
>
>
>祝好,
>Leonard
>[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/streaming/time_attributes.html#%E5%9C%A8-datastream-%E5%88%B0-table-%E8%BD%AC%E6%8D%A2%E6%97%B6%E5%AE%9A%E4%B9%89-1
>
>> 在 2020年12月10日,11:10,Jark Wu <[hidden email]> 写道:
>>
>> 链接错了。重发下。
>>
>> 1) 所谓时间属性,不是指 timestamp 类型的字段,而是一个特殊概念。可以看下文档如果声明时间属性:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html
>> <https://ci.apache.org/projects/flink/flink-docs-release-1).12/dev/table/streaming/time_attributes.html>
>> 2) 你的代码好像也不对。 L45: Table orders = tEnv.from("Orders"); 没看到你有注册过 "Orders"
>> 表。这一行应该执行不成功把。
>>
>> Best,
>> Jark
>>
>> On Thu, 10 Dec 2020 at 11:09, Jark Wu <[hidden email]> wrote:
>>
>>> 1). 所谓时间属性,不是指 timestamp 类型的字段,而是一个特殊概念。可以看下文档如果声明时间属性:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1).12/dev/table/streaming/time_attributes.html
>>> 2. 你的代码好像也不对。 L45: Table orders = tEnv.from("Orders"); 没看到你有注册过 "Orders"
>>> 表。这一行应该执行不成功把。
>>>
>>> Best,
>>> Jark
>>>
>>> On Wed, 9 Dec 2020 at 15:44, Appleyuchi <[hidden email]> wrote:
>>>
>>>> 代码是:
>>>> https://paste.ubuntu.com/p/gVGrj2V7ZF/
>>>> 报错:
>>>> A group window expects a time attribute for grouping in a stream
>>>> environment.
>>>> 但是代码的数据源中已经有时间属性了.
>>>> 请问应该怎么修改代码?
>>>> 谢谢
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>>
>
Reply | Threaded
Open this post in threaded view
|

Re: A group window expects a time attribute for grouping in a stream environment谢谢

Leonard Xu

> ①请问输出结果里面的"+"是什么意思,不应该是2017吗?
时间戳格式而已,可以检查下你的时间数据

> 最后的999是啥意思???
代表窗口结束时间,即timestamp(3)精度下小于window边界的时间

>
>
> ②代码中有两处时间限定
> .window(Tumble.over(lit(5).minutes())
> $("user"),
> $("w").start(),//输出很奇怪
> $("w").end(),
> $("w").rowtime(),
> 请问这两处时间限定有什么区别吗?
> 是不是
> 前者是"全局范围限定"?
> 后者是在前者限定的基础上做进一步限定?
> 如果是的话,end里面是否可以设定时间戳?
> ---------------------
没看出你说的两处时间限定,你是不是理解偏了,你可以先看下关于窗口和时间的文档[1]。

祝好,
Leonard
[1] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/stream/operators/windows.html#tumbling-windows <https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/stream/operators/windows.html#tumbling-windows>
[2] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/event_time.html <https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/event_time.html>







> -------------------------------------------------
> 昨天的完整代码是:
> https://paste.ubuntu.com/p/9JsFDKC5V8/
>
>
> 谢谢谢谢~!!!
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-12-10 12:02:31,"Leonard Xu" <[hidden email]> 写道:
>> Hi,
>> 补充下昨天线下提供的答疑,在从datastream 转换到Table时,如果希望转换后的Table上能够继续使用watermark,
>> 需要(1)让datastream中包含watermark (2)在table上声明event time 属性. 文档可以参考[1]
>>
>> 给出文档中省略的watermark生成部分code:
>>
>>       // 老版本
>> //        Table orders = tEnv.fromDataStream(orderA.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Order>() {
>> //                @Override
>> //                public long extractAscendingTimestamp(Order element) {
>> //                    return element.rowtime;
>> //                }
>> //            }), $("user"), $("product"), $("amount"),$("rowtime").rowtime());
>>       // 新版本
>>       Table orders = tEnv.fromDataStream(orderA.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator((ctx) -> new WatermarkGenerator<Order>() {
>>               @Override
>>               public void onEvent(Order order, long eventTimestamp, WatermarkOutput watermarkOutput) {
>>                   watermarkOutput.emitWatermark(new Watermark(eventTimestamp));
>>               }
>>               @Override
>>               public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
>>               }
>>           }))
>>           , $("user"), $("product"), $("amount"),$("rowtime").rowtime());
>> 如果代码不多,可以直接贴在邮件中哈。
>>
>>
>> 祝好,
>> Leonard
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/streaming/time_attributes.html#%E5%9C%A8-datastream-%E5%88%B0-table-%E8%BD%AC%E6%8D%A2%E6%97%B6%E5%AE%9A%E4%B9%89-1
>>
>>> 在 2020年12月10日,11:10,Jark Wu <[hidden email]> 写道:
>>>
>>> 链接错了。重发下。
>>>
>>> 1) 所谓时间属性,不是指 timestamp 类型的字段,而是一个特殊概念。可以看下文档如果声明时间属性:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1).12/dev/table/streaming/time_attributes.html>
>>> 2) 你的代码好像也不对。 L45: Table orders = tEnv.from("Orders"); 没看到你有注册过 "Orders"
>>> 表。这一行应该执行不成功把。
>>>
>>> Best,
>>> Jark
>>>
>>> On Thu, 10 Dec 2020 at 11:09, Jark Wu <[hidden email]> wrote:
>>>
>>>> 1). 所谓时间属性,不是指 timestamp 类型的字段,而是一个特殊概念。可以看下文档如果声明时间属性:
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1).12/dev/table/streaming/time_attributes.html
>>>> 2. 你的代码好像也不对。 L45: Table orders = tEnv.from("Orders"); 没看到你有注册过 "Orders"
>>>> 表。这一行应该执行不成功把。
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>> On Wed, 9 Dec 2020 at 15:44, Appleyuchi <[hidden email]> wrote:
>>>>
>>>>> 代码是:
>>>>> https://paste.ubuntu.com/p/gVGrj2V7ZF/
>>>>> 报错:
>>>>> A group window expects a time attribute for grouping in a stream
>>>>> environment.
>>>>> 但是代码的数据源中已经有时间属性了.
>>>>> 请问应该怎么修改代码?
>>>>> 谢谢
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>

Reply | Threaded
Open this post in threaded view
|

Re:Re: A group window expects a time attribute for grouping in a stream environment谢谢

Appleyuchi
您好!


问题①

请问这个输出结果前面的"+"是什么意思?

8> (true,1,+1705471-09-26T16:50,+1705471-09-26T16:55,+1705471-09-26T16:54:59.999,6)

1> (true,3,+1705471-09-26T16:50,+1705471-09-26T16:55,+1705471-09-26T16:54:59.999,4)

4> (true,1,+1705471-09-26T16:50,+1705471-09-26T16:55,+1705471-09-26T16:54:59.999,8)

谢谢


-------------
在 2020-12-10 17:13:20,"Leonard Xu" <[hidden email]> 写道:

>
>> ①请问输出结果里面的"+"是什么意思,不应该是2017吗?
>时间戳格式而已,可以检查下你的时间数据
>
>> 最后的999是啥意思???
>代表窗口结束时间,即timestamp(3)精度下小于window边界的时间
>
>>
>>
>> ②代码中有两处时间限定
>> .window(Tumble.over(lit(5).minutes())
>> $("user"),
>> $("w").start(),//输出很奇怪
>> $("w").end(),
>> $("w").rowtime(),
>> 请问这两处时间限定有什么区别吗?
>> 是不是
>> 前者是"全局范围限定"?
>> 后者是在前者限定的基础上做进一步限定?
>> 如果是的话,end里面是否可以设定时间戳?
>> ---------------------
>没看出你说的两处时间限定,你是不是理解偏了,你可以先看下关于窗口和时间的文档[1]。
>
>祝好,
>Leonard
>[1] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/stream/operators/windows.html#tumbling-windows <https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/stream/operators/windows.html#tumbling-windows>
>[2] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/event_time.html <https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/event_time.html>
>
>
>
>
>
>
>
>> -------------------------------------------------
>> 昨天的完整代码是:
>> https://paste.ubuntu.com/p/9JsFDKC5V8/
>>
>>
>> 谢谢谢谢~!!!
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-12-10 12:02:31,"Leonard Xu" <[hidden email]> 写道:
>>> Hi,
>>> 补充下昨天线下提供的答疑,在从datastream 转换到Table时,如果希望转换后的Table上能够继续使用watermark,
>>> 需要(1)让datastream中包含watermark (2)在table上声明event time 属性. 文档可以参考[1]
>>>
>>> 给出文档中省略的watermark生成部分code:
>>>
>>>       // 老版本
>>> //        Table orders = tEnv.fromDataStream(orderA.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Order>() {
>>> //                @Override
>>> //                public long extractAscendingTimestamp(Order element) {
>>> //                    return element.rowtime;
>>> //                }
>>> //            }), $("user"), $("product"), $("amount"),$("rowtime").rowtime());
>>>       // 新版本
>>>       Table orders = tEnv.fromDataStream(orderA.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator((ctx) -> new WatermarkGenerator<Order>() {
>>>               @Override
>>>               public void onEvent(Order order, long eventTimestamp, WatermarkOutput watermarkOutput) {
>>>                   watermarkOutput.emitWatermark(new Watermark(eventTimestamp));
>>>               }
>>>               @Override
>>>               public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
>>>               }
>>>           }))
>>>           , $("user"), $("product"), $("amount"),$("rowtime").rowtime());
>>> 如果代码不多,可以直接贴在邮件中哈。
>>>
>>>
>>> 祝好,
>>> Leonard
>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/streaming/time_attributes.html#%E5%9C%A8-datastream-%E5%88%B0-table-%E8%BD%AC%E6%8D%A2%E6%97%B6%E5%AE%9A%E4%B9%89-1
>>>
>>>> 在 2020年12月10日,11:10,Jark Wu <[hidden email]> 写道:
>>>>
>>>> 链接错了。重发下。
>>>>
>>>> 1) 所谓时间属性,不是指 timestamp 类型的字段,而是一个特殊概念。可以看下文档如果声明时间属性:
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1).12/dev/table/streaming/time_attributes.html>
>>>> 2) 你的代码好像也不对。 L45: Table orders = tEnv.from("Orders"); 没看到你有注册过 "Orders"
>>>> 表。这一行应该执行不成功把。
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>> On Thu, 10 Dec 2020 at 11:09, Jark Wu <[hidden email]> wrote:
>>>>
>>>>> 1). 所谓时间属性,不是指 timestamp 类型的字段,而是一个特殊概念。可以看下文档如果声明时间属性:
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1).12/dev/table/streaming/time_attributes.html
>>>>> 2. 你的代码好像也不对。 L45: Table orders = tEnv.from("Orders"); 没看到你有注册过 "Orders"
>>>>> 表。这一行应该执行不成功把。
>>>>>
>>>>> Best,
>>>>> Jark
>>>>>
>>>>> On Wed, 9 Dec 2020 at 15:44, Appleyuchi <[hidden email]> wrote:
>>>>>
>>>>>> 代码是:
>>>>>> https://paste.ubuntu.com/p/gVGrj2V7ZF/
>>>>>> 报错:
>>>>>> A group window expects a time attribute for grouping in a stream
>>>>>> environment.
>>>>>> 但是代码的数据源中已经有时间属性了.
>>>>>> 请问应该怎么修改代码?
>>>>>> 谢谢
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>
>
Reply | Threaded
Open this post in threaded view
|

Re:Re:Re: A group window expects a time attribute for grouping in a stream environment谢谢

Appleyuchi
好的,我再检查下。。。。

















在 2020-12-10 17:44:38,"Appleyuchi" <[hidden email]> 写道:

>您好!
>
>
>问题①
>
>请问这个输出结果前面的"+"是什么意思?
>
>8> (true,1,+1705471-09-26T16:50,+1705471-09-26T16:55,+1705471-09-26T16:54:59.999,6)
>
>1> (true,3,+1705471-09-26T16:50,+1705471-09-26T16:55,+1705471-09-26T16:54:59.999,4)
>
>4> (true,1,+1705471-09-26T16:50,+1705471-09-26T16:55,+1705471-09-26T16:54:59.999,8)
>
>谢谢
>
>
>-------------
>在 2020-12-10 17:13:20,"Leonard Xu" <[hidden email]> 写道:
>>
>>> ①请问输出结果里面的"+"是什么意思,不应该是2017吗?
>>时间戳格式而已,可以检查下你的时间数据
>>
>>> 最后的999是啥意思???
>>代表窗口结束时间,即timestamp(3)精度下小于window边界的时间
>>
>>>
>>>
>>> ②代码中有两处时间限定
>>> .window(Tumble.over(lit(5).minutes())
>>> $("user"),
>>> $("w").start(),//输出很奇怪
>>> $("w").end(),
>>> $("w").rowtime(),
>>> 请问这两处时间限定有什么区别吗?
>>> 是不是
>>> 前者是"全局范围限定"?
>>> 后者是在前者限定的基础上做进一步限定?
>>> 如果是的话,end里面是否可以设定时间戳?
>>> ---------------------
>>没看出你说的两处时间限定,你是不是理解偏了,你可以先看下关于窗口和时间的文档[1]。
>>
>>祝好,
>>Leonard
>>[1] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/stream/operators/windows.html#tumbling-windows <https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/stream/operators/windows.html#tumbling-windows>
>>[2] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/event_time.html <https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/event_time.html>
>>
>>
>>
>>
>>
>>
>>
>>> -------------------------------------------------
>>> 昨天的完整代码是:
>>> https://paste.ubuntu.com/p/9JsFDKC5V8/
>>>
>>>
>>> 谢谢谢谢~!!!
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> 在 2020-12-10 12:02:31,"Leonard Xu" <[hidden email]> 写道:
>>>> Hi,
>>>> 补充下昨天线下提供的答疑,在从datastream 转换到Table时,如果希望转换后的Table上能够继续使用watermark,
>>>> 需要(1)让datastream中包含watermark (2)在table上声明event time 属性. 文档可以参考[1]
>>>>
>>>> 给出文档中省略的watermark生成部分code:
>>>>
>>>>       // 老版本
>>>> //        Table orders = tEnv.fromDataStream(orderA.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Order>() {
>>>> //                @Override
>>>> //                public long extractAscendingTimestamp(Order element) {
>>>> //                    return element.rowtime;
>>>> //                }
>>>> //            }), $("user"), $("product"), $("amount"),$("rowtime").rowtime());
>>>>       // 新版本
>>>>       Table orders = tEnv.fromDataStream(orderA.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator((ctx) -> new WatermarkGenerator<Order>() {
>>>>               @Override
>>>>               public void onEvent(Order order, long eventTimestamp, WatermarkOutput watermarkOutput) {
>>>>                   watermarkOutput.emitWatermark(new Watermark(eventTimestamp));
>>>>               }
>>>>               @Override
>>>>               public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
>>>>               }
>>>>           }))
>>>>           , $("user"), $("product"), $("amount"),$("rowtime").rowtime());
>>>> 如果代码不多,可以直接贴在邮件中哈。
>>>>
>>>>
>>>> 祝好,
>>>> Leonard
>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/streaming/time_attributes.html#%E5%9C%A8-datastream-%E5%88%B0-table-%E8%BD%AC%E6%8D%A2%E6%97%B6%E5%AE%9A%E4%B9%89-1
>>>>
>>>>> 在 2020年12月10日,11:10,Jark Wu <[hidden email]> 写道:
>>>>>
>>>>> 链接错了。重发下。
>>>>>
>>>>> 1) 所谓时间属性,不是指 timestamp 类型的字段,而是一个特殊概念。可以看下文档如果声明时间属性:
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html
>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1).12/dev/table/streaming/time_attributes.html>
>>>>> 2) 你的代码好像也不对。 L45: Table orders = tEnv.from("Orders"); 没看到你有注册过 "Orders"
>>>>> 表。这一行应该执行不成功把。
>>>>>
>>>>> Best,
>>>>> Jark
>>>>>
>>>>> On Thu, 10 Dec 2020 at 11:09, Jark Wu <[hidden email]> wrote:
>>>>>
>>>>>> 1). 所谓时间属性,不是指 timestamp 类型的字段,而是一个特殊概念。可以看下文档如果声明时间属性:
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1).12/dev/table/streaming/time_attributes.html
>>>>>> 2. 你的代码好像也不对。 L45: Table orders = tEnv.from("Orders"); 没看到你有注册过 "Orders"
>>>>>> 表。这一行应该执行不成功把。
>>>>>>
>>>>>> Best,
>>>>>> Jark
>>>>>>
>>>>>> On Wed, 9 Dec 2020 at 15:44, Appleyuchi <[hidden email]> wrote:
>>>>>>
>>>>>>> 代码是:
>>>>>>> https://paste.ubuntu.com/p/gVGrj2V7ZF/
>>>>>>> 报错:
>>>>>>> A group window expects a time attribute for grouping in a stream
>>>>>>> environment.
>>>>>>> 但是代码的数据源中已经有时间属性了.
>>>>>>> 请问应该怎么修改代码?
>>>>>>> 谢谢
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>
>>
Reply | Threaded
Open this post in threaded view
|

Re:Re:Re:Re: A group window expects a time attribute for grouping in a stream environment谢谢

Appleyuchi
还是看不懂这个结果,
请问哪个文档有讲这种结果的是怎么回事吗?
例如:
+1705471-09-26T16:50



4> (true,1,+1705471-09-26T16:50,+1705471-09-26T16:55,+1705471-09-26T16:54:59.999,8)

1> (true,3,+1705471-09-26T16:50,+1705471-09-26T16:55,+1705471-09-26T16:54:59.999,4)

8> (true,1,+1705471-09-26T16:50,+1705471-09-26T16:55,+1705471-09-26T16:54:59.999,6)


谢谢您~

在 2020-12-10 17:56:42,"Appleyuchi" <[hidden email]> 写道:
>好的,我再检查下。。。。

>
>在 2020-12-10 17:44:38,"Appleyuchi" <[hidden email]> 写道:
>>您好!
>>
>>
>>问题①
>>
>>请问这个输出结果前面的"+"是什么意思?
>>
>>8> (true,1,+1705471-09-26T16:50,+1705471-09-26T16:55,+1705471-09-26T16:54:59.999,6)
>>
>>1> (true,3,+1705471-09-26T16:50,+1705471-09-26T16:55,+1705471-09-26T16:54:59.999,4)
>>
>>4> (true,1,+1705471-09-26T16:50,+1705471-09-26T16:55,+1705471-09-26T16:54:59.999,8)
>>
>>谢谢
>>
>>
>>-------------
>>在 2020-12-10 17:13:20,"Leonard Xu" <[hidden email]> 写道:
>>>
>>>> ①请问输出结果里面的"+"是什么意思,不应该是2017吗?
>>>时间戳格式而已,可以检查下你的时间数据
>>>
>>>> 最后的999是啥意思???
>>>代表窗口结束时间,即timestamp(3)精度下小于window边界的时间
>>>
>>>>
>>>>
>>>> ②代码中有两处时间限定
>>>> .window(Tumble.over(lit(5).minutes())
>>>> $("user"),
>>>> $("w").start(),//输出很奇怪
>>>> $("w").end(),
>>>> $("w").rowtime(),
>>>> 请问这两处时间限定有什么区别吗?
>>>> 是不是
>>>> 前者是"全局范围限定"?
>>>> 后者是在前者限定的基础上做进一步限定?
>>>> 如果是的话,end里面是否可以设定时间戳?
>>>> ---------------------
>>>没看出你说的两处时间限定,你是不是理解偏了,你可以先看下关于窗口和时间的文档[1]。
>>>
>>>祝好,
>>>Leonard
>>>[1] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/stream/operators/windows.html#tumbling-windows <https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/stream/operators/windows.html#tumbling-windows>
>>>[2] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/event_time.html <https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/event_time.html>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>> -------------------------------------------------
>>>> 昨天的完整代码是:
>>>> https://paste.ubuntu.com/p/9JsFDKC5V8/
>>>>
>>>>
>>>> 谢谢谢谢~!!!
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> 在 2020-12-10 12:02:31,"Leonard Xu" <[hidden email]> 写道:
>>>>> Hi,
>>>>> 补充下昨天线下提供的答疑,在从datastream 转换到Table时,如果希望转换后的Table上能够继续使用watermark,
>>>>> 需要(1)让datastream中包含watermark (2)在table上声明event time 属性. 文档可以参考[1]
>>>>>
>>>>> 给出文档中省略的watermark生成部分code:
>>>>>
>>>>>       // 老版本
>>>>> //        Table orders = tEnv.fromDataStream(orderA.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Order>() {
>>>>> //                @Override
>>>>> //                public long extractAscendingTimestamp(Order element) {
>>>>> //                    return element.rowtime;
>>>>> //                }
>>>>> //            }), $("user"), $("product"), $("amount"),$("rowtime").rowtime());
>>>>>       // 新版本
>>>>>       Table orders = tEnv.fromDataStream(orderA.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator((ctx) -> new WatermarkGenerator<Order>() {
>>>>>               @Override
>>>>>               public void onEvent(Order order, long eventTimestamp, WatermarkOutput watermarkOutput) {
>>>>>                   watermarkOutput.emitWatermark(new Watermark(eventTimestamp));
>>>>>               }
>>>>>               @Override
>>>>>               public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
>>>>>               }
>>>>>           }))
>>>>>           , $("user"), $("product"), $("amount"),$("rowtime").rowtime());
>>>>> 如果代码不多,可以直接贴在邮件中哈。
>>>>>
>>>>>
>>>>> 祝好,
>>>>> Leonard
>>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/streaming/time_attributes.html#%E5%9C%A8-datastream-%E5%88%B0-table-%E8%BD%AC%E6%8D%A2%E6%97%B6%E5%AE%9A%E4%B9%89-1
>>>>>
>>>>>> 在 2020年12月10日,11:10,Jark Wu <[hidden email]> 写道:
>>>>>>
>>>>>> 链接错了。重发下。
>>>>>>
>>>>>> 1) 所谓时间属性,不是指 timestamp 类型的字段,而是一个特殊概念。可以看下文档如果声明时间属性:
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html
>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1).12/dev/table/streaming/time_attributes.html>
>>>>>> 2) 你的代码好像也不对。 L45: Table orders = tEnv.from("Orders"); 没看到你有注册过 "Orders"
>>>>>> 表。这一行应该执行不成功把。
>>>>>>
>>>>>> Best,
>>>>>> Jark
>>>>>>
>>>>>> On Thu, 10 Dec 2020 at 11:09, Jark Wu <[hidden email]> wrote:
>>>>>>
>>>>>>> 1). 所谓时间属性,不是指 timestamp 类型的字段,而是一个特殊概念。可以看下文档如果声明时间属性:
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1).12/dev/table/streaming/time_attributes.html
>>>>>>> 2. 你的代码好像也不对。 L45: Table orders = tEnv.from("Orders"); 没看到你有注册过 "Orders"
>>>>>>> 表。这一行应该执行不成功把。
>>>>>>>
>>>>>>> Best,
>>>>>>> Jark
>>>>>>>
>>>>>>> On Wed, 9 Dec 2020 at 15:44, Appleyuchi <[hidden email]> wrote:
>>>>>>>
>>>>>>>> 代码是:
>>>>>>>> https://paste.ubuntu.com/p/gVGrj2V7ZF/
>>>>>>>> 报错:
>>>>>>>> A group window expects a time attribute for grouping in a stream
>>>>>>>> environment.
>>>>>>>> 但是代码的数据源中已经有时间属性了.
>>>>>>>> 请问应该怎么修改代码?
>>>>>>>> 谢谢
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>