Flink DataTypes json parse exception

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink DataTypes json parse exception

sunfulin
Hi, guys
When upgrading to Flink 1.10 rc0, I am confused by the new DataTypes schema defination.
I am reading and consuming records from kafka with json schema like   {"user_id":1866071598998,"recv_time":1547011832729}. and the code snippet is :



.withSchema(
    new Schema()
            // eventTime
            .field("rowtime", DataTypes.TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class)).rowtime(
                new Rowtime()
                    .timestampsFromField("recv_time")
                    .watermarksPeriodicBounded(1000)
            )
            .field("user_id", DataTypes.STRING())






But, I am running an issue and got exception like the following:


Caused by: java.time.format.DateTimeParseException: Text '1549705104542' could not be parsed at index 0
at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)

Reply | Threaded
Open this post in threaded view
|

Re: Flink DataTypes json parse exception

Timo Walther
Hi,

from which Flink version are you upgrading? There were some changes in
1.9 for how to parse timestamps in JSON format.

Your error might be related to those changes:

https://issues.apache.org/jira/browse/FLINK-11727

I hope this helps.

Timo


On 07.02.20 07:57, sunfulin wrote:

> Hi, guys
> When upgrading to Flink 1.10 rc0, I am confused by the new DataTypes schema defination.
> I am reading and consuming records from kafka with json schema like   {"user_id":1866071598998,"recv_time":1547011832729}. and the code snippet is :
>
>
>
> .withSchema(
>      new Schema()
>              // eventTime
>              .field("rowtime", DataTypes.TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class)).rowtime(
>                  new Rowtime()
>                      .timestampsFromField("recv_time")
>                      .watermarksPeriodicBounded(1000)
>              )
>              .field("user_id", DataTypes.STRING())
>
>
>
>
>
>
> But, I am running an issue and got exception like the following:
>
>
> Caused by: java.time.format.DateTimeParseException: Text '1549705104542' could not be parsed at index 0
> at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
> at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
> at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366)
> at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
> at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
> at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
> at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
> at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
>

Reply | Threaded
Open this post in threaded view
|

Re:Re: Flink DataTypes json parse exception

sunfulin
Hi,
I am using the latest Flink 1.10 rc. When I run the same code using Flink 1.8.2, there is no problem. But using 1.10 the issue just occur.
Confused by the related reason.











At 2020-02-11 18:33:50, "Timo Walther" <[hidden email]> wrote:

>Hi,
>
>from which Flink version are you upgrading? There were some changes in
>1.9 for how to parse timestamps in JSON format.
>
>Your error might be related to those changes:
>
>https://issues.apache.org/jira/browse/FLINK-11727
>
>I hope this helps.
>
>Timo
>
>
>On 07.02.20 07:57, sunfulin wrote:
>> Hi, guys
>> When upgrading to Flink 1.10 rc0, I am confused by the new DataTypes schema defination.
>> I am reading and consuming records from kafka with json schema like   {"user_id":1866071598998,"recv_time":1547011832729}. and the code snippet is :
>>
>>
>>
>> .withSchema(
>>      new Schema()
>>              // eventTime
>>              .field("rowtime", DataTypes.TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class)).rowtime(
>>                  new Rowtime()
>>                      .timestampsFromField("recv_time")
>>                      .watermarksPeriodicBounded(1000)
>>              )
>>              .field("user_id", DataTypes.STRING())
>>
>>
>>
>>
>>
>>
>> But, I am running an issue and got exception like the following:
>>
>>
>> Caused by: java.time.format.DateTimeParseException: Text '1549705104542' could not be parsed at index 0
>> at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
>> at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
>> at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366)
>> at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>> at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
>> at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
>> at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>> at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
>>