Hi all,
根据文档https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#start-reading-position, 使用新参数创建kafka_table,下游消费不到数据,使用老参数下游可以消费到数据,是不是新参数的方式有坑啊 老参数: streamTableEnv.executeSql( """ | |CREATE TABLE kafka_table ( | uid BIGINT, | sex VARCHAR, | age INT, | created_time TIMESTAMP(3), | WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND |) WITH ( | | 'connector.type' = 'kafka', | 'connector.version' = 'universal', | 'connector.topic' = 'user', | 'connector.startup-mode' = 'latest-offset', | 'connector.properties.zookeeper.connect' = 'cdh1:2181,cdh2:2181,cdh3:2181', | 'connector.properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092', | 'connector.properties.group.id' = 'user_flink', | 'format.type' = 'json', | 'format.derive-schema' = 'true' | |) |""".stripMargin) 新参数: streamTableEnv.executeSql( """ | |CREATE TABLE kafka_table ( | | uid BIGINT, | sex VARCHAR, | age INT, | created_time TIMESTAMP(3), | WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND |) WITH ( | 'connector' = 'kafka', | 'topic' = 'user', | 'properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092', | 'properties.group.id' = 'user_flink', | 'scan.startup.mode' = 'latest-offset', | 'format' = 'json', | 'json.fail-on-missing-field' = 'false', | 'json.ignore-parse-errors' = 'true' |) |""".stripMargin) |
Hi
你说的下游消费不到数据,这个下游是指当前作业消费不到数据吗? 正常应该不会的,可以提供个可复现代码吗? 祝好 Leonard Xu > 在 2020年7月23日,18:13,Zhou Zach <[hidden email]> 写道: > > Hi all, > > 根据文档https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#start-reading-position, > 使用新参数创建kafka_table,下游消费不到数据,使用老参数下游可以消费到数据,是不是新参数的方式有坑啊 > > > 老参数: > streamTableEnv.executeSql( > """ > | > |CREATE TABLE kafka_table ( > | uid BIGINT, > | sex VARCHAR, > | age INT, > | created_time TIMESTAMP(3), > | WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND > |) WITH ( > | > | 'connector.type' = 'kafka', > | 'connector.version' = 'universal', > | 'connector.topic' = 'user', > | 'connector.startup-mode' = 'latest-offset', > | 'connector.properties.zookeeper.connect' = 'cdh1:2181,cdh2:2181,cdh3:2181', > | 'connector.properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092', > | 'connector.properties.group.id' = 'user_flink', > | 'format.type' = 'json', > | 'format.derive-schema' = 'true' > | > |) > |""".stripMargin) > > 新参数: > > streamTableEnv.executeSql( > """ > | > |CREATE TABLE kafka_table ( > | > | uid BIGINT, > | sex VARCHAR, > | age INT, > | created_time TIMESTAMP(3), > | WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND > |) WITH ( > | 'connector' = 'kafka', > | 'topic' = 'user', > | 'properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092', > | 'properties.group.id' = 'user_flink', > | 'scan.startup.mode' = 'latest-offset', > | 'format' = 'json', > | 'json.fail-on-missing-field' = 'false', > | 'json.ignore-parse-errors' = 'true' > |) > |""".stripMargin) |
当前作业有个sink connector消费不到数据,我找到原因了,根本原因是kafka中时间字段的问题,只是with子句新旧参数对相同的字段数据表现了不同的行为,kafka中的消息格式:
{"uid":46,"sex":"female","age":11,"created_time":"2020-07-23T19:53:15.509Z"} 奇怪的是,在kafka_table DDL中,created_time 定义为TIMESTAMP(3),with使用老参数是可以成功运行的,with使用新参数,在IDEA中运行没有任何异常,提交到yarn上,会报异常: java.lang.RuntimeException: RowTime field should not be null, please convert it to a non-nulllong value. at org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:115) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] 在本地用如下函数测试,结果确实是NULL TO_TIMESTAMP('2020-07-23T19:53:15.509Z') kafka producuer将created_time字段设置为整型,或者 “2020-07-23 20:36:55.565”,with使用新参数是没有问题的。调了一下午,调到怀疑人生,还好发现问题 在 2020-07-23 20:10:43,"Leonard Xu" <[hidden email]> 写道: >Hi > >你说的下游消费不到数据,这个下游是指当前作业消费不到数据吗? > >正常应该不会的,可以提供个可复现代码吗? > >祝好 >Leonard Xu > > >> 在 2020年7月23日,18:13,Zhou Zach <[hidden email]> 写道: >> >> Hi all, >> >> 根据文档https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#start-reading-position, >> 使用新参数创建kafka_table,下游消费不到数据,使用老参数下游可以消费到数据,是不是新参数的方式有坑啊 >> >> >> 老参数: >> streamTableEnv.executeSql( >> """ >> | >> |CREATE TABLE kafka_table ( >> | uid BIGINT, >> | sex VARCHAR, >> | age INT, >> | created_time TIMESTAMP(3), >> | WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND >> |) WITH ( >> | >> | 'connector.type' = 'kafka', >> | 'connector.version' = 'universal', >> | 'connector.topic' = 'user', >> | 'connector.startup-mode' = 'latest-offset', >> | 'connector.properties.zookeeper.connect' = 'cdh1:2181,cdh2:2181,cdh3:2181', >> | 'connector.properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092', >> | 'connector.properties.group.id' = 'user_flink', >> | 'format.type' = 'json', >> | 'format.derive-schema' = 'true' >> | >> |) >> |""".stripMargin) >> >> 新参数: >> >> streamTableEnv.executeSql( >> """ >> | >> |CREATE TABLE kafka_table ( >> | >> | uid BIGINT, >> | sex VARCHAR, >> | age INT, >> | created_time TIMESTAMP(3), >> | WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND >> |) WITH ( >> | 'connector' = 'kafka', >> | 'topic' = 'user', >> | 'properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092', >> | 'properties.group.id' = 'user_flink', >> | 'scan.startup.mode' = 'latest-offset', >> | 'format' = 'json', >> | 'json.fail-on-missing-field' = 'false', >> | 'json.ignore-parse-errors' = 'true' >> |) >> |""".stripMargin) |
Hi
这是1.11里的一个 json format t的不兼容改动[1],目的是支持更多的 timestamp format 的解析,你可以把json-timestamp-format-standard <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#json-timestamp-format-standard>设置成 “ISO-8601”,应该就不用改动了。 Best Leonard Xu [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#json-timestamp-format-standard <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#json-timestamp-format-standard> > 在 2020年7月23日,20:54,Zhou Zach <[hidden email]> 写道: > > 当前作业有个sink connector消费不到数据,我找到原因了,根本原因是kafka中时间字段的问题,只是with子句新旧参数对相同的字段数据表现了不同的行为,kafka中的消息格式: > > > {"uid":46,"sex":"female","age":11,"created_time":"2020-07-23T19:53:15.509Z"} > 奇怪的是,在kafka_table DDL中,created_time 定义为TIMESTAMP(3),with使用老参数是可以成功运行的,with使用新参数,在IDEA中运行没有任何异常,提交到yarn上,会报异常: > java.lang.RuntimeException: RowTime field should not be null, please convert it to a non-nulllong value. > at org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:115) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > > > 在本地用如下函数测试,结果确实是NULL > TO_TIMESTAMP('2020-07-23T19:53:15.509Z') > kafka producuer将created_time字段设置为整型,或者 “2020-07-23 20:36:55.565”,with使用新参数是没有问题的。调了一下午,调到怀疑人生,还好发现问题 > > > > > > > > > > > > 在 2020-07-23 20:10:43,"Leonard Xu" <[hidden email]> 写道: >> Hi >> >> 你说的下游消费不到数据,这个下游是指当前作业消费不到数据吗? >> >> 正常应该不会的,可以提供个可复现代码吗? >> >> 祝好 >> Leonard Xu >> >> >>> 在 2020年7月23日,18:13,Zhou Zach <[hidden email]> 写道: >>> >>> Hi all, >>> >>> 根据文档https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#start-reading-position, >>> 使用新参数创建kafka_table,下游消费不到数据,使用老参数下游可以消费到数据,是不是新参数的方式有坑啊 >>> >>> >>> 老参数: >>> streamTableEnv.executeSql( >>> """ >>> | >>> |CREATE TABLE kafka_table ( >>> | uid BIGINT, >>> | sex VARCHAR, >>> | age INT, >>> | created_time TIMESTAMP(3), >>> | WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND >>> |) WITH ( >>> | >>> | 'connector.type' = 'kafka', >>> | 'connector.version' = 'universal', >>> | 'connector.topic' = 'user', >>> | 'connector.startup-mode' = 'latest-offset', >>> | 'connector.properties.zookeeper.connect' = 'cdh1:2181,cdh2:2181,cdh3:2181', >>> | 'connector.properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092', >>> | 'connector.properties.group.id' = 'user_flink', >>> | 'format.type' = 'json', >>> | 'format.derive-schema' = 'true' >>> | >>> |) >>> |""".stripMargin) >>> >>> 新参数: >>> >>> streamTableEnv.executeSql( >>> """ >>> | >>> |CREATE TABLE kafka_table ( >>> | >>> | uid BIGINT, >>> | sex VARCHAR, >>> | age INT, >>> | created_time TIMESTAMP(3), >>> | WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND >>> |) WITH ( >>> | 'connector' = 'kafka', >>> | 'topic' = 'user', >>> | 'properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092', >>> | 'properties.group.id' = 'user_flink', >>> | 'scan.startup.mode' = 'latest-offset', >>> | 'format' = 'json', >>> | 'json.fail-on-missing-field' = 'false', >>> | 'json.ignore-parse-errors' = 'true' >>> |) >>> |""".stripMargin) |
Hi,
按照提示修改了,还是报错的: Query: val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) streamExecutionEnv.setStateBackend(new RocksDBStateBackend("hdfs://nameservice1/flink/checkpoints")) val blinkEnvSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv, blinkEnvSettings) streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,CheckpointingMode.EXACTLY_ONCE) streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,Duration.ofSeconds(20)) streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT,Duration.ofSeconds(900)) streamTableEnv.executeSql( """ | |CREATE TABLE kafka_table ( | uid BIGINT, | sex VARCHAR, | age INT, | created_time TIMESTAMP(3), | procTime AS PROCTIME(), | WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND |) WITH ( | 'connector' = 'kafka', | 'topic' = 'user', | 'properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092', | 'properties.group.id' = 'user_flink', | 'scan.startup.mode' = 'latest-offset', | 'format' = 'json', | 'json.fail-on-missing-field' = 'false', | 'json.ignore-parse-errors' = 'true', | 'json.timestamp-format.standard' = 'ISO-8601' |) |""".stripMargin) streamTableEnv.executeSql( """ | |CREATE TABLE print_table |( | uid BIGINT, | sex VARCHAR, | age INT, | created_time TIMESTAMP(3) |) |WITH ('connector' = 'print') | | |""".stripMargin) streamTableEnv.executeSql( """ |insert into print_table |SELECT | uid,sex,age,created_time |FROM kafka_table | |""".stripMargin) 堆栈: 2020-07-2410:33:32,852INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser [] - Kafka startTimeMs: 1595558012852 2020-07-2410:33:32,853INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer [] - [Consumer clientId=consumer-user_flink-12, groupId=user_flink] Subscribed to partition(s): user-0 2020-07-2410:33:32,853INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer [] - [Consumer clientId=consumer-user_flink-12, groupId=user_flink] Seeking to offset 36627for partition user-0 2020-07-2410:33:32,860INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.Metadata [] - [Consumer clientId=consumer-user_flink-12, groupId=user_flink] ClusterID: cAT_xBISQNWghT9kR5UuIw 2020-07-2410:33:32,871WARN org.apache.flink.runtime.taskmanager.Task [] - Source: TableSourceScan(table=[[default_catalog, default_database, kafka_table]], fields=[uid, sex, age, created_time]) -> Calc(select=[uid, sex, age, created_time, () AS procTime]) -> WatermarkAssigner(rowtime=[created_time], watermark=[(created_time - 3000:INTERVALSECOND)]) -> Calc(select=[uid, sex, age, created_time]) -> Sink: Sink(table=[default_catalog.default_database.print_table], fields=[uid, sex, age, created_time]) (2/4) (6b585139c083982beb6997e1ae2041ed) switched fromRUNNING to FAILED. java.lang.RuntimeException: RowTime field should not be null, please convert it to a non-nulllong value. at org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:115) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] 在 2020-07-23 21:23:28,"Leonard Xu" <[hidden email]> 写道: >Hi > >这是1.11里的一个 json format t的不兼容改动[1],目的是支持更多的 timestamp format 的解析,你可以把json-timestamp-format-standard <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#json-timestamp-format-standard>设置成 “ISO-8601”,应该就不用改动了。 > > >Best >Leonard Xu >[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#json-timestamp-format-standard <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#json-timestamp-format-standard> > >> 在 2020年7月23日,20:54,Zhou Zach <[hidden email]> 写道: >> >> 当前作业有个sink connector消费不到数据,我找到原因了,根本原因是kafka中时间字段的问题,只是with子句新旧参数对相同的字段数据表现了不同的行为,kafka中的消息格式: >> >> >> {"uid":46,"sex":"female","age":11,"created_time":"2020-07-23T19:53:15.509Z"} >> 奇怪的是,在kafka_table DDL中,created_time 定义为TIMESTAMP(3),with使用老参数是可以成功运行的,with使用新参数,在IDEA中运行没有任何异常,提交到yarn上,会报异常: >> java.lang.RuntimeException: RowTime field should not be null, please convert it to a non-nulllong value. >> at org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:115) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> >> >> 在本地用如下函数测试,结果确实是NULL >> TO_TIMESTAMP('2020-07-23T19:53:15.509Z') >> kafka producuer将created_time字段设置为整型,或者 “2020-07-23 20:36:55.565”,with使用新参数是没有问题的。调了一下午,调到怀疑人生,还好发现问题 >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-07-23 20:10:43,"Leonard Xu" <[hidden email]> 写道: >>> Hi >>> >>> 你说的下游消费不到数据,这个下游是指当前作业消费不到数据吗? >>> >>> 正常应该不会的,可以提供个可复现代码吗? >>> >>> 祝好 >>> Leonard Xu >>> >>> >>>> 在 2020年7月23日,18:13,Zhou Zach <[hidden email]> 写道: >>>> >>>> Hi all, >>>> >>>> 根据文档https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#start-reading-position, >>>> 使用新参数创建kafka_table,下游消费不到数据,使用老参数下游可以消费到数据,是不是新参数的方式有坑啊 >>>> >>>> >>>> 老参数: >>>> streamTableEnv.executeSql( >>>> """ >>>> | >>>> |CREATE TABLE kafka_table ( >>>> | uid BIGINT, >>>> | sex VARCHAR, >>>> | age INT, >>>> | created_time TIMESTAMP(3), >>>> | WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND >>>> |) WITH ( >>>> | >>>> | 'connector.type' = 'kafka', >>>> | 'connector.version' = 'universal', >>>> | 'connector.topic' = 'user', >>>> | 'connector.startup-mode' = 'latest-offset', >>>> | 'connector.properties.zookeeper.connect' = 'cdh1:2181,cdh2:2181,cdh3:2181', >>>> | 'connector.properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092', >>>> | 'connector.properties.group.id' = 'user_flink', >>>> | 'format.type' = 'json', >>>> | 'format.derive-schema' = 'true' >>>> | >>>> |) >>>> |""".stripMargin) >>>> >>>> 新参数: >>>> >>>> streamTableEnv.executeSql( >>>> """ >>>> | >>>> |CREATE TABLE kafka_table ( >>>> | >>>> | uid BIGINT, >>>> | sex VARCHAR, >>>> | age INT, >>>> | created_time TIMESTAMP(3), >>>> | WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND >>>> |) WITH ( >>>> | 'connector' = 'kafka', >>>> | 'topic' = 'user', >>>> | 'properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092', >>>> | 'properties.group.id' = 'user_flink', >>>> | 'scan.startup.mode' = 'latest-offset', >>>> | 'format' = 'json', >>>> | 'json.fail-on-missing-field' = 'false', >>>> | 'json.ignore-parse-errors' = 'true' >>>> |) >>>> |""".stripMargin) > |
Hi
"2020-07-23T19:53:15.509Z” 是 RFC-3339 格式,这个格式是带zone的时间格式,对应的数据类型是 timestamp with local zone,这个应该在1.12里支持了[1] 1.10版本虽然是支持 RFC-3339 格式,但默认解析时区是有问题的,所以在1.11和1.12逐步中纠正了。 在1.11版本中,如果json数据是RFC-3339格式,你可以把这个字段当成string读出来,在计算列中用个UDF自己解析到需要的timestamp。 Best Leonard Xu [1] https://issues.apache.org/jira/browse/FLINK-18296 <https://issues.apache.org/jira/browse/FLINK-18296> > 在 2020年7月24日,10:39,Zhou Zach <[hidden email]> 写道: > > Hi, > > > 按照提示修改了,还是报错的: > > > Query: > > > val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment > streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > streamExecutionEnv.setStateBackend(new RocksDBStateBackend("hdfs://nameservice1/flink/checkpoints")) > > val blinkEnvSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv, blinkEnvSettings) > streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,CheckpointingMode.EXACTLY_ONCE) > streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,Duration.ofSeconds(20)) > streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT,Duration.ofSeconds(900)) > > > streamTableEnv.executeSql( > """ > | > |CREATE TABLE kafka_table ( > | uid BIGINT, > | sex VARCHAR, > | age INT, > | created_time TIMESTAMP(3), > | procTime AS PROCTIME(), > | WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND > |) WITH ( > | 'connector' = 'kafka', > | 'topic' = 'user', > | 'properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092', > | 'properties.group.id' = 'user_flink', > | 'scan.startup.mode' = 'latest-offset', > | 'format' = 'json', > | 'json.fail-on-missing-field' = 'false', > | 'json.ignore-parse-errors' = 'true', > | 'json.timestamp-format.standard' = 'ISO-8601' > |) > |""".stripMargin) > > streamTableEnv.executeSql( > """ > | > |CREATE TABLE print_table > |( > | uid BIGINT, > | sex VARCHAR, > | age INT, > | created_time TIMESTAMP(3) > |) > |WITH ('connector' = 'print') > | > | > |""".stripMargin) > > streamTableEnv.executeSql( > """ > |insert into print_table > |SELECT > | uid,sex,age,created_time > |FROM kafka_table > | > |""".stripMargin) > > > 堆栈: > > > 2020-07-2410:33:32,852INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser [] - Kafka startTimeMs: 1595558012852 > 2020-07-2410:33:32,853INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer [] - [Consumer clientId=consumer-user_flink-12, groupId=user_flink] Subscribed to partition(s): user-0 > 2020-07-2410:33:32,853INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer [] - [Consumer clientId=consumer-user_flink-12, groupId=user_flink] Seeking to offset 36627for partition user-0 > 2020-07-2410:33:32,860INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.Metadata [] - [Consumer clientId=consumer-user_flink-12, groupId=user_flink] ClusterID: cAT_xBISQNWghT9kR5UuIw > 2020-07-2410:33:32,871WARN org.apache.flink.runtime.taskmanager.Task [] - Source: TableSourceScan(table=[[default_catalog, default_database, kafka_table]], fields=[uid, sex, age, created_time]) -> Calc(select=[uid, sex, age, created_time, () AS procTime]) -> WatermarkAssigner(rowtime=[created_time], watermark=[(created_time - 3000:INTERVALSECOND)]) -> Calc(select=[uid, sex, age, created_time]) -> Sink: Sink(table=[default_catalog.default_database.print_table], fields=[uid, sex, age, created_time]) (2/4) (6b585139c083982beb6997e1ae2041ed) switched fromRUNNING to FAILED. > java.lang.RuntimeException: RowTime field should not be null, please convert it to a non-nulllong value. > at org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:115) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > > > > > > > > > > > > > > > > > > 在 2020-07-23 21:23:28,"Leonard Xu" <[hidden email]> 写道: >> Hi >> >> 这是1.11里的一个 json format t的不兼容改动[1],目的是支持更多的 timestamp format 的解析,你可以把json-timestamp-format-standard <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#json-timestamp-format-standard>设置成 “ISO-8601”,应该就不用改动了。 >> >> >> Best >> Leonard Xu >> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#json-timestamp-format-standard <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#json-timestamp-format-standard> >> >>> 在 2020年7月23日,20:54,Zhou Zach <[hidden email]> 写道: >>> >>> 当前作业有个sink connector消费不到数据,我找到原因了,根本原因是kafka中时间字段的问题,只是with子句新旧参数对相同的字段数据表现了不同的行为,kafka中的消息格式: >>> >>> >>> {"uid":46,"sex":"female","age":11,"created_time":"2020-07-23T19:53:15.509Z"} >>> 奇怪的是,在kafka_table DDL中,created_time 定义为TIMESTAMP(3),with使用老参数是可以成功运行的,with使用新参数,在IDEA中运行没有任何异常,提交到yarn上,会报异常: >>> java.lang.RuntimeException: RowTime field should not be null, please convert it to a non-nulllong value. >>> at org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:115) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >>> >>> >>> 在本地用如下函数测试,结果确实是NULL >>> TO_TIMESTAMP('2020-07-23T19:53:15.509Z') >>> kafka producuer将created_time字段设置为整型,或者 “2020-07-23 20:36:55.565”,with使用新参数是没有问题的。调了一下午,调到怀疑人生,还好发现问题 >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> 在 2020-07-23 20:10:43,"Leonard Xu" <[hidden email]> 写道: >>>> Hi >>>> >>>> 你说的下游消费不到数据,这个下游是指当前作业消费不到数据吗? >>>> >>>> 正常应该不会的,可以提供个可复现代码吗? >>>> >>>> 祝好 >>>> Leonard Xu >>>> >>>> >>>>> 在 2020年7月23日,18:13,Zhou Zach <[hidden email]> 写道: >>>>> >>>>> Hi all, >>>>> >>>>> 根据文档https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#start-reading-position, >>>>> 使用新参数创建kafka_table,下游消费不到数据,使用老参数下游可以消费到数据,是不是新参数的方式有坑啊 >>>>> >>>>> >>>>> 老参数: >>>>> streamTableEnv.executeSql( >>>>> """ >>>>> | >>>>> |CREATE TABLE kafka_table ( >>>>> | uid BIGINT, >>>>> | sex VARCHAR, >>>>> | age INT, >>>>> | created_time TIMESTAMP(3), >>>>> | WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND >>>>> |) WITH ( >>>>> | >>>>> | 'connector.type' = 'kafka', >>>>> | 'connector.version' = 'universal', >>>>> | 'connector.topic' = 'user', >>>>> | 'connector.startup-mode' = 'latest-offset', >>>>> | 'connector.properties.zookeeper.connect' = 'cdh1:2181,cdh2:2181,cdh3:2181', >>>>> | 'connector.properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092', >>>>> | 'connector.properties.group.id' = 'user_flink', >>>>> | 'format.type' = 'json', >>>>> | 'format.derive-schema' = 'true' >>>>> | >>>>> |) >>>>> |""".stripMargin) >>>>> >>>>> 新参数: >>>>> >>>>> streamTableEnv.executeSql( >>>>> """ >>>>> | >>>>> |CREATE TABLE kafka_table ( >>>>> | >>>>> | uid BIGINT, >>>>> | sex VARCHAR, >>>>> | age INT, >>>>> | created_time TIMESTAMP(3), >>>>> | WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND >>>>> |) WITH ( >>>>> | 'connector' = 'kafka', >>>>> | 'topic' = 'user', >>>>> | 'properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092', >>>>> | 'properties.group.id' = 'user_flink', >>>>> | 'scan.startup.mode' = 'latest-offset', >>>>> | 'format' = 'json', >>>>> | 'json.fail-on-missing-field' = 'false', >>>>> | 'json.ignore-parse-errors' = 'true' >>>>> |) >>>>> |""".stripMargin) >> |
Hi,
感谢详细答疑! | | Zhou Zach | | 邮箱:[hidden email] | 签名由 网易邮箱大师 定制 在2020年07月24日 11:48,Leonard Xu 写道: Hi "2020-07-23T19:53:15.509Z” 是 RFC-3339 格式,这个格式是带zone的时间格式,对应的数据类型是 timestamp with local zone,这个应该在1.12里支持了[1] 1.10版本虽然是支持 RFC-3339 格式,但默认解析时区是有问题的,所以在1.11和1.12逐步中纠正了。 在1.11版本中,如果json数据是RFC-3339格式,你可以把这个字段当成string读出来,在计算列中用个UDF自己解析到需要的timestamp。 Best Leonard Xu [1] https://issues.apache.org/jira/browse/FLINK-18296 <https://issues.apache.org/jira/browse/FLINK-18296> > 在 2020年7月24日,10:39,Zhou Zach <[hidden email]> 写道: > > Hi, > > > 按照提示修改了,还是报错的: > > > Query: > > > val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment > streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > streamExecutionEnv.setStateBackend(new RocksDBStateBackend("hdfs://nameservice1/flink/checkpoints")) > > val blinkEnvSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv, blinkEnvSettings) > streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,CheckpointingMode.EXACTLY_ONCE) > streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,Duration.ofSeconds(20)) > streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT,Duration.ofSeconds(900)) > > > streamTableEnv.executeSql( > """ > | > |CREATE TABLE kafka_table ( > | uid BIGINT, > | sex VARCHAR, > | age INT, > | created_time TIMESTAMP(3), > | procTime AS PROCTIME(), > | WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND > |) WITH ( > | 'connector' = 'kafka', > | 'topic' = 'user', > | 'properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092', > | 'properties.group.id' = 'user_flink', > | 'scan.startup.mode' = 'latest-offset', > | 'format' = 'json', > | 'json.fail-on-missing-field' = 'false', > | 'json.ignore-parse-errors' = 'true', > | 'json.timestamp-format.standard' = 'ISO-8601' > |) > |""".stripMargin) > > streamTableEnv.executeSql( > """ > | > |CREATE TABLE print_table > |( > | uid BIGINT, > | sex VARCHAR, > | age INT, > | created_time TIMESTAMP(3) > |) > |WITH ('connector' = 'print') > | > | > |""".stripMargin) > > streamTableEnv.executeSql( > """ > |insert into print_table > |SELECT > | uid,sex,age,created_time > |FROM kafka_table > | > |""".stripMargin) > > > 堆栈: > > > 2020-07-2410:33:32,852INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser [] - Kafka startTimeMs: 1595558012852 > 2020-07-2410:33:32,853INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer [] - [Consumer clientId=consumer-user_flink-12, groupId=user_flink] Subscribed to partition(s): user-0 > 2020-07-2410:33:32,853INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer [] - [Consumer clientId=consumer-user_flink-12, groupId=user_flink] Seeking to offset 36627for partition user-0 > 2020-07-2410:33:32,860INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.Metadata [] - [Consumer clientId=consumer-user_flink-12, groupId=user_flink] ClusterID: cAT_xBISQNWghT9kR5UuIw > 2020-07-2410:33:32,871WARN org.apache.flink.runtime.taskmanager.Task [] - Source: TableSourceScan(table=[[default_catalog, default_database, kafka_table]], fields=[uid, sex, age, created_time]) -> Calc(select=[uid, sex, age, created_time, () AS procTime]) -> WatermarkAssigner(rowtime=[created_time], watermark=[(created_time - 3000:INTERVALSECOND)]) -> Calc(select=[uid, sex, age, created_time]) -> Sink: Sink(table=[default_catalog.default_database.print_table], fields=[uid, sex, age, created_time]) (2/4) (6b585139c083982beb6997e1ae2041ed) switched fromRUNNING to FAILED. > java.lang.RuntimeException: RowTime field should not be null, please convert it to a non-nulllong value. > at org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:115) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > > > > > > > > > > > > > > > > > > 在 2020-07-23 21:23:28,"Leonard Xu" <[hidden email]> 写道: >> Hi >> >> 这是1.11里的一个 json format t的不兼容改动[1],目的是支持更多的 timestamp format 的解析,你可以把json-timestamp-format-standard <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#json-timestamp-format-standard>设置成 “ISO-8601”,应该就不用改动了。 >> >> >> Best >> Leonard Xu >> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#json-timestamp-format-standard <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#json-timestamp-format-standard> >> >>> 在 2020年7月23日,20:54,Zhou Zach <[hidden email]> 写道: >>> >>> 当前作业有个sink connector消费不到数据,我找到原因了,根本原因是kafka中时间字段的问题,只是with子句新旧参数对相同的字段数据表现了不同的行为,kafka中的消息格式: >>> >>> >>> {"uid":46,"sex":"female","age":11,"created_time":"2020-07-23T19:53:15.509Z"} >>> 奇怪的是,在kafka_table DDL中,created_time 定义为TIMESTAMP(3),with使用老参数是可以成功运行的,with使用新参数,在IDEA中运行没有任何异常,提交到yarn上,会报异常: >>> java.lang.RuntimeException: RowTime field should not be null, please convert it to a non-nulllong value. >>> at org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:115) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >>> >>> >>> 在本地用如下函数测试,结果确实是NULL >>> TO_TIMESTAMP('2020-07-23T19:53:15.509Z') >>> kafka producuer将created_time字段设置为整型,或者 “2020-07-23 20:36:55.565”,with使用新参数是没有问题的。调了一下午,调到怀疑人生,还好发现问题 >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> 在 2020-07-23 20:10:43,"Leonard Xu" <[hidden email]> 写道: >>>> Hi >>>> >>>> 你说的下游消费不到数据,这个下游是指当前作业消费不到数据吗? >>>> >>>> 正常应该不会的,可以提供个可复现代码吗? >>>> >>>> 祝好 >>>> Leonard Xu >>>> >>>> >>>>> 在 2020年7月23日,18:13,Zhou Zach <[hidden email]> 写道: >>>>> >>>>> Hi all, >>>>> >>>>> 根据文档https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#start-reading-position, >>>>> 使用新参数创建kafka_table,下游消费不到数据,使用老参数下游可以消费到数据,是不是新参数的方式有坑啊 >>>>> >>>>> >>>>> 老参数: >>>>> streamTableEnv.executeSql( >>>>> """ >>>>> | >>>>> |CREATE TABLE kafka_table ( >>>>> | uid BIGINT, >>>>> | sex VARCHAR, >>>>> | age INT, >>>>> | created_time TIMESTAMP(3), >>>>> | WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND >>>>> |) WITH ( >>>>> | >>>>> | 'connector.type' = 'kafka', >>>>> | 'connector.version' = 'universal', >>>>> | 'connector.topic' = 'user', >>>>> | 'connector.startup-mode' = 'latest-offset', >>>>> | 'connector.properties.zookeeper.connect' = 'cdh1:2181,cdh2:2181,cdh3:2181', >>>>> | 'connector.properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092', >>>>> | 'connector.properties.group.id' = 'user_flink', >>>>> | 'format.type' = 'json', >>>>> | 'format.derive-schema' = 'true' >>>>> | >>>>> |) >>>>> |""".stripMargin) >>>>> >>>>> 新参数: >>>>> >>>>> streamTableEnv.executeSql( >>>>> """ >>>>> | >>>>> |CREATE TABLE kafka_table ( >>>>> | >>>>> | uid BIGINT, >>>>> | sex VARCHAR, >>>>> | age INT, >>>>> | created_time TIMESTAMP(3), >>>>> | WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND >>>>> |) WITH ( >>>>> | 'connector' = 'kafka', >>>>> | 'topic' = 'user', >>>>> | 'properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092', >>>>> | 'properties.group.id' = 'user_flink', >>>>> | 'scan.startup.mode' = 'latest-offset', >>>>> | 'format' = 'json', >>>>> | 'json.fail-on-missing-field' = 'false', >>>>> | 'json.ignore-parse-errors' = 'true' >>>>> |) >>>>> |""".stripMargin) >> |
Free forum by Nabble | Edit this page |