|
有一个flink sql mysql-cdc作业,从kafka消费canal-json格式的binlog数据,打到下游存储,遇到一个问题:上游源表字段是会动态添加的,而我的sql table定义是静态的,如果上游schema发生变化,我的作业就会失败。在flink sql中是否可以针对目标数据源只定义用到的字段?如果可以的话应该怎么实现?现在是会抛类似下面的error。意思是原本包含43列的数据表,在我的DDL中只定义了其中的4列。有哪些格式是支持定义部分字段的呢?
21/06/02 18:54:22 [Source: TableSourceScan(table=[[default_catalog, default_database, charge_log]], fields=[id, charge_id, trace_id, app_id]) -> Sink: Sink(table=[default_catalog.default_database.print_table], fields=[id, charge_id, trace_id, app_id]) (3/12)#0] WARN taskmanager.Task: Source: TableSourceScan(table=[[default_catalog, default_database, charge_log]], fields=[id, charge_id, trace_id, app_id]) -> Sink: Sink(table=[default_catalog.default_database.print_table], fields=[id, charge_id, trace_id, app_id]) (3/12)#0 (8810adcd7960cb22a6954c985ba49d0d) switched from RUNNING to FAILED.
java.lang.IllegalArgumentException: Row arity: 43, but serializer arity: 4
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:124)
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
|