Abstract: When it comes to distributed algorithms, we have to mention Paxos algorithm. In the past decades, it is basically the term of distributed consensus, because the most commonly used consensus algorithms are based on it. Examples include Fast Paxos, Cheap Paxos, Raft, ZAB, etc.

This article is shared by Leo Xiao, the author of “Practical Distributed System – Python Implementation of Paxos”.

Consistency algorithm background: Paxos

  1. Consistency algorithm solves the following problems: Data cannot be stored on a single node (host) in a distributed system; otherwise, a single point of failure may occur. Ensure that multiple nodes (hosts) have the same data.

  2. What is consistency? Consistency is the consistency of data. In a distributed system, it can be understood as the consistency of data values across multiple nodes.

  3. Consistency model classification: generally divided into strong consistency and weak consistency. Strong consistency ensures that the system changes the cluster status immediately after submission. Common models include: Paxos, Raft (Muti-PaxOS), ZAB (Muti-Paxos); Weak consistency is also called final consistency. The system does not guarantee that the state of the cluster will change immediately after the change is committed, but the final state will be consistent over time. Common models include: DNS system, Gossip protocol

  4. Consistency algorithm use case: Google Chubby distributed lock service, using Paxos algorithm; Etcd distributed key value database using Raft algorithm; ZooKeeper distributed application coordination service and Chubby open source implementation, using ZAB algorithm

  • Simple-paxos is not in itself practical to achieve consistency over a single static value, and the clustered system we need to implement (bank account service) wants to agree on a specific state (account balance) that changes over time. So you need to use Paxos to agree on each action, treating each change as a state machine transition.

  • Multi-paxos is actually a sequence of Simple Paxos instances (slots), each of which is numbered sequentially. Each state transition is assigned a “slot number,” and each member of the cluster performs the transition in a strict numerical order. To change the state of the cluster (for example, to process a transport operation), we try to agree on that operation in the next slot. Specifically, this means adding a slot number to each message and tracking all protocol status on a per-slot basis.

  • Running Paxos for each slot at least two round trips would be too slow. Multi-paxos optimizes by using the same set of vote numbers for all slots and executing Prepare/Promise for all slots at the same time.

    Client Proposer Acceptor Learner | | | | | | | — First Request — X——–>| | | | | | Request | X———>|->|->| | | Prepare(N) | |<———X–X–X | | Promise(N,I,{Va,Vb,Vc}) | X———>|->|->| | | Accept! (N,I,V) | |<———X–X–X——>|->| Accepted(N,I,V) |<———————————X–X Response | | | | | | |

Paxos implementation

Implementing multi-PaxOS in Practical software is notoriously difficult, spawning many papers such as “Paxos Made Simple”, “Paxos Made Practical”

  • First, multi-poposer can be a problem in busy environments, as each cluster member tries to determine its state machine operations in each slot. The solution is to elect a “leader”, responsible for submitting votes for each session. All other cluster nodes send new actions to the leader for execution. Thus, in normal operations with only one leader, there would be no voting conflict.

The Prepare/Promise phase can be used as a leader election: whichever cluster member has the most recently promised vote number is considered the leader. The leader is then free to execute the Accept/Accepted phase directly, without repeating the first phase. As we’ll see below, the Leader election is actually quite complex.

While Simple Paxos guarantees that the cluster will not reach conflicting decisions, it does not guarantee that any decisions will be made. For example, if the initial Prepare message is lost and does not reach the recipient, the proposer will wait for a Promise message that never arrives. Fixing this requires a well-designed retransmission: enough to make progress eventually, but not enough to cluster into a packet storm.

  • Another problem is the dissemination of decisions. Under normal circumstances, simply broadcastDecisionInformation can solve this problem. However, if the message is lost, the node may never be aware of the decision and cannot apply state machine transitions for future slots. So the implementation needs some mechanism to share information about the decided proposal.

Using distributed state machines presents another challenge: when a new node is started, it needs to capture the existing state of the cluster.

While this can be done by making decisions to catch up with all slots since the first, in a large cluster this can involve millions of slots. In addition, we need some way to initialize a new cluster.

Cluster Library Introduction

With all the theory behind it, let’s use Python to implement a simplified Multi-PaxOS

Business scenarios and pain points

