使用kafka connectorDescriptor , 从kafka读取json格式数据, 生成Table
``` ... schema.field("_proctime", Types.SQL_TIMESTAMP()).proctime() schema .field("_rowtime", Types.SQL_TIMESTAMP()) .rowtime( new Rowtime() .timestampsFromField("eventTime") .watermarksPeriodicBounded(1000) ) ``` 问题1. 生成的 _proctime 处理时间字段, 结果显示的时区是UTC, 怎么调整成 +8时区. 问题2. eventTime 事件时间字段怎么支持Long类型. 我输入到kafka记录为 {"eventTime": 100000, "id":1,"name":"hb"} 会提示 eventTime 字段类型问题 |
Hi:
1.是的,目前只能是UTC,如果你有计算要求,你可以考虑改变的业务的窗口时间。 2.支持long的,你输入是不是int才会报错的,具体报错的信息? Best, Jingsong Lee ------------------------------------------------------------------ From:hb <[hidden email]> Send Time:2019年9月3日(星期二) 10:44 To:user-zh <[hidden email]> Subject:Flink SQL 时间问题 使用kafka connectorDescriptor , 从kafka读取json格式数据, 生成Table ``` ... schema.field("_proctime", Types.SQL_TIMESTAMP()).proctime() schema .field("_rowtime", Types.SQL_TIMESTAMP()) .rowtime( new Rowtime() .timestampsFromField("eventTime") .watermarksPeriodicBounded(1000) ) ``` 问题1. 生成的 _proctime 处理时间字段, 结果显示的时区是UTC, 怎么调整成 +8时区. 问题2. eventTime 事件时间字段怎么支持Long类型. 我输入到kafka记录为 {"eventTime": 100000, "id":1,"name":"hb"} 会提示 eventTime 字段类型问题 |
Hi:
时间你可以转成Long,关于UTC,你说要生成Table,这样的话如果用的是SQL,可以采用UDF进行转换 | | Jimmy | | [hidden email] | 签名由网易邮箱大师定制 在2019年09月3日 21:25,JingsongLee<[hidden email]> 写道: Hi: 1.是的,目前只能是UTC,如果你有计算要求,你可以考虑改变的业务的窗口时间。 2.支持long的,你输入是不是int才会报错的,具体报错的信息? Best, Jingsong Lee ------------------------------------------------------------------ From:hb <[hidden email]> Send Time:2019年9月3日(星期二) 10:44 To:user-zh <[hidden email]> Subject:Flink SQL 时间问题 使用kafka connectorDescriptor , 从kafka读取json格式数据, 生成Table ``` ... schema.field("_proctime", Types.SQL_TIMESTAMP()).proctime() schema .field("_rowtime", Types.SQL_TIMESTAMP()) .rowtime( new Rowtime() .timestampsFromField("eventTime") .watermarksPeriodicBounded(1000) ) ``` 问题1. 生成的 _proctime 处理时间字段, 结果显示的时区是UTC, 怎么调整成 +8时区. 问题2. eventTime 事件时间字段怎么支持Long类型. 我输入到kafka记录为 {"eventTime": 100000, "id":1,"name":"hb"} 会提示 eventTime 字段类型问题 |
kafka输入: {"eventTime": 100000, "id":1,"name":"hb"}
错误提示: ``` Caused by: java.lang.Exception: java.io.IOException: Failed to deserialize JSON object. at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132) at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Failed to deserialize JSON object. at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:129) at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:72) at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:146) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202) Caused by: java.time.format.DateTimeParseException: Text '100000' could not be parsed at index 0 at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949) at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$createTimestampConverter$1dee6515$1(JsonRowDeserializationSchema.java:334) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232) at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:403) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:382) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232) at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:127) ... 7 more ``` kafka输入: {"eventTime": "2019-09-02T09:56:16.484Z", "id":1,"name":"hb"} 结果显示正确: 4> (true,1,hb,2019-09-04T06:45:46.846,2019-09-02T09:56:16.484) 感到奇怪. ``` val env = StreamExecutionEnvironment.getExecutionEnvironment val conf = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val tEnv = StreamTableEnvironment.create(env, conf) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val kafkaIn = new Kafka() .version("0.11") .topic("hbtest111") .property("bootstrap.servers", "192.168.1.160:19092") .property("group.id", "test2") val json = new Json().deriveSchema() val schema = new Schema() .field("id", Types.INT()) .field("name", Types.STRING()) schema.field("_proctime", Types.SQL_TIMESTAMP()).proctime() schema .field("_rowtime", Types.SQL_TIMESTAMP()) .rowtime( new Rowtime() .timestampsFromField("eventTime") .watermarksPeriodicBounded(1000) ) tEnv.connect(kafkaIn).withFormat(json).withSchema(schema).inAppendMode().registerTableSource("table_from_kafka") val t = tEnv.sqlQuery("select * from table_from_kafka") t.printSchema() t.toRetractStream[Row].print() tEnv.execute("") ``` 在 2019-09-03 21:52:54,"Jimmy Wong" <[hidden email]> 写道: >Hi: >时间你可以转成Long,关于UTC,你说要生成Table,这样的话如果用的是SQL,可以采用UDF进行转换 > > >| | >Jimmy >| >| >[hidden email] >| >签名由网易邮箱大师定制 > > >在2019年09月3日 21:25,JingsongLee<[hidden email]> 写道: >Hi: >1.是的,目前只能是UTC,如果你有计算要求,你可以考虑改变的业务的窗口时间。 >2.支持long的,你输入是不是int才会报错的,具体报错的信息? > >Best, >Jingsong Lee > > >------------------------------------------------------------------ >From:hb <[hidden email]> >Send Time:2019年9月3日(星期二) 10:44 >To:user-zh <[hidden email]> >Subject:Flink SQL 时间问题 > >使用kafka connectorDescriptor , 从kafka读取json格式数据, 生成Table > > >``` >... > > >schema.field("_proctime", Types.SQL_TIMESTAMP()).proctime() > > >schema >.field("_rowtime", Types.SQL_TIMESTAMP()) >.rowtime( >new Rowtime() >.timestampsFromField("eventTime") >.watermarksPeriodicBounded(1000) >) >``` > > >问题1. 生成的 _proctime 处理时间字段, 结果显示的时区是UTC, 怎么调整成 +8时区. >问题2. eventTime 事件时间字段怎么支持Long类型. > > >我输入到kafka记录为 {"eventTime": 100000, "id":1,"name":"hb"} 会提示 eventTime 字段类型问题 |
Free forum by Nabble | Edit this page |