One, foreword
- According to The definition of Wikipedia, two-phase Commit (2PC for short) is an algorithm designed by giants to solve the consistency problem of all nodes in a distributed system architecture when committing transactions. It can also be called a protocol.
- In the Flink version 1.4, the community public logic is extracted from the two-phase commit protocol and encapsulation, released for user-defined implement specific methods to achieve Flink EOS TwoPhaseCommitSinkFunction characteristics. This article is based on Flink 1.12.4, and we read the source code of Flink two-phase submission.
2. Introduction to 2PC
Definition 1.
According to the definition of Wikipedia, two-phase submission can be summarized as one purpose, two roles and three conditions (that is, two important roles can achieve one final purpose under three valid conditions), specifically as follows:
An end goal
All nodes in a distributed system architecture must be consistent when committing transactions (that is, all will succeed or all will fail).
Two important roles
- A Coordinator is responsible for coordinating and issuing orders
- Participants are responsible for working hard and responding to the coordinators’ orders.
Three conditions
- In a distributed system, there must be a coordinator node and multiple participant nodes, and all nodes can communicate with each other normally.
- All nodes use the pre-write log mode, and logs can be stored reliably.
- All nodes are not permanently damaged, allowing for recoverable temporary damage.
Principle 2.
Two-phase commit, as the name suggests, is a two-phase COMMIT: preCommit and COMMIT.
Take one Coordinator and three Participants as an example. The detailed principle is as follows:
PreCommit phase
- The coordinator initiates a request to all participants, asks if the submit operation can be performed, and begins to wait for a response from all participants.
- All participant nodes perform all transaction operations up to the time the coordinator asks for them and write undo and redo information to the log for persistence.
- All participants respond to queries initiated by the coordinator. For each participant node, a “agree” message is returned if his transaction operation is successful; Otherwise, an “terminate” message is returned.
The commit phase
- If the message returned by all the participant nodes obtained by the coordinator is “agree”, the coordinator sends the request of “formal submission” to all the participant nodes (success); On the other hand, if the response message returned by any of the participant nodes in the pre-submission phase is “terminate”, or the coordinator’s inquiry phase times out, so that no response from all the participant nodes is received, then the coordinator sends a request of “rollback submission” to all the participant nodes (failure case).
- Success All participant nodes officially complete the operation and release the resources occupied during the entire transaction; Conversely, in the case of failure, all participant nodes use the previously persistent write-ahead log to roll back the transaction and release the resources consumed during the entire transaction.
- In the successful case, all participant nodes send a “transaction completed” message to the coordinator node. In the case of failure, all participant nodes send a “Rollback completed” message to the coordinator node.
- In the successful case, the coordinator receives the “transaction completion” message feedback from all participant nodes and completes the transaction. In the case of failure, the coordinator receives a “rollback completed” message from all participant nodes and cancellations the transaction.
Three, Flink 2PC source code
Flink 1.12.4 version, TwpPhaseCommintSinkFunction class, official tip in order to realize the two-phase commit protocol, to achieve the following methods according to the actual situation in subclasses
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// this is like the pre-commit of a 2-phase-commit transaction
// we are ready to commit and remember the transactioncheckState( currentTransactionHolder ! =null."bug: no transaction object when performing state snapshot");
long checkpointId = context.getCheckpointId();
LOG.debug(
"{} - checkpoint {} triggered, flushing transaction '{}'",
name(),
context.getCheckpointId(),
currentTransactionHolder);
preCommit(currentTransactionHolder.handle);
pendingCommitTransactions.put(checkpointId, currentTransactionHolder);
LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions);
currentTransactionHolder = beginTransactionInternal();
LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);
state.clear();
state.add(
new State<>(
this.currentTransactionHolder,
new ArrayList<>(pendingCommitTransactions.values()),
userContext));
}
/** This method is called to write values within a transaction. * /
protected abstract void invoke(TXN transaction, IN value, Context context) throws Exception;
/** Call this method to start a new transaction. * /
protected abstract TXN beginTransaction(a) throws Exception;
/** * Call this method to withhold the transaction created in the previous step. * Note: Pre-commit must perform all necessary steps to prepare the transaction for a possible future commit. The transaction may still abort after that, but the underlying implementation must ensure that the commit call to the pre-committed transaction always works. * Usually implementation involves flushing the data. */
protected abstract void preCommit(TXN transaction) throws Exception;
/** * Call this method to formally commit the pre-committed transaction. * Note: If the method fails (i.e. the transaction fails to commit), the Flink application is restarted and recoverAndCommit is called to recommit the transaction */
protected abstract void commit(TXN transaction);
/** * This method is called to resume the transaction commit operation after the execution fails. * Note: A custom implementation must ensure that a call to this method will eventually be executed successfully. If the call still fails, the Flink application is restarted and the call is executed again; If repeated execution fails (the user restarts the system for a certain number of times), data will be lost. In addition, transactions are executed in the same order as they were created. * /
protected void recoverAndCommit(TXN transaction) {
commit(transaction);
}
/** Call this method to cancel the transaction
protected abstract void abort(TXN transaction);
/* cancel the transaction rejected by the coordinator */
protected void recoverAndAbort(TXN transaction) {
abort(transaction);
}
/** * a callback function called by a subclass after the user context is restored to handle transactions that have been committed or canceled and will no longer be processed. * /
protected void finishRecoveringContext(Collection<TXN> handledTransactions) {}
Copy the code
TwpPhaseCommintSinkFunction is an abstract class implements CheckpointedFunction interface and CheckpointListener interface.
The method of implementing the CheckpointedFunction interface is as follows:
@Override
/** * This method is defined in the parent class so that it can be called when a snapshot of a checkpoint needs to be requested. * Methods in subclasses are transactionally coupled to checkpoint operations: * 1. Verify the transaction state. The transaction cannot be null when called. * 2. Pre-commit the current transaction and add it to the list of transactions to be committed to prepare for the subsequent formal commit. * 3. Start a new transaction. * 4. Clear the list that stores the transaction state, and then record the current transaction to commit. * /
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// this is like the pre-commit of a 2-phase-commit transaction
// we are ready to commit and remember the transaction
/** 1. Check the transaction status. If the transaction object is empty, throw an exception */checkState( currentTransactionHolder ! =null."bug: no transaction object when performing state snapshot");
long checkpointId = context.getCheckpointId();
LOG.debug(
"{} - checkpoint {} triggered, flushing transaction '{}'",
name(),
context.getCheckpointId(),
currentTransactionHolder);
/** 2.1 preCommit the current transaction --TODO will eventually invoke the logic of preCommit in the implementation class */
preCommit(currentTransactionHolder.handle);
/** 2.2 Record the current pre-committed transaction to the list of transactions to commit */
pendingCommitTransactions.put(checkpointId, currentTransactionHolder);
LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions);
/** 3. Start a new transaction */
currentTransactionHolder = beginTransactionInternal();
LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);
/** 4. Clear the list of transactions and record the current pre-committed transactions */
state.clear();
state.add(
new State<>(
this.currentTransactionHolder,
new ArrayList<>(pendingCommitTransactions.values()),
userContext));
}
/** * Calls this method to initialize the checkpoint state. * The initialization method in the subclass performs related coupling operations on the transaction: * 1. Get the state list of checkpoints * 2. Loop state list, get the committed to commit transactions, and record the transaction to commit. * 3. Terminate uncommitted transactions and record uncommitted transactions. * 4. Start a new transaction */
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
/** 1. Obtain the status list of checkpoints */
state = context.getOperatorStateStore().getListState(stateDescriptor);
boolean recoveredUserContext = false;
if (context.isRestored()) {
LOG.info("{} - restoring state", name());
/** 2. Loop state list, get the transaction to commit, and record the transaction to commit */
for (State<TXN, CONTEXT> operatorState : state.get()) {
// Get the user context
userContext = operatorState.getContext();
// Get the transaction to commit
List<TransactionHolder<TXN>> recoveredTransactions =
operatorState.getPendingCommitTransactions();
List<TXN> handledTransactions = new ArrayList<>(recoveredTransactions.size() + 1);
for (TransactionHolder<TXN> recoveredTransaction : recoveredTransactions) {
// If this fails to succeed eventually, there is actually data loss
// Restore and commit the transaction
recoverAndCommitInternal(recoveredTransaction);
// Record the transaction to be committed
handledTransactions.add(recoveredTransaction.handle);
LOG.info("{} committed recovered transaction {}", name(), recoveredTransaction);
}
/** 3. Terminate uncommitted transactions and record uncommitted transactions */
{
TXN transaction = operatorState.getPendingTransaction().handle;
recoverAndAbort(transaction);
handledTransactions.add(transaction);
LOG.info(
"{} aborted recovered transaction {}",
name(),
operatorState.getPendingTransaction());
}
/** Reclaim user context configuration */
if (userContext.isPresent()) {
finishRecoveringContext(handledTransactions);
recoveredUserContext = true; }}}// if in restore we didn't get any userContext or we are initializing from scratch
// If the user context is not obtained during recovery, context initialization is performed
if(! recoveredUserContext) { LOG.info("{} - no state to restore", name());
userContext = initializeUserContext();
}
// Case list of transactions to commit
this.pendingCommitTransactions.clear();
/** 4. Start new business */
currentTransactionHolder = beginTransactionInternal();
LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);
}
Copy the code
The method of implementing the CheckpointListener interface is as follows:
/** * checkPoint is called to commit the pre-committed transaction. * /
@Override
public final void notifyCheckpointComplete(long checkpointId) throws Exception {
/** 1. Obtain the list of all transactions to be committed */
Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator =
pendingCommitTransactions.entrySet().iterator();
Throwable firstError = null;
/** 2. Loop to commit transactions in the list of transactions to commit */
while (pendingTransactionIterator.hasNext()) {
Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();
Long pendingTransactionCheckpointId = entry.getKey();
TransactionHolder<TXN> pendingTransaction = entry.getValue();
// Only commit transactions earlier than checkpointId
if (pendingTransactionCheckpointId > checkpointId) {
continue;
}
LOG.info(
"{} - checkpoint {} complete, committing transaction {} from checkpoint {}",
name(),
checkpointId,
pendingTransaction,
pendingTransactionCheckpointId);
// Timeout warning
logWarningIfTimeoutAlmostReached(pendingTransaction);
try {
// Phase 2 commits the transaction
commit(pendingTransaction.handle);
} catch (Throwable t) {
if (firstError == null) {
firstError = t;
}
}
LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction);
// Remove committed transactions from the list of pending transactions
pendingTransactionIterator.remove();
}
if(firstError ! =null) {
throw new FlinkRuntimeException(
"Committing one of transactions failed, logging first encountered failure", firstError); }}@Override
public void notifyCheckpointAborted(long checkpointId) {}
Copy the code
Four,
By means of the Flink CheckPoint mechanism and the 2 PC protocol, to Sink end users as long as the custom implementation TwoPhaseCommitSinkFunction can avoid external system disrupted Flink EOS of ecological existence.
Copy the code