Let’s take the scenario of a simple bank account management service as an example. In this service, each account has a current balance and each account has its own account number. Users can “deposit”, “transfer”, “query the current balance” and other operations on the account. The “transfer” operation involves two accounts simultaneously: the transfer out account and the transfer in account. If the account balance is insufficient, the transfer operation must be rejected.

  • If the service is deployed on only one server, it is easy to implement: use an action lock to ensure that “transfer” operations are not performed simultaneously, and verify outgoing accounts. However, banks cannot rely on just one server to store critical information such as account balances. These services are often distributed across multiple servers, each running instances of the same code. Users can access their accounts from either server.

  • In a simple implementation of a distributed processing system, each server keeps a copy of the account balance. It handles any incoming operations and sends updates of account balances to other servers. But there is a serious problem with this approach: if two servers operate on the same account at the same time, which new account balance is correct? Even if the server does not share balances but shares operations, simultaneous transfers to an account can result in an overdraft.

  • Fundamentally, these errors occur when servers use their local state to respond to operations, rather than ensuring that local state matches other servers in the first place. For example, imagine server A receives an order to transfer money from account 101 to account 202, while server B has already processed another request to transfer money from account 101 to account 202 without notifying server A. In this case, the local status of server A is different from that of server B. Even if account 101 is overdrawn, server A still allows transfer from account 101.

Distributed state machine

To prevent this from happening we use a tool called a distributed state machine. The idea is that each server runs the same corresponding state machine for each same input. Due to the nature of the state machine, the output from each server is the same for the same input. For operations such as “transfer” and “query current balance”, the account number and balance are also inputs to the state machine.

The application’s state machine is relatively simple:

 def execute_operation(state, operation):     if operation.name == 'deposit':         if not verify_signature(operation.deposit_signature):         return state, False         state.accounts[operation.destination_account] += operation.amount         return state, True     elif operation.name == 'transfer':         if state.accounts[operation.source_account] < operation.amount:             return state, False             state.accounts[operation.source_account] -= operation.amount         state.accounts[operation.destination_account] += operation.amount         return state, True     elif operation.name == 'get-balance':     return state, state.accounts[operation.account]
Copy the code

It is important to note that running the Query current balance operation does not change the current state, but it is implemented as a state change operation. This ensures that the returned balance is up to date in the distributed system and is not returned based on local state on a server.

This might not look like the typical state machine you learn about in your computer science class. Whereas a traditional state machine is a set of finite states, each corresponding to the transfer behavior of a token, in this paper the state of the state machine is a set of account balances, so there are an infinite number of possible states. However, the basic rule of the state machine also applies to the state machine in this article: For the same initial state, the same input always has the same output.

Therefore, the distributed state machine ensures that each host will have the same response for the same operation. However, in order to ensure that each server allows state machine input, the problems mentioned above still exist. This is a consistency problem, to solve it we use a derived Paxos algorithm.

Core requirements

Consistency services can be provided for larger applications: we use a _Cluster_ library to implement simplified Multi-PaxOS

Correctness is the library’s most important capability, so it is important to structure the code so that we can see and test its correspondence to the specification.

Complex protocols can have complex failures, so we’ll build support to reproduce and debug uncommon failures.

We’ll implement the POC code: enough to prove that the core concepts are practical, and the code is structured so that adding this functionality later will change the core implementation to a minimum

Let’s start coding.

Types and constants

The protocols in the cluster require 15 different message types, each defined by namedturple in the Collection:

    Accepted = namedtuple('Accepted', ['slot', 'ballot_num'])    Accept = namedtuple('Accept', ['slot', 'ballot_num', 'proposal'])    Decision = namedtuple('Decision', ['slot', 'proposal'])    Invoked = namedtuple('Invoked', ['client_id', 'output'])    Invoke = namedtuple('Invoke', ['caller', 'client_id', 'input_value'])    Join = namedtuple('Join', [])    Active = namedtuple('Active', [])    Prepare = namedtuple('Prepare', ['ballot_num'])    Promise = namedtuple('Promise', ['ballot_num', 'accepted_proposals'])    Propose = namedtuple('Propose', ['slot', 'proposal'])    Welcome = namedtuple('Welcome', ['state', 'slot', 'decisions'])    Decided = namedtuple('Decided', ['slot'])    Preempted = namedtuple('Preempted', ['slot', 'preempted_by'])    Adopted = namedtuple('Adopted', ['ballot_num', 'accepted_proposals'])    Accepting = namedtuple('Accepting', ['leader'])
