我在SQL关联后把结果写入mysql出现 No value specified for parameter 1错误? 我的版本是1.10.0,代码如下 JDBCUpsertTableSink build = JDBCUpsertTableSink.builder() .setTableSchema(results.getSchema()) .setOptions(JDBCOptions.builder() .setDBUrl("。。。。MultiQueries=true&useUnicode=true&characterEncoding=UTF-8") .setDriverName("com.mysql.jdbc.Driver") .setUsername("jczx_cjch") .setPassword("jczx_cjch2") .setTableName("xkf_join_result") .build()) .setFlushIntervalMills(1000) .setFlushMaxSize(100) .setMaxRetryTimes(3) .build(); DataStream<Tuple2<Boolean, Row>> retract = bsTableEnv.toRetractStream(results, Row.class); retract.print(); build.emitDataStream(retract); 就会出现如下错误 java.sql.SQLException: No value specified for parameter 1 at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965) at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898) at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887) at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861) at com.mysql.jdbc.PreparedStatement.checkAllParametersSet(PreparedStatement.java:2211) at com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:2191) at com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:2121) at com.mysql.jdbc.PreparedStatement.execute(PreparedStatement.java:1162) at org.apache.flink.api.java.io.jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:118) at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159) at org.apache.flink.api.java.io.jdbc.JDBCUpsertSinkFunction.snapshotState(JDBCUpsertSinkFunction.java:56) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1354) 我的输出数据是(true,2020-04-22 21:34:00,2020-04-22 21:34:15,20200422213541465568468)是这样的 我查看源码发现 先调用JDBCUpsertOutputFormat类的writeRecord方法给UpsertWriter类的成员变量map中添加元素 @Override public synchronized void writeRecord(Tuple2<Boolean, Row> tuple2) throws IOException { checkFlushException(); try { jdbcWriter.addRecord(tuple2); batchCount++; if (batchCount >= flushMaxSize) { flush(); } } catch (Exception e) { throw new RuntimeException("Writing records to JDBC failed.", e); } } 之后调用flush()方法,调用UpsertWriter类执行executeBatch方法 public synchronized void flush() throws Exception { checkFlushException(); for (int i = 1; i <= maxRetryTimes; i++) { try { jdbcWriter.executeBatch(); batchCount = 0; break; } catch (SQLException e) { LOG.error("JDBC executeBatch error, retry times = {}", i, e); if (i >= maxRetryTimes) { throw e; } Thread.sleep(1000 * i); } } } 然后会调用UpsertWriter类 实现JDBCWriter类在executeBatch方法中先判断map是否为空,然后循环map;之后判断2元组第一个元素的true调用内部类处理元素,否则删除数据;但是我每次的数据只有一条就是map的大小是1,且2元组的第一个元素值是true,循环结束执行 deleteStatement.executeBatch();方法就会出错,因为删除的语句站位符还没有填充; @Override public void executeBatch() throws SQLException { if (keyToRows.size() > 0) { for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet()) { Row pk = entry.getKey(); Tuple2<Boolean, Row> tuple = entry.getValue(); if (tuple.f0) { processOneRowInBatch(pk, tuple.f1); } else { setRecordToStatement(deleteStatement, pkTypes, pk); deleteStatement.addBatch(); } } internalExecuteBatch(); deleteStatement.executeBatch(); keyToRows.clear(); } } |
Hi,
- JDBC是upsert sink,所以你需要toUpsertStream,而不是toRetractStream,建议你用完整的DDL来插入mysql的表。 - 这个异常看起来是JDBC的bug,你可以建个JIRA来跟踪吗? Best, Jingsong Lee On Wed, Apr 22, 2020 at 9:58 PM 1101300123 <[hidden email]> wrote: > > > 我在SQL关联后把结果写入mysql出现 No value specified for parameter 1错误? > 我的版本是1.10.0,代码如下 > JDBCUpsertTableSink build = JDBCUpsertTableSink.builder() > .setTableSchema(results.getSchema()) > .setOptions(JDBCOptions.builder() > > .setDBUrl("。。。。MultiQueries=true&useUnicode=true&characterEncoding=UTF-8") > .setDriverName("com.mysql.jdbc.Driver") > .setUsername("jczx_cjch") > .setPassword("jczx_cjch2") > .setTableName("xkf_join_result") > .build()) > .setFlushIntervalMills(1000) > .setFlushMaxSize(100) > .setMaxRetryTimes(3) > .build(); > > > DataStream<Tuple2<Boolean, Row>> retract = > bsTableEnv.toRetractStream(results, Row.class); > retract.print(); > build.emitDataStream(retract); > > > > > 就会出现如下错误 > java.sql.SQLException: No value specified for parameter 1 > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965) > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898) > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887) > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861) > at > com.mysql.jdbc.PreparedStatement.checkAllParametersSet(PreparedStatement.java:2211) > at > com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:2191) > at > com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:2121) > at com.mysql.jdbc.PreparedStatement.execute(PreparedStatement.java:1162) > at org.apache.flink.api.java.io > .jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:118) > at org.apache.flink.api.java.io > .jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159) > at org.apache.flink.api.java.io > .jdbc.JDBCUpsertSinkFunction.snapshotState(JDBCUpsertSinkFunction.java:56) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1354) > > > > > 我的输出数据是(true,2020-04-22 21:34:00,2020-04-22 > 21:34:15,20200422213541465568468)是这样的 > 我查看源码发现 > 先调用JDBCUpsertOutputFormat类的writeRecord方法给UpsertWriter类的成员变量map中添加元素 > @Override > public synchronized void writeRecord(Tuple2<Boolean, Row> tuple2) throws > IOException { > checkFlushException(); > try { > jdbcWriter.addRecord(tuple2); > batchCount++; > if (batchCount >= flushMaxSize) { > flush(); > } > } catch (Exception e) { > throw new RuntimeException("Writing records to JDBC failed.", e); > } > } > 之后调用flush()方法,调用UpsertWriter类执行executeBatch方法 > public synchronized void flush() throws Exception { > checkFlushException(); > for (int i = 1; i <= maxRetryTimes; i++) { > try { > jdbcWriter.executeBatch(); > batchCount = 0; > break; > } catch (SQLException e) { > LOG.error("JDBC executeBatch error, retry times = {}", i, e); > if (i >= maxRetryTimes) { > throw e; > } > Thread.sleep(1000 * i); > } > } > } > > > 然后会调用UpsertWriter类 > 实现JDBCWriter类在executeBatch方法中先判断map是否为空,然后循环map;之后判断2元组第一个元素的true调用内部类处理元素,否则删除数据;但是我每次的数据只有一条就是map的大小是1,且2元组的第一个元素值是true,循环结束执行 > deleteStatement.executeBatch();方法就会出错,因为删除的语句站位符还没有填充; > > > @Override > public void executeBatch() throws SQLException { > if (keyToRows.size() > 0) { > for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet()) > { > Row pk = entry.getKey(); > Tuple2<Boolean, Row> tuple = entry.getValue(); > if (tuple.f0) { > processOneRowInBatch(pk, tuple.f1); > } else { > setRecordToStatement(deleteStatement, pkTypes, pk); > deleteStatement.addBatch(); > } > } > internalExecuteBatch(); > deleteStatement.executeBatch(); > keyToRows.clear(); > } > } -- Best, Jingsong Lee |
好的,我先换了看看,之后建jira
在2020年4月22日 22:38,Jingsong Li<[hidden email]> 写道: Hi, - JDBC是upsert sink,所以你需要toUpsertStream,而不是toRetractStream,建议你用完整的DDL来插入mysql的表。 - 这个异常看起来是JDBC的bug,你可以建个JIRA来跟踪吗? Best, Jingsong Lee On Wed, Apr 22, 2020 at 9:58 PM 1101300123 <[hidden email]> wrote: 我在SQL关联后把结果写入mysql出现 No value specified for parameter 1错误? 我的版本是1.10.0,代码如下 JDBCUpsertTableSink build = JDBCUpsertTableSink.builder() .setTableSchema(results.getSchema()) .setOptions(JDBCOptions.builder() .setDBUrl("。。。。MultiQueries=true&useUnicode=true&characterEncoding=UTF-8") .setDriverName("com.mysql.jdbc.Driver") .setUsername("jczx_cjch") .setPassword("jczx_cjch2") .setTableName("xkf_join_result") .build()) .setFlushIntervalMills(1000) .setFlushMaxSize(100) .setMaxRetryTimes(3) .build(); DataStream<Tuple2<Boolean, Row>> retract = bsTableEnv.toRetractStream(results, Row.class); retract.print(); build.emitDataStream(retract); 就会出现如下错误 java.sql.SQLException: No value specified for parameter 1 at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965) at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898) at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887) at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861) at com.mysql.jdbc.PreparedStatement.checkAllParametersSet(PreparedStatement.java:2211) at com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:2191) at com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:2121) at com.mysql.jdbc.PreparedStatement.execute(PreparedStatement.java:1162) at org.apache.flink.api.java.io .jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:118) at org.apache.flink.api.java.io .jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159) at org.apache.flink.api.java.io .jdbc.JDBCUpsertSinkFunction.snapshotState(JDBCUpsertSinkFunction.java:56) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1354) 我的输出数据是(true,2020-04-22 21:34:00,2020-04-22 21:34:15,20200422213541465568468)是这样的 我查看源码发现 先调用JDBCUpsertOutputFormat类的writeRecord方法给UpsertWriter类的成员变量map中添加元素 @Override public synchronized void writeRecord(Tuple2<Boolean, Row> tuple2) throws IOException { checkFlushException(); try { jdbcWriter.addRecord(tuple2); batchCount++; if (batchCount >= flushMaxSize) { flush(); } } catch (Exception e) { throw new RuntimeException("Writing records to JDBC failed.", e); } } 之后调用flush()方法,调用UpsertWriter类执行executeBatch方法 public synchronized void flush() throws Exception { checkFlushException(); for (int i = 1; i <= maxRetryTimes; i++) { try { jdbcWriter.executeBatch(); batchCount = 0; break; } catch (SQLException e) { LOG.error("JDBC executeBatch error, retry times = {}", i, e); if (i >= maxRetryTimes) { throw e; } Thread.sleep(1000 * i); } } } 然后会调用UpsertWriter类 实现JDBCWriter类在executeBatch方法中先判断map是否为空,然后循环map;之后判断2元组第一个元素的true调用内部类处理元素,否则删除数据;但是我每次的数据只有一条就是map的大小是1,且2元组的第一个元素值是true,循环结束执行 deleteStatement.executeBatch();方法就会出错,因为删除的语句站位符还没有填充; @Override public void executeBatch() throws SQLException { if (keyToRows.size() > 0) { for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet()) { Row pk = entry.getKey(); Tuple2<Boolean, Row> tuple = entry.getValue(); if (tuple.f0) { processOneRowInBatch(pk, tuple.f1); } else { setRecordToStatement(deleteStatement, pkTypes, pk); deleteStatement.addBatch(); } } internalExecuteBatch(); deleteStatement.executeBatch(); keyToRows.clear(); } } -- Best, Jingsong Lee |
In reply to this post by 1101300123
赞详细的分析!
没能复现你说的问题,最后一步的分析应该有点小问题,我看下了jdbc mysql的实现 com/mysql/jdbc/PreparedStatement.java#executeBatchInternal() 1289行 是会判断batchedArgs数组的大小后会直接返回的,应该不会执行,你可以进一步调试确认下 ``` if (this.batchedArgs == null || this.batchedArgs.size() == 0) { return new long[0]; } ``` 祝好, Leonard Xu > 在 2020年4月22日,21:58,1101300123 <[hidden email]> 写道: > > > > 我在SQL关联后把结果写入mysql出现 No value specified for parameter 1错误? > 我的版本是1.10.0,代码如下 > JDBCUpsertTableSink build = JDBCUpsertTableSink.builder() > .setTableSchema(results.getSchema()) > .setOptions(JDBCOptions.builder() > .setDBUrl("。。。。MultiQueries=true&useUnicode=true&characterEncoding=UTF-8") > .setDriverName("com.mysql.jdbc.Driver") > .setUsername("jczx_cjch") > .setPassword("jczx_cjch2") > .setTableName("xkf_join_result") > .build()) > .setFlushIntervalMills(1000) > .setFlushMaxSize(100) > .setMaxRetryTimes(3) > .build(); > > > DataStream<Tuple2<Boolean, Row>> retract = bsTableEnv.toRetractStream(results, Row.class); > retract.print(); > build.emitDataStream(retract); > > > > > 就会出现如下错误 > java.sql.SQLException: No value specified for parameter 1 > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965) > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898) > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887) > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861) > at com.mysql.jdbc.PreparedStatement.checkAllParametersSet(PreparedStatement.java:2211) > at com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:2191) > at com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:2121) > at com.mysql.jdbc.PreparedStatement.execute(PreparedStatement.java:1162) > at org.apache.flink.api.java.io.jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:118) > at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159) > at org.apache.flink.api.java.io.jdbc.JDBCUpsertSinkFunction.snapshotState(JDBCUpsertSinkFunction.java:56) > at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) > at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) > at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402) > at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420) > at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1354) > > > > > 我的输出数据是(true,2020-04-22 21:34:00,2020-04-22 21:34:15,20200422213541465568468)是这样的 > 我查看源码发现 > 先调用JDBCUpsertOutputFormat类的writeRecord方法给UpsertWriter类的成员变量map中添加元素 > @Override > public synchronized void writeRecord(Tuple2<Boolean, Row> tuple2) throws IOException { > checkFlushException(); > try { > jdbcWriter.addRecord(tuple2); > batchCount++; > if (batchCount >= flushMaxSize) { > flush(); > } > } catch (Exception e) { > throw new RuntimeException("Writing records to JDBC failed.", e); > } > } > 之后调用flush()方法,调用UpsertWriter类执行executeBatch方法 > public synchronized void flush() throws Exception { > checkFlushException(); > for (int i = 1; i <= maxRetryTimes; i++) { > try { > jdbcWriter.executeBatch(); > batchCount = 0; > break; > } catch (SQLException e) { > LOG.error("JDBC executeBatch error, retry times = {}", i, e); > if (i >= maxRetryTimes) { > throw e; > } > Thread.sleep(1000 * i); > } > } > } > > > 然后会调用UpsertWriter类 实现JDBCWriter类在executeBatch方法中先判断map是否为空,然后循环map;之后判断2元组第一个元素的true调用内部类处理元素,否则删除数据;但是我每次的数据只有一条就是map的大小是1,且2元组的第一个元素值是true,循环结束执行 deleteStatement.executeBatch();方法就会出错,因为删除的语句站位符还没有填充; > > > @Override > public void executeBatch() throws SQLException { > if (keyToRows.size() > 0) { > for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet()) { > Row pk = entry.getKey(); > Tuple2<Boolean, Row> tuple = entry.getValue(); > if (tuple.f0) { > processOneRowInBatch(pk, tuple.f1); > } else { > setRecordToStatement(deleteStatement, pkTypes, pk); > deleteStatement.addBatch(); > } > } > internalExecuteBatch(); > deleteStatement.executeBatch(); > keyToRows.clear(); > } > } |
我给你一些数据和代码吧!和我真实场景错误一样 订单主表:orders 13点两条记录;order_state是状态 0取消 1待支付 {"order_no":"order1","order_state":1,"pay_time":"","create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"} {"order_no":"order2","order_state":1,"pay_time":"","create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"} 13:15 来了一条新的记录 取消订单 {"order_no":"order1","order_state":0,"pay_time":"","create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:15:00"} 订单明细表:order_detail 4条记录 {"order_no":"order1","product_code":"product1","quantity":3,"create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"} {"order_no":"order1","product_code":"product2","quantity":5,"create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"} {"order_no":"order2","product_code":"product1","quantity":2,"create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"} {"order_no":"order2","product_code":"product2","quantity":4,"create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"} 需求的要求是当订单创建后我们就要统计该订单对应的商品数量,而当订单状态变为取消时我们要减掉该订单对应的商品数量。 代码 package Learn.kafkasql; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.types.Row; public class SqlCount { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tenv = StreamTableEnvironment.create(env,settings); tenv.sqlUpdate("CREATE TABLE orders " + " (" + " order_no string," + " order_state int," + " pay_time string," + " create_time string," + " update_time string" + " ) " + " WITH (" + " 'connector.type' = 'kafka', " + " 'connector.version' = 'universal', " +//--kafka版本 " 'connector.topic' = 'tp_orders'," +//--kafkatopic " 'connector.properties.zookeeper.connect' = '192.168.179.120:2181', " + " 'connector.properties.bootstrap.servers' = '192.168.179.120:9092'," + " 'connector.properties.group.id' = 'testGroup'," + " 'connector.startup-mode' = 'latest-offset'," + " 'format.type' = 'json' " +//--数据为json格式 " )"); tenv.sqlUpdate("CREATE TABLE order_detail " + " (" + " order_no string," + " product_code string," + " quantity int," + " create_time string," + " update_time string" + " ) " + " WITH (" + " 'connector.type' = 'kafka', " + " 'connector.version' = 'universal', " +//--kafka版本 " 'connector.topic' = 'tp_order_detail'," +//--kafkatopic " 'connector.properties.zookeeper.connect' = '192.168.179.120:2181', " + " 'connector.properties.bootstrap.servers' = '192.168.179.120:9092'," + " 'connector.properties.group.id' = 'testGroup'," + " 'connector.startup-mode' = 'latest-offset'," + " 'format.type' = 'json' " +//--数据为json格式 " )"); tenv.sqlUpdate("CREATE TABLE product_sale" + " (" + " order_date string," + " product_code string," + " cnt int" + " ) " + " WITH (" + " 'connector.type' = 'jdbc', " + " 'connector.url' = 'jdbc:mysql://192.168.179.120:3306/flink?serverTimezone=UTC&useSSL=true', " + " 'connector.table' = 'order_state_cnt', " + " 'connector.driver' = 'com.mysql.jdbc.Driver', " + " 'connector.username' = 'root'," + " 'connector.password' = '123456'," + " 'connector.write.flush.max-rows' = '1'," +//--默认每5000条数据写入一次,测试调小一点 " 'connector.write.flush.interval' = '2s'," +//--写入时间间隔 " 'connector.write.max-retries' = '3'" + " )"); tenv.sqlUpdate("insert into product_sale " + "select create_date,product_code,sum(quantity)" + "from (select t1.order_no," + " t1.create_date," + " t2.product_code," + " t2.quantity" + " from (select order_no," + " order_state," + " substring(create_time,1,10) create_date," + " update_time ," + " row_number() over(partition by order_no order by update_time desc) as rn" + " from orders" + " )t1" + " left join order_detail t2" + " on t1.order_no=t2.order_no" + " where t1.rn=1" +//--取最新的订单状态数据 " and t1.order_state<>0" +//--不包含取消订单 " )t3" + " group by create_date,product_code"); Table table = tenv.sqlQuery("select create_date,product_code,sum(quantity)" + "from (select t1.order_no," + " t1.create_date," + " t2.product_code," + " t2.quantity" + " from (select order_no," + " order_state," + " substring(create_time,1,10) create_date," + " update_time ," + " row_number() over(partition by order_no order by update_time desc) as rn" + " from orders" + " )t1" + " left join order_detail t2" + " on t1.order_no=t2.order_no" + " where t1.rn=1" + " and t1.order_state<>0" + " )t3" + " group by create_date,product_code"); tenv.toRetractStream(table, Row.class).print(); tenv.execute("count"); } } mysql 建表语句 CREATE TABLE `order_state_cnt` ( `order_date` varchar(12) , `product_code` varchar(12) , `cnt` int ) ENGINE=InnoDB DEFAULT CHARSET=utf8 使用的是kafka命令行一条条发送数据的方式 主要是deleteStatement.executeBatch();这个方法报错 @Override public void executeBatch() throws SQLException { if (keyToRows.size() > 0) { for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet()) { Row pk = entry.getKey(); Tuple2<Boolean, Row> tuple = entry.getValue(); if (tuple.f0) { processOneRowInBatch(pk, tuple.f1); } else { setRecordToStatement(deleteStatement, pkTypes, pk); deleteStatement.addBatch(); } } internalExecuteBatch(); deleteStatement.executeBatch(); keyToRows.clear(); } } 在2020年4月23日 00:21,Leonard Xu<[hidden email]> 写道: 赞详细的分析! 没能复现你说的问题,最后一步的分析应该有点小问题,我看下了jdbc mysql的实现 com/mysql/jdbc/PreparedStatement.java#executeBatchInternal() 1289行 是会判断batchedArgs数组的大小后会直接返回的,应该不会执行,你可以进一步调试确认下 ``` if (this.batchedArgs == null || this.batchedArgs.size() == 0) { return new long[0]; } ``` 祝好, Leonard Xu 在 2020年4月22日,21:58,1101300123 <[hidden email]> 写道: 我在SQL关联后把结果写入mysql出现 No value specified for parameter 1错误? 我的版本是1.10.0,代码如下 JDBCUpsertTableSink build = JDBCUpsertTableSink.builder() .setTableSchema(results.getSchema()) .setOptions(JDBCOptions.builder() .setDBUrl("。。。。MultiQueries=true&useUnicode=true&characterEncoding=UTF-8") .setDriverName("com.mysql.jdbc.Driver") .setUsername("jczx_cjch") .setPassword("jczx_cjch2") .setTableName("xkf_join_result") .build()) .setFlushIntervalMills(1000) .setFlushMaxSize(100) .setMaxRetryTimes(3) .build(); DataStream<Tuple2<Boolean, Row>> retract = bsTableEnv.toRetractStream(results, Row.class); retract.print(); build.emitDataStream(retract); 就会出现如下错误 java.sql.SQLException: No value specified for parameter 1 at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965) at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898) at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887) at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861) at com.mysql.jdbc.PreparedStatement.checkAllParametersSet(PreparedStatement.java:2211) at com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:2191) at com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:2121) at com.mysql.jdbc.PreparedStatement.execute(PreparedStatement.java:1162) at org.apache.flink.api.java.io.jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:118) at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159) at org.apache.flink.api.java.io.jdbc.JDBCUpsertSinkFunction.snapshotState(JDBCUpsertSinkFunction.java:56) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1354) 我的输出数据是(true,2020-04-22 21:34:00,2020-04-22 21:34:15,20200422213541465568468)是这样的 我查看源码发现 先调用JDBCUpsertOutputFormat类的writeRecord方法给UpsertWriter类的成员变量map中添加元素 @Override public synchronized void writeRecord(Tuple2<Boolean, Row> tuple2) throws IOException { checkFlushException(); try { jdbcWriter.addRecord(tuple2); batchCount++; if (batchCount >= flushMaxSize) { flush(); } } catch (Exception e) { throw new RuntimeException("Writing records to JDBC failed.", e); } } 之后调用flush()方法,调用UpsertWriter类执行executeBatch方法 public synchronized void flush() throws Exception { checkFlushException(); for (int i = 1; i <= maxRetryTimes; i++) { try { jdbcWriter.executeBatch(); batchCount = 0; break; } catch (SQLException e) { LOG.error("JDBC executeBatch error, retry times = {}", i, e); if (i >= maxRetryTimes) { throw e; } Thread.sleep(1000 * i); } } } 然后会调用UpsertWriter类 实现JDBCWriter类在executeBatch方法中先判断map是否为空,然后循环map;之后判断2元组第一个元素的true调用内部类处理元素,否则删除数据;但是我每次的数据只有一条就是map的大小是1,且2元组的第一个元素值是true,循环结束执行 deleteStatement.executeBatch();方法就会出错,因为删除的语句站位符还没有填充; @Override public void executeBatch() throws SQLException { if (keyToRows.size() > 0) { for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet()) { Row pk = entry.getKey(); Tuple2<Boolean, Row> tuple = entry.getValue(); if (tuple.f0) { processOneRowInBatch(pk, tuple.f1); } else { setRecordToStatement(deleteStatement, pkTypes, pk); deleteStatement.addBatch(); } } internalExecuteBatch(); deleteStatement.executeBatch(); keyToRows.clear(); } } |
Hi,
我本地复现了下,用1.10.0发现的你的sql是ok的,结果也符合预期☺️,如下[1]: 看到你建了JIRA,我们在issue里继续跟进吧 祝好, Leonard Xu [1] mysql> select * from order_state_cnt; +------------+--------------+------+ | order_date | product_code | cnt | +------------+--------------+------+ | 2020-04-01 | product1 | 3 | | 2020-04-01 | product2 | 5 | | 2020-04-01 | product1 | 5 | | 2020-04-01 | product2 | 9 | +------------+--------------+------+ 4 rows in set (0.00 sec) mysql> select * from order_state_cnt; +------------+--------------+------+ | order_date | product_code | cnt | +------------+--------------+------+ | 2020-04-01 | product1 | 3 | | 2020-04-01 | product2 | 5 | | 2020-04-01 | product1 | 5 | | 2020-04-01 | product2 | 9 | | 2020-04-01 | product1 | 2 | | 2020-04-01 | product2 | 4 | +------------+--------------+------+ 6 rows in set (0.00 sec) > 在 2020年4月23日,10:48,1101300123 <[hidden email]> 写道: > > > > 我给你一些数据和代码吧!和我真实场景错误一样 > 订单主表:orders > 13点两条记录;order_state是状态 0取消 1待支付 > {"order_no":"order1","order_state":1,"pay_time":"","create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"} > {"order_no":"order2","order_state":1,"pay_time":"","create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"} > > > 13:15 > 来了一条新的记录 取消订单 > {"order_no":"order1","order_state":0,"pay_time":"","create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:15:00"} > > > 订单明细表:order_detail > 4条记录 > {"order_no":"order1","product_code":"product1","quantity":3,"create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"} > {"order_no":"order1","product_code":"product2","quantity":5,"create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"} > {"order_no":"order2","product_code":"product1","quantity":2,"create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"} > {"order_no":"order2","product_code":"product2","quantity":4,"create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"} > > > 需求的要求是当订单创建后我们就要统计该订单对应的商品数量,而当订单状态变为取消时我们要减掉该订单对应的商品数量。 > > > 代码 > package Learn.kafkasql; > > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.EnvironmentSettings; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.java.StreamTableEnvironment; > import org.apache.flink.types.Row; > > public class SqlCount { > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > StreamTableEnvironment tenv = StreamTableEnvironment.create(env,settings); > > tenv.sqlUpdate("CREATE TABLE orders " + > " (" + > " order_no string," + > " order_state int," + > " pay_time string," + > " create_time string," + > " update_time string" + > " ) " + > " WITH (" + > " 'connector.type' = 'kafka', " + > " 'connector.version' = 'universal', " +//--kafka版本 > " 'connector.topic' = 'tp_orders'," +//--kafkatopic > " 'connector.properties.zookeeper.connect' = '192.168.179.120:2181', " + > " 'connector.properties.bootstrap.servers' = '192.168.179.120:9092'," + > " 'connector.properties.group.id' = 'testGroup'," + > " 'connector.startup-mode' = 'latest-offset'," + > " 'format.type' = 'json' " +//--数据为json格式 > " )"); > tenv.sqlUpdate("CREATE TABLE order_detail " + > " (" + > " order_no string," + > " product_code string," + > " quantity int," + > " create_time string," + > " update_time string" + > " ) " + > " WITH (" + > " 'connector.type' = 'kafka', " + > " 'connector.version' = 'universal', " +//--kafka版本 > " 'connector.topic' = 'tp_order_detail'," +//--kafkatopic > " 'connector.properties.zookeeper.connect' = '192.168.179.120:2181', " + > " 'connector.properties.bootstrap.servers' = '192.168.179.120:9092'," + > " 'connector.properties.group.id' = 'testGroup'," + > " 'connector.startup-mode' = 'latest-offset'," + > " 'format.type' = 'json' " +//--数据为json格式 > " )"); > > tenv.sqlUpdate("CREATE TABLE product_sale" + > " (" + > " order_date string," + > " product_code string," + > " cnt int" + > " ) " + > " WITH (" + > " 'connector.type' = 'jdbc', " + > " 'connector.url' = 'jdbc:mysql://192.168.179.120:3306/flink?serverTimezone=UTC&useSSL=true', " + > " 'connector.table' = 'order_state_cnt', " + > " 'connector.driver' = 'com.mysql.jdbc.Driver', " + > " 'connector.username' = 'root'," + > " 'connector.password' = '123456'," + > " 'connector.write.flush.max-rows' = '1'," +//--默认每5000条数据写入一次,测试调小一点 > " 'connector.write.flush.interval' = '2s'," +//--写入时间间隔 > " 'connector.write.max-retries' = '3'" + > " )"); > tenv.sqlUpdate("insert into product_sale " + > "select create_date,product_code,sum(quantity)" + > "from (select t1.order_no," + > " t1.create_date," + > " t2.product_code," + > " t2.quantity" + > " from (select order_no," + > " order_state," + > " substring(create_time,1,10) create_date," + > " update_time ," + > " row_number() over(partition by order_no order by update_time desc) as rn" + > " from orders" + > " )t1" + > " left join order_detail t2" + > " on t1.order_no=t2.order_no" + > " where t1.rn=1" +//--取最新的订单状态数据 > " and t1.order_state<>0" +//--不包含取消订单 > " )t3" + > " group by create_date,product_code"); > > Table table = tenv.sqlQuery("select create_date,product_code,sum(quantity)" + > "from (select t1.order_no," + > " t1.create_date," + > " t2.product_code," + > " t2.quantity" + > " from (select order_no," + > " order_state," + > " substring(create_time,1,10) create_date," + > " update_time ," + > " row_number() over(partition by order_no order by update_time desc) as rn" + > " from orders" + > " )t1" + > " left join order_detail t2" + > " on t1.order_no=t2.order_no" + > " where t1.rn=1" + > " and t1.order_state<>0" + > " )t3" + > " group by create_date,product_code"); > tenv.toRetractStream(table, Row.class).print(); > tenv.execute("count"); > } > } > mysql 建表语句 > CREATE TABLE `order_state_cnt` ( > `order_date` varchar(12) , > `product_code` varchar(12) , > `cnt` int > ) ENGINE=InnoDB DEFAULT CHARSET=utf8 > > > 使用的是kafka命令行一条条发送数据的方式 > > > 主要是deleteStatement.executeBatch();这个方法报错 > @Override > public void executeBatch() throws SQLException { > if (keyToRows.size() > 0) { > for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet()) { > Row pk = entry.getKey(); > Tuple2<Boolean, Row> tuple = entry.getValue(); > if (tuple.f0) { > processOneRowInBatch(pk, tuple.f1); > } else { > setRecordToStatement(deleteStatement, pkTypes, pk); > deleteStatement.addBatch(); > } > } > internalExecuteBatch(); > deleteStatement.executeBatch(); > keyToRows.clear(); > } > } > 在2020年4月23日 00:21,Leonard Xu<[hidden email]> 写道: > 赞详细的分析! > > 没能复现你说的问题,最后一步的分析应该有点小问题,我看下了jdbc mysql的实现 > com/mysql/jdbc/PreparedStatement.java#executeBatchInternal() 1289行 > 是会判断batchedArgs数组的大小后会直接返回的,应该不会执行,你可以进一步调试确认下 > ``` > if (this.batchedArgs == null || this.batchedArgs.size() == 0) { > return new long[0]; > } > ``` > > 祝好, > Leonard Xu > > 在 2020年4月22日,21:58,1101300123 <[hidden email]> 写道: > > > > 我在SQL关联后把结果写入mysql出现 No value specified for parameter 1错误? > 我的版本是1.10.0,代码如下 > JDBCUpsertTableSink build = JDBCUpsertTableSink.builder() > .setTableSchema(results.getSchema()) > .setOptions(JDBCOptions.builder() > .setDBUrl("。。。。MultiQueries=true&useUnicode=true&characterEncoding=UTF-8") > .setDriverName("com.mysql.jdbc.Driver") > .setUsername("jczx_cjch") > .setPassword("jczx_cjch2") > .setTableName("xkf_join_result") > .build()) > .setFlushIntervalMills(1000) > .setFlushMaxSize(100) > .setMaxRetryTimes(3) > .build(); > > > DataStream<Tuple2<Boolean, Row>> retract = bsTableEnv.toRetractStream(results, Row.class); > retract.print(); > build.emitDataStream(retract); > > > > > 就会出现如下错误 > java.sql.SQLException: No value specified for parameter 1 > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965) > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898) > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887) > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861) > at com.mysql.jdbc.PreparedStatement.checkAllParametersSet(PreparedStatement.java:2211) > at com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:2191) > at com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:2121) > at com.mysql.jdbc.PreparedStatement.execute(PreparedStatement.java:1162) > at org.apache.flink.api.java.io.jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:118) > at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159) > at org.apache.flink.api.java.io.jdbc.JDBCUpsertSinkFunction.snapshotState(JDBCUpsertSinkFunction.java:56) > at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) > at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) > at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402) > at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420) > at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1354) > > > > > 我的输出数据是(true,2020-04-22 21:34:00,2020-04-22 21:34:15,20200422213541465568468)是这样的 > 我查看源码发现 > 先调用JDBCUpsertOutputFormat类的writeRecord方法给UpsertWriter类的成员变量map中添加元素 > @Override > public synchronized void writeRecord(Tuple2<Boolean, Row> tuple2) throws IOException { > checkFlushException(); > try { > jdbcWriter.addRecord(tuple2); > batchCount++; > if (batchCount >= flushMaxSize) { > flush(); > } > } catch (Exception e) { > throw new RuntimeException("Writing records to JDBC failed.", e); > } > } > 之后调用flush()方法,调用UpsertWriter类执行executeBatch方法 > public synchronized void flush() throws Exception { > checkFlushException(); > for (int i = 1; i <= maxRetryTimes; i++) { > try { > jdbcWriter.executeBatch(); > batchCount = 0; > break; > } catch (SQLException e) { > LOG.error("JDBC executeBatch error, retry times = {}", i, e); > if (i >= maxRetryTimes) { > throw e; > } > Thread.sleep(1000 * i); > } > } > } > > > 然后会调用UpsertWriter类 实现JDBCWriter类在executeBatch方法中先判断map是否为空,然后循环map;之后判断2元组第一个元素的true调用内部类处理元素,否则删除数据;但是我每次的数据只有一条就是map的大小是1,且2元组的第一个元素值是true,循环结束执行 deleteStatement.executeBatch();方法就会出错,因为删除的语句站位符还没有填充; > > > @Override > public void executeBatch() throws SQLException { > if (keyToRows.size() > 0) { > for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet()) { > Row pk = entry.getKey(); > Tuple2<Boolean, Row> tuple = entry.getValue(); > if (tuple.f0) { > processOneRowInBatch(pk, tuple.f1); > } else { > setRecordToStatement(deleteStatement, pkTypes, pk); > deleteStatement.addBatch(); > } > } > internalExecuteBatch(); > deleteStatement.executeBatch(); > keyToRows.clear(); > } > } |
我重新在源码里打了一些日志编译后,之前的问题不见了,试了好多次没有复现了,之前因为集成clickhouse 改过源码的delete代码,不知道是不是这个引起的
在2020年4月23日 16:23,Leonard Xu<[hidden email]> 写道: Hi, 我本地复现了下,用1.10.0发现的你的sql是ok的,结果也符合预期☺️,如下[1]: 看到你建了JIRA,我们在issue里继续跟进吧 祝好, Leonard Xu [1] mysql> select * from order_state_cnt; +------------+--------------+------+ | order_date | product_code | cnt | +------------+--------------+------+ | 2020-04-01 | product1 | 3 | | 2020-04-01 | product2 | 5 | | 2020-04-01 | product1 | 5 | | 2020-04-01 | product2 | 9 | +------------+--------------+------+ 4 rows in set (0.00 sec) mysql> select * from order_state_cnt; +------------+--------------+------+ | order_date | product_code | cnt | +------------+--------------+------+ | 2020-04-01 | product1 | 3 | | 2020-04-01 | product2 | 5 | | 2020-04-01 | product1 | 5 | | 2020-04-01 | product2 | 9 | | 2020-04-01 | product1 | 2 | | 2020-04-01 | product2 | 4 | +------------+--------------+------+ 6 rows in set (0.00 sec) 在 2020年4月23日,10:48,1101300123 <[hidden email]> 写道: 我给你一些数据和代码吧!和我真实场景错误一样 订单主表:orders 13点两条记录;order_state是状态 0取消 1待支付 {"order_no":"order1","order_state":1,"pay_time":"","create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"} {"order_no":"order2","order_state":1,"pay_time":"","create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"} 13:15 来了一条新的记录 取消订单 {"order_no":"order1","order_state":0,"pay_time":"","create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:15:00"} 订单明细表:order_detail 4条记录 {"order_no":"order1","product_code":"product1","quantity":3,"create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"} {"order_no":"order1","product_code":"product2","quantity":5,"create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"} {"order_no":"order2","product_code":"product1","quantity":2,"create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"} {"order_no":"order2","product_code":"product2","quantity":4,"create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"} 需求的要求是当订单创建后我们就要统计该订单对应的商品数量,而当订单状态变为取消时我们要减掉该订单对应的商品数量。 代码 package Learn.kafkasql; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.types.Row; public class SqlCount { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tenv = StreamTableEnvironment.create(env,settings); tenv.sqlUpdate("CREATE TABLE orders " + " (" + " order_no string," + " order_state int," + " pay_time string," + " create_time string," + " update_time string" + " ) " + " WITH (" + " 'connector.type' = 'kafka', " + " 'connector.version' = 'universal', " +//--kafka版本 " 'connector.topic' = 'tp_orders'," +//--kafkatopic " 'connector.properties.zookeeper.connect' = '192.168.179.120:2181', " + " 'connector.properties.bootstrap.servers' = '192.168.179.120:9092'," + " 'connector.properties.group.id' = 'testGroup'," + " 'connector.startup-mode' = 'latest-offset'," + " 'format.type' = 'json' " +//--数据为json格式 " )"); tenv.sqlUpdate("CREATE TABLE order_detail " + " (" + " order_no string," + " product_code string," + " quantity int," + " create_time string," + " update_time string" + " ) " + " WITH (" + " 'connector.type' = 'kafka', " + " 'connector.version' = 'universal', " +//--kafka版本 " 'connector.topic' = 'tp_order_detail'," +//--kafkatopic " 'connector.properties.zookeeper.connect' = '192.168.179.120:2181', " + " 'connector.properties.bootstrap.servers' = '192.168.179.120:9092'," + " 'connector.properties.group.id' = 'testGroup'," + " 'connector.startup-mode' = 'latest-offset'," + " 'format.type' = 'json' " +//--数据为json格式 " )"); tenv.sqlUpdate("CREATE TABLE product_sale" + " (" + " order_date string," + " product_code string," + " cnt int" + " ) " + " WITH (" + " 'connector.type' = 'jdbc', " + " 'connector.url' = 'jdbc:mysql://192.168.179.120:3306/flink?serverTimezone=UTC&useSSL=true', " + " 'connector.table' = 'order_state_cnt', " + " 'connector.driver' = 'com.mysql.jdbc.Driver', " + " 'connector.username' = 'root'," + " 'connector.password' = '123456'," + " 'connector.write.flush.max-rows' = '1'," +//--默认每5000条数据写入一次,测试调小一点 " 'connector.write.flush.interval' = '2s'," +//--写入时间间隔 " 'connector.write.max-retries' = '3'" + " )"); tenv.sqlUpdate("insert into product_sale " + "select create_date,product_code,sum(quantity)" + "from (select t1.order_no," + " t1.create_date," + " t2.product_code," + " t2.quantity" + " from (select order_no," + " order_state," + " substring(create_time,1,10) create_date," + " update_time ," + " row_number() over(partition by order_no order by update_time desc) as rn" + " from orders" + " )t1" + " left join order_detail t2" + " on t1.order_no=t2.order_no" + " where t1.rn=1" +//--取最新的订单状态数据 " and t1.order_state<>0" +//--不包含取消订单 " )t3" + " group by create_date,product_code"); Table table = tenv.sqlQuery("select create_date,product_code,sum(quantity)" + "from (select t1.order_no," + " t1.create_date," + " t2.product_code," + " t2.quantity" + " from (select order_no," + " order_state," + " substring(create_time,1,10) create_date," + " update_time ," + " row_number() over(partition by order_no order by update_time desc) as rn" + " from orders" + " )t1" + " left join order_detail t2" + " on t1.order_no=t2.order_no" + " where t1.rn=1" + " and t1.order_state<>0" + " )t3" + " group by create_date,product_code"); tenv.toRetractStream(table, Row.class).print(); tenv.execute("count"); } } mysql 建表语句 CREATE TABLE `order_state_cnt` ( `order_date` varchar(12) , `product_code` varchar(12) , `cnt` int ) ENGINE=InnoDB DEFAULT CHARSET=utf8 使用的是kafka命令行一条条发送数据的方式 主要是deleteStatement.executeBatch();这个方法报错 @Override public void executeBatch() throws SQLException { if (keyToRows.size() > 0) { for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet()) { Row pk = entry.getKey(); Tuple2<Boolean, Row> tuple = entry.getValue(); if (tuple.f0) { processOneRowInBatch(pk, tuple.f1); } else { setRecordToStatement(deleteStatement, pkTypes, pk); deleteStatement.addBatch(); } } internalExecuteBatch(); deleteStatement.executeBatch(); keyToRows.clear(); } } 在2020年4月23日 00:21,Leonard Xu<[hidden email]> 写道: 赞详细的分析! 没能复现你说的问题,最后一步的分析应该有点小问题,我看下了jdbc mysql的实现 com/mysql/jdbc/PreparedStatement.java#executeBatchInternal() 1289行 是会判断batchedArgs数组的大小后会直接返回的,应该不会执行,你可以进一步调试确认下 ``` if (this.batchedArgs == null || this.batchedArgs.size() == 0) { return new long[0]; } ``` 祝好, Leonard Xu 在 2020年4月22日,21:58,1101300123 <[hidden email]> 写道: 我在SQL关联后把结果写入mysql出现 No value specified for parameter 1错误? 我的版本是1.10.0,代码如下 JDBCUpsertTableSink build = JDBCUpsertTableSink.builder() .setTableSchema(results.getSchema()) .setOptions(JDBCOptions.builder() .setDBUrl("。。。。MultiQueries=true&useUnicode=true&characterEncoding=UTF-8") .setDriverName("com.mysql.jdbc.Driver") .setUsername("jczx_cjch") .setPassword("jczx_cjch2") .setTableName("xkf_join_result") .build()) .setFlushIntervalMills(1000) .setFlushMaxSize(100) .setMaxRetryTimes(3) .build(); DataStream<Tuple2<Boolean, Row>> retract = bsTableEnv.toRetractStream(results, Row.class); retract.print(); build.emitDataStream(retract); 就会出现如下错误 java.sql.SQLException: No value specified for parameter 1 at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965) at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898) at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887) at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861) at com.mysql.jdbc.PreparedStatement.checkAllParametersSet(PreparedStatement.java:2211) at com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:2191) at com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:2121) at com.mysql.jdbc.PreparedStatement.execute(PreparedStatement.java:1162) at org.apache.flink.api.java.io.jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:118) at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159) at org.apache.flink.api.java.io.jdbc.JDBCUpsertSinkFunction.snapshotState(JDBCUpsertSinkFunction.java:56) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1354) 我的输出数据是(true,2020-04-22 21:34:00,2020-04-22 21:34:15,20200422213541465568468)是这样的 我查看源码发现 先调用JDBCUpsertOutputFormat类的writeRecord方法给UpsertWriter类的成员变量map中添加元素 @Override public synchronized void writeRecord(Tuple2<Boolean, Row> tuple2) throws IOException { checkFlushException(); try { jdbcWriter.addRecord(tuple2); batchCount++; if (batchCount >= flushMaxSize) { flush(); } } catch (Exception e) { throw new RuntimeException("Writing records to JDBC failed.", e); } } 之后调用flush()方法,调用UpsertWriter类执行executeBatch方法 public synchronized void flush() throws Exception { checkFlushException(); for (int i = 1; i <= maxRetryTimes; i++) { try { jdbcWriter.executeBatch(); batchCount = 0; break; } catch (SQLException e) { LOG.error("JDBC executeBatch error, retry times = {}", i, e); if (i >= maxRetryTimes) { throw e; } Thread.sleep(1000 * i); } } } 然后会调用UpsertWriter类 实现JDBCWriter类在executeBatch方法中先判断map是否为空,然后循环map;之后判断2元组第一个元素的true调用内部类处理元素,否则删除数据;但是我每次的数据只有一条就是map的大小是1,且2元组的第一个元素值是true,循环结束执行 deleteStatement.executeBatch();方法就会出错,因为删除的语句站位符还没有填充; @Override public void executeBatch() throws SQLException { if (keyToRows.size() > 0) { for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet()) { Row pk = entry.getKey(); Tuple2<Boolean, Row> tuple = entry.getValue(); if (tuple.f0) { processOneRowInBatch(pk, tuple.f1); } else { setRecordToStatement(deleteStatement, pkTypes, pk); deleteStatement.addBatch(); } } internalExecuteBatch(); deleteStatement.executeBatch(); keyToRows.clear(); } } |
Hi,
看起来应该是你之前改代码时引入的小bug,因为从代码路径和测试来看都不能复现这个问题。 另外,如果修改了源代码记得邮件里说明下,不然好南。。。。 祝好, Leonard Xu > 在 2020年4月23日,16:26,1101300123 <[hidden email]> 写道: > > 我重新在源码里打了一些日志编译后,之前的问题不见了,试了好多次没有复现了,之前因为集成clickhouse 改过源码的delete代码,不知道是不是这个引起的 > 在2020年4月23日 16:23,Leonard Xu<[hidden email]> 写道: > Hi, > 我本地复现了下,用1.10.0发现的你的sql是ok的,结果也符合预期☺️,如下[1]: > 看到你建了JIRA,我们在issue里继续跟进吧 > > 祝好, > Leonard Xu > > [1] > mysql> select * from order_state_cnt; > +------------+--------------+------+ > | order_date | product_code | cnt | > +------------+--------------+------+ > | 2020-04-01 | product1 | 3 | > | 2020-04-01 | product2 | 5 | > | 2020-04-01 | product1 | 5 | > | 2020-04-01 | product2 | 9 | > +------------+--------------+------+ > 4 rows in set (0.00 sec) > > mysql> select * from order_state_cnt; > +------------+--------------+------+ > | order_date | product_code | cnt | > +------------+--------------+------+ > | 2020-04-01 | product1 | 3 | > | 2020-04-01 | product2 | 5 | > | 2020-04-01 | product1 | 5 | > | 2020-04-01 | product2 | 9 | > | 2020-04-01 | product1 | 2 | > | 2020-04-01 | product2 | 4 | > +------------+--------------+------+ > 6 rows in set (0.00 sec) > > > > 在 2020年4月23日,10:48,1101300123 <[hidden email]> 写道: > > > > 我给你一些数据和代码吧!和我真实场景错误一样 > 订单主表:orders > 13点两条记录;order_state是状态 0取消 1待支付 > {"order_no":"order1","order_state":1,"pay_time":"","create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"} > {"order_no":"order2","order_state":1,"pay_time":"","create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"} > > > 13:15 > 来了一条新的记录 取消订单 > {"order_no":"order1","order_state":0,"pay_time":"","create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:15:00"} > > > 订单明细表:order_detail > 4条记录 > {"order_no":"order1","product_code":"product1","quantity":3,"create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"} > {"order_no":"order1","product_code":"product2","quantity":5,"create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"} > {"order_no":"order2","product_code":"product1","quantity":2,"create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"} > {"order_no":"order2","product_code":"product2","quantity":4,"create_time":"2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"} > > > 需求的要求是当订单创建后我们就要统计该订单对应的商品数量,而当订单状态变为取消时我们要减掉该订单对应的商品数量。 > > > 代码 > package Learn.kafkasql; > > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.EnvironmentSettings; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.java.StreamTableEnvironment; > import org.apache.flink.types.Row; > > public class SqlCount { > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > StreamTableEnvironment tenv = StreamTableEnvironment.create(env,settings); > > tenv.sqlUpdate("CREATE TABLE orders " + > " (" + > " order_no string," + > " order_state int," + > " pay_time string," + > " create_time string," + > " update_time string" + > " ) " + > " WITH (" + > " 'connector.type' = 'kafka', " + > " 'connector.version' = 'universal', " +//--kafka版本 > " 'connector.topic' = 'tp_orders'," +//--kafkatopic > " 'connector.properties.zookeeper.connect' = '192.168.179.120:2181', " + > " 'connector.properties.bootstrap.servers' = '192.168.179.120:9092'," + > " 'connector.properties.group.id' = 'testGroup'," + > " 'connector.startup-mode' = 'latest-offset'," + > " 'format.type' = 'json' " +//--数据为json格式 > " )"); > tenv.sqlUpdate("CREATE TABLE order_detail " + > " (" + > " order_no string," + > " product_code string," + > " quantity int," + > " create_time string," + > " update_time string" + > " ) " + > " WITH (" + > " 'connector.type' = 'kafka', " + > " 'connector.version' = 'universal', " +//--kafka版本 > " 'connector.topic' = 'tp_order_detail'," +//--kafkatopic > " 'connector.properties.zookeeper.connect' = '192.168.179.120:2181', " + > " 'connector.properties.bootstrap.servers' = '192.168.179.120:9092'," + > " 'connector.properties.group.id' = 'testGroup'," + > " 'connector.startup-mode' = 'latest-offset'," + > " 'format.type' = 'json' " +//--数据为json格式 > " )"); > > tenv.sqlUpdate("CREATE TABLE product_sale" + > " (" + > " order_date string," + > " product_code string," + > " cnt int" + > " ) " + > " WITH (" + > " 'connector.type' = 'jdbc', " + > " 'connector.url' = 'jdbc:mysql://192.168.179.120:3306/flink?serverTimezone=UTC&useSSL=true', " + > " 'connector.table' = 'order_state_cnt', " + > " 'connector.driver' = 'com.mysql.jdbc.Driver', " + > " 'connector.username' = 'root'," + > " 'connector.password' = '123456'," + > " 'connector.write.flush.max-rows' = '1'," +//--默认每5000条数据写入一次,测试调小一点 > " 'connector.write.flush.interval' = '2s'," +//--写入时间间隔 > " 'connector.write.max-retries' = '3'" + > " )"); > tenv.sqlUpdate("insert into product_sale " + > "select create_date,product_code,sum(quantity)" + > "from (select t1.order_no," + > " t1.create_date," + > " t2.product_code," + > " t2.quantity" + > " from (select order_no," + > " order_state," + > " substring(create_time,1,10) create_date," + > " update_time ," + > " row_number() over(partition by order_no order by update_time desc) as rn" + > " from orders" + > " )t1" + > " left join order_detail t2" + > " on t1.order_no=t2.order_no" + > " where t1.rn=1" +//--取最新的订单状态数据 > " and t1.order_state<>0" +//--不包含取消订单 > " )t3" + > " group by create_date,product_code"); > > Table table = tenv.sqlQuery("select create_date,product_code,sum(quantity)" + > "from (select t1.order_no," + > " t1.create_date," + > " t2.product_code," + > " t2.quantity" + > " from (select order_no," + > " order_state," + > " substring(create_time,1,10) create_date," + > " update_time ," + > " row_number() over(partition by order_no order by update_time desc) as rn" + > " from orders" + > " )t1" + > " left join order_detail t2" + > " on t1.order_no=t2.order_no" + > " where t1.rn=1" + > " and t1.order_state<>0" + > " )t3" + > " group by create_date,product_code"); > tenv.toRetractStream(table, Row.class).print(); > tenv.execute("count"); > } > } > mysql 建表语句 > CREATE TABLE `order_state_cnt` ( > `order_date` varchar(12) , > `product_code` varchar(12) , > `cnt` int > ) ENGINE=InnoDB DEFAULT CHARSET=utf8 > > > 使用的是kafka命令行一条条发送数据的方式 > > > 主要是deleteStatement.executeBatch();这个方法报错 > @Override > public void executeBatch() throws SQLException { > if (keyToRows.size() > 0) { > for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet()) { > Row pk = entry.getKey(); > Tuple2<Boolean, Row> tuple = entry.getValue(); > if (tuple.f0) { > processOneRowInBatch(pk, tuple.f1); > } else { > setRecordToStatement(deleteStatement, pkTypes, pk); > deleteStatement.addBatch(); > } > } > internalExecuteBatch(); > deleteStatement.executeBatch(); > keyToRows.clear(); > } > } > 在2020年4月23日 00:21,Leonard Xu<[hidden email]> 写道: > 赞详细的分析! > > 没能复现你说的问题,最后一步的分析应该有点小问题,我看下了jdbc mysql的实现 > com/mysql/jdbc/PreparedStatement.java#executeBatchInternal() 1289行 > 是会判断batchedArgs数组的大小后会直接返回的,应该不会执行,你可以进一步调试确认下 > ``` > if (this.batchedArgs == null || this.batchedArgs.size() == 0) { > return new long[0]; > } > ``` > > 祝好, > Leonard Xu > > 在 2020年4月22日,21:58,1101300123 <[hidden email]> 写道: > > > > 我在SQL关联后把结果写入mysql出现 No value specified for parameter 1错误? > 我的版本是1.10.0,代码如下 > JDBCUpsertTableSink build = JDBCUpsertTableSink.builder() > .setTableSchema(results.getSchema()) > .setOptions(JDBCOptions.builder() > .setDBUrl("。。。。MultiQueries=true&useUnicode=true&characterEncoding=UTF-8") > .setDriverName("com.mysql.jdbc.Driver") > .setUsername("jczx_cjch") > .setPassword("jczx_cjch2") > .setTableName("xkf_join_result") > .build()) > .setFlushIntervalMills(1000) > .setFlushMaxSize(100) > .setMaxRetryTimes(3) > .build(); > > > DataStream<Tuple2<Boolean, Row>> retract = bsTableEnv.toRetractStream(results, Row.class); > retract.print(); > build.emitDataStream(retract); > > > > > 就会出现如下错误 > java.sql.SQLException: No value specified for parameter 1 > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965) > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898) > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887) > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861) > at com.mysql.jdbc.PreparedStatement.checkAllParametersSet(PreparedStatement.java:2211) > at com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:2191) > at com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:2121) > at com.mysql.jdbc.PreparedStatement.execute(PreparedStatement.java:1162) > at org.apache.flink.api.java.io.jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:118) > at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159) > at org.apache.flink.api.java.io.jdbc.JDBCUpsertSinkFunction.snapshotState(JDBCUpsertSinkFunction.java:56) > at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) > at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) > at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402) > at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420) > at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1354) > > > > > 我的输出数据是(true,2020-04-22 21:34:00,2020-04-22 21:34:15,20200422213541465568468)是这样的 > 我查看源码发现 > 先调用JDBCUpsertOutputFormat类的writeRecord方法给UpsertWriter类的成员变量map中添加元素 > @Override > public synchronized void writeRecord(Tuple2<Boolean, Row> tuple2) throws IOException { > checkFlushException(); > try { > jdbcWriter.addRecord(tuple2); > batchCount++; > if (batchCount >= flushMaxSize) { > flush(); > } > } catch (Exception e) { > throw new RuntimeException("Writing records to JDBC failed.", e); > } > } > 之后调用flush()方法,调用UpsertWriter类执行executeBatch方法 > public synchronized void flush() throws Exception { > checkFlushException(); > for (int i = 1; i <= maxRetryTimes; i++) { > try { > jdbcWriter.executeBatch(); > batchCount = 0; > break; > } catch (SQLException e) { > LOG.error("JDBC executeBatch error, retry times = {}", i, e); > if (i >= maxRetryTimes) { > throw e; > } > Thread.sleep(1000 * i); > } > } > } > > > 然后会调用UpsertWriter类 实现JDBCWriter类在executeBatch方法中先判断map是否为空,然后循环map;之后判断2元组第一个元素的true调用内部类处理元素,否则删除数据;但是我每次的数据只有一条就是map的大小是1,且2元组的第一个元素值是true,循环结束执行 deleteStatement.executeBatch();方法就会出错,因为删除的语句站位符还没有填充; > > > @Override > public void executeBatch() throws SQLException { > if (keyToRows.size() > 0) { > for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet()) { > Row pk = entry.getKey(); > Tuple2<Boolean, Row> tuple = entry.getValue(); > if (tuple.f0) { > processOneRowInBatch(pk, tuple.f1); > } else { > setRecordToStatement(deleteStatement, pkTypes, pk); > deleteStatement.addBatch(); > } > } > internalExecuteBatch(); > deleteStatement.executeBatch(); > keyToRows.clear(); > } > } |
Free forum by Nabble | Edit this page |