关于flink中端对端的精确一次性理解问题

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

关于flink中端对端的精确一次性理解问题

1900
根据flink端对端的定义,采用二阶段提交,flink中已经封装好了,放到TwoPhaseCommitSinkFunction中,只要实现对应的方法就行,



– beginTransaction

– preCommit

– commit

– abort



如果sink端的操作是幂等的,就不需要实现了,假设现在我要实现写入MYSQL,实现端对端精确一次性,这种情况下是不是 preCommit不需要写,
然后最终commit再提交事务?这样事务的打开时间就跟checkpoint的时间关联了,假设checkpoint时间很长,是不是意味着事务打开时间很长,有风险;
而且如果checkpoint要持续的进行着(原来假设每分钟一次checkpoint,现在要每秒一次?),不能设置间隔时间?
或者是间隔时间都算到下次checkpoint中,这样事务的打开时间更长了,请问是不是这样理解?
这样端对端精确一次性是不是效果很小?
Reply | Threaded
Open this post in threaded view
|

Re: 关于flink中端对端的精确一次性理解问题

Yun Tang
Hi

可以看一下TwoPhaseCommitSinkFunction的实现,preCommit是在snapshotState时调用,会将当前的currentTransactionHolder存储到pendingCommitTransactions,直到notifyCheckpointComplete时(也就是commit时),将pendingCommitTransactions取出进行事务性操作。所以preCommit时候不需要写是不符合语义的。

如果借助TwoPhaseCommitSinkFunction,确实需要适当减少checkpoint interval,否则可能很久都没有输出,但是不建议在state较大时设置成秒级。

祝好
唐云
________________________________
From: 1900 <[hidden email]>
Sent: Tuesday, August 27, 2019 14:15
To: user-zh <[hidden email]>
Subject: 关于flink中端对端的精确一次性理解问题

根据flink端对端的定义,采用二阶段提交,flink中已经封装好了,放到TwoPhaseCommitSinkFunction中,只要实现对应的方法就行,



�C beginTransaction

�C preCommit

�C commit

�C abort



如果sink端的操作是幂等的,就不需要实现了,假设现在我要实现写入MYSQL,实现端对端精确一次性,这种情况下是不是 preCommit不需要写,
然后最终commit再提交事务?这样事务的打开时间就跟checkpoint的时间关联了,假设checkpoint时间很长,是不是意味着事务打开时间很长,有风险;
而且如果checkpoint要持续的进行着(原来假设每分钟一次checkpoint,现在要每秒一次?),不能设置间隔时间?
或者是间隔时间都算到下次checkpoint中,这样事务的打开时间更长了,请问是不是这样理解?
这样端对端精确一次性是不是效果很小?
Reply | Threaded
Open this post in threaded view
|

回复: 关于flink中端对端的精确一次性理解问题

1900
hi,


看了解释,还是觉得比较抽象,能不能具体点,下面贴下之前一位同学写的实现,总感觉不对,但好像也有点意思,能不能结合代码解释下


public class Sink extends TwoPhaseCommitSinkFunction <ObjectNode, Connection, Void> {


//    private Connection connection;


    public Sink() {
        super(new KryoSerializer <>(Connection.class , new ExecutionConfig()) , VoidSerializer.INSTANCE);
    }


    @Override
    protected void invoke(Connection connection , ObjectNode objectNode , Context context) throws Exception {
        String  stu     = objectNode.get("value").toString();
        Student student = JSON.parseObject(stu , Student.class);


        System.err.println("start invoke......." + "id = " + student.getId() + "  name = " + student.getName() + "   password"
                           + " = " + student.getPassword() + "  age = " + student.getAge());


        String            sql = "insert into Student(id,name,password,age) values (?,?,?,?)";
        PreparedStatement ps  = connection.prepareStatement(sql);
        ps.setInt(1 , student.getId());
        ps.setString(2 , student.getName());
        ps.setString(3 , student.getPassword());
        ps.setInt(4 , student.getAge());
        ps.executeUpdate();
        //手动制造异常
        if (student.getId() == 33) {
            System.out.println(1 / 0);
        }
    }


    @Override
    protected Connection beginTransaction() throws Exception {
        String url = "jdbc:mysql:";
        return DBConnectUtil.getConnection(url , "" , "");
    }


    @Override
    protected void preCommit(Connection connection) throws Exception {
    }


    @Override
    protected void commit(Connection connection) {
        if (connection != null) {
            try {
                connection.commit();
            } catch (SQLException e) {
                System.err.println("commit  error ............" + e.getMessage());
            } finally {
                try {
                    connection.close();
                } catch (SQLException e) {
                    System.err.println(" finally  commit error ............" + e.getMessage());
                }
            }
        }
    }


    @Override
    protected void abort(Connection connection) {
        if (connection != null) {
            try {
                connection.rollback();
            } catch (SQLException e) {
                System.err.println("abort error ............" + e.getMessage());
            } finally {
                try {
                    connection.close();
                } catch (SQLException e) {
                    System.err.println(" finally  abort error ............" + e.getMessage());
                }
            }
        }
    }
}







------------------ 原始邮件 ------------------
发件人: "Yun Tang"<[hidden email]>;
发送时间: 2019年8月27日(星期二) 下午3:00
收件人: "user-zh"<[hidden email]>;

主题: Re: 关于flink中端对端的精确一次性理解问题



Hi

可以看一下TwoPhaseCommitSinkFunction的实现,preCommit是在snapshotState时调用,会将当前的currentTransactionHolder存储到pendingCommitTransactions,直到notifyCheckpointComplete时(也就是commit时),将pendingCommitTransactions取出进行事务性操作。所以preCommit时候不需要写是不符合语义的。

如果借助TwoPhaseCommitSinkFunction,确实需要适当减少checkpoint interval,否则可能很久都没有输出,但是不建议在state较大时设置成秒级。

祝好
唐云
________________________________
From: 1900 <[hidden email]>
Sent: Tuesday, August 27, 2019 14:15
To: user-zh <[hidden email]>
Subject: 关于flink中端对端的精确一次性理解问题

根据flink端对端的定义,采用二阶段提交,flink中已经封装好了,放到TwoPhaseCommitSinkFunction中,只要实现对应的方法就行,



– beginTransaction

– preCommit

– commit

– abort



如果sink端的操作是幂等的,就不需要实现了,假设现在我要实现写入MYSQL,实现端对端精确一次性,这种情况下是不是 preCommit不需要写,
然后最终commit再提交事务?这样事务的打开时间就跟checkpoint的时间关联了,假设checkpoint时间很长,是不是意味着事务打开时间很长,有风险;
而且如果checkpoint要持续的进行着(原来假设每分钟一次checkpoint,现在要每秒一次?),不能设置间隔时间?
或者是间隔时间都算到下次checkpoint中,这样事务的打开时间更长了,请问是不是这样理解?
这样端对端精确一次性是不是效果很小?
Reply | Threaded
Open this post in threaded view
|

Re: 关于flink中端对端的精确一次性理解问题

qi luo
对于MySQL sink来说,使用2PC我理解应该是不能用MySQL
transaction的。因为如果你在preCommit中(或之前)开启了transaction,任务失败的话数据会直接丢失,没法实现2PC里preCommit成功后必须保证commit成功的语义。一种办法是preCommit时写入mysql临时表,在commit时将临时表数据移动入正式表。

On Wed, Aug 28, 2019 at 2:47 PM 1900 <[hidden email]> wrote:

> hi,
>
>
> 看了解释,还是觉得比较抽象,能不能具体点,下面贴下之前一位同学写的实现,总感觉不对,但好像也有点意思,能不能结合代码解释下
>
>
> public class Sink extends TwoPhaseCommitSinkFunction <ObjectNode,
> Connection, Void> {
>
>
> //    private Connection connection;
>
>
>     public Sink() {
>         super(new KryoSerializer <>(Connection.class , new
> ExecutionConfig()) , VoidSerializer.INSTANCE);
>     }
>
>
>     @Override
>     protected void invoke(Connection connection , ObjectNode objectNode ,
> Context context) throws Exception {
>         String  stu     = objectNode.get("value").toString();
>         Student student = JSON.parseObject(stu , Student.class);
>
>
>         System.err.println("start invoke......." + "id = " +
> student.getId() + "  name = " + student.getName() + "   password"
>                            + " = " + student.getPassword() + "  age = " +
> student.getAge());
>
>
>         String            sql = "insert into Student(id,name,password,age)
> values (?,?,?,?)";
>         PreparedStatement ps  = connection.prepareStatement(sql);
>         ps.setInt(1 , student.getId());
>         ps.setString(2 , student.getName());
>         ps.setString(3 , student.getPassword());
>         ps.setInt(4 , student.getAge());
>         ps.executeUpdate();
>         //手动制造异常
>         if (student.getId() == 33) {
>             System.out.println(1 / 0);
>         }
>     }
>
>
>     @Override
>     protected Connection beginTransaction() throws Exception {
>         String url = "jdbc:mysql:";
>         return DBConnectUtil.getConnection(url , "" , "");
>     }
>
>
>     @Override
>     protected void preCommit(Connection connection) throws Exception {
>     }
>
>
>     @Override
>     protected void commit(Connection connection) {
>         if (connection != null) {
>             try {
>                 connection.commit();
>             } catch (SQLException e) {
>                 System.err.println("commit  error ............" +
> e.getMessage());
>             } finally {
>                 try {
>                     connection.close();
>                 } catch (SQLException e) {
>                     System.err.println(" finally  commit error
> ............" + e.getMessage());
>                 }
>             }
>         }
>     }
>
>
>     @Override
>     protected void abort(Connection connection) {
>         if (connection != null) {
>             try {
>                 connection.rollback();
>             } catch (SQLException e) {
>                 System.err.println("abort error ............" +
> e.getMessage());
>             } finally {
>                 try {
>                     connection.close();
>                 } catch (SQLException e) {
>                     System.err.println(" finally  abort error
> ............" + e.getMessage());
>                 }
>             }
>         }
>     }
> }
>
>
>
>
>
>
>
> ------------------ 原始邮件 ------------------
> 发件人: "Yun Tang"<[hidden email]>;
> 发送时间: 2019年8月27日(星期二) 下午3:00
> 收件人: "user-zh"<[hidden email]>;
>
> 主题: Re: 关于flink中端对端的精确一次性理解问题
>
>
>
> Hi
>
>
> 可以看一下TwoPhaseCommitSinkFunction的实现,preCommit是在snapshotState时调用,会将当前的currentTransactionHolder存储到pendingCommitTransactions,直到notifyCheckpointComplete时(也就是commit时),将pendingCommitTransactions取出进行事务性操作。所以preCommit时候不需要写是不符合语义的。
>
> 如果借助TwoPhaseCommitSinkFunction,确实需要适当减少checkpoint
> interval,否则可能很久都没有输出,但是不建议在state较大时设置成秒级。
>
> 祝好
> 唐云
> ________________________________
> From: 1900 <[hidden email]>
> Sent: Tuesday, August 27, 2019 14:15
> To: user-zh <[hidden email]>
> Subject: 关于flink中端对端的精确一次性理解问题
>
>
> 根据flink端对端的定义,采用二阶段提交,flink中已经封装好了,放到TwoPhaseCommitSinkFunction中,只要实现对应的方法就行,
>
>
>
> – beginTransaction
>
> – preCommit
>
> – commit
>
> – abort
>
>
>
> 如果sink端的操作是幂等的,就不需要实现了,假设现在我要实现写入MYSQL,实现端对端精确一次性,这种情况下是不是 preCommit不需要写,
>
> 然后最终commit再提交事务?这样事务的打开时间就跟checkpoint的时间关联了,假设checkpoint时间很长,是不是意味着事务打开时间很长,有风险;
> 而且如果checkpoint要持续的进行着(原来假设每分钟一次checkpoint,现在要每秒一次?),不能设置间隔时间?
> 或者是间隔时间都算到下次checkpoint中,这样事务的打开时间更长了,请问是不是这样理解?
> 这样端对端精确一次性是不是效果很小?