|
我的flinksql为:CREATE TABLE kafkaInputTable (
name String,
address String
) WITH (
'connector' = 'kafka',
'topic' = 'test',
'properties.bootstrap.servers' = 'node1:9092,node2:9092,node3:9092',
'format' = 'avro'
);
CREATE TABLE kafkaOutputTable (
name String,
address String
) WITH (
'connector' = 'kafka',
'topic' = 'test1',
'properties.bootstrap.servers' = 'node1:9092,node2:9092,node3:9092',
'format' = 'csv'
);
INSERT INTO kafkaOutputTable SELECT name, address FROM kafkaInputTable;
当我往报错test 这个topic生产avro格式的数据时,报错:Failed ro deserialize Avro record......ArrayIndexOutOfBoundsException: 6
{
"namespace": "cc.unmi.data",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "address", "type": ["string", "null"]}
]
}
请教一下我这么写哪里有问题,如果avro格式的表作为sink就没这个问题。
|