flink sql 1.11 kafka source with子句使用新参数,下游消费不到数据

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

flink sql 1.11 kafka source with子句使用新参数,下游消费不到数据

Zhou Zach
Hi all,

根据文档https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#start-reading-position,
使用新参数创建kafka_table,下游消费不到数据,使用老参数下游可以消费到数据,是不是新参数的方式有坑啊


老参数:
    streamTableEnv.executeSql(
      """
        |
        |CREATE TABLE kafka_table (
        |    uid BIGINT,
        |    sex VARCHAR,
        |    age INT,
        |    created_time TIMESTAMP(3),
        |    WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND
        |) WITH (
        |
        |     'connector.type' = 'kafka',
        |    'connector.version' = 'universal',
        |    'connector.topic' = 'user',
        |    'connector.startup-mode' = 'latest-offset',
        |    'connector.properties.zookeeper.connect' = 'cdh1:2181,cdh2:2181,cdh3:2181',
        |    'connector.properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092',
        |    'connector.properties.group.id' = 'user_flink',
        |    'format.type' = 'json',
        |    'format.derive-schema' = 'true'
        |
        |)
        |""".stripMargin)

新参数:

    streamTableEnv.executeSql(
      """
        |
        |CREATE TABLE kafka_table (
        |
        |    uid BIGINT,
        |    sex VARCHAR,
        |    age INT,
        |    created_time TIMESTAMP(3),
        |    WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND
        |) WITH (
        |    'connector' = 'kafka',
        |     'topic' = 'user',
        |    'properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092',
        |    'properties.group.id' = 'user_flink',
        |    'scan.startup.mode' = 'latest-offset',
        |    'format' = 'json',
        |    'json.fail-on-missing-field' = 'false',
        |    'json.ignore-parse-errors' = 'true'
        |)
        |""".stripMargin)
Reply | Threaded
Open this post in threaded view
|

Re: flink sql 1.11 kafka source with子句使用新参数,下游消费不到数据

Leonard Xu
Hi

你说的下游消费不到数据,这个下游是指当前作业消费不到数据吗?

正常应该不会的,可以提供个可复现代码吗?

祝好
Leonard Xu


> 在 2020年7月23日,18:13,Zhou Zach <[hidden email]> 写道:
>
> Hi all,
>
> 根据文档https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#start-reading-position,
> 使用新参数创建kafka_table,下游消费不到数据,使用老参数下游可以消费到数据,是不是新参数的方式有坑啊
>
>
> 老参数:
>    streamTableEnv.executeSql(
>      """
>        |
>        |CREATE TABLE kafka_table (
>        |    uid BIGINT,
>        |    sex VARCHAR,
>        |    age INT,
>        |    created_time TIMESTAMP(3),
>        |    WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND
>        |) WITH (
>        |
>        |     'connector.type' = 'kafka',
>        |    'connector.version' = 'universal',
>        |    'connector.topic' = 'user',
>        |    'connector.startup-mode' = 'latest-offset',
>        |    'connector.properties.zookeeper.connect' = 'cdh1:2181,cdh2:2181,cdh3:2181',
>        |    'connector.properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092',
>        |    'connector.properties.group.id' = 'user_flink',
>        |    'format.type' = 'json',
>        |    'format.derive-schema' = 'true'
>        |
>        |)
>        |""".stripMargin)
>
> 新参数:
>
>    streamTableEnv.executeSql(
>      """
>        |
>        |CREATE TABLE kafka_table (
>        |
>        |    uid BIGINT,
>        |    sex VARCHAR,
>        |    age INT,
>        |    created_time TIMESTAMP(3),
>        |    WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND
>        |) WITH (
>        |    'connector' = 'kafka',
>        |     'topic' = 'user',
>        |    'properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092',
>        |    'properties.group.id' = 'user_flink',
>        |    'scan.startup.mode' = 'latest-offset',
>        |    'format' = 'json',
>        |    'json.fail-on-missing-field' = 'false',
>        |    'json.ignore-parse-errors' = 'true'
>        |)
>        |""".stripMargin)

Reply | Threaded
Open this post in threaded view
|

Re:Re: flink sql 1.11 kafka source with子句使用新参数,下游消费不到数据

Zhou Zach
当前作业有个sink connector消费不到数据,我找到原因了,根本原因是kafka中时间字段的问题,只是with子句新旧参数对相同的字段数据表现了不同的行为,kafka中的消息格式:


{"uid":46,"sex":"female","age":11,"created_time":"2020-07-23T19:53:15.509Z"}
奇怪的是,在kafka_table DDL中,created_time 定义为TIMESTAMP(3),with使用老参数是可以成功运行的,with使用新参数,在IDEA中运行没有任何异常,提交到yarn上,会报异常:
java.lang.RuntimeException: RowTime field should not be null, please convert it to a non-nulllong value.
    at org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:115) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]


在本地用如下函数测试,结果确实是NULL
TO_TIMESTAMP('2020-07-23T19:53:15.509Z')
kafka producuer将created_time字段设置为整型,或者 “2020-07-23 20:36:55.565”,with使用新参数是没有问题的。调了一下午,调到怀疑人生,还好发现问题











在 2020-07-23 20:10:43,"Leonard Xu" <[hidden email]> 写道:

>Hi
>
>你说的下游消费不到数据,这个下游是指当前作业消费不到数据吗?
>
>正常应该不会的,可以提供个可复现代码吗?
>
>祝好
>Leonard Xu
>
>
>> 在 2020年7月23日,18:13,Zhou Zach <[hidden email]> 写道:
>>
>> Hi all,
>>
>> 根据文档https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#start-reading-position,
>> 使用新参数创建kafka_table,下游消费不到数据,使用老参数下游可以消费到数据,是不是新参数的方式有坑啊
>>
>>
>> 老参数:
>>    streamTableEnv.executeSql(
>>      """
>>        |
>>        |CREATE TABLE kafka_table (
>>        |    uid BIGINT,
>>        |    sex VARCHAR,
>>        |    age INT,
>>        |    created_time TIMESTAMP(3),
>>        |    WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND
>>        |) WITH (
>>        |
>>        |     'connector.type' = 'kafka',
>>        |    'connector.version' = 'universal',
>>        |    'connector.topic' = 'user',
>>        |    'connector.startup-mode' = 'latest-offset',
>>        |    'connector.properties.zookeeper.connect' = 'cdh1:2181,cdh2:2181,cdh3:2181',
>>        |    'connector.properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092',
>>        |    'connector.properties.group.id' = 'user_flink',
>>        |    'format.type' = 'json',
>>        |    'format.derive-schema' = 'true'
>>        |
>>        |)
>>        |""".stripMargin)
>>
>> 新参数:
>>
>>    streamTableEnv.executeSql(
>>      """
>>        |
>>        |CREATE TABLE kafka_table (
>>        |
>>        |    uid BIGINT,
>>        |    sex VARCHAR,
>>        |    age INT,
>>        |    created_time TIMESTAMP(3),
>>        |    WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND
>>        |) WITH (
>>        |    'connector' = 'kafka',
>>        |     'topic' = 'user',
>>        |    'properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092',
>>        |    'properties.group.id' = 'user_flink',
>>        |    'scan.startup.mode' = 'latest-offset',
>>        |    'format' = 'json',
>>        |    'json.fail-on-missing-field' = 'false',
>>        |    'json.ignore-parse-errors' = 'true'
>>        |)
>>        |""".stripMargin)
Reply | Threaded
Open this post in threaded view
|

Re: flink sql 1.11 kafka source with子句使用新参数,下游消费不到数据

Leonard Xu
Hi

这是1.11里的一个 json format t的不兼容改动[1],目的是支持更多的 timestamp format 的解析,你可以把json-timestamp-format-standard <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#json-timestamp-format-standard>设置成 “ISO-8601”,应该就不用改动了。


Best
Leonard Xu
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#json-timestamp-format-standard <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#json-timestamp-format-standard>

