TwoPhaseCommitSinkFunction 中 initializeState 代码的困惑

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

TwoPhaseCommitSinkFunction 中 initializeState 代码的困惑

Zeahoo
大家好,
在阅读源码中看到以下代码片段,initializeState 方法内有如下代码:

                if (context.isRestored()) {
                        LOG.info("{} - restoring state", name());
                        for (State<TXN, CONTEXT> operatorState : state.get()) {
                                userContext = operatorState.getContext();
                                List<TransactionHolder&lt;TXN>> recoveredTransactions =
operatorState.getPendingCommitTransactions();
                                List<TXN> handledTransactions = new
ArrayList<>(recoveredTransactions.size() + 1);
                                for (TransactionHolder<TXN> recoveredTransaction :
recoveredTransactions) {
                                        // If this fails to succeed eventually, there is actually data loss
                                        recoverAndCommitInternal(recoveredTransaction);
                                        handledTransactions.add(recoveredTransaction.handle);
                                        LOG.info("{} committed recovered transaction {}", name(),
recoveredTransaction);
                                }

                                {
                                        TXN transaction = operatorState.getPendingTransaction().handle;
                                        recoverAndAbort(transaction);
                                        handledTransactions.add(transaction);
                                        LOG.info("{} aborted recovered transaction {}", name(),
operatorState.getPendingTransaction());
                                }

                                if (userContext.isPresent()) {
                                        finishRecoveringContext(handledTransactions);
                                        recoveredUserContext = true;
                                }
                        }
                }

如上,请教一下为什么

                                        TXN transaction = operatorState.getPendingTransaction().handle;
                                        recoverAndAbort(transaction);
                                        handledTransactions.add(transaction);
                                        LOG.info("{} aborted recovered transaction {}", name(),
operatorState.getPendingTransaction());
需要被花括号包裹起来,在阅读代码过程中并没有感受到这个代码需要被 {} 包裹的必要。
希望得到各位的解答!




--
Sent from: http://apache-flink.147419.n8.nabble.com/