Copy the code

Using named tuples to describe each message type keeps the code clean and helps avoid simple errors. If the named tuple constructor is not given the correct attributes, it throws an exception that makes the error obvious. Tuples k are well formatted in log messages and do not use as much memory as dictionaries.

Create a message:

    msg = Accepted(slot=10, ballot_num=30)
Copy the code

Access message:

    got_ballot_num = msg.ballot_num
Copy the code

We’ll learn what these messages mean later.

The code also introduces some constants, most of which define timeouts for various messages:

JOIN_RETRANSMIT = 0.7 CATCHUP_INTERVAL = 0.6 ACCEPT_RETRANSMIT = 1.0 PREPARE_RETRANSMIT = 1.0 INVOKE_RETRANSMIT = 0.5 LEADER_TIMEOUT = 1.0 NULL_BALLOT = Ballot(-1, -1) # psorts before all real ballots NOOP_PROPOSAL = Proposal(None, None, None) # no-op to fill otherwise empty slotsCopy the code

Finally, we need to define the Proposal and Ballot in the protocol

    Proposal = namedtuple('Proposal', ['caller', 'client_id', 'input'])    Ballot = namedtuple('Ballot', ['n', 'leader'])
Copy the code

A component model

The core components of multi-PaxOS include Roles and Nodes.

  • To ensure testability and keep the code readable, we split the Cluster into classes that correspond to the roles described in the protocol. Each is a subclass of Role.

    class Role(object): def init(self, node): self.node = node self.node.register(self) self.running = True self.logger = node.logger.getChild(type(self).name) def set_timer(self, seconds, callback): return self.node.network.set_timer(self.node.address, seconds, lambda: self.running and callback()) def stop(self): self.running = False self.node.unregister(self)

The roles of cluster nodes are glued together by the Node class, which represents individual nodes on the network. Roles are added to and removed from nodes during the program.

Messages arriving at the node are relayed to all active roles, invoking methods named after the message type and prefixed with do_. These DO_ methods receive the attributes of the message as keyword arguments for easy access. The Node class also provides the send method as a convenience, using functools.partial to provide some parameters to the same method of the Network class.

class Node(object): unique_ids = itertools.count() def __init__(self, network, address): self.network = network self.address = address or 'N%d' % self.unique_ids.next() self.logger = SimTimeLogger( logging.getLogger(self.address), {'network': self.network}) self.logger.info('starting') self.roles = [] self.send = functools.partial(self.network.send, self) def register(self, roles): self.roles.append(roles) def unregister(self, roles): self.roles.remove(roles) def receive(self, sender, message): handler_name = 'do_%s' % type(message).__name__ for comp in self.roles[:]: if not hasattr(comp, handler_name): continue comp.logger.debug("received %s from %s", message, sender) fn = getattr(comp, handler_name) fn(sender=sender, **message._asdict())Copy the code

Application interface

A Member object is created and started on each cluster Member, providing an application-specific state machine and a list of peers. The member object adds the Bootstrap role to the node if it is joining an existing cluster, and the seed to the member object if a new cluster is being created. Use network.run to run the protocol in a separate thread.

The application interacts with the cluster through the Invoke method to initiate a state transition, and once the proposal is identified and the state machine is run, the Invoke returns the output of the state machine. This method uses a simple synchronization Queue to wait for the result of the protocol thread.

class Member(object): def __init__(self, state_machine, network, peers, seed=None, seed_cls=Seed, bootstrap_cls=Bootstrap): self.network = network self.node = network.new_node() if seed is not None: self.startup_role = seed_cls(self.node, initial_state=seed, peers=peers, execute_fn=state_machine) else: self.startup_role = bootstrap_cls(self.node, execute_fn=state_machine, peers=peers) self.requester = None def start(self): self.startup_role.start() self.thread = threading.Thread(target=self.network.run) self.thread.start() def invoke(self, input_value, request_cls=Requester): assert self.requester is None q = Queue.Queue() self.requester = request_cls(self.node, input_value, q.put) self.requester.start() output = q.get() self.requester = None return outputCopy the code