> 在 2020年7月23日,20:54,Zhou Zach <[hidden email]> 写道:
>
> 当前作业有个sink connector消费不到数据,我找到原因了,根本原因是kafka中时间字段的问题,只是with子句新旧参数对相同的字段数据表现了不同的行为,kafka中的消息格式:
>
>
> {"uid":46,"sex":"female","age":11,"created_time":"2020-07-23T19:53:15.509Z"}
> 奇怪的是,在kafka_table DDL中,created_time 定义为TIMESTAMP(3),with使用老参数是可以成功运行的,with使用新参数,在IDEA中运行没有任何异常,提交到yarn上,会报异常:
> java.lang.RuntimeException: RowTime field should not be null, please convert it to a non-nulllong value.
>    at org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:115) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>
>
> 在本地用如下函数测试,结果确实是NULL
> TO_TIMESTAMP('2020-07-23T19:53:15.509Z')
> kafka producuer将created_time字段设置为整型,或者 “2020-07-23 20:36:55.565”,with使用新参数是没有问题的。调了一下午,调到怀疑人生,还好发现问题
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-23 20:10:43,"Leonard Xu" <[hidden email]> 写道:
>> Hi
>>
>> 你说的下游消费不到数据,这个下游是指当前作业消费不到数据吗?
>>
>> 正常应该不会的,可以提供个可复现代码吗?
>>
>> 祝好
>> Leonard Xu
>>
>>
>>> 在 2020年7月23日,18:13,Zhou Zach <[hidden email]> 写道:
>>>
>>> Hi all,
>>>
>>> 根据文档https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#start-reading-position,
>>> 使用新参数创建kafka_table,下游消费不到数据,使用老参数下游可以消费到数据,是不是新参数的方式有坑啊
>>>
>>>
>>> 老参数:
>>>   streamTableEnv.executeSql(
>>>     """
>>>       |
>>>       |CREATE TABLE kafka_table (
>>>       |    uid BIGINT,
>>>       |    sex VARCHAR,
>>>       |    age INT,
>>>       |    created_time TIMESTAMP(3),
>>>       |    WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND
>>>       |) WITH (
>>>       |
>>>       |     'connector.type' = 'kafka',
>>>       |    'connector.version' = 'universal',
>>>       |    'connector.topic' = 'user',
>>>       |    'connector.startup-mode' = 'latest-offset',
>>>       |    'connector.properties.zookeeper.connect' = 'cdh1:2181,cdh2:2181,cdh3:2181',
>>>       |    'connector.properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092',
>>>       |    'connector.properties.group.id' = 'user_flink',
>>>       |    'format.type' = 'json',
>>>       |    'format.derive-schema' = 'true'
>>>       |
>>>       |)
>>>       |""".stripMargin)
>>>
>>> 新参数:
>>>
>>>   streamTableEnv.executeSql(
>>>     """
>>>       |
>>>       |CREATE TABLE kafka_table (
>>>       |
>>>       |    uid BIGINT,
>>>       |    sex VARCHAR,
>>>       |    age INT,
>>>       |    created_time TIMESTAMP(3),
>>>       |    WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND
>>>       |) WITH (
>>>       |    'connector' = 'kafka',
>>>       |     'topic' = 'user',
>>>       |    'properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092',
>>>       |    'properties.group.id' = 'user_flink',
>>>       |    'scan.startup.mode' = 'latest-offset',
>>>       |    'format' = 'json',
>>>       |    'json.fail-on-missing-field' = 'false',
>>>       |    'json.ignore-parse-errors' = 'true'
>>>       |)
>>>       |""".stripMargin)

Reply | Threaded
Open this post in threaded view
|

Re:Re: flink sql 1.11 kafka source with子句使用新参数,下游消费不到数据

Zhou Zach
Hi,


按照提示修改了,还是报错的:


Query:


       val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
        streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        streamExecutionEnv.setStateBackend(new RocksDBStateBackend("hdfs://nameservice1/flink/checkpoints"))

        val blinkEnvSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
        val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv, blinkEnvSettings)
        streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,CheckpointingMode.EXACTLY_ONCE)
        streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,Duration.ofSeconds(20))
        streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT,Duration.ofSeconds(900))


    streamTableEnv.executeSql(
      """
        |
        |CREATE TABLE kafka_table (
        |    uid BIGINT,
        |    sex VARCHAR,
        |    age INT,
        |    created_time TIMESTAMP(3),
        |    procTime AS PROCTIME(),
        |    WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND
        |) WITH (
        |    'connector' = 'kafka',
        |    'topic' = 'user',
        |    'properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092',
        |    'properties.group.id' = 'user_flink',
        |    'scan.startup.mode' = 'latest-offset',
        |    'format' = 'json',
        |    'json.fail-on-missing-field' = 'false',
        |    'json.ignore-parse-errors' = 'true',
        |    'json.timestamp-format.standard' = 'ISO-8601'
        |)
        |""".stripMargin)

    streamTableEnv.executeSql(
      """
        |
        |CREATE TABLE print_table
        |(
        |    uid BIGINT,
        |    sex VARCHAR,
        |    age INT,
        |    created_time TIMESTAMP(3)
        |)
        |WITH ('connector' = 'print')
        |
        |
        |""".stripMargin)

    streamTableEnv.executeSql(
      """
        |insert into print_table
        |SELECT
        |   uid,sex,age,created_time
        |FROM  kafka_table
        |
        |""".stripMargin)


堆栈:


2020-07-2410:33:32,852INFO  org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser [] - Kafka startTimeMs: 1595558012852
2020-07-2410:33:32,853INFO  org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer [] - [Consumer clientId=consumer-user_flink-12, groupId=user_flink] Subscribed to partition(s): user-0
2020-07-2410:33:32,853INFO  org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer [] - [Consumer clientId=consumer-user_flink-12, groupId=user_flink] Seeking to offset 36627for partition user-0
2020-07-2410:33:32,860INFO  org.apache.flink.kafka.shaded.org.apache.kafka.clients.Metadata [] - [Consumer clientId=consumer-user_flink-12, groupId=user_flink] ClusterID: cAT_xBISQNWghT9kR5UuIw
2020-07-2410:33:32,871WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: TableSourceScan(table=[[default_catalog, default_database, kafka_table]], fields=[uid, sex, age, created_time]) -> Calc(select=[uid, sex, age, created_time, () AS procTime]) -> WatermarkAssigner(rowtime=[created_time], watermark=[(created_time - 3000:INTERVALSECOND)]) -> Calc(select=[uid, sex, age, created_time]) -> Sink: Sink(table=[default_catalog.default_database.print_table], fields=[uid, sex, age, created_time]) (2/4) (6b585139c083982beb6997e1ae2041ed) switched fromRUNNING to FAILED.
java.lang.RuntimeException: RowTime field should not be null, please convert it to a non-nulllong value.
    at org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:115) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]

















在 2020-07-23 21:23:28,"Leonard Xu" <[hidden email]> 写道:

>Hi
>
>这是1.11里的一个 json format t的不兼容改动[1],目的是支持更多的 timestamp format 的解析,你可以把json-timestamp-format-standard <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#json-timestamp-format-standard>设置成 “ISO-8601”,应该就不用改动了。
>
>
>Best
>Leonard Xu
>[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#json-timestamp-format-standard <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#json-timestamp-format-standard>
>
>> 在 2020年7月23日,20:54,Zhou Zach <[hidden email]> 写道:
>>
>> 当前作业有个sink connector消费不到数据,我找到原因了,根本原因是kafka中时间字段的问题,只是with子句新旧参数对相同的字段数据表现了不同的行为,kafka中的消息格式:
>>
>>
>> {"uid":46,"sex":"female","age":11,"created_time":"2020-07-23T19:53:15.509Z"}
>> 奇怪的是,在kafka_table DDL中,created_time 定义为TIMESTAMP(3),with使用老参数是可以成功运行的,with使用新参数,在IDEA中运行没有任何异常,提交到yarn上,会报异常:
>> java.lang.RuntimeException: RowTime field should not be null, please convert it to a non-nulllong value.
>>    at org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:115) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>>
>>
>> 在本地用如下函数测试,结果确实是NULL
>> TO_TIMESTAMP('2020-07-23T19:53:15.509Z')
>> kafka producuer将created_time字段设置为整型,或者 “2020-07-23 20:36:55.565”,with使用新参数是没有问题的。调了一下午,调到怀疑人生,还好发现问题
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-07-23 20:10:43,"Leonard Xu" <[hidden email]> 写道:
>>> Hi
>>>
>>> 你说的下游消费不到数据,这个下游是指当前作业消费不到数据吗?
>>>
>>> 正常应该不会的,可以提供个可复现代码吗?
>>>
>>> 祝好
>>> Leonard Xu
>>>
>>>
>>>> 在 2020年7月23日,18:13,Zhou Zach <[hidden email]> 写道:
>>>>
>>>> Hi all,
>>>>
>>>> 根据文档https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#start-reading-position,
>>>> 使用新参数创建kafka_table,下游消费不到数据,使用老参数下游可以消费到数据,是不是新参数的方式有坑啊
>>>>
>>>>
>>>> 老参数:
>>>>   streamTableEnv.executeSql(
>>>>     """
>>>>       |
>>>>       |CREATE TABLE kafka_table (
>>>>       |    uid BIGINT,
>>>>       |    sex VARCHAR,
>>>>       |    age INT,
>>>>       |    created_time TIMESTAMP(3),
>>>>       |    WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND
>>>>       |) WITH (
>>>>       |
>>>>       |     'connector.type' = 'kafka',
>>>>       |    'connector.version' = 'universal',
>>>>       |    'connector.topic' = 'user',
>>>>       |    'connector.startup-mode' = 'latest-offset',
>>>>       |    'connector.properties.zookeeper.connect' = 'cdh1:2181,cdh2:2181,cdh3:2181',
>>>>       |    'connector.properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092',
>>>>       |    'connector.properties.group.id' = 'user_flink',
>>>>       |    'format.type' = 'json',
>>>>       |    'format.derive-schema' = 'true'
>>>>       |
>>>>       |)
>>>>       |""".stripMargin)
>>>>
>>>> 新参数:
>>>>
>>>>   streamTableEnv.executeSql(
>>>>     """
>>>>       |
>>>>       |CREATE TABLE kafka_table (
>>>>       |
>>>>       |    uid BIGINT,
>>>>       |    sex VARCHAR,
>>>>       |    age INT,
>>>>       |    created_time TIMESTAMP(3),
>>>>       |    WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND
>>>>       |) WITH (
>>>>       |    'connector' = 'kafka',
>>>>       |     'topic' = 'user',
>>>>       |    'properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092',
>>>>       |    'properties.group.id' = 'user_flink',
>>>>       |    'scan.startup.mode' = 'latest-offset',
>>>>       |    'format' = 'json',
>>>>       |    'json.fail-on-missing-field' = 'false',
>>>>       |    'json.ignore-parse-errors' = 'true'
>>>>       |)
>>>>       |""".stripMargin)
>
Reply | Threaded
Open this post in threaded view
|

