项目简述:从kafka取数据,每10秒一批,sink到mysql中的ETL 环境相关信息 flink运行模式:local mysql的global variables中wait_timeout=28800 mysql客户端mysql-connector-java版本5.1.42 报错 org.apache.flink.streaming.runtime.tasks.TimerException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_191] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_191] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_191] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[na:1.8.0_191] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_191] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_191] at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_191] Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0] at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0] at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0] at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0] at com.feiyu.help.AWF.apply(AWF.java:23) ~[classes/:na] at com.feiyu.help.AWF.apply(AWF.java:14) ~[classes/:na] at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction.process(InternalIterableAllWindowFunction.java:44) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0] at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction.process(InternalIterableAllWindowFunction.java:32) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0] at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0] at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0] at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0] at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0] ... 7 common frames omitted Caused by: java.sql.SQLException: Could not retrieve transaction read-only status from server at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:964) ~[mysql-connector-java-5.1.42.jar:5.1.42] at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:897) ~[mysql-connector-java-5.1.42.jar:5.1.42] at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:886) ~[mysql-connector-java-5.1.42.jar:5.1.42] at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:860) ~[mysql-connector-java-5.1.42.jar:5.1.42] at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:877) ~[mysql-connector-java-5.1.42.jar:5.1.42] at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:873) ~[mysql-connector-java-5.1.42.jar:5.1.42] at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3536) ~[mysql-connector-java-5.1.42.jar:5.1.42] at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3505) ~[mysql-connector-java-5.1.42.jar:5.1.42] at com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1230) ~[mysql-connector-java-5.1.42.jar:5.1.42] at com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:970) ~[mysql-connector-java-5.1.42.jar:5.1.42] at com.feiyu.help.MySQLTwoPhaseCommitSink.invoke(MySQLTwoPhaseCommitSink.java:78) ~[classes/:na] at com.feiyu.help.MySQLTwoPhaseCommitSink.invoke(MySQLTwoPhaseCommitSink.java:23) ~[classes/:na] at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:228) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0] at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0] ... 20 common frames omitted Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure The last packet successfully received from the server was 3,747 milliseconds ago. The last packet sent successfully to the server was 3,748 milliseconds ago. at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.8.0_191] at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:1.8.0_191] at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.8.0_191] at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[na:1.8.0_191] at com.mysql.jdbc.Util.handleNewInstance(Util.java:425) ~[mysql-connector-java-5.1.42.jar:5.1.42] at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:989) ~[mysql-connector-java-5.1.42.jar:5.1.42] at com.mysql.jdbc.MysqlIO.clearInputStream(MysqlIO.java:905) ~[mysql-connector-java-5.1.42.jar:5.1.42] at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2474) ~[mysql-connector-java-5.1.42.jar:5.1.42] at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2680) ~[mysql-connector-java-5.1.42.jar:5.1.42] at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2486) ~[mysql-connector-java-5.1.42.jar:5.1.42] at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2444) ~[mysql-connector-java-5.1.42.jar:5.1.42] at com.mysql.jdbc.StatementImpl.executeQuery(StatementImpl.java:1381) ~[mysql-connector-java-5.1.42.jar:5.1.42] at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3530) ~[mysql-connector-java-5.1.42.jar:5.1.42] ... 28 common frames omitted Caused by: java.net.SocketException: Bad file descriptor (ioctl FIONREAD failed) at java.net.PlainSocketImpl.socketAvailable(Native Method) ~[na:1.8.0_191] at java.net.AbstractPlainSocketImpl.available(AbstractPlainSocketImpl.java:490) ~[na:1.8.0_191] at java.net.SocketInputStream.available(SocketInputStream.java:259) ~[na:1.8.0_191] at com.mysql.jdbc.util.ReadAheadInputStream.available(ReadAheadInputStream.java:219) ~[mysql-connector-java-5.1.42.jar:5.1.42] at com.mysql.jdbc.MysqlIO.clearInputStream(MysqlIO.java:901) ~[mysql-connector-java-5.1.42.jar:5.1.42] ... 34 common frames omitted 11:59:30.051 [flink-akka.actor.default-dispatcher-8] INFO o.a.f.r.e.ExecutionGraph - Job flink kafka to Mysql (822054e1e70a7ac7616ffef1e667a20e) switched from state RUNNING to FAILING. org.apache.flink.streaming.runtime.tasks.TimerException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_191] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_191] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_191] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[na:1.8.0_191] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_191] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_191] at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_191] Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0] at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0] at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0] at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0] at com.feiyu.help.AWF.apply(AWF.java:23) ~[classes/:na] at com.feiyu.help.AWF.apply(AWF.java:14) ~[classes/:na] at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction.process(InternalIterableAllWindowFunction.java:44) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0] at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction.process(InternalIterableAllWindowFunction.java:32) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0] at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0] at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0] at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0] at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0] ... 7 common frames omitted Caused by: java.sql.SQLException: Could not retrieve transaction read-only status from server at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:964) ~[mysql-connector-java-5.1.42.jar:5.1.42] at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:897) ~[mysql-connector-java-5.1.42.jar:5.1.42] at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:886) ~[mysql-connector-java-5.1.42.jar:5.1.42] at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:860) ~[mysql-connector-java-5.1.42.jar:5.1.42] at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:877) ~[mysql-connector-java-5.1.42.jar:5.1.42] at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:873) ~[mysql-connector-java-5.1.42.jar:5.1.42] at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3536) ~[mysql-connector-java-5.1.42.jar:5.1.42] at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3505) ~[mysql-connector-java-5.1.42.jar:5.1.42] at com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1230) ~[mysql-connector-java-5.1.42.jar:5.1.42] at com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:970) ~[mysql-connector-java-5.1.42.jar:5.1.42] at com.feiyu.help.MySQLTwoPhaseCommitSink.invoke(MySQLTwoPhaseCommitSink.java:78) ~[classes/:na] at com.feiyu.help.MySQLTwoPhaseCommitSink.invoke(MySQLTwoPhaseCommitSink.java:23) ~[classes/:na] at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:228) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0] at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) ~[flink-streaming-java_2.11-1.9.0.jar:1.9.0] ... 20 common frames omitted Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure The last packet successfully received from the server was 3,747 milliseconds ago. The last packet sent successfully to the server was 3,748 milliseconds ago. at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.8.0_191] at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:1.8.0_191] at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.8.0_191] at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[na:1.8.0_191] at com.mysql.jdbc.Util.handleNewInstance(Util.java:425) ~[mysql-connector-java-5.1.42.jar:5.1.42] at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:989) ~[mysql-connector-java-5.1.42.jar:5.1.42] at com.mysql.jdbc.MysqlIO.clearInputStream(MysqlIO.java:905) ~[mysql-connector-java-5.1.42.jar:5.1.42] at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2474) ~[mysql-connector-java-5.1.42.jar:5.1.42] at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2680) ~[mysql-connector-java-5.1.42.jar:5.1.42] at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2486) ~[mysql-connector-java-5.1.42.jar:5.1.42] at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2444) ~[mysql-connector-java-5.1.42.jar:5.1.42] at com.mysql.jdbc.StatementImpl.executeQuery(StatementImpl.java:1381) ~[mysql-connector-java-5.1.42.jar:5.1.42] at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3530) ~[mysql-connector-java-5.1.42.jar:5.1.42] ... 28 common frames omitted Caused by: java.net.SocketException: Bad file descriptor (ioctl FIONREAD failed) at java.net.PlainSocketImpl.socketAvailable(Native Method) ~[na:1.8.0_191] at java.net.AbstractPlainSocketImpl.available(AbstractPlainSocketImpl.java:490) ~[na:1.8.0_191] at java.net.SocketInputStream.available(SocketInputStream.java:259) ~[na:1.8.0_191] at com.mysql.jdbc.util.ReadAheadInputStream.available(ReadAheadInputStream.java:219) ~[mysql-connector-java-5.1.42.jar:5.1.42] at com.mysql.jdbc.MysqlIO.clearInputStream(MysqlIO.java:901) ~[mysql-connector-java-5.1.42.jar:5.1.42] ... 34 common frames omitted 代码 主程序: package com.feiyu.help; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema; import java.sql.Types; import java.util.Properties; public class KafkaSinkMysql { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //检查点配置 env.enableCheckpointing(10000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(100000); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //kafka配置 Properties props = new Properties(); props.put("bootstrap.servers", "10.250.0.101:9092,10.250.0.102:9092,10.250.0.103:9092"); props.put("group.id", "test"); props.put("auto.offset.reset", "latest");//earliest env.addSource(new FlinkKafkaConsumer011<>( "zzz", //这个 kafka topic 需和生产消息的 topic 一致 new JSONKeyValueDeserializationSchema(true), props)).setParallelism(1) .timeWindowAll(Time.seconds(5)) .apply(new AWF()).setParallelism(1) .addSink(new MySQLTwoPhaseCommitSink( "jdbc:mysql://10.250.0.38:3306/xy_data_20027?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&useSSL=false&autoReconnect=true&rewriteBatchedStatements=true" ,//failOverReadOnly=false "com.mysql.jdbc.Driver", "public", "public", "insert into employee_lwn (id, name, password, age, salary, department) values (?, ?, ?, ?, ?, ?)", (new int[]{Types.INTEGER,Types.VARCHAR,Types.VARCHAR,Types.INTEGER,Types.INTEGER,Types.VARCHAR}))) .setParallelism(1); env.execute("flink kafka to Mysql"); } } 窗口处理: package com.feiyu.help; import org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.collect.Lists; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; public class AWF implements AllWindowFunction<ObjectNode, List<ObjectNode>, TimeWindow> { private static final Logger log = LoggerFactory.getLogger(AWF.class); @Override public void apply(TimeWindow window, Iterable<ObjectNode> values, Collector<List<ObjectNode>> out) throws Exception { ArrayList<ObjectNode> model = Lists.newArrayList(values); if (model.size() > 0) { log.info("10 秒内收集到 employee 的数据条数是:" + model.size()); out.collect(model); log.info("collect 执行完毕"); } } } sink: package com.feiyu.help; import com.alibaba.fastjson.JSONObject; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.base.VoidSerializer; import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Serializable; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.List; /** * 实现两阶段提交MySQL */ public class MySQLTwoPhaseCommitSink extends TwoPhaseCommitSinkFunction<List<ObjectNode>, Connection, Void> implements Serializable { private static final long serialVersionUID = 1L; private static final Logger log = LoggerFactory.getLogger(MySQLTwoPhaseCommitSink.class); private String drivername; private String url; private String username; private String password; private String sql; private int[] types; public MySQLTwoPhaseCommitSink(String url, String drivername, String username, String password, String sql, int[] types) { super(new KryoSerializer<>(Connection.class,new ExecutionConfig()), VoidSerializer.INSTANCE); this.drivername = drivername; this.url = url; this.username = username; this.password = password; this.sql = sql; this.types = types; } /** * 执行数据库入库操作 task初始化的时候调用 * * @param connection * @param data * @param context * @throws Exception */ @Override protected void invoke(Connection connection, List<ObjectNode> data, Context context) throws Exception { log.info("start invoke...{},线程id: {}",connection, Thread.currentThread().getId()); log.info("使用连接:{} 创建游标...,线程id: {}",connection, Thread.currentThread().getId()); PreparedStatement prepareStatement = connection.prepareStatement(this.sql); log.info("创建 ps:{} 成功...,线程id: {}",prepareStatement.toString(), Thread.currentThread().getId()); data.forEach(objectNode -> { try { String value = objectNode.get("value").toString(); JSONObject valueJson = JSONObject.parseObject(value); prepareStatement.setObject(1, valueJson.get("id")); prepareStatement.setObject(2, valueJson.get("name")); prepareStatement.setObject(3, valueJson.get("password")); prepareStatement.setObject(4, valueJson.get("age")); prepareStatement.setObject(5, valueJson.get("salary")); prepareStatement.setObject(6, valueJson.get("department")); prepareStatement.addBatch(); } catch (SQLException e) { e.printStackTrace(); } }); log.info("start executeBatch, 使用ps:{}...,线程id: {}",prepareStatement.toString(), Thread.currentThread().getId()); prepareStatement.executeBatch(); log.info("准备关闭ps:{} ...,线程id: {}",prepareStatement.toString(), Thread.currentThread().getId()); prepareStatement.close(); } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { log.info("start snapshotState...,线程id: {}", Thread.currentThread().getId()); super.snapshotState(context); } /** * 获取连接,开启手动提交事物(getConnection方法中) * * @return * @throws Exception */ @Override protected Connection beginTransaction() throws Exception { log.info("start beginTransaction.......,线程id: {}", Thread.currentThread().getId()); Connection connection = null; try { log.info("create connection......."); connection = this.establishConnection(); log.info("建立连接:{} 成功...,线程id: {}",connection, Thread.currentThread().getId()); } catch (SQLException var4) { throw new IllegalArgumentException("open() failed.", var4); } catch (ClassNotFoundException var5) { throw new IllegalArgumentException("JDBC driver class not found.", var5); } // 设置手动提交 connection.setAutoCommit(false); return connection; } private Connection establishConnection() throws SQLException, ClassNotFoundException { Class.forName(this.drivername); if (this.username == null) { return DriverManager.getConnection(this.url); } else { return DriverManager.getConnection(this.url, this.username, this.password); } } /** * 预提交,这里预提交的逻辑在invoke方法中 * * @param connection * @throws Exception */ @Override protected void preCommit(Connection connection) throws Exception { log.info("start preCommit...{} ...,线程id: {}",connection, Thread.currentThread().getId()); } /** * 如果invoke方法执行正常,则提交事务 * * @param connection */ @Override protected void commit(Connection connection) { log.info("start commit..{},线程id: {}",connection, Thread.currentThread().getId()); if (connection != null) { try { log.info("准备提交事务,使用连接:{} ...,线程id: {}",connection, Thread.currentThread().getId()); connection.commit(); close(connection); } catch (SQLException e) { log.error("提交事务失败,Connection: {},线程id: {}",connection, Thread.currentThread().getId()); e.printStackTrace(); } finally { } } } @Override protected void recoverAndCommit(Connection connection) { log.info("start recoverAndCommit {}.......,线程id: {}",connection, Thread.currentThread().getId()); } @Override protected void recoverAndAbort(Connection connection) { log.info("start abort recoverAndAbort {}.......,线程id: {}",connection, Thread.currentThread().getId()); } /** * 如果invoke执行异常则回滚事物,下一次的checkpoint操作也不会执行 * * @param connection */ @Override protected void abort(Connection connection) { log.info("start abort rollback... {} ,线程id: {}",connection, Thread.currentThread().getId()); if (connection != null) { try { log.error("事务发生回滚,Connection: {} ,线程id: {}",connection, Thread.currentThread().getId()); connection.rollback(); // close(connection); } catch (SQLException e) { log.error("事物回滚失败,Connection: {} ,线程id: {}",connection, Thread.currentThread().getId()); } finally { // close(connection); } } } /** * 关闭连接 * * @param connection */ public void close(Connection connection) { if (connection != null) { try { log.info("准备关闭连接:{} ...,线程id: {}",connection, Thread.currentThread().getId()); connection.close(); } catch (SQLException var12) { log.info("JDBC connection could not be closed: " + var12.getMessage()); } finally { // connection = null; } } } } POJO: package com.feiyu.help; import java.io.Serializable; public class Employee implements Serializable {// implements Model private static final long serialVersionUID = 2L; public int id; public String name; public String password; public int age; public Integer salary; public String department; public Employee(int id, String name, String password, int age, Integer salary, String department) { this.id = id; this.name = name; this.password = password; this.age = age; this.salary = salary; this.department = department; } public Employee() { } public Integer getSalary() { return salary; } public void setSalary(Integer salary) { this.salary = salary; } public String getDepartment() { return department; } public void setDepartment(String department) { this.department = department; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } } kafka生产者:(用于生产测试数据) package com.feiyu.help; import com.alibaba.fastjson.JSON; import com.feiyu.gflow.test2.test.Employee; import com.google.gson.Gson; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; import java.util.Random; public class KafkaProducerTestCoutinue { public static void main(String[] args) { Producer(); } public static void Producer() { String broker = "10.250.0.101:9092,10.250.0.102:9092,10.250.0.103:9092"; String topic = "zzz"; Properties props = new Properties(); props.put("bootstrap.servers", broker); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer producer = new KafkaProducer<String, String>(props); String[] depLists = new String[5]; depLists[0] = "行政部"; depLists[1] = "账务部"; depLists[2] = "市场部"; depLists[3] = "技术部"; depLists[4] = "销售部"; Random rand = new Random(300); Gson gson = new Gson(); for (int i = 1; i <= 10000; i++) { String temp = JSON.toJSONString( new Employee(i, "user" + i, "password" + i, rand.nextInt(40) + 20, (rand.nextInt(300) + 1) * 100, depLists[rand.nextInt(5)]) ); ProducerRecord record = new ProducerRecord<String, String>(topic, null, null, temp); producer.send(record); System.out.println("发送数据: " + temp); try { Thread.sleep(10); //发送一条数据 sleep } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("发送数据完成"); producer.flush(); } } mysql数据库表信息: CREATE TABLE `employee_lwn` ( `id` bigint(20) DEFAULT NULL, `name` varchar(50) DEFAULT NULL, `password` varchar(50) DEFAULT NULL, `age` int(11) DEFAULT NULL, `salary` int(11) DEFAULT NULL, `department` varchar(20) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; |
Free forum by Nabble | Edit this page |