Role class

Roles in Paxos include client, acceptor, proposer, Learner, and leader. In a typical implementation, a single processor can play one or more roles simultaneously. This does not affect the correctness of the protocol, and roles are usually combined to improve latency and/or message volume in the protocol.

The design of Role includes the following subclasses:

Acceptor – promises and accepting proposals

Replica – Manages distributed state machines: submit proposals, submit Decisions, and respond to requesters

Leader-lead rounds of the multi-PaxOS algorithm

Scout – Performs both Prepare and Promise for the leader in the multi-PaxOS

Commander – Execute Accept or Accepted for the leader in the multi-PaxOS

Bootstrap – Adds a new node to the cluster

Seed – Creates a new cluster

Requester – Requests distributed state machine operations

Let’s implement each role class one by one

Acceptor

The Acceptor class implements the Acceptor role in Paxos, so it must store the last promise vote number and the proposals for each slot accepted for each period. Prepare and Accept messages are required accordingly. The POC implementation is a short class that corresponds directly to the protocol. To acceptors, Multi-PaxOS looks like simple PaxOS with slot numbers added to the message.

class Acceptor(Role): def __init__(self, node): super(Acceptor, self).__init__(node) self.ballot_num = NULL_BALLOT self.accepted_proposals = {} # {slot: (ballot_num, proposal)} def do_Prepare(self, sender, ballot_num): if ballot_num > self.ballot_num: self.ballot_num = ballot_num # we've heard from a scout, so it might be the next leader self.node.send([self.node.address], Accepting(leader=sender)) self.node.send([sender], Promise( ballot_num=self.ballot_num, accepted_proposals=self.accepted_proposals )) def do_Accept(self, sender, ballot_num, slot, proposal): if ballot_num >= self.ballot_num: self.ballot_num = ballot_num acc = self.accepted_proposals if slot not in acc or acc[slot][0] < ballot_num: acc[slot] = (ballot_num, proposal) self.node.send([sender], Accepted( slot=slot, ballot_num=self.ballot_num))Copy the code

Replica

Replica is the most complex subclass of Role, corresponding to the Learner and Proposal roles in the protocol. Its main responsibilities are as follows: propose a new Proposal; Call the local state machine when deciding on the proposal; Trace the current Leader; And add the newly started node to the cluster.

Replica creates a new proposal in response to the ‘Invoke’ message from the client, selects the slot it considers unused and sends a ‘Propose’ message to the current leader. If the consensus of the selected slots is for different proposals, replica must use the new slot Re-propose.

The figure below shows the Replica role control process:

Requester    Local Rep   Current Leader   X---------->|             |    Invoke   |           X------------>|    Propose   |           |<------------X    Decision   |<----------X             |    Decision   |           |             |  
Copy the code

The Decision message represents the slot on which the cluster has agreed, and the Replica class stores the new Decision and runs the state machine until the undetermined slot is reached. The Replica identifies the decided slot that the cluster has agreed to from the submitted slot processed by the local state machine. If slots are out of order, submitted proposals may be delayed, waiting for the next slot to be decided. After the slot is submitted, each replica sends the operation result back to the Invoked message to the requester.

In some cases there may be no valid proposal or decision for the slot, requiring the state machine to execute the slot one by one, so the cluster must agree on what to populate the slot. To avoid this possibility, Replica makes a “no-op” proposal when it encounters a slot. If such a proposal is ultimately decided upon, the state machine does nothing on the slot.

Similarly, the same proposal may be decided twice. For any such repeated proposal, Replica will skip the call to the state machine without performing any state transitions to the slot.

Replicas needs to know which node is the active leader to send a Veto message to it. To achieve this goal, each Replicas uses three information sources to track the active Leader.

When the leader role switches to active, it sends a Adopted message to a copy on the same node (figure below) :

Leader    Local Repplica      X----------->|          Admopted
Copy the code

When an Acceptor role sends a message to a new Promise leader, it sends the message to its local copy (figure below).

Acceptor    Local Repplica      X----------->|          Accepting
Copy the code

