根据flink端对端的定义,采用二阶段提交,flink中已经封装好了,放到TwoPhaseCommitSinkFunction中,只要实现对应的方法就行,
– beginTransaction – preCommit – commit – abort 如果sink端的操作是幂等的,就不需要实现了,假设现在我要实现写入MYSQL,实现端对端精确一次性,这种情况下是不是 preCommit不需要写, 然后最终commit再提交事务?这样事务的打开时间就跟checkpoint的时间关联了,假设checkpoint时间很长,是不是意味着事务打开时间很长,有风险; 而且如果checkpoint要持续的进行着(原来假设每分钟一次checkpoint,现在要每秒一次?),不能设置间隔时间? 或者是间隔时间都算到下次checkpoint中,这样事务的打开时间更长了,请问是不是这样理解? 这样端对端精确一次性是不是效果很小? |
Hi
可以看一下TwoPhaseCommitSinkFunction的实现,preCommit是在snapshotState时调用,会将当前的currentTransactionHolder存储到pendingCommitTransactions,直到notifyCheckpointComplete时(也就是commit时),将pendingCommitTransactions取出进行事务性操作。所以preCommit时候不需要写是不符合语义的。 如果借助TwoPhaseCommitSinkFunction,确实需要适当减少checkpoint interval,否则可能很久都没有输出,但是不建议在state较大时设置成秒级。 祝好 唐云 ________________________________ From: 1900 <[hidden email]> Sent: Tuesday, August 27, 2019 14:15 To: user-zh <[hidden email]> Subject: 关于flink中端对端的精确一次性理解问题 根据flink端对端的定义,采用二阶段提交,flink中已经封装好了,放到TwoPhaseCommitSinkFunction中,只要实现对应的方法就行, �C beginTransaction �C preCommit �C commit �C abort 如果sink端的操作是幂等的,就不需要实现了,假设现在我要实现写入MYSQL,实现端对端精确一次性,这种情况下是不是 preCommit不需要写, 然后最终commit再提交事务?这样事务的打开时间就跟checkpoint的时间关联了,假设checkpoint时间很长,是不是意味着事务打开时间很长,有风险; 而且如果checkpoint要持续的进行着(原来假设每分钟一次checkpoint,现在要每秒一次?),不能设置间隔时间? 或者是间隔时间都算到下次checkpoint中,这样事务的打开时间更长了,请问是不是这样理解? 这样端对端精确一次性是不是效果很小? |
hi,
看了解释,还是觉得比较抽象,能不能具体点,下面贴下之前一位同学写的实现,总感觉不对,但好像也有点意思,能不能结合代码解释下 public class Sink extends TwoPhaseCommitSinkFunction <ObjectNode, Connection, Void> { // private Connection connection; public Sink() { super(new KryoSerializer <>(Connection.class , new ExecutionConfig()) , VoidSerializer.INSTANCE); } @Override protected void invoke(Connection connection , ObjectNode objectNode , Context context) throws Exception { String stu = objectNode.get("value").toString(); Student student = JSON.parseObject(stu , Student.class); System.err.println("start invoke......." + "id = " + student.getId() + " name = " + student.getName() + " password" + " = " + student.getPassword() + " age = " + student.getAge()); String sql = "insert into Student(id,name,password,age) values (?,?,?,?)"; PreparedStatement ps = connection.prepareStatement(sql); ps.setInt(1 , student.getId()); ps.setString(2 , student.getName()); ps.setString(3 , student.getPassword()); ps.setInt(4 , student.getAge()); ps.executeUpdate(); //手动制造异常 if (student.getId() == 33) { System.out.println(1 / 0); } } @Override protected Connection beginTransaction() throws Exception { String url = "jdbc:mysql:"; return DBConnectUtil.getConnection(url , "" , ""); } @Override protected void preCommit(Connection connection) throws Exception { } @Override protected void commit(Connection connection) { if (connection != null) { try { connection.commit(); } catch (SQLException e) { System.err.println("commit error ............" + e.getMessage()); } finally { try { connection.close(); } catch (SQLException e) { System.err.println(" finally commit error ............" + e.getMessage()); } } } } @Override protected void abort(Connection connection) { if (connection != null) { try { connection.rollback(); } catch (SQLException e) { System.err.println("abort error ............" + e.getMessage()); } finally { try { connection.close(); } catch (SQLException e) { System.err.println(" finally abort error ............" + e.getMessage()); } } } } } ------------------ 原始邮件 ------------------ 发件人: "Yun Tang"<[hidden email]>; 发送时间: 2019年8月27日(星期二) 下午3:00 收件人: "user-zh"<[hidden email]>; 主题: Re: 关于flink中端对端的精确一次性理解问题 Hi 可以看一下TwoPhaseCommitSinkFunction的实现,preCommit是在snapshotState时调用,会将当前的currentTransactionHolder存储到pendingCommitTransactions,直到notifyCheckpointComplete时(也就是commit时),将pendingCommitTransactions取出进行事务性操作。所以preCommit时候不需要写是不符合语义的。 如果借助TwoPhaseCommitSinkFunction,确实需要适当减少checkpoint interval,否则可能很久都没有输出,但是不建议在state较大时设置成秒级。 祝好 唐云 ________________________________ From: 1900 <[hidden email]> Sent: Tuesday, August 27, 2019 14:15 To: user-zh <[hidden email]> Subject: 关于flink中端对端的精确一次性理解问题 根据flink端对端的定义,采用二阶段提交,flink中已经封装好了,放到TwoPhaseCommitSinkFunction中,只要实现对应的方法就行, – beginTransaction – preCommit – commit – abort 如果sink端的操作是幂等的,就不需要实现了,假设现在我要实现写入MYSQL,实现端对端精确一次性,这种情况下是不是 preCommit不需要写, 然后最终commit再提交事务?这样事务的打开时间就跟checkpoint的时间关联了,假设checkpoint时间很长,是不是意味着事务打开时间很长,有风险; 而且如果checkpoint要持续的进行着(原来假设每分钟一次checkpoint,现在要每秒一次?),不能设置间隔时间? 或者是间隔时间都算到下次checkpoint中,这样事务的打开时间更长了,请问是不是这样理解? 这样端对端精确一次性是不是效果很小? |
对于MySQL sink来说,使用2PC我理解应该是不能用MySQL
transaction的。因为如果你在preCommit中(或之前)开启了transaction,任务失败的话数据会直接丢失,没法实现2PC里preCommit成功后必须保证commit成功的语义。一种办法是preCommit时写入mysql临时表,在commit时将临时表数据移动入正式表。 On Wed, Aug 28, 2019 at 2:47 PM 1900 <[hidden email]> wrote: > hi, > > > 看了解释,还是觉得比较抽象,能不能具体点,下面贴下之前一位同学写的实现,总感觉不对,但好像也有点意思,能不能结合代码解释下 > > > public class Sink extends TwoPhaseCommitSinkFunction <ObjectNode, > Connection, Void> { > > > // private Connection connection; > > > public Sink() { > super(new KryoSerializer <>(Connection.class , new > ExecutionConfig()) , VoidSerializer.INSTANCE); > } > > > @Override > protected void invoke(Connection connection , ObjectNode objectNode , > Context context) throws Exception { > String stu = objectNode.get("value").toString(); > Student student = JSON.parseObject(stu , Student.class); > > > System.err.println("start invoke......." + "id = " + > student.getId() + " name = " + student.getName() + " password" > + " = " + student.getPassword() + " age = " + > student.getAge()); > > > String sql = "insert into Student(id,name,password,age) > values (?,?,?,?)"; > PreparedStatement ps = connection.prepareStatement(sql); > ps.setInt(1 , student.getId()); > ps.setString(2 , student.getName()); > ps.setString(3 , student.getPassword()); > ps.setInt(4 , student.getAge()); > ps.executeUpdate(); > //手动制造异常 > if (student.getId() == 33) { > System.out.println(1 / 0); > } > } > > > @Override > protected Connection beginTransaction() throws Exception { > String url = "jdbc:mysql:"; > return DBConnectUtil.getConnection(url , "" , ""); > } > > > @Override > protected void preCommit(Connection connection) throws Exception { > } > > > @Override > protected void commit(Connection connection) { > if (connection != null) { > try { > connection.commit(); > } catch (SQLException e) { > System.err.println("commit error ............" + > e.getMessage()); > } finally { > try { > connection.close(); > } catch (SQLException e) { > System.err.println(" finally commit error > ............" + e.getMessage()); > } > } > } > } > > > @Override > protected void abort(Connection connection) { > if (connection != null) { > try { > connection.rollback(); > } catch (SQLException e) { > System.err.println("abort error ............" + > e.getMessage()); > } finally { > try { > connection.close(); > } catch (SQLException e) { > System.err.println(" finally abort error > ............" + e.getMessage()); > } > } > } > } > } > > > > > > > > ------------------ 原始邮件 ------------------ > 发件人: "Yun Tang"<[hidden email]>; > 发送时间: 2019年8月27日(星期二) 下午3:00 > 收件人: "user-zh"<[hidden email]>; > > 主题: Re: 关于flink中端对端的精确一次性理解问题 > > > > Hi > > > 可以看一下TwoPhaseCommitSinkFunction的实现,preCommit是在snapshotState时调用,会将当前的currentTransactionHolder存储到pendingCommitTransactions,直到notifyCheckpointComplete时(也就是commit时),将pendingCommitTransactions取出进行事务性操作。所以preCommit时候不需要写是不符合语义的。 > > 如果借助TwoPhaseCommitSinkFunction,确实需要适当减少checkpoint > interval,否则可能很久都没有输出,但是不建议在state较大时设置成秒级。 > > 祝好 > 唐云 > ________________________________ > From: 1900 <[hidden email]> > Sent: Tuesday, August 27, 2019 14:15 > To: user-zh <[hidden email]> > Subject: 关于flink中端对端的精确一次性理解问题 > > > 根据flink端对端的定义,采用二阶段提交,flink中已经封装好了,放到TwoPhaseCommitSinkFunction中,只要实现对应的方法就行, > > > > – beginTransaction > > – preCommit > > – commit > > – abort > > > > 如果sink端的操作是幂等的,就不需要实现了,假设现在我要实现写入MYSQL,实现端对端精确一次性,这种情况下是不是 preCommit不需要写, > > 然后最终commit再提交事务?这样事务的打开时间就跟checkpoint的时间关联了,假设checkpoint时间很长,是不是意味着事务打开时间很长,有风险; > 而且如果checkpoint要持续的进行着(原来假设每分钟一次checkpoint,现在要每秒一次?),不能设置间隔时间? > 或者是间隔时间都算到下次checkpoint中,这样事务的打开时间更长了,请问是不是这样理解? > 这样端对端精确一次性是不是效果很小? |
Free forum by Nabble | Edit this page |