如题:
我向kafka中输出了json格式的数据 {"id":5,"price":40,"timestamp":1584942626828,"type":"math"} {"id":2,"price":70,"timestamp":1584942629638,"type":"math"} {"id":2,"price":70,"timestamp":1584942634951,"type":"math"} .... 其中timestamp字段是13位时间戳,对应的SQL表中应该怎么处理成时间格式呢? - name: bookpojo type: source-table connector: property-version: 1 type: kafka version: "universal" topic: pojosource startup-mode: earliest-offset properties: zookeeper.connect: localhost:2181 bootstrap.servers: localhost:9092 group.id: testGroup format: property-version: 1 type: json schema: "ROW<id INT, type STRING, price INT, timestamp TIMESTAMP>" schema: - name: id data-type: INT - name: type data-type: STRING - name: price data-type: INT - name: timestamp data-type: TIMESTAMP(3) 上述配置,好像有问题。 我在官网中找到这样一句说明: 字符串和时间类型:不修剪值。文字"null"也可以理解。时间类型必须根据Java SQL时间格式进行格式化,并以毫秒为单位。例如: 2018-01-01日期,20:43:59时间和2018-01-01 20:43:59.999时间戳。 时间一定得是字符串类型且带毫秒吗? 谢谢。 |
你定义的Kafka source使用JsonRowDeserializationSchema 解析json字符串并将其转换为Flink types
[1]。 目前JsonRowDeserializationSchema 仅支持 RFC 3339兼容的时间字符串 [2]。 [1] https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java#L446 [2] https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/TimeFormats.java#L38 *Best Regards,* *Zhenghua Gao* On Mon, Mar 23, 2020 at 4:27 PM 吴志勇 <[hidden email]> wrote: > 如题: > 我向kafka中输出了json格式的数据 > {"id":5,"price":40,"timestamp":1584942626828,"type":"math"} > {"id":2,"price":70,"timestamp":1584942629638,"type":"math"} > {"id":2,"price":70,"timestamp":1584942634951,"type":"math"} > > .... > 其中timestamp字段是13位时间戳,对应的SQL表中应该怎么处理成时间格式呢? > - name: bookpojo > type: source-table > connector: > property-version: 1 > type: kafka > version: "universal" > topic: pojosource > startup-mode: earliest-offset > properties: > > zookeeper.connect: localhost:2181 > > bootstrap.servers: localhost:9092 > group.id: testGroup > format: > property-version: 1 > type: json > > schema: "ROW<id INT, type STRING, price INT, timestamp TIMESTAMP>" > schema: > - name: id > data-type: INT > - name: type > data-type: STRING > - name: price > data-type: INT > - name: timestamp > > data-type: TIMESTAMP(3) > > > > > 上述配置,好像有问题。 > > > 我在官网中找到这样一句说明: > 字符串和时间类型:不修剪值。文字"null"也可以理解。时间类型必须根据Java > SQL时间格式进行格式化,并以毫秒为单位。例如: 2018-01-01日期,20:43:59时间和2018-01-01 > 20:43:59.999时间戳。 > 时间一定得是字符串类型且带毫秒吗? > > > 谢谢。 |
Administrator
|
I created to track this issue:
https://issues.apache.org/jira/browse/FLINK-16725 Best, Jark On Mon, 23 Mar 2020 at 18:23, Zhenghua Gao <[hidden email]> wrote: > 你定义的Kafka source使用JsonRowDeserializationSchema 解析json字符串并将其转换为Flink types > [1]。 > 目前JsonRowDeserializationSchema 仅支持 RFC 3339兼容的时间字符串 [2]。 > > [1] > > https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java#L446 > [2] > > https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/TimeFormats.java#L38 > > *Best Regards,* > *Zhenghua Gao* > > > On Mon, Mar 23, 2020 at 4:27 PM 吴志勇 <[hidden email]> wrote: > > > 如题: > > 我向kafka中输出了json格式的数据 > > {"id":5,"price":40,"timestamp":1584942626828,"type":"math"} > > {"id":2,"price":70,"timestamp":1584942629638,"type":"math"} > > {"id":2,"price":70,"timestamp":1584942634951,"type":"math"} > > > > .... > > 其中timestamp字段是13位时间戳,对应的SQL表中应该怎么处理成时间格式呢? > > - name: bookpojo > > type: source-table > > connector: > > property-version: 1 > > type: kafka > > version: "universal" > > topic: pojosource > > startup-mode: earliest-offset > > properties: > > > > > zookeeper.connect: localhost:2181 > > > > > bootstrap.servers: localhost:9092 > > group.id: testGroup > > format: > > property-version: 1 > > type: json > > > > > schema: "ROW<id INT, type STRING, price INT, timestamp TIMESTAMP>" > > schema: > > - name: id > > data-type: INT > > - name: type > > data-type: STRING > > - name: price > > data-type: INT > > - name: timestamp > > > > > data-type: TIMESTAMP(3) > > > > > > > > > > 上述配置,好像有问题。 > > > > > > 我在官网中找到这样一句说明: > > 字符串和时间类型:不修剪值。文字"null"也可以理解。时间类型必须根据Java > > SQL时间格式进行格式化,并以毫秒为单位。例如: 2018-01-01日期,20:43:59时间和2018-01-01 > > 20:43:59.999时间戳。 > > 时间一定得是字符串类型且带毫秒吗? > > > > > > 谢谢。 > |
In reply to this post by 吴志勇
Hi,吴志勇
你的SQL表定义应该没问题,出问题的地方 现在flink的 json format 遵循 RFC3399标准[1],其识别的timestamp的格式是:'yyyy-MM-dd'T'HH:mm:ss.SSS’Z', 暂不支持long解析为 timestamp,你可以在输出到kafka时将timestamp转换成该格式: DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); Date date = new Date(System.currentTimeMillis()); String jsonSchemaDate = dateFormat.format(date); //{"id":5,"price":40,"timestamp”:”2020-03-24T18:23:38.123Z","type":"math"} 这个问题社区已经有issue [2]在跟进了,该issue会支持long转timestamp。 另外,如果是中文提问请发user-zh邮件列表,user邮件列表请用英语哈^_^ Best, Leonard [1]https://json-schema.org/understanding-json-schema/reference/string.html#dates-and-times <https://json-schema.org/understanding-json-schema/reference/string.html#dates-and-times> [2]https://issues.apache.org/jira/browse/FLINK-16725 <https://issues.apache.org/jira/browse/FLINK-16725> > 在 2020年3月23日,15:35,吴志勇 <[hidden email]> 写道: > > 如题: > 我向kafka中输出了json格式的数据 > {"id":5,"price":40,"timestamp":1584942626828,"type":"math"} > {"id":2,"price":70,"timestamp":1584942629638,"type":"math"} > {"id":2,"price":70,"timestamp":1584942634951,"type":"math"} > .... > 其中timestamp字段是13位时间戳,对应的SQL表中应该怎么处理成时间格式呢? > - name: bookpojo > type: source-table > connector: > property-version: 1 > type: kafka > version: "universal" > topic: pojosource > startup-mode: earliest-offset > properties: > zookeeper.connect: localhost:2181 > bootstrap.servers: localhost:9092 > group.id: testGroup > format: > property-version: 1 > type: json > schema: "ROW<id INT, type STRING, price INT, timestamp TIMESTAMP>" > schema: > - name: id > data-type: INT > - name: type > data-type: STRING > - name: price > data-type: INT > - name: timestamp > data-type: TIMESTAMP(3) > > 上述配置,好像有问题。 > > 我在官网中找到这样一句说明: > 字符串和时间类型:时间类型必须根据Java SQL时间格式进行格式化,并以毫秒为单位。例如: 2018-01-01日期,20:43:59时间和2018-01-01 20:43:59.999时间戳。 > 时间一定得是字符串类型且带毫秒吗? > > 谢谢。 > |
Free forum by Nabble | Edit this page |