The Active leader sends active messages in the form of a heartbeat. If no such message arrives before the LEADER_TIMEOUT expires, Replica assumes the Leader is dead and passes to the next Leader. In this case, it is important that all replicas choose the same new leader, which we can do by sorting the members and selecting the next leader in the list.

When the node joins the network, Bootstrap sends a Join message (figure below). Replica responds with a Welcome message containing its latest status so that new nodes can be quickly started.

BootStrap Replica Replica Replica X---------->| | | Join |<----------X X | Welcome X------------------------>| | Join |<------------------------X | Welcome X-------------------------------------->| Join |<--------------------------------------X Welcome class Replica(Role): def __init__(self, node, execute_fn, state, slot, decisions, peers): super(Replica, self).__init__(node) self.execute_fn = execute_fn self.state = state self.slot = slot self.decisions = decisions self.peers = peers self.proposals = {} # next slot num for a proposal (may lead slot) self.next_slot = slot self.latest_leader = None self.latest_leader_timeout = None # making proposals def do_Invoke(self, sender, caller, client_id, input_value): proposal = Proposal(caller, client_id, input_value) slot = next((s for s, p in self.proposals.iteritems() if p == proposal), None) # propose, or re-propose if this proposal already has a slot self.propose(proposal, slot) def propose(self, proposal, slot=None): """Send (or resend, if slot is specified) a proposal to the leader""" if not slot: slot, self.next_slot = self.next_slot, self.next_slot + 1 self.proposals[slot] = proposal # find a leader we think is working - either the latest we know of, or # ourselves (which may trigger a scout to make us the leader) leader = self.latest_leader or self.node.address self.logger.info( "proposing %s at slot %d to leader %s" % (proposal, slot, leader)) self.node.send([leader], Propose(slot=slot, proposal=proposal)) # handling decided proposals def do_Decision(self, sender, slot, proposal): assert not self.decisions.get(self.slot, None), \ "next slot to commit is already decided" if slot in self.decisions: assert self.decisions[slot] == proposal, \ "slot %d already decided with %r!" % (slot, self.decisions[slot]) return self.decisions[slot] = proposal self.next_slot = max(self.next_slot, slot + 1) # re-propose our proposal in a new slot if it lost its slot and wasn't a no-op our_proposal = self.proposals.get(slot) if (our_proposal is not None and our_proposal ! = proposal and our_proposal.caller): self.propose(our_proposal) # execute any pending, decided proposals while True: commit_proposal = self.decisions.get(self.slot) if not commit_proposal: break # not decided yet commit_slot, self.slot = self.slot, self.slot + 1 self.commit(commit_slot, commit_proposal) def commit(self, slot, proposal): """Actually commit a proposal that is decided and in sequence""" decided_proposals = [p for s, p in self.decisions.iteritems() if s < slot] if proposal in decided_proposals: self.logger.info( "not committing duplicate proposal %r, slot %d", proposal, slot) return # duplicate self.logger.info("committing %r at slot %d" % (proposal, slot)) if proposal.caller is not None: # perform a client operation self.state, output = self.execute_fn(self.state, proposal.input) self.node.send([proposal.caller], Invoked(client_id=proposal.client_id, output=output)) # tracking the leader def do_Adopted(self, sender, ballot_num, accepted_proposals): self.latest_leader = self.node.address self.leader_alive() def do_Accepting(self, sender, leader): self.latest_leader = leader self.leader_alive() def do_Active(self, sender): if sender ! = self.latest_leader: return self.leader_alive() def leader_alive(self): if self.latest_leader_timeout: self.latest_leader_timeout.cancel() def reset_leader(): idx = self.peers.index(self.latest_leader) self.latest_leader = self.peers[(idx + 1) % len(self.peers)] self.logger.debug("leader timed out; tring the next one, %s", self.latest_leader) self.latest_leader_timeout = self.set_timer(LEADER_TIMEOUT, reset_leader) # adding new cluster members def do_Join(self, sender): if sender in self.peers: self.node.send([sender], Welcome( state=self.state, slot=self.slot, decisions=self.decisions))Copy the code

Leader/Scout/Commander

The main task of the Leader is to receive the message of Propose requesting a new vote and make a decision. After the Prepare/Promise part of the agreement is successfully completed, the Leader is in the Active state. The active Leader can immediately send an Accept message in response to a WITHDRAW.

