Hi, guys
When upgrading to Flink 1.10 rc0, I am confused by the new DataTypes schema defination. I am reading and consuming records from kafka with json schema like {"user_id":1866071598998,"recv_time":1547011832729}. and the code snippet is : .withSchema( new Schema() // eventTime .field("rowtime", DataTypes.TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class)).rowtime( new Rowtime() .timestampsFromField("recv_time") .watermarksPeriodicBounded(1000) ) .field("user_id", DataTypes.STRING()) But, I am running an issue and got exception like the following: Caused by: java.time.format.DateTimeParseException: Text '1549705104542' could not be parsed at index 0 at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949) at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777) at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131) |
Hi,
from which Flink version are you upgrading? There were some changes in 1.9 for how to parse timestamps in JSON format. Your error might be related to those changes: https://issues.apache.org/jira/browse/FLINK-11727 I hope this helps. Timo On 07.02.20 07:57, sunfulin wrote: > Hi, guys > When upgrading to Flink 1.10 rc0, I am confused by the new DataTypes schema defination. > I am reading and consuming records from kafka with json schema like {"user_id":1866071598998,"recv_time":1547011832729}. and the code snippet is : > > > > .withSchema( > new Schema() > // eventTime > .field("rowtime", DataTypes.TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class)).rowtime( > new Rowtime() > .timestampsFromField("recv_time") > .watermarksPeriodicBounded(1000) > ) > .field("user_id", DataTypes.STRING()) > > > > > > > But, I am running an issue and got exception like the following: > > > Caused by: java.time.format.DateTimeParseException: Text '1549705104542' could not be parsed at index 0 > at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949) > at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777) > at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366) > at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) > at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439) > at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418) > at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) > at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131) > |
Hi,
I am using the latest Flink 1.10 rc. When I run the same code using Flink 1.8.2, there is no problem. But using 1.10 the issue just occur. Confused by the related reason. At 2020-02-11 18:33:50, "Timo Walther" <[hidden email]> wrote: >Hi, > >from which Flink version are you upgrading? There were some changes in >1.9 for how to parse timestamps in JSON format. > >Your error might be related to those changes: > >https://issues.apache.org/jira/browse/FLINK-11727 > >I hope this helps. > >Timo > > >On 07.02.20 07:57, sunfulin wrote: >> Hi, guys >> When upgrading to Flink 1.10 rc0, I am confused by the new DataTypes schema defination. >> I am reading and consuming records from kafka with json schema like {"user_id":1866071598998,"recv_time":1547011832729}. and the code snippet is : >> >> >> >> .withSchema( >> new Schema() >> // eventTime >> .field("rowtime", DataTypes.TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class)).rowtime( >> new Rowtime() >> .timestampsFromField("recv_time") >> .watermarksPeriodicBounded(1000) >> ) >> .field("user_id", DataTypes.STRING()) >> >> >> >> >> >> >> But, I am running an issue and got exception like the following: >> >> >> Caused by: java.time.format.DateTimeParseException: Text '1549705104542' could not be parsed at index 0 >> at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949) >> at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777) >> at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366) >> at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) >> at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439) >> at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418) >> at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) >> at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131) >> |
Free forum by Nabble | Edit this page |