代码里定义了kafka connectorDescriptor , 从kafka读取json格式数据, 生成Table
schema .field("_rowtime", Types.SQL_TIMESTAMP()) .rowtime( new Rowtime() .timestampsFromField("eventTime") .watermarksPeriodicBounded(1000)) kafka输入: {"eventTime": 100000, "id":1,"name":"hb"} 会报错, 输入 {"eventTime": "2019-09-02T09:56:16.484Z", "id":1,"name":"hb"} 结果显示正确, eventTime 字段怎么不支持数值输入呢. 错误提示: ``` Caused by: java.lang.Exception: java.io.IOException: Failed to deserialize JSON object. at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132) at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Failed to deserialize JSON object. at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:129) at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:72) at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:146) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202) Caused by: java.time.format.DateTimeParseException: Text '100000' could not be parsed at index 0 at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949) at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$createTimestampConverter$1dee6515$1(JsonRowDeserializationSchema.java:334) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232) at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:403) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:382) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232) at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:127) ... 7 more ``` 源码: ``` val env = StreamExecutionEnvironment.getExecutionEnvironment val conf = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val tEnv = StreamTableEnvironment.create(env, conf) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val kafkaIn = new Kafka() .version("0.11") .topic("hbtest111") .property("bootstrap.servers", "192.168.1.160:19092") .property("group.id", "test2") val json = new Json().deriveSchema() val schema = new Schema() .field("id", Types.INT()) .field("name", Types.STRING()) schema.field("_proctime", Types.SQL_TIMESTAMP()).proctime() schema .field("_rowtime", Types.SQL_TIMESTAMP()) .rowtime( new Rowtime() .timestampsFromField("eventTime") .watermarksPeriodicBounded(1000) ) tEnv.connect(kafkaIn).withFormat(json).withSchema(schema).inAppendMode().registerTableSource("table_from_kafka") val t = tEnv.sqlQuery("select * from table_from_kafka") t.printSchema() t.toRetractStream[Row].print() tEnv.execute("") ``` |
FLINK 应该不能把输入的eventTime的long类型转成SQL_TIMESTAMP类型
发件人: hb 发送时间: 2019-09-05 14:24 收件人: user-zh 主题: Flink 1.9 Blink planner 时间字段问题 代码里定义了kafka connectorDescriptor , 从kafka读取json格式数据, 生成Table schema .field("_rowtime", Types.SQL_TIMESTAMP()) .rowtime( new Rowtime() .timestampsFromField("eventTime") .watermarksPeriodicBounded(1000)) kafka输入: {"eventTime": 100000, "id":1,"name":"hb"} 会报错, 输入 {"eventTime": "2019-09-02T09:56:16.484Z", "id":1,"name":"hb"} 结果显示正确, eventTime 字段怎么不支持数值输入呢. 错误提示: ``` Caused by: java.lang.Exception: java.io.IOException: Failed to deserialize JSON object. at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132) at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Failed to deserialize JSON object. at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:129) at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:72) at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:146) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202) Caused by: java.time.format.DateTimeParseException: Text '100000' could not be parsed at index 0 at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949) at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$createTimestampConverter$1dee6515$1(JsonRowDeserializationSchema.java:334) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232) at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:403) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:382) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232) at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:127) ... 7 more ``` 源码: ``` val env = StreamExecutionEnvironment.getExecutionEnvironment val conf = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val tEnv = StreamTableEnvironment.create(env, conf) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val kafkaIn = new Kafka() .version("0.11") .topic("hbtest111") .property("bootstrap.servers", "192.168.1.160:19092") .property("group.id", "test2") val json = new Json().deriveSchema() val schema = new Schema() .field("id", Types.INT()) .field("name", Types.STRING()) schema.field("_proctime", Types.SQL_TIMESTAMP()).proctime() schema .field("_rowtime", Types.SQL_TIMESTAMP()) .rowtime( new Rowtime() .timestampsFromField("eventTime") .watermarksPeriodicBounded(1000) ) tEnv.connect(kafkaIn).withFormat(json).withSchema(schema).inAppendMode().registerTableSource("table_from_kafka") val t = tEnv.sqlQuery("select * from table_from_kafka") t.printSchema() t.toRetractStream[Row].print() tEnv.execute("") ``` |
实际应用中, 时间字段最常用的就是Long类型的毫秒时间戳, 难道这个不支持么.
在 2019-09-05 14:06:08,"[hidden email]" <[hidden email]> 写道: >FLINK 应该不能把输入的eventTime的long类型转成SQL_TIMESTAMP类型 > > >发件人: hb >发送时间: 2019-09-05 14:24 >收件人: user-zh >主题: Flink 1.9 Blink planner 时间字段问题 >代码里定义了kafka connectorDescriptor , 从kafka读取json格式数据, 生成Table >schema >.field("_rowtime", Types.SQL_TIMESTAMP()) >.rowtime( >new Rowtime() >.timestampsFromField("eventTime") >.watermarksPeriodicBounded(1000)) >kafka输入: {"eventTime": 100000, "id":1,"name":"hb"} 会报错, > >输入 {"eventTime": "2019-09-02T09:56:16.484Z", "id":1,"name":"hb"} 结果显示正确, >eventTime 字段怎么不支持数值输入呢. > > >错误提示: >``` >Caused by: java.lang.Exception: java.io.IOException: Failed to deserialize JSON object. >at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212) >at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132) >at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298) >at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403) >at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) >at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) >at java.lang.Thread.run(Thread.java:748) >Caused by: java.io.IOException: Failed to deserialize JSON object. >at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:129) >at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:72) >at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45) >at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:146) >at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) >at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) >at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) >at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202) >Caused by: java.time.format.DateTimeParseException: Text '100000' could not be parsed at index 0 >at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949) >at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777) >at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$createTimestampConverter$1dee6515$1(JsonRowDeserializationSchema.java:334) >at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232) >at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:403) >at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:382) >at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232) >at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:127) >... 7 more >``` > > > > >源码: >``` > val env = StreamExecutionEnvironment.getExecutionEnvironment > val conf = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > val tEnv = StreamTableEnvironment.create(env, conf) > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > > > val kafkaIn = new Kafka() > .version("0.11") > .topic("hbtest111") > .property("bootstrap.servers", "192.168.1.160:19092") > .property("group.id", "test2") > > > val json = new Json().deriveSchema() > > > val schema = new Schema() > .field("id", Types.INT()) > .field("name", Types.STRING()) > > > schema.field("_proctime", Types.SQL_TIMESTAMP()).proctime() > schema > .field("_rowtime", Types.SQL_TIMESTAMP()) > .rowtime( > new Rowtime() > .timestampsFromField("eventTime") > .watermarksPeriodicBounded(1000) > ) > > > tEnv.connect(kafkaIn).withFormat(json).withSchema(schema).inAppendMode().registerTableSource("table_from_kafka") > val t = tEnv.sqlQuery("select * from table_from_kafka") > t.printSchema() > > > t.toRetractStream[Row].print() > tEnv.execute("") >``` |
Administrator
|
可能是因为你在 schema 中把 eventTime 声明成了 timestamp 类型,你可以声明成 long 试试。 .field("_rowtime", Types.LONG())
> 在 2019年9月5日,15:11,hb <[hidden email]> 写道: > > 实际应用中, 时间字段最常用的就是Long类型的毫秒时间戳, 难道这个不支持么. |
不行,
Caused by: org.apache.flink.table.api.ValidationException: Rowtime attribute '_rowtime' is not of type SQL_TIMESTAMP. 在 2019-09-06 10:48:02,"Jark Wu" <[hidden email]> 写道: >可能是因为你在 schema 中把 eventTime 声明成了 timestamp 类型,你可以声明成 long 试试。 .field("_rowtime", Types.LONG()) > >> 在 2019年9月5日,15:11,hb <[hidden email]> 写道: >> >> 实际应用中, 时间字段最常用的就是Long类型的毫秒时间戳, 难道这个不支持么. > |
Free forum by Nabble | Edit this page |