In keeping with the role-categorized model, the Leader delegates the Scout and Commander roles to execute each part of the protocol.

class Leader(Role): def __init__(self, node, peers, commander_cls=Commander, scout_cls=Scout): super(Leader, self).__init__(node) self.ballot_num = Ballot(0, node.address) self.active = False self.proposals = {} self.commander_cls = commander_cls self.scout_cls = scout_cls self.scouting = False self.peers = peers def start(self): # reminder others we're active before LEADER_TIMEOUT expires def active(): if self.active: Self.node. send(self.peers, Active()) self.set_timer(LEADER_TIMEOUT / 2.0, Active) Active() def spawn_scout(self): assert not self.scouting self.scouting = True self.scout_cls(self.node, self.ballot_num, self.peers).start() def do_Adopted(self, sender, ballot_num, accepted_proposals): self.scouting = False self.proposals.update(accepted_proposals) # note that we don't re-spawn commanders here; if there are undecided # proposals, the replicas will re-propose self.logger.info("leader becoming active") self.active = True def spawn_commander(self, ballot_num, slot): proposal = self.proposals[slot] self.commander_cls(self.node, ballot_num, slot, proposal, self.peers).start() def do_Preempted(self, sender, slot, preempted_by): if not slot: # from the scout self.scouting = False self.logger.info("leader preempted by %s", preempted_by.leader) self.active = False self.ballot_num = Ballot((preempted_by or self.ballot_num).n + 1, self.ballot_num.leader) def do_Propose(self, sender, slot, proposal): if slot not in self.proposals: if self.active: self.proposals[slot] = proposal self.logger.info("spawning commander for slot %d" % (slot,)) self.spawn_commander(self.ballot_num, slot) else: if not self.scouting: self.logger.info("got PROPOSE when not active - scouting") self.spawn_scout() else: self.logger.info("got PROPOSE while scouting; ignored") else: self.logger.info("got PROPOSE for a slot already being proposed")Copy the code

When the Leader wants to become active, he creates a Scout role in response to withdraw receiving a message when it is inactive (figure below), Scout sends (and resend if necessary) a Prepare message, and collects the Promise response until the message is heard. Most peers or until preempted. Reply to the Leader through Adopted or Preempted.

Leader Scout Acceptor Acceptor Acceptor | | | | | | X--------->| | | Prepare | |<---------X | | Promise | X---------------------->| | Prepare | |<----------------------X | Promise | X---------------------------------->| Prepare | |<----------------------------------X Promise |<---------X | | | Adopted class Scout(Role): def __init__(self, node, ballot_num, peers): super(Scout, self).__init__(node) self.ballot_num = ballot_num self.accepted_proposals = {} self.acceptors = set([]) self.peers = peers self.quorum = len(peers) / 2 + 1 self.retransmit_timer = None def start(self): self.logger.info("scout starting") self.send_prepare() def send_prepare(self): self.node.send(self.peers, Prepare(ballot_num=self.ballot_num)) self.retransmit_timer = self.set_timer(PREPARE_RETRANSMIT, self.send_prepare) def update_accepted(self, accepted_proposals): acc = self.accepted_proposals for slot, (ballot_num, proposal) in accepted_proposals.iteritems(): if slot not in acc or acc[slot][0] < ballot_num: acc[slot] = (ballot_num, proposal) def do_Promise(self, sender, ballot_num, accepted_proposals): if ballot_num == self.ballot_num: self.logger.info("got matching promise; need %d" % self.quorum) self.update_accepted(accepted_proposals) self.acceptors.add(sender) if len(self.acceptors) >= self.quorum: # strip the ballot numbers from self.accepted_proposals, now that it # represents a majority accepted_proposals = \ dict((s, p) for s, (b, p) in self.accepted_proposals.iteritems()) # We're adopted; note that this does *not* mean that no other # leader is active. # Any such conflicts will be handled by the # commanders. self.node.send([self.node.address], Adopted(ballot_num=ballot_num, accepted_proposals=accepted_proposals)) self.stop() else: # this acceptor has promised another leader a higher ballot number, # so we've lost self.node.send([self.node.address], Preempted(slot=None, preempted_by=ballot_num)) self.stop()Copy the code

