One, foreword

  1. According to The definition of Wikipedia, two-phase Commit (2PC for short) is an algorithm designed by giants to solve the consistency problem of all nodes in a distributed system architecture when committing transactions. It can also be called a protocol.
  2. In the Flink version 1.4, the community public logic is extracted from the two-phase commit protocol and encapsulation, released for user-defined implement specific methods to achieve Flink EOS TwoPhaseCommitSinkFunction characteristics. This article is based on Flink 1.12.4, and we read the source code of Flink two-phase submission.

2. Introduction to 2PC

Definition 1.

According to the definition of Wikipedia, two-phase submission can be summarized as one purpose, two roles and three conditions (that is, two important roles can achieve one final purpose under three valid conditions), specifically as follows:

An end goal

All nodes in a distributed system architecture must be consistent when committing transactions (that is, all will succeed or all will fail).

Two important roles

  1. A Coordinator is responsible for coordinating and issuing orders
  2. Participants are responsible for working hard and responding to the coordinators’ orders.

Three conditions

  1. In a distributed system, there must be a coordinator node and multiple participant nodes, and all nodes can communicate with each other normally.
  2. All nodes use the pre-write log mode, and logs can be stored reliably.
  3. All nodes are not permanently damaged, allowing for recoverable temporary damage.

Principle 2.

Two-phase commit, as the name suggests, is a two-phase COMMIT: preCommit and COMMIT.

Take one Coordinator and three Participants as an example. The detailed principle is as follows:

PreCommit phase

  1. The coordinator initiates a request to all participants, asks if the submit operation can be performed, and begins to wait for a response from all participants.
  2. All participant nodes perform all transaction operations up to the time the coordinator asks for them and write undo and redo information to the log for persistence.
  3. All participants respond to queries initiated by the coordinator. For each participant node, a “agree” message is returned if his transaction operation is successful; Otherwise, an “terminate” message is returned.

The commit phase

  1. If the message returned by all the participant nodes obtained by the coordinator is “agree”, the coordinator sends the request of “formal submission” to all the participant nodes (success); On the other hand, if the response message returned by any of the participant nodes in the pre-submission phase is “terminate”, or the coordinator’s inquiry phase times out, so that no response from all the participant nodes is received, then the coordinator sends a request of “rollback submission” to all the participant nodes (failure case).
  2. Success All participant nodes officially complete the operation and release the resources occupied during the entire transaction; Conversely, in the case of failure, all participant nodes use the previously persistent write-ahead log to roll back the transaction and release the resources consumed during the entire transaction.
  3. In the successful case, all participant nodes send a “transaction completed” message to the coordinator node. In the case of failure, all participant nodes send a “Rollback completed” message to the coordinator node.
  4. In the successful case, the coordinator receives the “transaction completion” message feedback from all participant nodes and completes the transaction. In the case of failure, the coordinator receives a “rollback completed” message from all participant nodes and cancellations the transaction.

Three, Flink 2PC source code

Flink 1.12.4 version, TwpPhaseCommintSinkFunction class, official tip in order to realize the two-phase commit protocol, to achieve the following methods according to the actual situation in subclasses

      @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        // this is like the pre-commit of a 2-phase-commit transaction
        // we are ready to commit and remember the transactioncheckState( currentTransactionHolder ! =null."bug: no transaction object when performing state snapshot");

        long checkpointId = context.getCheckpointId();
        LOG.debug(
                "{} - checkpoint {} triggered, flushing transaction '{}'",
                name(),
                context.getCheckpointId(),
                currentTransactionHolder);

        preCommit(currentTransactionHolder.handle);
        pendingCommitTransactions.put(checkpointId, currentTransactionHolder);
        LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions);

        currentTransactionHolder = beginTransactionInternal();
        LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);

        state.clear();
        state.add(
                new State<>(
                        this.currentTransactionHolder,
                        new ArrayList<>(pendingCommitTransactions.values()),
                        userContext));
    }




      /** This method is called to write values within a transaction. * /
    protected abstract void invoke(TXN transaction, IN value, Context context) throws Exception;

    /** Call this method to start a new transaction. * /
    protected abstract TXN beginTransaction(a) throws Exception;

    /** * Call this method to withhold the transaction created in the previous step. * Note: Pre-commit must perform all necessary steps to prepare the transaction for a possible future commit. The transaction may still abort after that, but the underlying implementation must ensure that the commit call to the pre-committed transaction always works. * 

Usually implementation involves flushing the data. */