Re: flink sql 1.11 kafka source with子句使用新参数,下游消费不到数据

Leonard Xu
Hi

"2020-07-23T19:53:15.509Z” 是 RFC-3339 格式,这个格式是带zone的时间格式,对应的数据类型是 timestamp with local zone,这个应该在1.12里支持了[1]
1.10版本虽然是支持 RFC-3339 格式,但默认解析时区是有问题的,所以在1.11和1.12逐步中纠正了。

在1.11版本中,如果json数据是RFC-3339格式,你可以把这个字段当成string读出来,在计算列中用个UDF自己解析到需要的timestamp。

Best
Leonard Xu
[1] https://issues.apache.org/jira/browse/FLINK-18296 <https://issues.apache.org/jira/browse/FLINK-18296>

> 在 2020年7月24日,10:39,Zhou Zach <[hidden email]> 写道:
>
> Hi,
>
>
> 按照提示修改了,还是报错的:
>
>
> Query:
>
>
>       val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
>        streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>        streamExecutionEnv.setStateBackend(new RocksDBStateBackend("hdfs://nameservice1/flink/checkpoints"))
>
>        val blinkEnvSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>        val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv, blinkEnvSettings)
>        streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,CheckpointingMode.EXACTLY_ONCE)
>        streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,Duration.ofSeconds(20))
>        streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT,Duration.ofSeconds(900))
>
>
>    streamTableEnv.executeSql(
>      """
>        |
>        |CREATE TABLE kafka_table (
>        |    uid BIGINT,
>        |    sex VARCHAR,
>        |    age INT,
>        |    created_time TIMESTAMP(3),
>        |    procTime AS PROCTIME(),
>        |    WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND
>        |) WITH (
>        |    'connector' = 'kafka',
>        |    'topic' = 'user',
>        |    'properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092',
>        |    'properties.group.id' = 'user_flink',
>        |    'scan.startup.mode' = 'latest-offset',
>        |    'format' = 'json',
>        |    'json.fail-on-missing-field' = 'false',
>        |    'json.ignore-parse-errors' = 'true',
>        |    'json.timestamp-format.standard' = 'ISO-8601'
>        |)
>        |""".stripMargin)
>
>    streamTableEnv.executeSql(
>      """
>        |
>        |CREATE TABLE print_table
>        |(
>        |    uid BIGINT,
>        |    sex VARCHAR,
>        |    age INT,
>        |    created_time TIMESTAMP(3)
>        |)
>        |WITH ('connector' = 'print')
>        |
>        |
>        |""".stripMargin)
>
>    streamTableEnv.executeSql(
>      """
>        |insert into print_table
>        |SELECT
>        |   uid,sex,age,created_time
>        |FROM  kafka_table
>        |
>        |""".stripMargin)
>
>
> 堆栈:
>
>
> 2020-07-2410:33:32,852INFO  org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser [] - Kafka startTimeMs: 1595558012852
> 2020-07-2410:33:32,853INFO  org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer [] - [Consumer clientId=consumer-user_flink-12, groupId=user_flink] Subscribed to partition(s): user-0
> 2020-07-2410:33:32,853INFO  org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer [] - [Consumer clientId=consumer-user_flink-12, groupId=user_flink] Seeking to offset 36627for partition user-0
> 2020-07-2410:33:32,860INFO  org.apache.flink.kafka.shaded.org.apache.kafka.clients.Metadata [] - [Consumer clientId=consumer-user_flink-12, groupId=user_flink] ClusterID: cAT_xBISQNWghT9kR5UuIw
> 2020-07-2410:33:32,871WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: TableSourceScan(table=[[default_catalog, default_database, kafka_table]], fields=[uid, sex, age, created_time]) -> Calc(select=[uid, sex, age, created_time, () AS procTime]) -> WatermarkAssigner(rowtime=[created_time], watermark=[(created_time - 3000:INTERVALSECOND)]) -> Calc(select=[uid, sex, age, created_time]) -> Sink: Sink(table=[default_catalog.default_database.print_table], fields=[uid, sex, age, created_time]) (2/4) (6b585139c083982beb6997e1ae2041ed) switched fromRUNNING to FAILED.
> java.lang.RuntimeException: RowTime field should not be null, please convert it to a non-nulllong value.
>    at org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:115) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-23 21:23:28,"Leonard Xu" <[hidden email]> 写道:
>> Hi
>>
>> 这是1.11里的一个 json format t的不兼容改动[1],目的是支持更多的 timestamp format 的解析,你可以把json-timestamp-format-standard <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#json-timestamp-format-standard>设置成 “ISO-8601”,应该就不用改动了。
>>
>>
>> Best
>> Leonard Xu
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#json-timestamp-format-standard <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#json-timestamp-format-standard>
>>
>>> 在 2020年7月23日,20:54,Zhou Zach <[hidden email]> 写道:
>>>
>>> 当前作业有个sink connector消费不到数据,我找到原因了,根本原因是kafka中时间字段的问题,只是with子句新旧参数对相同的字段数据表现了不同的行为,kafka中的消息格式:
>>>
>>>
>>> {"uid":46,"sex":"female","age":11,"created_time":"2020-07-23T19:53:15.509Z"}
>>> 奇怪的是,在kafka_table DDL中,created_time 定义为TIMESTAMP(3),with使用老参数是可以成功运行的,with使用新参数,在IDEA中运行没有任何异常,提交到yarn上,会报异常:
>>> java.lang.RuntimeException: RowTime field should not be null, please convert it to a non-nulllong value.
>>>   at org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:115) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>>>
>>>
>>> 在本地用如下函数测试,结果确实是NULL
>>> TO_TIMESTAMP('2020-07-23T19:53:15.509Z')
>>> kafka producuer将created_time字段设置为整型,或者 “2020-07-23 20:36:55.565”,with使用新参数是没有问题的。调了一下午,调到怀疑人生,还好发现问题
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> 在 2020-07-23 20:10:43,"Leonard Xu" <[hidden email]> 写道:
>>>> Hi
>>>>
>>>> 你说的下游消费不到数据,这个下游是指当前作业消费不到数据吗?
>>>>
>>>> 正常应该不会的,可以提供个可复现代码吗?
>>>>
>>>> 祝好
>>>> Leonard Xu
>>>>
>>>>
>>>>> 在 2020年7月23日,18:13,Zhou Zach <[hidden email]> 写道:
>>>>>
>>>>> Hi all,
>>>>>
>>>>> 根据文档https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#start-reading-position,
>>>>> 使用新参数创建kafka_table,下游消费不到数据,使用老参数下游可以消费到数据,是不是新参数的方式有坑啊
>>>>>
>>>>>
>>>>> 老参数:
>>>>>  streamTableEnv.executeSql(
>>>>>    """
>>>>>      |
>>>>>      |CREATE TABLE kafka_table (
>>>>>      |    uid BIGINT,
>>>>>      |    sex VARCHAR,
>>>>>      |    age INT,
>>>>>      |    created_time TIMESTAMP(3),
>>>>>      |    WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND
>>>>>      |) WITH (
>>>>>      |
>>>>>      |     'connector.type' = 'kafka',
>>>>>      |    'connector.version' = 'universal',
>>>>>      |    'connector.topic' = 'user',
>>>>>      |    'connector.startup-mode' = 'latest-offset',
>>>>>      |    'connector.properties.zookeeper.connect' = 'cdh1:2181,cdh2:2181,cdh3:2181',
>>>>>      |    'connector.properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092',
>>>>>      |    'connector.properties.group.id' = 'user_flink',
>>>>>      |    'format.type' = 'json',
>>>>>      |    'format.derive-schema' = 'true'
>>>>>      |
>>>>>      |)
>>>>>      |""".stripMargin)
>>>>>
>>>>> 新参数:
>>>>>
>>>>>  streamTableEnv.executeSql(
>>>>>    """
>>>>>      |
>>>>>      |CREATE TABLE kafka_table (
>>>>>      |
>>>>>      |    uid BIGINT,
>>>>>      |    sex VARCHAR,
>>>>>      |    age INT,
>>>>>      |    created_time TIMESTAMP(3),
>>>>>      |    WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND
>>>>>      |) WITH (
>>>>>      |    'connector' = 'kafka',
>>>>>      |     'topic' = 'user',
>>>>>      |    'properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092',
>>>>>      |    'properties.group.id' = 'user_flink',
>>>>>      |    'scan.startup.mode' = 'latest-offset',
>>>>>      |    'format' = 'json',
>>>>>      |    'json.fail-on-missing-field' = 'false',
>>>>>      |    'json.ignore-parse-errors' = 'true'
>>>>>      |)
>>>>>      |""".stripMargin)
>>