The Leader creates a Commander role for each slot that has an active proposal (figure below). Like Scout, Commander sends and resends Accept messages and waits for the majority of recipients to reply with Accepted or preempt the message. After receiving the recommendation, Commander broadcasts the Decision message to all nodes. It responds to the Leader with a Decided or Preempted.

Leader Commander Acceptor Acceptor Acceptor | | | | | | X--------->| | | Accept | |<---------X | | Accepted | X---------------------->| | Accept | |<----------------------X | Accepted | X---------------------------------->| Accept  | |<----------------------------------X Accepted |<---------X | | | Decided class Commander(Role): def __init__(self, node, ballot_num, slot, proposal, peers): super(Commander, self).__init__(node) self.ballot_num = ballot_num self.slot = slot self.proposal = proposal self.acceptors = set([]) self.peers = peers self.quorum = len(peers) / 2 + 1 def start(self): self.node.send(set(self.peers) - self.acceptors, Accept( slot=self.slot, ballot_num=self.ballot_num, proposal=self.proposal)) self.set_timer(ACCEPT_RETRANSMIT, self.start) def finished(self, ballot_num, preempted): if preempted: self.node.send([self.node.address], Preempted(slot=self.slot, preempted_by=ballot_num)) else: self.node.send([self.node.address], Decided(slot=self.slot)) self.stop() def do_Accepted(self, sender, slot, ballot_num): if slot ! = self.slot: return if ballot_num == self.ballot_num: self.acceptors.add(sender) if len(self.acceptors) < self.quorum: return self.node.send(self.peers, Decision( slot=self.slot, proposal=self.proposal)) self.finished(ballot_num, False) else: self.finished(ballot_num, True)Copy the code

One problem is that the network emulator, which will be introduced later, introduces packet loss even on messages within nodes. When all Decision messages are lost, the protocol cannot continue. The Replica continues to transmit the Propose message again, but the Leader ignores these messages, because it has made a proposal for this slot. Since no Replica has received the Decision, the Replica cannot find the result in the catch process. The solution is to work like a real network stack to ensure that local messages are always delivered successfully.

Bootstrap

When a node joins a cluster, it must obtain the current cluster status. Bootstrap role circulates that each node sends the join message until Welcome is received. The sequence diagram of Bootstrap is as follows:

If the startup process is implemented in each role (Replica, leader, acceptor) and waiting for the welcome message, the initialization logic will be scattered to each role, which will be very troublesome to test. Finally, we decide to add bootstrap role. Once the startup is completed, Add each role to node and pass the initial state to their constructor.

class Bootstrap(Role): def __init__(self, node, peers, execute_fn, replica_cls=Replica, acceptor_cls=Acceptor, leader_cls=Leader, commander_cls=Commander, scout_cls=Scout): super(Bootstrap, self).__init__(node) self.execute_fn = execute_fn self.peers = peers self.peers_cycle = itertools.cycle(peers) self.replica_cls = replica_cls self.acceptor_cls = acceptor_cls self.leader_cls = leader_cls self.commander_cls = commander_cls self.scout_cls = scout_cls def start(self): self.join() def join(self): self.node.send([next(self.peers_cycle)], Join()) self.set_timer(JOIN_RETRANSMIT, self.join) def do_Welcome(self, sender, state, slot, decisions): self.acceptor_cls(self.node) self.replica_cls(self.node, execute_fn=self.execute_fn, peers=self.peers, state=state, slot=slot, decisions=decisions) self.leader_cls(self.node, peers=self.peers, commander_cls=self.commander_cls, scout_cls=self.scout_cls).start() self.stop()Copy the code

Reference:

  • Aosabook.org/en/500L/clu…

  • Lamport.azurewebsites.net/pubs/lampor…

  • Lamport.azurewebsites.net/pubs/paxos-…

  • www.scs.stanford.edu/~dm/home/pa…

  • www.researchgate.net/publication…

  • www.paxos.com/

  • www.cs.cornell.edu/courses/cs6…

  • En.wikipedia.org/wiki/Paxos_…

  • Ongardie.net/static/raft…

  • zhuanlan.zhihu.com/p/130332285

Click to follow, the first time to learn about Huawei cloud fresh technology ~