hi
我使用的flink 1.11.0版本 代码如下 StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnvironment = StreamTableEnvironment.create(streamEnv); tableEnvironment.executeSql(" " + " CREATE TABLE mySource ( " + " a bigint, " + " b bigint " + " ) WITH ( " + " 'connector.type' = 'kafka', " + " 'connector.version' = 'universal', " + " 'connector.topic' = 'mytesttopic', " + " 'connector.properties.zookeeper.connect' = '172.17.0.2:2181', " + " 'connector.properties.bootstrap.servers' = '172.17.0.2:9092', " + " 'connector.properties.group.id' = 'flink-test-cxy', " + " 'connector.startup-mode' = 'latest-offset', " + " 'format.type' = 'json' " + " ) "); tableEnvironment.executeSql("CREATE TABLE mysqlsink ( " + " id bigint, " + " game_id varchar, " + " PRIMARY KEY (id) NOT ENFORCED " + " ) " + " with ( " + " 'connector.type' = 'jdbc', " + " 'connector.url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , " + " 'connector.username' = 'root' , " + " 'connector.password' = 'root', " + " 'connector.table' = 'mysqlsink' , " + " 'connector.driver' = 'com.mysql.cj.jdbc.Driver' , " + " 'connector.write.flush.interval' = '2s', " + " 'connector.write.flush.max-rows' = '300' " + " )"); tableEnvironment.executeSql("insert into mysqlsink (`id`,`game_id`) values (select a,cast(b as varchar) b from mySource)"); 问题一 : 上面的insert语句会出现如下错误 Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply '$SCALAR_QUERY' to arguments of type '$SCALAR_QUERY(<RECORDTYPE(BIGINT A, VARCHAR(2147483647) B)>)'. Supported form(s): '$SCALAR_QUERY(<RECORDTYPE(SINGLE FIELD)>)' 问题二 : 如果insert改成 tableEnvironment.executeSql("insert into mysqlsink select a,cast(b as varchar) b from mySource"); 是可以运行的,但是出现唯一主键重复时会报错 Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry '1' for key 'PRIMARY' |
改成update模式,然后也可以修改唯一主键为自然键
> 在 2020年7月31日,下午4:13,chenxuying <[hidden email]> 写道: > > hi > 我使用的flink 1.11.0版本 > 代码如下 > StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); > TableEnvironment tableEnvironment = StreamTableEnvironment.create(streamEnv); > tableEnvironment.executeSql(" " + > " CREATE TABLE mySource ( " + > " a bigint, " + > " b bigint " + > " ) WITH ( " + > " 'connector.type' = 'kafka', " + > " 'connector.version' = 'universal', " + > " 'connector.topic' = 'mytesttopic', " + > " 'connector.properties.zookeeper.connect' = '172.17.0.2:2181', " + > " 'connector.properties.bootstrap.servers' = '172.17.0.2:9092', " + > " 'connector.properties.group.id' = 'flink-test-cxy', " + > " 'connector.startup-mode' = 'latest-offset', " + > " 'format.type' = 'json' " + > " ) "); > tableEnvironment.executeSql("CREATE TABLE mysqlsink ( " + > " id bigint, " + > " game_id varchar, " + > " PRIMARY KEY (id) NOT ENFORCED " + > " ) " + > " with ( " + > " 'connector.type' = 'jdbc', " + > " 'connector.url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , " + > " 'connector.username' = 'root' , " + > " 'connector.password' = 'root', " + > " 'connector.table' = 'mysqlsink' , " + > " 'connector.driver' = 'com.mysql.cj.jdbc.Driver' , " + > " 'connector.write.flush.interval' = '2s', " + > " 'connector.write.flush.max-rows' = '300' " + > " )"); > tableEnvironment.executeSql("insert into mysqlsink (`id`,`game_id`) values (select a,cast(b as varchar) b from mySource)"); > > > 问题一 : 上面的insert语句会出现如下错误 > Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply '$SCALAR_QUERY' to arguments of type '$SCALAR_QUERY(<RECORDTYPE(BIGINT A, VARCHAR(2147483647) B)>)'. Supported form(s): '$SCALAR_QUERY(<RECORDTYPE(SINGLE FIELD)>)' > > > 问题二 : 如果insert改成 tableEnvironment.executeSql("insert into mysqlsink select a,cast(b as varchar) b from mySource"); 是可以运行的,但是出现唯一主键重复时会报错 > Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry '1' for key 'PRIMARY' > > > |
In reply to this post by cxydevelop@163.com
Hi, chenxuying
看你还是用的还是 " 'connector.type' = 'jdbc', …. " ,这是老的option,使用老的option参数还是需要根据query推导主键, 需要使用新的属性[1]:" 'connector' = 'jdbc’,…." 才能配合 主键 决定 upsert 模式. Best Leonard [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/jdbc.html#connector-options <https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/jdbc.html#connector-options> > 在 2020年7月31日,16:12,chenxuying <[hidden email]> 写道: > > hi > 我使用的flink 1.11.0版本 > 代码如下 > StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); > TableEnvironment tableEnvironment = StreamTableEnvironment.create(streamEnv); > tableEnvironment.executeSql(" " + > " CREATE TABLE mySource ( " + > " a bigint, " + > " b bigint " + > " ) WITH ( " + > " 'connector.type' = 'kafka', " + > " 'connector.version' = 'universal', " + > " 'connector.topic' = 'mytesttopic', " + > " 'connector.properties.zookeeper.connect' = '172.17.0.2:2181', " + > " 'connector.properties.bootstrap.servers' = '172.17.0.2:9092', " + > " 'connector.properties.group.id' = 'flink-test-cxy', " + > " 'connector.startup-mode' = 'latest-offset', " + > " 'format.type' = 'json' " + > " ) "); > tableEnvironment.executeSql("CREATE TABLE mysqlsink ( " + > " id bigint, " + > " game_id varchar, " + > " PRIMARY KEY (id) NOT ENFORCED " + > " ) " + > " with ( " + > " 'connector.type' = 'jdbc', " + > " 'connector.url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , " + > " 'connector.username' = 'root' , " + > " 'connector.password' = 'root', " + > " 'connector.table' = 'mysqlsink' , " + > " 'connector.driver' = 'com.mysql.cj.jdbc.Driver' , " + > " 'connector.write.flush.interval' = '2s', " + > " 'connector.write.flush.max-rows' = '300' " + > " )"); > tableEnvironment.executeSql("insert into mysqlsink (`id`,`game_id`) values (select a,cast(b as varchar) b from mySource)"); > > > 问题一 : 上面的insert语句会出现如下错误 > Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply '$SCALAR_QUERY' to arguments of type '$SCALAR_QUERY(<RECORDTYPE(BIGINT A, VARCHAR(2147483647) B)>)'. Supported form(s): '$SCALAR_QUERY(<RECORDTYPE(SINGLE FIELD)>)' > > > 问题二 : 如果insert改成 tableEnvironment.executeSql("insert into mysqlsink select a,cast(b as varchar) b from mySource"); 是可以运行的,但是出现唯一主键重复时会报错 > Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry '1' for key 'PRIMARY' > > > |
谢谢回答
使用新属性可以 成功修改记录 , 但是不太明白 "使用老的option参数还是需要根据query推导主键" 这里话是什么意思,需要怎么做 在 2020-07-31 16:46:41,"Leonard Xu" <[hidden email]> 写道: >Hi, chenxuying > >看你还是用的还是 " 'connector.type' = 'jdbc', …. " ,这是老的option,使用老的option参数还是需要根据query推导主键, >需要使用新的属性[1]:" 'connector' = 'jdbc’,…." 才能配合 主键 决定 upsert 模式. > >Best >Leonard >[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/jdbc.html#connector-options <https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/jdbc.html#connector-options> > >> 在 2020年7月31日,16:12,chenxuying <[hidden email]> 写道: >> >> hi >> 我使用的flink 1.11.0版本 >> 代码如下 >> StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); >> TableEnvironment tableEnvironment = StreamTableEnvironment.create(streamEnv); >> tableEnvironment.executeSql(" " + >> " CREATE TABLE mySource ( " + >> " a bigint, " + >> " b bigint " + >> " ) WITH ( " + >> " 'connector.type' = 'kafka', " + >> " 'connector.version' = 'universal', " + >> " 'connector.topic' = 'mytesttopic', " + >> " 'connector.properties.zookeeper.connect' = '172.17.0.2:2181', " + >> " 'connector.properties.bootstrap.servers' = '172.17.0.2:9092', " + >> " 'connector.properties.group.id' = 'flink-test-cxy', " + >> " 'connector.startup-mode' = 'latest-offset', " + >> " 'format.type' = 'json' " + >> " ) "); >> tableEnvironment.executeSql("CREATE TABLE mysqlsink ( " + >> " id bigint, " + >> " game_id varchar, " + >> " PRIMARY KEY (id) NOT ENFORCED " + >> " ) " + >> " with ( " + >> " 'connector.type' = 'jdbc', " + >> " 'connector.url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , " + >> " 'connector.username' = 'root' , " + >> " 'connector.password' = 'root', " + >> " 'connector.table' = 'mysqlsink' , " + >> " 'connector.driver' = 'com.mysql.cj.jdbc.Driver' , " + >> " 'connector.write.flush.interval' = '2s', " + >> " 'connector.write.flush.max-rows' = '300' " + >> " )"); >> tableEnvironment.executeSql("insert into mysqlsink (`id`,`game_id`) values (select a,cast(b as varchar) b from mySource)"); >> >> >> 问题一 : 上面的insert语句会出现如下错误 >> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply '$SCALAR_QUERY' to arguments of type '$SCALAR_QUERY(<RECORDTYPE(BIGINT A, VARCHAR(2147483647) B)>)'. Supported form(s): '$SCALAR_QUERY(<RECORDTYPE(SINGLE FIELD)>)' >> >> >> 问题二 : 如果insert改成 tableEnvironment.executeSql("insert into mysqlsink select a,cast(b as varchar) b from mySource"); 是可以运行的,但是出现唯一主键重复时会报错 >> Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry '1' for key 'PRIMARY' >> >> >> > |
Hello
> 在 2020年7月31日,21:13,chenxuying <[hidden email]> 写道: > > 但是不太明白 "使用老的option参数还是需要根据query推导主键" 这里话是什么意思,需要怎么做 简单来讲,如果使用的是老版本(1.10)的option参数,代码执行的路径就和1.10版本一样的,1.10版本里是不支持定义 PRIMARY KEY 的, 是通过用户的query来决定写入的模式是upsert 还是 append , 你可以看下1.10的文档关于用query 推导 写入模式的文档[1], 如果已经在用1.11了,1.10的文档可以不用看的。 在1.10里经常出现query 推导不出 key 导致无法做upsert写入的case, 在1.11里通过支持定义 PRIMARY KEY,不会再有类似问题.1.11的文档参考[2]。 祝好 Leonard [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#jdbc-connector <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#jdbc-connector> [2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table> |
hi
ok,谢谢,懂了哈哈 在 2020-07-31 21:27:02,"Leonard Xu" <[hidden email]> 写道: >Hello > >> 在 2020年7月31日,21:13,chenxuying <[hidden email]> 写道: >> >> 但是不太明白 "使用老的option参数还是需要根据query推导主键" 这里话是什么意思,需要怎么做 > >简单来讲,如果使用的是老版本(1.10)的option参数,代码执行的路径就和1.10版本一样的,1.10版本里是不支持定义 PRIMARY KEY 的, >是通过用户的query来决定写入的模式是upsert 还是 append , 你可以看下1.10的文档关于用query 推导 写入模式的文档[1], 如果已经在用1.11了,1.10的文档可以不用看的。 > >在1.10里经常出现query 推导不出 key 导致无法做upsert写入的case, 在1.11里通过支持定义 PRIMARY KEY,不会再有类似问题.1.11的文档参考[2]。 > >祝好 >Leonard > >[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#jdbc-connector <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#jdbc-connector> >[2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table> |
Free forum by Nabble | Edit this page |