protected abstract void preCommit(TXN transaction) throws Exception; /** * Call this method to formally commit the pre-committed transaction. * Note: If the method fails (i.e. the transaction fails to commit), the Flink application is restarted and recoverAndCommit is called to recommit the transaction */ protected abstract void commit(TXN transaction); /** * This method is called to resume the transaction commit operation after the execution fails. * Note: A custom implementation must ensure that a call to this method will eventually be executed successfully. If the call still fails, the Flink application is restarted and the call is executed again; If repeated execution fails (the user restarts the system for a certain number of times), data will be lost. In addition, transactions are executed in the same order as they were created. * / protected void recoverAndCommit(TXN transaction) { commit(transaction); } /** Call this method to cancel the transaction protected abstract void abort(TXN transaction); /* cancel the transaction rejected by the coordinator */ protected void recoverAndAbort(TXN transaction) { abort(transaction); } /** * a callback function called by a subclass after the user context is restored to handle transactions that have been committed or canceled and will no longer be processed. * / protected void finishRecoveringContext(Collection<TXN> handledTransactions) {} Copy the code

TwpPhaseCommintSinkFunction is an abstract class implements CheckpointedFunction interface and CheckpointListener interface.

The method of implementing the CheckpointedFunction interface is as follows:

      @Override
      /** * This method is defined in the parent class so that it can be called when a snapshot of a checkpoint needs to be requested. * Methods in subclasses are transactionally coupled to checkpoint operations: * 1. Verify the transaction state. The transaction cannot be null when called. * 2. Pre-commit the current transaction and add it to the list of transactions to be committed to prepare for the subsequent formal commit. * 3. Start a new transaction. * 4. Clear the list that stores the transaction state, and then record the current transaction to commit. * /
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        // this is like the pre-commit of a 2-phase-commit transaction
        // we are ready to commit and remember the transaction

       /** 1. Check the transaction status. If the transaction object is empty, throw an exception */checkState( currentTransactionHolder ! =null."bug: no transaction object when performing state snapshot");

        long checkpointId = context.getCheckpointId();
        LOG.debug(
                "{} - checkpoint {} triggered, flushing transaction '{}'",
                name(),
                context.getCheckpointId(),
                currentTransactionHolder);
            /** 2.1 preCommit the current transaction --TODO will eventually invoke the logic of preCommit in the implementation class */
        preCommit(currentTransactionHolder.handle);
       /** 2.2 Record the current pre-committed transaction to the list of transactions to commit */
        pendingCommitTransactions.put(checkpointId, currentTransactionHolder);
        LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions);
            /** 3. Start a new transaction */
        currentTransactionHolder = beginTransactionInternal();
        LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);
            /** 4. Clear the list of transactions and record the current pre-committed transactions */
        state.clear();
        state.add(
                new State<>(
                        this.currentTransactionHolder,
                        new ArrayList<>(pendingCommitTransactions.values()),
                        userContext));
    }


      /** * Calls this method to initialize the checkpoint state. * The initialization method in the subclass performs related coupling operations on the transaction: * 1. Get the state list of checkpoints * 2. Loop state list, get the committed to commit transactions, and record the transaction to commit. * 3. Terminate uncommitted transactions and record uncommitted transactions. * 4. Start a new transaction */
      @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
           /** 1. Obtain the status list of checkpoints */
        state = context.getOperatorStateStore().getListState(stateDescriptor);

        boolean recoveredUserContext = false;
        if (context.isRestored()) {
            LOG.info("{} - restoring state", name());
           /** 2. Loop state list, get the transaction to commit, and record the transaction to commit */
            for (State<TXN, CONTEXT> operatorState : state.get()) {
               // Get the user context
                userContext = operatorState.getContext();
               // Get the transaction to commit
                List<TransactionHolder<TXN>> recoveredTransactions =
                        operatorState.getPendingCommitTransactions();
                List<TXN> handledTransactions = new ArrayList<>(recoveredTransactions.size() + 1);
                for (TransactionHolder<TXN> recoveredTransaction : recoveredTransactions) {
                    // If this fails to succeed eventually, there is actually data loss
                    // Restore and commit the transaction
                   recoverAndCommitInternal(recoveredTransaction);
                   // Record the transaction to be committed
                    handledTransactions.add(recoveredTransaction.handle);
                    LOG.info("{} committed recovered transaction {}", name(), recoveredTransaction);
                }
                        /** 3. Terminate uncommitted transactions and record uncommitted transactions */
                {
                    TXN transaction = operatorState.getPendingTransaction().handle;
                    recoverAndAbort(transaction);
                    handledTransactions.add(transaction);
                    LOG.info(
                            "{} aborted recovered transaction {}",
                            name(),
                            operatorState.getPendingTransaction());
                }

               /** Reclaim user context configuration */
                if (userContext.isPresent()) {
                    finishRecoveringContext(handledTransactions);
                    recoveredUserContext = true; }}}// if in restore we didn't get any userContext or we are initializing from scratch
       // If the user context is not obtained during recovery, context initialization is performed
        if(! recoveredUserContext) { LOG.info("{} - no state to restore", name());

            userContext = initializeUserContext();
        }
       // Case list of transactions to commit
        this.pendingCommitTransactions.clear();

       /** 4. Start new business */
        currentTransactionHolder = beginTransactionInternal();
        LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);
    }

Copy the code

The method of implementing the CheckpointListener interface is as follows:

      /** * checkPoint is called to commit the pre-committed transaction. * /
      @Override
    public final void notifyCheckpointComplete(long checkpointId) throws Exception {
       
       /** 1. Obtain the list of all transactions to be committed */ 
        Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator =
                pendingCommitTransactions.entrySet().iterator();
        Throwable firstError = null;
            /** 2. Loop to commit transactions in the list of transactions to commit */
        while (pendingTransactionIterator.hasNext()) {
            Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();
            Long pendingTransactionCheckpointId = entry.getKey();
            TransactionHolder<TXN> pendingTransaction = entry.getValue();
            // Only commit transactions earlier than checkpointId
           if (pendingTransactionCheckpointId > checkpointId) {
                continue;
            }

            LOG.info(
                    "{} - checkpoint {} complete, committing transaction {} from checkpoint {}",
                    name(),
                    checkpointId,
                    pendingTransaction,
                    pendingTransactionCheckpointId);
                  // Timeout warning
            logWarningIfTimeoutAlmostReached(pendingTransaction);
            try {
               // Phase 2 commits the transaction
                commit(pendingTransaction.handle);
            } catch (Throwable t) {
                if (firstError == null) {
                    firstError = t;
                }
            }

            LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction);
                  // Remove committed transactions from the list of pending transactions
            pendingTransactionIterator.remove();
        }

        if(firstError ! =null) {
            throw new FlinkRuntimeException(
                    "Committing one of transactions failed, logging first encountered failure", firstError); }}@Override
    public void notifyCheckpointAborted(long checkpointId) {}
Copy the code

Four,

By means of the Flink CheckPoint mechanism and the 2 PC protocol, to Sink end users as long as the custom implementation TwoPhaseCommitSinkFunction can avoid external system disrupted Flink EOS of ecological existence.

Copy the code