基础场景: 从 KafkaSource 输入数据,输出到 sinktable, 期间 Left join 关联 DimTable 维表。
Flink 版本 1.12.2 场景1:当把 sinktable 设置为 'connector' = 'print' ,不设置任何主键,可正常关联输出 场景2:当把 sinktable 设置为 'connector' = 'mysql' 则会要求加上 primary key 场景3:在 sinktable 加上 PRIMARY KEY (uid, platform,root_game_id) NOT ENFORCED 则报错,主要报错信息: java.sql.BatchUpdateException: [30000, 2021060816420117201616500303151172567] cannot update pk column UID to expr 注:此处使用的MySQL 是阿里的 ADB,建表SQL如下 Create Table `v2_dwd_root_game_uid_reg_log` ( `uid` bigint NOT NULL DEFAULT '0' COMMENT '注册uid', `user_name` varchar NOT NULL DEFAULT '', // 此处省略其他字段 primary key (`uid`,`platform`,`root_game_id`) ) DISTRIBUTE BY HASH(`uid`) INDEX_ALL='Y' STORAGE_POLICY='HOT' COMMENT='按根游戏账号注册日志'; 下面是场景3的SQL语句: // Kafka Source CREATE TABLE KafkaTable ( message STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'xxxxxxxxxxx', 'properties.bootstrap.servers' = 'xxxxxxxxxxx', 'properties.group.id' = 'xxxxxxxxxxxxx', 'scan.startup.mode' = 'group-offsets', 'format' = 'json' ); // 维表 CREATE TABLE DimTable ( game_id BIGINT, root_game_id BIGINT, main_game_id BIGINT, platform VARCHAR ) WITH ( 'connector' = 'jdbc', 'url' = 'xxxxxxxxxxxx', 'table-name' = 'v2_dim_game_id', 'driver' = 'com.mysql.cj.jdbc.Driver', 'username' = 'xxxxxxxxx', 'password' = 'xxxxxxxx', 'lookup.cache.max-rows'='5000', 'lookup.cache.ttl' = '60s', 'lookup.max-retries'='3' ); // MySQL输出 CREATE TABLE sinktable ( uid BIGINT, root_game_id BIGINT, game_id BIGINT, platform VARCHAR, //....省略其它字段 PRIMARY KEY (uid, platform,root_game_id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'xxxxxxxxxxxxxx', 'table-name' = 'v2_dwd_root_game_uid_reg_log', 'driver' = 'com.mysql.cj.jdbc.Driver', 'username' = 'xxxxxxxxxxx', 'password' = 'xxxxxxxxxxxx', 'sink.buffer-flush.interval'='5s', 'sink.buffer-flush.max-rows' = '10' ); // 插入(关联维表) INSERT INTO sinktable select IF(IsInvalidValue(k.uid), 0 , CAST(k.uid AS BIGINT)) as uid, IF((k.game_id IS NULL), 0 , k.game_id) as game_id, d.platform as platform, d.root_game_id as root_game_id, // 省略其它字段 from KafkaTable,LATERAL TABLE(RequestBodyColumnToRow(message, 'uid,game_id(BIGINT),platform' )) as k LEFT JOIN DimTable as d ON k.game_id = d.game_id and k.platform = d.platform; -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
详细的异常打印信息如下:
java.sql.BatchUpdateException: [30000, 2021060816420017201616500303151172306] cannot update pk column UID to expr at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at com.mysql.cj.util.Util.handleNewInstance(Util.java:192) at com.mysql.cj.util.Util.getInstance(Util.java:167) at com.mysql.cj.util.Util.getInstance(Util.java:174) at com.mysql.cj.jdbc.exceptions.SQLError.createBatchUpdateException(SQLError.java:224) at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchedInserts(ClientPreparedStatement.java:755) at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchInternal(ClientPreparedStatement.java:426) at com.mysql.cj.jdbc.StatementImpl.executeBatch(StatementImpl.java:796) at org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65) at org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64) at org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101) at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216) at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184) at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.lambda$open$0(JdbcBatchingOutputFormat.java:128) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset$$$capture(FutureTask.java:308) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
In reply to this post by WeiXubin
我把表移动到了普通 MYSQL,可以正常运行。 经过排查应该是 ADB 建表 SQL 中的 DISTRIBUTE BY HASH(`uid`)
所导致,该语法用于处理数据倾斜问题。 看起来似乎是联表 join 的时候要求定义主键,但是定义主键后会转换为 upsert 流,而 ADB 中定义了 DISTRIBUTE BY 与 upsert 冲突了,不知道是否这么理解 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |