FLINK SQL中时间戳怎么处理处理

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

FLINK SQL中时间戳怎么处理处理

吴志勇
如题:
我向kafka中输出了json格式的数据
{"id":5,"price":40,"timestamp":1584942626828,"type":"math"}
{"id":2,"price":70,"timestamp":1584942629638,"type":"math"}
{"id":2,"price":70,"timestamp":1584942634951,"type":"math"}

....
其中timestamp字段是13位时间戳,对应的SQL表中应该怎么处理成时间格式呢?
  - name: bookpojo
    type: source-table
    connector: 
      property-version: 1
      type: kafka
      version: "universal"
      topic: pojosource
      startup-mode: earliest-offset
      properties:
        zookeeper.connect: localhost:2181
        bootstrap.servers: localhost:9092
        group.id: testGroup
    format: 
      property-version: 1
      type: json
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;schema:&nbsp;"ROW<id&nbsp;INT,&nbsp;type&nbsp;STRING,&nbsp;price&nbsp;INT,&nbsp;timestamp&nbsp;TIMESTAMP&gt;"
&nbsp;&nbsp;&nbsp;&nbsp;schema:&nbsp;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;-&nbsp;name:&nbsp;id
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;data-type:&nbsp;INT
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;-&nbsp;name:&nbsp;type
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;data-type:&nbsp;STRING
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;-&nbsp;name:&nbsp;price
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;data-type:&nbsp;INT
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;-&nbsp;name:&nbsp;timestamp
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;data-type:&nbsp;TIMESTAMP(3)




上述配置,好像有问题。


我在官网中找到这样一句说明:
字符串和时间类型:不修剪值。文字"null"也可以理解。时间类型必须根据Java SQL时间格式进行格式化,并以毫秒为单位。例如:&nbsp;2018-01-01日期,20:43:59时间和2018-01-01 20:43:59.999时间戳。
时间一定得是字符串类型且带毫秒吗?


谢谢。
Reply | Threaded
Open this post in threaded view
|

Re: FLINK SQL中时间戳怎么处理处理

Zhenghua Gao
你定义的Kafka source使用JsonRowDeserializationSchema 解析json字符串并将其转换为Flink types
[1]。
目前JsonRowDeserializationSchema 仅支持 RFC 3339兼容的时间字符串 [2]。

[1]
https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java#L446
[2]
https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/TimeFormats.java#L38

*Best Regards,*
*Zhenghua Gao*


On Mon, Mar 23, 2020 at 4:27 PM 吴志勇 <[hidden email]> wrote:

> 如题:
> 我向kafka中输出了json格式的数据
> {"id":5,"price":40,"timestamp":1584942626828,"type":"math"}
> {"id":2,"price":70,"timestamp":1584942629638,"type":"math"}
> {"id":2,"price":70,"timestamp":1584942634951,"type":"math"}
>
> ....
> 其中timestamp字段是13位时间戳,对应的SQL表中应该怎么处理成时间格式呢?
> &nbsp;&nbsp;-&nbsp;name:&nbsp;bookpojo
> &nbsp;&nbsp;&nbsp;&nbsp;type:&nbsp;source-table
> &nbsp;&nbsp;&nbsp;&nbsp;connector:&nbsp;
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;property-version:&nbsp;1
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;type:&nbsp;kafka
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;version:&nbsp;"universal"
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;topic:&nbsp;pojosource
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;startup-mode:&nbsp;earliest-offset
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;properties:
>
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;zookeeper.connect:&nbsp;localhost:2181
>
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;bootstrap.servers:&nbsp;localhost:9092
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;group.id:&nbsp;testGroup
> &nbsp;&nbsp;&nbsp;&nbsp;format:&nbsp;
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;property-version:&nbsp;1
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;type:&nbsp;json
>
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;schema:&nbsp;"ROW<id&nbsp;INT,&nbsp;type&nbsp;STRING,&nbsp;price&nbsp;INT,&nbsp;timestamp&nbsp;TIMESTAMP&gt;"
> &nbsp;&nbsp;&nbsp;&nbsp;schema:&nbsp;
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;-&nbsp;name:&nbsp;id
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;data-type:&nbsp;INT
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;-&nbsp;name:&nbsp;type
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;data-type:&nbsp;STRING
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;-&nbsp;name:&nbsp;price
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;data-type:&nbsp;INT
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;-&nbsp;name:&nbsp;timestamp
>
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;data-type:&nbsp;TIMESTAMP(3)
>
>
>
>
> 上述配置,好像有问题。
>
>
> 我在官网中找到这样一句说明:
> 字符串和时间类型:不修剪值。文字"null"也可以理解。时间类型必须根据Java
> SQL时间格式进行格式化,并以毫秒为单位。例如:&nbsp;2018-01-01日期,20:43:59时间和2018-01-01
> 20:43:59.999时间戳。
> 时间一定得是字符串类型且带毫秒吗?
>
>
> 谢谢。
Reply | Threaded
Open this post in threaded view
|

Re: FLINK SQL中时间戳怎么处理处理

Jark
Administrator
I created to track this issue:
https://issues.apache.org/jira/browse/FLINK-16725

Best,
Jark

On Mon, 23 Mar 2020 at 18:23, Zhenghua Gao <[hidden email]> wrote:

> 你定义的Kafka source使用JsonRowDeserializationSchema 解析json字符串并将其转换为Flink types
> [1]。
> 目前JsonRowDeserializationSchema 仅支持 RFC 3339兼容的时间字符串 [2]。
>
> [1]
>
> https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java#L446
> [2]
>
> https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/TimeFormats.java#L38
>
> *Best Regards,*
> *Zhenghua Gao*
>
>
> On Mon, Mar 23, 2020 at 4:27 PM 吴志勇 <[hidden email]> wrote:
>
> > 如题:
> > 我向kafka中输出了json格式的数据
> > {"id":5,"price":40,"timestamp":1584942626828,"type":"math"}
> > {"id":2,"price":70,"timestamp":1584942629638,"type":"math"}
> > {"id":2,"price":70,"timestamp":1584942634951,"type":"math"}
> >
> > ....
> > 其中timestamp字段是13位时间戳,对应的SQL表中应该怎么处理成时间格式呢?
> > &nbsp;&nbsp;-&nbsp;name:&nbsp;bookpojo
> > &nbsp;&nbsp;&nbsp;&nbsp;type:&nbsp;source-table
> > &nbsp;&nbsp;&nbsp;&nbsp;connector:&nbsp;
> > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;property-version:&nbsp;1
> > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;type:&nbsp;kafka
> > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;version:&nbsp;"universal"
> > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;topic:&nbsp;pojosource
> > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;startup-mode:&nbsp;earliest-offset
> > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;properties:
> >
> >
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;zookeeper.connect:&nbsp;localhost:2181
> >
> >
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;bootstrap.servers:&nbsp;localhost:9092
> > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;group.id:&nbsp;testGroup
> > &nbsp;&nbsp;&nbsp;&nbsp;format:&nbsp;
> > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;property-version:&nbsp;1
> > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;type:&nbsp;json
> >
> >
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;schema:&nbsp;"ROW<id&nbsp;INT,&nbsp;type&nbsp;STRING,&nbsp;price&nbsp;INT,&nbsp;timestamp&nbsp;TIMESTAMP&gt;"
> > &nbsp;&nbsp;&nbsp;&nbsp;schema:&nbsp;
> > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;-&nbsp;name:&nbsp;id
> > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;data-type:&nbsp;INT
> > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;-&nbsp;name:&nbsp;type
> > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;data-type:&nbsp;STRING
> > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;-&nbsp;name:&nbsp;price
> > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;data-type:&nbsp;INT
> > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;-&nbsp;name:&nbsp;timestamp
> >
> >
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;data-type:&nbsp;TIMESTAMP(3)
> >
> >
> >
> >
> > 上述配置,好像有问题。
> >
> >
> > 我在官网中找到这样一句说明:
> > 字符串和时间类型:不修剪值。文字"null"也可以理解。时间类型必须根据Java
> > SQL时间格式进行格式化,并以毫秒为单位。例如:&nbsp;2018-01-01日期,20:43:59时间和2018-01-01
> > 20:43:59.999时间戳。
> > 时间一定得是字符串类型且带毫秒吗?
> >
> >
> > 谢谢。
>
Reply | Threaded
Open this post in threaded view
|

Re: FLINK SQL中时间戳怎么处理处理

Leonard Xu
In reply to this post by 吴志勇
Hi,吴志勇
你的SQL表定义应该没问题,出问题的地方 现在flink的 json format 遵循 RFC3399标准[1],其识别的timestamp的格式是:'yyyy-MM-dd'T'HH:mm:ss.SSS’Z', 暂不支持long解析为 timestamp,你可以在输出到kafka时将timestamp转换成该格式:
DateFormat dateFormat =  new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
Date date = new Date(System.currentTimeMillis());
String jsonSchemaDate = dateFormat.format(date);
//{"id":5,"price":40,"timestamp”:”2020-03-24T18:23:38.123Z","type":"math"}
这个问题社区已经有issue [2]在跟进了,该issue会支持long转timestamp。

另外,如果是中文提问请发user-zh邮件列表,user邮件列表请用英语哈^_^

Best,
Leonard

[1]https://json-schema.org/understanding-json-schema/reference/string.html#dates-and-times <https://json-schema.org/understanding-json-schema/reference/string.html#dates-and-times>
[2]https://issues.apache.org/jira/browse/FLINK-16725 <https://issues.apache.org/jira/browse/FLINK-16725>

> 在 2020年3月23日,15:35,吴志勇 <[hidden email]> 写道:
>
> 如题:
> 我向kafka中输出了json格式的数据
> {"id":5,"price":40,"timestamp":1584942626828,"type":"math"}
> {"id":2,"price":70,"timestamp":1584942629638,"type":"math"}
> {"id":2,"price":70,"timestamp":1584942634951,"type":"math"}
> ....
> 其中timestamp字段是13位时间戳,对应的SQL表中应该怎么处理成时间格式呢?
>   - name: bookpojo
>     type: source-table
>     connector:
>       property-version: 1
>       type: kafka
>       version: "universal"
>       topic: pojosource
>       startup-mode: earliest-offset
>       properties:
>         zookeeper.connect: localhost:2181
>         bootstrap.servers: localhost:9092
>         group.id: testGroup
>     format:
>       property-version: 1
>       type: json
>       schema: "ROW<id INT, type STRING, price INT, timestamp TIMESTAMP>"
>     schema:
>       - name: id
>         data-type: INT
>       - name: type
>         data-type: STRING
>       - name: price
>         data-type: INT
>       - name: timestamp
>         data-type: TIMESTAMP(3)
>
> 上述配置,好像有问题。
>
> 我在官网中找到这样一句说明:
> 字符串和时间类型:时间类型必须根据Java SQL时间格式进行格式化,并以毫秒为单位。例如: 2018-01-01日期,20:43:59时间和2018-01-01 20:43:59.999时间戳。
> 时间一定得是字符串类型且带毫秒吗?
>
> 谢谢。
>