hi everyone,
小白通过debezium将pgsql cdc数据同步至kafka之后,使用我们flink的sql client提交测试任务,但当kafka端cdc json数据一开始发送,任务即报错,通过web ui log查看界面发现错误日志如下,还请大佬帮忙分析,谢谢! ====================================分割线====================================== DDL: CREATE TABLE pgsql_person_cdc( id BIGINT, name STRING, age STRING, sex STRING, phone STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'postgres.public.person', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'pgsql_cdc', 'format' = 'debezium-json', 'debezium-json.schema-include' = 'true' ) CREATE TABLE sql_out ( id BIGINT, name STRING, age STRING, sex STRING, phone STRING ) WITH ( 'connector' = 'print' ) INSERT INTO sql_out SELECT * FROM pgsql_person_cdc; ====================================分割线====================================== 错误日志: java.io.IOException: Corrupt Debezium JSON message '{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"postgres.public.person.Envelope"},"payload":{"before":null,"after":{"id":2,"name":"liushimin","age":"24","sex":"man","phone":"155555555"},"source":{"version":"1.2.0.Final","connector":"postgresql","name":"postgres","ts_ms":1595409754151,"snapshot":"false","db":"postgres","schema":"public","table":"person","txId":569,"lsn":23632344,"xmin":null},"op":"u","ts_ms":1595409754270,"transaction":null}}'. at org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136) ~[flink-json-1.11.0.jar:1.11.0] at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) ~[flink-connector-kafka-base_2.11-1.11.0.jar:1.11.0] at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) ~[flink-connector-kafka_2.11-1.11.0.jar:1.11.0] at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) ~[flink-connector-kafka_2.11-1.11.0.jar:1.11.0] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) ~[flink-connector-kafka-base_2.11-1.11.0.jar:1.11.0] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.11-1.11.0.jar:1.11.0] Caused by: java.lang.NullPointerException at org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:120) ~[flink-json-1.11.0.jar:1.11.0] ... 7 more 2020-07-22 17:22:34,415 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for Source: TableSourceScan(table=[[default_catalog, default_database, pgsql_person_cdc]], fields=[id, name, age, sex, phone]) -> Sink: Sink(table=[default_catalog.default_database.sql_out], fields=[id, name, age, sex, phone]) (1/1) (b553cb66df6e47a27e7dae8466b684ab). 2020-07-22 17:22:34,418 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FAILED to JobManager for task Source: TableSourceScan(table=[[default_catalog, default_database, pgsql_person_cdc]], fields=[id, name, age, sex, phone]) -> Sink: Sink(table=[default_catalog.default_database.sql_out], fields=[id, name, age, sex, phone]) (1/1) b553cb66df6e47a27e7dae8466b684ab. 2020-07-22 17:22:34,461 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:0, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=384.000mb (402653174 bytes), taskOffHeapMemory=0 bytes, managedMemory=512.000mb (536870920 bytes), networkMemory=128.000mb (134217730 bytes)}, allocationId: 495bb5a0cd877808674b29890b6b8bc0, jobId: 3feda3a191fcb8e0da891b9fda1ee532). 2020-07-22 17:22:34,462 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job 3feda3a191fcb8e0da891b9fda1ee532 from job leader monitoring. 2020-07-22 17:22:34,462 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Close JobManager connection for job 3feda3a191fcb8e0da891b9fda1ee532. ====================================分割线====================================== best! [hidden email] |
Hello,
代码在为before这条数据设置rowKind时抛了一个NPE,before正常应该是不为null的。 看起来是你的数据问题,一条 update 的changelog, before 为null, 这是不合理的,没有before的数据,是无法处理after的数据的。 如果确认是脏数据,可以开启ignore-parse-errors跳过[1] 祝好 Leonard [1]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors> { "payload": { "before": null, "after": { "id": 2, "name": "liushimin", "age": "24", "sex": "man", "phone": "155555555" }, "source": { "version": "1.2.0.Final", "connector": "postgresql", "name": "postgres", "ts_ms": 1595409754151, "snapshot": "false", "db": "postgres", "schema": "public", "table": "person", "txId": 569, "lsn": 23632344, "xmin": null }, "op": "u", "ts_ms": 1595409754270, "transaction": null } } > 在 2020年7月22日,17:34,[hidden email] 写道: > > {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"postgres.public.person.Envelope"},"payload":{"before":null,"after":{"id":2,"name":"liushimin","age":"24","sex":"man","phone":"155555555"},"source":{"version":"1.2.0.Final","connector":"postgresql","name":"postgres","ts_ms":1595409754151,"snapshot":"false","db":"postgres","schema":"public","table":"person","txId":569,"lsn":23632344,"xmin":null},"op":"u","ts_ms":1595409754270,"transaction":null}} |
Administrator
|
Hi,
这是个已知问题,目前 debezium 同步不同数据库并没有保证一模一样地数据格式,比如同步 PG 的UPDATE消息时候,before 和 after 字段就不是全的。 这个问题会在后面地版本中解决。 Best, Jark On Wed, 22 Jul 2020 at 21:07, Leonard Xu <[hidden email]> wrote: > Hello, > > 代码在为before这条数据设置rowKind时抛了一个NPE,before正常应该是不为null的。 > 看起来是你的数据问题,一条 update 的changelog, before 为null, > 这是不合理的,没有before的数据,是无法处理after的数据的。 > 如果确认是脏数据,可以开启ignore-parse-errors跳过[1] > > 祝好 > Leonard > [1] > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors > < > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors > > > > { > "payload": { > "before": null, > "after": { > "id": 2, > "name": "liushimin", > "age": "24", > "sex": "man", > "phone": "155555555" > }, > "source": { > "version": "1.2.0.Final", > "connector": "postgresql", > "name": "postgres", > "ts_ms": 1595409754151, > "snapshot": "false", > "db": "postgres", > "schema": "public", > "table": "person", > "txId": 569, > "lsn": 23632344, > "xmin": null > }, > "op": "u", > "ts_ms": 1595409754270, > "transaction": null > } > } > > > 在 2020年7月22日,17:34,[hidden email] 写道: > > > > > {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"postgres.public.person.Envelope"},"payload":{"before":null,"after":{"id":2,"name":"liushimin","age":"24","sex":"man","phone":"155555555"},"source":{"version":"1.2.0.Final","connector":"postgresql","name":"postgres","ts_ms":1595409754151,"snapshot":"false","db":"postgres","schema":"public","table":"person","txId":569,"lsn":23632344,"xmin":null},"op":"u","ts_ms":1595409754270,"transaction":null}} > > |
感谢二位大佬@Leonard, @Jark的解答!
[hidden email] 发件人: Jark Wu 发送时间: 2020-07-22 23:56 收件人: user-zh 主题: Re: flink 1.11 cdc相关问题 Hi, 这是个已知问题,目前 debezium 同步不同数据库并没有保证一模一样地数据格式,比如同步 PG 的UPDATE消息时候,before 和 after 字段就不是全的。 这个问题会在后面地版本中解决。 Best, Jark On Wed, 22 Jul 2020 at 21:07, Leonard Xu <[hidden email]> wrote: > Hello, > > 代码在为before这条数据设置rowKind时抛了一个NPE,before正常应该是不为null的。 > 看起来是你的数据问题,一条 update 的changelog, before 为null, > 这是不合理的,没有before的数据,是无法处理after的数据的。 > 如果确认是脏数据,可以开启ignore-parse-errors跳过[1] > > 祝好 > Leonard > [1] > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors > < > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors > > > > { > "payload": { > "before": null, > "after": { > "id": 2, > "name": "liushimin", > "age": "24", > "sex": "man", > "phone": "155555555" > }, > "source": { > "version": "1.2.0.Final", > "connector": "postgresql", > "name": "postgres", > "ts_ms": 1595409754151, > "snapshot": "false", > "db": "postgres", > "schema": "public", > "table": "person", > "txId": 569, > "lsn": 23632344, > "xmin": null > }, > "op": "u", > "ts_ms": 1595409754270, > "transaction": null > } > } > > > 在 2020年7月22日,17:34,[hidden email] 写道: > > > > > {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"postgres.public.person.Envelope"},"payload":{"before":null,"after":{"id":2,"name":"liushimin","age":"24","sex":"man","phone":"155555555"},"source":{"version":"1.2.0.Final","connector":"postgresql","name":"postgres","ts_ms":1595409754151,"snapshot":"false","db":"postgres","schema":"public","table":"person","txId":569,"lsn":23632344,"xmin":null},"op":"u","ts_ms":1595409754270,"transaction":null}} > > |
Hi amenhub
针对这个问题,我建了个issue来跟踪这个问题[1], 另外你可以在你的PG 里面把表的IDENTITY设置为FULL,这样 debezium 同步的UPDATE数据就会有完整的信息, DB命令是:ALTER TABLE yourTable REPLICA IDENTITY FULL, 可以参考debezium官网文档[2] Best Leonard Xu [1] https://issues.apache.org/jira/browse/FLINK-18700 <https://issues.apache.org/jira/browse/FLINK-18700> [2] https://debezium.io/documentation/reference/1.2/connectors/postgresql.html <https://debezium.io/documentation/reference/1.2/connectors/postgresql.html> > 在 2020年7月23日,09:14,[hidden email] 写道: > > 感谢二位大佬@Leonard, @Jark的解答! > > > > [hidden email] > > 发件人: Jark Wu > 发送时间: 2020-07-22 23:56 > 收件人: user-zh > 主题: Re: flink 1.11 cdc相关问题 > Hi, > > 这是个已知问题,目前 debezium 同步不同数据库并没有保证一模一样地数据格式,比如同步 PG 的UPDATE消息时候,before 和 > after 字段就不是全的。 > 这个问题会在后面地版本中解决。 > > Best, > Jark > > On Wed, 22 Jul 2020 at 21:07, Leonard Xu <[hidden email]> wrote: > >> Hello, >> >> 代码在为before这条数据设置rowKind时抛了一个NPE,before正常应该是不为null的。 >> 看起来是你的数据问题,一条 update 的changelog, before 为null, >> 这是不合理的,没有before的数据,是无法处理after的数据的。 >> 如果确认是脏数据,可以开启ignore-parse-errors跳过[1] >> >> 祝好 >> Leonard >> [1] >> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors >> < >> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors >>> >> >> { >> "payload": { >> "before": null, >> "after": { >> "id": 2, >> "name": "liushimin", >> "age": "24", >> "sex": "man", >> "phone": "155555555" >> }, >> "source": { >> "version": "1.2.0.Final", >> "connector": "postgresql", >> "name": "postgres", >> "ts_ms": 1595409754151, >> "snapshot": "false", >> "db": "postgres", >> "schema": "public", >> "table": "person", >> "txId": 569, >> "lsn": 23632344, >> "xmin": null >> }, >> "op": "u", >> "ts_ms": 1595409754270, >> "transaction": null >> } >> } >> >>> 在 2020年7月22日,17:34,[hidden email] 写道: >>> >>> >> {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"postgres.public.person.Envelope"},"payload":{"before":null,"after":{"id":2,"name":"liushimin","age":"24","sex":"man","phone":"155555555"},"source":{"version":"1.2.0.Final","connector":"postgresql","name":"postgres","ts_ms":1595409754151,"snapshot":"false","db":"postgres","schema":"public","table":"person","txId":569,"lsn":23632344,"xmin":null},"op":"u","ts_ms":1595409754270,"transaction":null}} >> >> |
多谢!已关注~
Best [hidden email] 发件人: Leonard Xu 发送时间: 2020-07-24 16:20 收件人: user-zh 主题: Re: flink 1.11 cdc相关问题 Hi amenhub 针对这个问题,我建了个issue来跟踪这个问题[1], 另外你可以在你的PG 里面把表的IDENTITY设置为FULL,这样 debezium 同步的UPDATE数据就会有完整的信息, DB命令是:ALTER TABLE yourTable REPLICA IDENTITY FULL, 可以参考debezium官网文档[2] Best Leonard Xu [1] https://issues.apache.org/jira/browse/FLINK-18700 <https://issues.apache.org/jira/browse/FLINK-18700> [2] https://debezium.io/documentation/reference/1.2/connectors/postgresql.html <https://debezium.io/documentation/reference/1.2/connectors/postgresql.html> > 在 2020年7月23日,09:14,[hidden email] 写道: > > 感谢二位大佬@Leonard, @Jark的解答! > > > > [hidden email] > > 发件人: Jark Wu > 发送时间: 2020-07-22 23:56 > 收件人: user-zh > 主题: Re: flink 1.11 cdc相关问题 > Hi, > > 这是个已知问题,目前 debezium 同步不同数据库并没有保证一模一样地数据格式,比如同步 PG 的UPDATE消息时候,before 和 > after 字段就不是全的。 > 这个问题会在后面地版本中解决。 > > Best, > Jark > > On Wed, 22 Jul 2020 at 21:07, Leonard Xu <[hidden email]> wrote: > >> Hello, >> >> 代码在为before这条数据设置rowKind时抛了一个NPE,before正常应该是不为null的。 >> 看起来是你的数据问题,一条 update 的changelog, before 为null, >> 这是不合理的,没有before的数据,是无法处理after的数据的。 >> 如果确认是脏数据,可以开启ignore-parse-errors跳过[1] >> >> 祝好 >> Leonard >> [1] >> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors >> < >> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors >>> >> >> { >> "payload": { >> "before": null, >> "after": { >> "id": 2, >> "name": "liushimin", >> "age": "24", >> "sex": "man", >> "phone": "155555555" >> }, >> "source": { >> "version": "1.2.0.Final", >> "connector": "postgresql", >> "name": "postgres", >> "ts_ms": 1595409754151, >> "snapshot": "false", >> "db": "postgres", >> "schema": "public", >> "table": "person", >> "txId": 569, >> "lsn": 23632344, >> "xmin": null >> }, >> "op": "u", >> "ts_ms": 1595409754270, >> "transaction": null >> } >> } >> >>> 在 2020年7月22日,17:34,[hidden email] 写道: >>> >>> >> {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"postgres.public.person.Envelope"},"payload":{"before":null,"after":{"id":2,"name":"liushimin","age":"24","sex":"man","phone":"155555555"},"source":{"version":"1.2.0.Final","connector":"postgresql","name":"postgres","ts_ms":1595409754151,"snapshot":"false","db":"postgres","schema":"public","table":"person","txId":569,"lsn":23632344,"xmin":null},"op":"u","ts_ms":1595409754270,"transaction":null}} >> >> |
Free forum by Nabble | Edit this page |