Reply | Threaded
Open this post in threaded view
|

回复:flink sql 1.11 kafka source with子句使用新参数,下游消费不到数据

Zhou Zach
Hi,
感谢详细答疑!



| |
Zhou Zach
|
|
邮箱:[hidden email]
|

签名由 网易邮箱大师 定制

在2020年07月24日 11:48,Leonard Xu 写道:
Hi

"2020-07-23T19:53:15.509Z” 是 RFC-3339 格式,这个格式是带zone的时间格式,对应的数据类型是 timestamp with local zone,这个应该在1.12里支持了[1]
1.10版本虽然是支持 RFC-3339 格式,但默认解析时区是有问题的,所以在1.11和1.12逐步中纠正了。

在1.11版本中,如果json数据是RFC-3339格式,你可以把这个字段当成string读出来,在计算列中用个UDF自己解析到需要的timestamp。

Best
Leonard Xu
[1] https://issues.apache.org/jira/browse/FLINK-18296 <https://issues.apache.org/jira/browse/FLINK-18296&gt;

> 在 2020年7月24日,10:39,Zhou Zach <[hidden email]> 写道:
>
> Hi,
>
>
> 按照提示修改了,还是报错的:
>
>
> Query:
>
>
>       val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
>        streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>        streamExecutionEnv.setStateBackend(new RocksDBStateBackend("hdfs://nameservice1/flink/checkpoints"))
>
>        val blinkEnvSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>        val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv, blinkEnvSettings)
>        streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,CheckpointingMode.EXACTLY_ONCE)
>        streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,Duration.ofSeconds(20))
>        streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT,Duration.ofSeconds(900))
>
>
>    streamTableEnv.executeSql(
>      """
>        |
>        |CREATE TABLE kafka_table (
>        |    uid BIGINT,
>        |    sex VARCHAR,
>        |    age INT,
>        |    created_time TIMESTAMP(3),
>        |    procTime AS PROCTIME(),
>        |    WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND
>        |) WITH (
>        |    'connector' = 'kafka',
>        |    'topic' = 'user',
>        |    'properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092',
>        |    'properties.group.id' = 'user_flink',
>        |    'scan.startup.mode' = 'latest-offset',
>        |    'format' = 'json',
>        |    'json.fail-on-missing-field' = 'false',
>        |    'json.ignore-parse-errors' = 'true',
>        |    'json.timestamp-format.standard' = 'ISO-8601'
>        |)
>        |""".stripMargin)
>
>    streamTableEnv.executeSql(
>      """
>        |
>        |CREATE TABLE print_table
>        |(
>        |    uid BIGINT,
>        |    sex VARCHAR,
>        |    age INT,
>        |    created_time TIMESTAMP(3)
>        |)
>        |WITH ('connector' = 'print')
>        |
>        |
>        |""".stripMargin)
>
>    streamTableEnv.executeSql(
>      """
>        |insert into print_table
>        |SELECT
>        |   uid,sex,age,created_time
>        |FROM  kafka_table
>        |
>        |""".stripMargin)
>
>
> 堆栈:
>
>
> 2020-07-2410:33:32,852INFO  org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser [] - Kafka startTimeMs: 1595558012852
> 2020-07-2410:33:32,853INFO  org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer [] - [Consumer clientId=consumer-user_flink-12, groupId=user_flink] Subscribed to partition(s): user-0
> 2020-07-2410:33:32,853INFO  org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer [] - [Consumer clientId=consumer-user_flink-12, groupId=user_flink] Seeking to offset 36627for partition user-0
> 2020-07-2410:33:32,860INFO  org.apache.flink.kafka.shaded.org.apache.kafka.clients.Metadata [] - [Consumer clientId=consumer-user_flink-12, groupId=user_flink] ClusterID: cAT_xBISQNWghT9kR5UuIw
> 2020-07-2410:33:32,871WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: TableSourceScan(table=[[default_catalog, default_database, kafka_table]], fields=[uid, sex, age, created_time]) -> Calc(select=[uid, sex, age, created_time, () AS procTime]) -> WatermarkAssigner(rowtime=[created_time], watermark=[(created_time - 3000:INTERVALSECOND)]) -> Calc(select=[uid, sex, age, created_time]) -> Sink: Sink(table=[default_catalog.default_database.print_table], fields=[uid, sex, age, created_time]) (2/4) (6b585139c083982beb6997e1ae2041ed) switched fromRUNNING to FAILED.
> java.lang.RuntimeException: RowTime field should not be null, please convert it to a non-nulllong value.
>    at org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:115) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-23 21:23:28,"Leonard Xu" <[hidden email]> 写道:
>> Hi
>>
>> 这是1.11里的一个 json format t的不兼容改动[1],目的是支持更多的 timestamp format 的解析,你可以把json-timestamp-format-standard <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#json-timestamp-format-standard>设置成 “ISO-8601”,应该就不用改动了。
>>
>>
>> Best
>> Leonard Xu
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#json-timestamp-format-standard <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#json-timestamp-format-standard&gt;
>>
>>> 在 2020年7月23日,20:54,Zhou Zach <[hidden email]> 写道:
>>>
>>> 当前作业有个sink connector消费不到数据,我找到原因了,根本原因是kafka中时间字段的问题,只是with子句新旧参数对相同的字段数据表现了不同的行为,kafka中的消息格式:
>>>
>>>
>>> {"uid":46,"sex":"female","age":11,"created_time":"2020-07-23T19:53:15.509Z"}
>>> 奇怪的是,在kafka_table DDL中,created_time 定义为TIMESTAMP(3),with使用老参数是可以成功运行的,with使用新参数,在IDEA中运行没有任何异常,提交到yarn上,会报异常:
>>> java.lang.RuntimeException: RowTime field should not be null, please convert it to a non-nulllong value.
>>>   at org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:115) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>>>
>>>
>>> 在本地用如下函数测试,结果确实是NULL
>>> TO_TIMESTAMP('2020-07-23T19:53:15.509Z')
>>> kafka producuer将created_time字段设置为整型,或者 “2020-07-23 20:36:55.565”,with使用新参数是没有问题的。调了一下午,调到怀疑人生,还好发现问题
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> 在 2020-07-23 20:10:43,"Leonard Xu" <[hidden email]> 写道:
>>>> Hi
>>>>
>>>> 你说的下游消费不到数据,这个下游是指当前作业消费不到数据吗?
>>>>
>>>> 正常应该不会的,可以提供个可复现代码吗?
>>>>
>>>> 祝好
>>>> Leonard Xu
>>>>
>>>>
>>>>> 在 2020年7月23日,18:13,Zhou Zach <[hidden email]> 写道:
>>>>>
>>>>> Hi all,
>>>>>
>>>>> 根据文档https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#start-reading-position,
>>>>> 使用新参数创建kafka_table,下游消费不到数据,使用老参数下游可以消费到数据,是不是新参数的方式有坑啊
>>>>>
>>>>>
>>>>> 老参数:
>>>>>  streamTableEnv.executeSql(
>>>>>    """
>>>>>      |
>>>>>      |CREATE TABLE kafka_table (
>>>>>      |    uid BIGINT,
>>>>>      |    sex VARCHAR,
>>>>>      |    age INT,
>>>>>      |    created_time TIMESTAMP(3),
>>>>>      |    WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND
>>>>>      |) WITH (
>>>>>      |
>>>>>      |     'connector.type' = 'kafka',
>>>>>      |    'connector.version' = 'universal',
>>>>>      |    'connector.topic' = 'user',
>>>>>      |    'connector.startup-mode' = 'latest-offset',
>>>>>      |    'connector.properties.zookeeper.connect' = 'cdh1:2181,cdh2:2181,cdh3:2181',
>>>>>      |    'connector.properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092',
>>>>>      |    'connector.properties.group.id' = 'user_flink',
>>>>>      |    'format.type' = 'json',
>>>>>      |    'format.derive-schema' = 'true'
>>>>>      |
>>>>>      |)
>>>>>      |""".stripMargin)
>>>>>
>>>>> 新参数:
>>>>>
>>>>>  streamTableEnv.executeSql(
>>>>>    """
>>>>>      |
>>>>>      |CREATE TABLE kafka_table (
>>>>>      |
>>>>>      |    uid BIGINT,
>>>>>      |    sex VARCHAR,
>>>>>      |    age INT,
>>>>>      |    created_time TIMESTAMP(3),
>>>>>      |    WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND
>>>>>      |) WITH (
>>>>>      |    'connector' = 'kafka',
>>>>>      |     'topic' = 'user',
>>>>>      |    'properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092',
>>>>>      |    'properties.group.id' = 'user_flink',
>>>>>      |    'scan.startup.mode' = 'latest-offset',
>>>>>      |    'format' = 'json',
>>>>>      |    'json.fail-on-missing-field' = 'false',
>>>>>      |    'json.ignore-parse-errors' = 'true'
>>>>>      |)
>>>>>      |""".stripMargin)
>>