flink 1.11 cdc相关问题

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

flink 1.11 cdc相关问题

amenhub@163.com
hi everyone,

小白通过debezium将pgsql cdc数据同步至kafka之后,使用我们flink的sql client提交测试任务,但当kafka端cdc json数据一开始发送,任务即报错,通过web ui log查看界面发现错误日志如下,还请大佬帮忙分析,谢谢!

====================================分割线======================================
DDL:

    CREATE TABLE pgsql_person_cdc(
id BIGINT,
name STRING,
age STRING,
sex STRING,
phone STRING
    ) WITH (
'connector' = 'kafka',
'topic' = 'postgres.public.person',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'pgsql_cdc',
'format' = 'debezium-json',
'debezium-json.schema-include' = 'true'
    )
    CREATE TABLE sql_out (
  id BIGINT,
name STRING,
age STRING,
sex STRING,
phone STRING
    ) WITH (
        'connector' = 'print'
    )
    INSERT INTO sql_out SELECT * FROM pgsql_person_cdc;

====================================分割线======================================
错误日志:

java.io.IOException: Corrupt Debezium JSON message '{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"postgres.public.person.Envelope"},"payload":{"before":null,"after":{"id":2,"name":"liushimin","age":"24","sex":"man","phone":"155555555"},"source":{"version":"1.2.0.Final","connector":"postgresql","name":"postgres","ts_ms":1595409754151,"snapshot":"false","db":"postgres","schema":"public","table":"person","txId":569,"lsn":23632344,"xmin":null},"op":"u","ts_ms":1595409754270,"transaction":null}}'.
    at org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136) ~[flink-json-1.11.0.jar:1.11.0]
    at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) ~[flink-connector-kafka-base_2.11-1.11.0.jar:1.11.0]
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) ~[flink-connector-kafka_2.11-1.11.0.jar:1.11.0]
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) ~[flink-connector-kafka_2.11-1.11.0.jar:1.11.0]
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) ~[flink-connector-kafka-base_2.11-1.11.0.jar:1.11.0]
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
Caused by: java.lang.NullPointerException
    at org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:120) ~[flink-json-1.11.0.jar:1.11.0]
    ... 7 more
2020-07-22 17:22:34,415 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for Source: TableSourceScan(table=[[default_catalog, default_database, pgsql_person_cdc]], fields=[id, name, age, sex, phone]) -> Sink: Sink(table=[default_catalog.default_database.sql_out], fields=[id, name, age, sex, phone]) (1/1) (b553cb66df6e47a27e7dae8466b684ab).
2020-07-22 17:22:34,418 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FAILED to JobManager for task Source: TableSourceScan(table=[[default_catalog, default_database, pgsql_person_cdc]], fields=[id, name, age, sex, phone]) -> Sink: Sink(table=[default_catalog.default_database.sql_out], fields=[id, name, age, sex, phone]) (1/1) b553cb66df6e47a27e7dae8466b684ab.
2020-07-22 17:22:34,461 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:0, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=384.000mb (402653174 bytes), taskOffHeapMemory=0 bytes, managedMemory=512.000mb (536870920 bytes), networkMemory=128.000mb (134217730 bytes)}, allocationId: 495bb5a0cd877808674b29890b6b8bc0, jobId: 3feda3a191fcb8e0da891b9fda1ee532).
2020-07-22 17:22:34,462 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job 3feda3a191fcb8e0da891b9fda1ee532 from job leader monitoring.
2020-07-22 17:22:34,462 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Close JobManager connection for job 3feda3a191fcb8e0da891b9fda1ee532.

====================================分割线======================================

best!
   


[hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 cdc相关问题

Leonard Xu
Hello,

代码在为before这条数据设置rowKind时抛了一个NPE,before正常应该是不为null的。
看起来是你的数据问题,一条 update 的changelog, before 为null, 这是不合理的,没有before的数据,是无法处理after的数据的。
如果确认是脏数据,可以开启ignore-parse-errors跳过[1]

祝好
Leonard
[1]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors>

{
    "payload": {
        "before": null,
        "after": {
            "id": 2,
            "name": "liushimin",
            "age": "24",
            "sex": "man",
            "phone": "155555555"
        },
        "source": {
            "version": "1.2.0.Final",
            "connector": "postgresql",
            "name": "postgres",
            "ts_ms": 1595409754151,
            "snapshot": "false",
            "db": "postgres",
            "schema": "public",
            "table": "person",
            "txId": 569,
            "lsn": 23632344,
            "xmin": null
        },
        "op": "u",
        "ts_ms": 1595409754270,
        "transaction": null
    }
}

> 在 2020年7月22日,17:34,[hidden email] 写道:
>
> {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"postgres.public.person.Envelope"},"payload":{"before":null,"after":{"id":2,"name":"liushimin","age":"24","sex":"man","phone":"155555555"},"source":{"version":"1.2.0.Final","connector":"postgresql","name":"postgres","ts_ms":1595409754151,"snapshot":"false","db":"postgres","schema":"public","table":"person","txId":569,"lsn":23632344,"xmin":null},"op":"u","ts_ms":1595409754270,"transaction":null}}

Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 cdc相关问题

Jark
Administrator
Hi,

这是个已知问题,目前 debezium 同步不同数据库并没有保证一模一样地数据格式,比如同步 PG 的UPDATE消息时候,before 和
after 字段就不是全的。
这个问题会在后面地版本中解决。

Best,
Jark

On Wed, 22 Jul 2020 at 21:07, Leonard Xu <[hidden email]> wrote:

> Hello,
>
> 代码在为before这条数据设置rowKind时抛了一个NPE,before正常应该是不为null的。
> 看起来是你的数据问题,一条 update 的changelog, before 为null,
> 这是不合理的,没有before的数据,是无法处理after的数据的。
> 如果确认是脏数据,可以开启ignore-parse-errors跳过[1]
>
> 祝好
> Leonard
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors
> <
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors
> >
>
> {
>     "payload": {
>         "before": null,
>         "after": {
>             "id": 2,
>             "name": "liushimin",
>             "age": "24",
>             "sex": "man",
>             "phone": "155555555"
>         },
>         "source": {
>             "version": "1.2.0.Final",
>             "connector": "postgresql",
>             "name": "postgres",
>             "ts_ms": 1595409754151,
>             "snapshot": "false",
>             "db": "postgres",
>             "schema": "public",
>             "table": "person",
>             "txId": 569,
>             "lsn": 23632344,
>             "xmin": null
>         },
>         "op": "u",
>         "ts_ms": 1595409754270,
>         "transaction": null
>     }
> }
>
> > 在 2020年7月22日,17:34,[hidden email] 写道:
> >
> >
> {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"postgres.public.person.Envelope"},"payload":{"before":null,"after":{"id":2,"name":"liushimin","age":"24","sex":"man","phone":"155555555"},"source":{"version":"1.2.0.Final","connector":"postgresql","name":"postgres","ts_ms":1595409754151,"snapshot":"false","db":"postgres","schema":"public","table":"person","txId":569,"lsn":23632344,"xmin":null},"op":"u","ts_ms":1595409754270,"transaction":null}}
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: flink 1.11 cdc相关问题

amenhub@163.com
感谢二位大佬@Leonard, @Jark的解答!



[hidden email]
 
发件人: Jark Wu
发送时间: 2020-07-22 23:56
收件人: user-zh
主题: Re: flink 1.11 cdc相关问题
Hi,
 
这是个已知问题,目前 debezium 同步不同数据库并没有保证一模一样地数据格式,比如同步 PG 的UPDATE消息时候,before 和
after 字段就不是全的。
这个问题会在后面地版本中解决。
 
Best,
Jark
 
On Wed, 22 Jul 2020 at 21:07, Leonard Xu <[hidden email]> wrote:
 

> Hello,
>
> 代码在为before这条数据设置rowKind时抛了一个NPE,before正常应该是不为null的。
> 看起来是你的数据问题,一条 update 的changelog, before 为null,
> 这是不合理的,没有before的数据,是无法处理after的数据的。
> 如果确认是脏数据,可以开启ignore-parse-errors跳过[1]
>
> 祝好
> Leonard
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors
> <
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors
> >
>
> {
>     "payload": {
>         "before": null,
>         "after": {
>             "id": 2,
>             "name": "liushimin",
>             "age": "24",
>             "sex": "man",
>             "phone": "155555555"
>         },
>         "source": {
>             "version": "1.2.0.Final",
>             "connector": "postgresql",
>             "name": "postgres",
>             "ts_ms": 1595409754151,
>             "snapshot": "false",
>             "db": "postgres",
>             "schema": "public",
>             "table": "person",
>             "txId": 569,
>             "lsn": 23632344,
>             "xmin": null
>         },
>         "op": "u",
>         "ts_ms": 1595409754270,
>         "transaction": null
>     }
> }
>
> > 在 2020年7月22日,17:34,[hidden email] 写道:
> >
> >
> {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"postgres.public.person.Envelope"},"payload":{"before":null,"after":{"id":2,"name":"liushimin","age":"24","sex":"man","phone":"155555555"},"source":{"version":"1.2.0.Final","connector":"postgresql","name":"postgres","ts_ms":1595409754151,"snapshot":"false","db":"postgres","schema":"public","table":"person","txId":569,"lsn":23632344,"xmin":null},"op":"u","ts_ms":1595409754270,"transaction":null}}
>
>
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 cdc相关问题

Leonard Xu
Hi amenhub

针对这个问题,我建了个issue来跟踪这个问题[1],
另外你可以在你的PG 里面把表的IDENTITY设置为FULL,这样 debezium 同步的UPDATE数据就会有完整的信息,
DB命令是:ALTER TABLE yourTable REPLICA IDENTITY FULL, 可以参考debezium官网文档[2]

Best
Leonard Xu
[1] https://issues.apache.org/jira/browse/FLINK-18700 <https://issues.apache.org/jira/browse/FLINK-18700>
[2] https://debezium.io/documentation/reference/1.2/connectors/postgresql.html <https://debezium.io/documentation/reference/1.2/connectors/postgresql.html>

> 在 2020年7月23日,09:14,[hidden email] 写道:
>
> 感谢二位大佬@Leonard, @Jark的解答!
>
>
>
> [hidden email]
>
> 发件人: Jark Wu
> 发送时间: 2020-07-22 23:56
> 收件人: user-zh
> 主题: Re: flink 1.11 cdc相关问题
> Hi,
>
> 这是个已知问题,目前 debezium 同步不同数据库并没有保证一模一样地数据格式,比如同步 PG 的UPDATE消息时候,before 和
> after 字段就不是全的。
> 这个问题会在后面地版本中解决。
>
> Best,
> Jark
>
> On Wed, 22 Jul 2020 at 21:07, Leonard Xu <[hidden email]> wrote:
>
>> Hello,
>>
>> 代码在为before这条数据设置rowKind时抛了一个NPE,before正常应该是不为null的。
>> 看起来是你的数据问题,一条 update 的changelog, before 为null,
>> 这是不合理的,没有before的数据,是无法处理after的数据的。
>> 如果确认是脏数据,可以开启ignore-parse-errors跳过[1]
>>
>> 祝好
>> Leonard
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors
>> <
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors
>>>
>>
>> {
>>    "payload": {
>>        "before": null,
>>        "after": {
>>            "id": 2,
>>            "name": "liushimin",
>>            "age": "24",
>>            "sex": "man",
>>            "phone": "155555555"
>>        },
>>        "source": {
>>            "version": "1.2.0.Final",
>>            "connector": "postgresql",
>>            "name": "postgres",
>>            "ts_ms": 1595409754151,
>>            "snapshot": "false",
>>            "db": "postgres",
>>            "schema": "public",
>>            "table": "person",
>>            "txId": 569,
>>            "lsn": 23632344,
>>            "xmin": null
>>        },
>>        "op": "u",
>>        "ts_ms": 1595409754270,
>>        "transaction": null
>>    }
>> }
>>
>>> 在 2020年7月22日,17:34,[hidden email] 写道:
>>>
>>>
>> {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"postgres.public.person.Envelope"},"payload":{"before":null,"after":{"id":2,"name":"liushimin","age":"24","sex":"man","phone":"155555555"},"source":{"version":"1.2.0.Final","connector":"postgresql","name":"postgres","ts_ms":1595409754151,"snapshot":"false","db":"postgres","schema":"public","table":"person","txId":569,"lsn":23632344,"xmin":null},"op":"u","ts_ms":1595409754270,"transaction":null}}
>>
>>

Reply | Threaded
Open this post in threaded view
|

回复: Re: flink 1.11 cdc相关问题

amenhub@163.com
多谢!已关注~


Best


[hidden email]
 
发件人: Leonard Xu
发送时间: 2020-07-24 16:20
收件人: user-zh
主题: Re: flink 1.11 cdc相关问题
Hi amenhub
 
针对这个问题,我建了个issue来跟踪这个问题[1],
另外你可以在你的PG 里面把表的IDENTITY设置为FULL,这样 debezium 同步的UPDATE数据就会有完整的信息,
DB命令是:ALTER TABLE yourTable REPLICA IDENTITY FULL, 可以参考debezium官网文档[2]
 
Best
Leonard Xu
[1] https://issues.apache.org/jira/browse/FLINK-18700 <https://issues.apache.org/jira/browse/FLINK-18700>
[2] https://debezium.io/documentation/reference/1.2/connectors/postgresql.html <https://debezium.io/documentation/reference/1.2/connectors/postgresql.html>
 

> 在 2020年7月23日,09:14,[hidden email] 写道:
>
> 感谢二位大佬@Leonard, @Jark的解答!
>
>
>
> [hidden email]
>
> 发件人: Jark Wu
> 发送时间: 2020-07-22 23:56
> 收件人: user-zh
> 主题: Re: flink 1.11 cdc相关问题
> Hi,
>
> 这是个已知问题,目前 debezium 同步不同数据库并没有保证一模一样地数据格式,比如同步 PG 的UPDATE消息时候,before 和
> after 字段就不是全的。
> 这个问题会在后面地版本中解决。
>
> Best,
> Jark
>
> On Wed, 22 Jul 2020 at 21:07, Leonard Xu <[hidden email]> wrote:
>
>> Hello,
>>
>> 代码在为before这条数据设置rowKind时抛了一个NPE,before正常应该是不为null的。
>> 看起来是你的数据问题,一条 update 的changelog, before 为null,
>> 这是不合理的,没有before的数据,是无法处理after的数据的。
>> 如果确认是脏数据,可以开启ignore-parse-errors跳过[1]
>>
>> 祝好
>> Leonard
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors
>> <
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors
>>>
>>
>> {
>>    "payload": {
>>        "before": null,
>>        "after": {
>>            "id": 2,
>>            "name": "liushimin",
>>            "age": "24",
>>            "sex": "man",
>>            "phone": "155555555"
>>        },
>>        "source": {
>>            "version": "1.2.0.Final",
>>            "connector": "postgresql",
>>            "name": "postgres",
>>            "ts_ms": 1595409754151,
>>            "snapshot": "false",
>>            "db": "postgres",
>>            "schema": "public",
>>            "table": "person",
>>            "txId": 569,
>>            "lsn": 23632344,
>>            "xmin": null
>>        },
>>        "op": "u",
>>        "ts_ms": 1595409754270,
>>        "transaction": null
>>    }
>> }
>>
>>> 在 2020年7月22日,17:34,[hidden email] 写道:
>>>
>>>
>> {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"postgres.public.person.Envelope"},"payload":{"before":null,"after":{"id":2,"name":"liushimin","age":"24","sex":"man","phone":"155555555"},"source":{"version":"1.2.0.Final","connector":"postgresql","name":"postgres","ts_ms":1595409754151,"snapshot":"false","db":"postgres","schema":"public","table":"person","txId":569,"lsn":23632344,"xmin":null},"op":"u","ts_ms":1595409754270,"transaction":null}}
>>
>>