sequence
This paper mainly studies CRDT
CRDT
CRDT is short for conflict-free Replicated Data Type, also known as a passive synchronisation, that is, a conflict-free Replicated Data Type that can be used to replicate Data across networks and automatically resolve conflicts to achieve consistency. It is suitable for copying data between partitions in systems using AP architecture. Concrete implementation can be divided into state-based CvRDT, operation-based CmRDT, delta-based, Pure operation-based and so on
Consistency with Consensus, guarantee convergence to the same value in spite of network delays, partitions and message reordering
State-based(CvRDT
)
- CvRDT, short for Convergent Replicated Data Type, also known as an Active Synchronisation, is commonly used in file systems such as NFS, AFS, Coda and KV storage such as Riak, Dynamo
- This is done by passing the states of the entire object. You need to define a merge function to merge the input Object States
- The merge function needs to satisfy COMMUTative and idempotent (1600mv) to be retried and order independent
Operation-based(CmRDT
)
- CmRDT, short for Commutative Replicated Data Type, is commonly used in Cooperative Systems such as Bayou, Rover, IceCube, and Telex
- This is done by passing operations, which requires the prepare method to generate operations and the effect method to apply the input operations changes to the local state
- Here, the transmission protocol is required to be reliable. If repeated transmission is possible, Effect is required to be idempotent, and there are certain requirements for Order. If order cannot be guaranteed, effect is required to be the effect of OR when combined
Delta-based
Delta-based can be understood as a combination of state-based and operation-based improvements. It implements replication through delta-based mutators
Pure operation-based
In the operation-based mode, the prepare method is required to generate operations, which may be delayed. Pure operation-based means that the prepare method is not used to generate operations by comparing state. Instead, it simply returns the existing Operations, which need to record the Operations for each step of the Object State operation
Convergent Operations
For CRDT, to implement some operations on the data structure in conflict-free Replicated, the following conditions need to be met:
- Associative
(A +(b+ C)=(a+b)+ C), i.e., grouping does not affect
- Commutative
(a+b=b+a), order has no effect
- Idempotent
(a+a=a), duplication has no effect (idempotent)
Basic data types
The basic data types of CRDT include Counters, Registers, and Sets
Counters
- Grow-only counter(
G-Counter
)
Use the Max function to merge
- Positive-negative counter(
PN-Counter
)
We use two G-counters, one for increment and one for decrement, and sum at the end
Registers
Register has both assign() and value() operations
- Last Write Wins -register(
LWW-Register
)
Add unique ids to each assign operation, such as timestamps or vector clocks, and merge using the Max function
- Multi-valued -register(
MV-Register
)
Similar to G-counter, a new version of assign is added each time, using the Max function to merge
Sets
- Grow-only set(
G-Set
)
Merge using the union operation
- Two-phase set(
2P-Set
)
Use two G-sets, an addSet for adding and a removeSet for removing
- Last write wins set(
LWW-element Set
)
Similar to 2P-set, there is an addSet and a removeSet, but timestamp information is added to the element, and the add and remove with higher timestamp take precedence
- Observed-remove set(
OR-Set
)
Like 2P-set, there is an addSet, and a removeSet, but with tag information for the element, add takes precedence over remove for the same tag
Other data types
Array
There is an Array Replicated Growable Array(RGA) that supports the addRight(v, a) operation
Graph
Graph can be implemented based on Sets, but needs to handle concurrent addEdge(u, v), removeVertex(u) operations
Map
Map processes concurrent PUT and RMV operations
The instance
The wurmloch-CRDT implementation is used here
GCounter
wurmloch-crdt/src/main/java/com/netopyr/wurmloch/crdt/GCounter.java
public class GCounter extends AbstractCrdt<GCounter, GCounter.UpdateCommand> {
// fields
private Map<String, Long> entries = HashMap.empty();
// constructor
public GCounter(String nodeId, String crdtId) {
super(nodeId, crdtId, BehaviorProcessor.create());
}
// crdt
@Override
protected Option<UpdateCommand> processCommand(UpdateCommand command) {
final Map<String, Long> oldEntries = entries;
entries = entries.merge(command.entries, Math::max);
return entries.equals(oldEntries)? Option.none() : Option.of(new UpdateCommand(crdtId, entries));
}
// core functionality
public long get() {
return entries.values().sum().longValue();
}
public void increment() {
increment(1L);
}
public void increment(long value) {
if (value < 1L) {
throw new IllegalArgumentException("Value needs to be a positive number."); } entries = entries.put(nodeId, entries.get(nodeId).getOrElse(0L) + value); commands.onNext(new UpdateCommand( crdtId, entries )); } / /... }Copy the code
- ProcessCommand receives UpdateCommand and merges it using the HashMap merge method, where BiFunction is Math:: Max; The get() method sums the entries.values() to get the result
PNCounter
wurmloch-crdt/src/main/java/com/netopyr/wurmloch/crdt/PNCounter.java
public class PNCounter extends AbstractCrdt<PNCounter, PNCounter.UpdateCommand> {
// fields
private Map<String, Long> pEntries = HashMap.empty();
private Map<String, Long> nEntries = HashMap.empty();
// constructor
public PNCounter(String nodeId, String crtdId) {
super(nodeId, crtdId, BehaviorProcessor.create());
}
// crdt
protected Option<UpdateCommand> processCommand(PNCounter.UpdateCommand command) {
final Map<String, Long> oldPEntries = pEntries;
final Map<String, Long> oldNEntries = nEntries;
pEntries = pEntries.merge(command.pEntries, Math::max);
nEntries = nEntries.merge(command.nEntries, Math::max);
return pEntries.equals(oldPEntries) && nEntries.equals(oldNEntries)? Option.none()
: Option.of(new UpdateCommand(crdtId, pEntries, nEntries));
}
// core functionality
public long get() {
return pEntries.values().sum().longValue() - nEntries.values().sum().longValue();
}
public void increment() {
increment(1L);
}
public void increment(long value) {
if (value < 1L) {
throw new IllegalArgumentException("Value needs to be a positive number.");
}
pEntries = pEntries.put(nodeId, pEntries.get(nodeId).getOrElse(0L) + value);
commands.onNext(new UpdateCommand(
crdtId,
pEntries,
nEntries
));
}
public void decrement() {
decrement(1L);
}
public void decrement(long value) {
if (value < 1L) {
throw new IllegalArgumentException("Value needs to be a positive number."); } nEntries = nEntries.put(nodeId, nEntries.get(nodeId).getOrElse(0L) + value); commands.onNext(new UpdateCommand( crdtId, pEntries, nEntries )); } / /... }Copy the code
- PNCounter uses two hashmaps, pEntries for increments and nEntries for decrements. ProcessCommand uses HashMap merge to merge pEntries and nEntries respectively, where BiFunction is Math:: Max; The get() method uses the sum of pEntries. Values () minus the sum of nendies.values ()
LWWRegister
wurmloch-crdt/src/main/java/com/netopyr/wurmloch/crdt/LWWRegister.java
public class LWWRegister<T> extends AbstractCrdt<LWWRegister<T>, LWWRegister.SetCommand<T>> { // fields private T value; private StrictVectorClock clock; // constructor public LWWRegister(String nodeId, String crdtId) { super(nodeId, crdtId, BehaviorProcessor.create()); this.clock = new StrictVectorClock(nodeId); } // crdt protected Option<SetCommand<T>> processCommand(SetCommand<T>command) {
if (clock.compareTo(command.getClock()) < 0) {
clock = clock.merge(command.getClock());
doSet(command.getValue());
return Option.of(command);
}
return Option.none();
}
// core functionality
public T get() {
return value;
}
public void set(T newValue) {
if (! Objects.equals(value, newValue)) {
doSet(newValue);
commands.onNext(new SetCommand<>(
crdtId,
value,
clock
));
}
}
// implementation
private void doSet(T value) { this.value = value; clock = clock.increment(); } / /... }Copy the code
- LWWRegister uses StrictVectorClock, its processCommand receives SetCommand, which merges clock when the local clock is less than command-getclock (). Then execute doSet to update value and update local clock.increment()
MVRegister
wurmloch-crdt/src/main/java/com/netopyr/wurmloch/crdt/MVRegister.java
public class MVRegister<T> extends AbstractCrdt<MVRegister<T>, MVRegister.SetCommand<T>> {
// fields
private Array<Entry<T>> entries = Array.empty();
// constructor
public MVRegister(String nodeId, String crdtId) {
super(nodeId, crdtId, ReplayProcessor.create());
}
// crdt
protected Option<SetCommand<T>> processCommand(SetCommand<T> command) {
final Entry<T> newEntry = command.getEntry();
if(! entries.exists(entry -> entry.getClock().compareTo(newEntry.getClock()) > 0 || entry.getClock().equals(newEntry.getClock()))) { final Array<Entry<T>> newEntries = entries .filter(entry -> entry.getClock().compareTo(newEntry.getClock()) == 0) .append(newEntry);doSet(newEntries);
return Option.of(command);
}
return Option.none();
}
// core functionality
public Array<T> get() {
return entries.map(Entry::getValue);
}
public void set(T newValue) {
if(entries.size() ! = 1 | |! Objects.equals(entries.head().getValue(), newValue)) { final Entry<T> newEntry = new Entry<>(newValue, incVV());doSet(Array.of(newEntry));
commands.onNext(new SetCommand<>(
crdtId,
newEntry
));
}
}
// implementation
private void doSet(Array<Entry<T>> newEntries) {
entries = newEntries;
}
private VectorClock incVV() {
final Array<VectorClock> clocks = entries.map(Entry::getClock);
final VectorClock mergedClock = clocks.reduceOption(VectorClock::merge).getOrElse(new VectorClock());
returnmergedClock.increment(nodeId); } / /... }Copy the code
- Where LWWRegister uses Array and StrictVectorClock, its processCommand receives SetCommand, It creates new newEntries when clocks without entries are greater than or equal newentry.getclock (). This newEntries does not contain entries equal to newentry.getclock (). NewEntry is added, and finally doSet is used to assign to the local entries
GSet
wurmloch-crdt/src/main/java/com/netopyr/wurmloch/crdt/GSet.java
public class GSet<E> extends AbstractSet<E> implements Crdt<GSet<E>, GSet.AddCommand<E>> {
// fields
private final String crdtId;
private final Set<E> elements = new HashSet<>();
private final Processor<AddCommand<E>, AddCommand<E>> commands = ReplayProcessor.create();
// constructor
public GSet(String crdtId) {
this.crdtId = Objects.requireNonNull(crdtId, "Id must not be null");
}
// crdt
@Override
public String getCrdtId() {
return crdtId;
}
@Override
public void subscribe(Subscriber<? super AddCommand<E>> subscriber) {
commands.subscribe(subscriber);
}
@Override
public void subscribeTo(Publisher<? extends AddCommand<E>> publisher) {
Flowable.fromPublisher(publisher).onTerminateDetach().subscribe(command -> {
final Option<AddCommand<E>> newCommand = processCommand(command);
newCommand.peek(commands::onNext);
});
}
private Option<AddCommand<E>> processCommand(AddCommand<E> command) {
return doAdd(command.getElement())? Option.of(command) : Option.none();
}
// core functionality
@Override
public int size() {
return elements.size();
}
@Override
public Iterator<E> iterator() {
return new GSetIterator();
}
@Override
public boolean add(E element) {
commands.onNext(new AddCommand<>(crdtId, element));
return doAdd(element);
}
// implementation
private synchronized boolean doAdd(E element) {
returnelements.add(element); } / /... }Copy the code
- Here GSet is implemented using Set, its processCommand receives AddCommand, and its doAdd method is merged using the add of Set
TwoPhaseSet
wurmloch-crdt/src/main/java/com/netopyr/wurmloch/crdt/TwoPSet.java
public class TwoPSet<E> extends AbstractSet<E> implements Crdt<TwoPSet<E>, TwoPSet.TwoPSetCommand<E>> {
// fields
private final String crdtId;
private final Set<E> elements = new HashSet<>();
private final Set<E> tombstone = new HashSet<>();
private final Processor<TwoPSetCommand<E>, TwoPSetCommand<E>> commands = ReplayProcessor.create();
// constructor
public TwoPSet(String crdtId) {
this.crdtId = Objects.requireNonNull(crdtId, "CrdtId must not be null");
}
// crdt
@Override
public String getCrdtId() {
return crdtId;
}
@Override
public void subscribe(Subscriber<? super TwoPSetCommand<E>> subscriber) {
commands.subscribe(subscriber);
}
@Override
public void subscribeTo(Publisher<? extends TwoPSetCommand<E>> publisher) {
Flowable.fromPublisher(publisher).onTerminateDetach().subscribe(command -> {
final Option<TwoPSetCommand<E>> newCommand = processCommand(command);
newCommand.peek(commands::onNext);
});
}
private Option<TwoPSetCommand<E>> processCommand(TwoPSetCommand<E> command) {
if (command instanceof TwoPSet.AddCommand) {
return doAdd(((TwoPSet.AddCommand<E>) command).getElement())? Option.of(command) : Option.none();
} else if (command instanceof TwoPSet.RemoveCommand) {
return doRemove(((TwoPSet.RemoveCommand<E>) command).getElement())? Option.of(command) : Option.none();
}
return Option.none();
}
// core functionality
@Override
public int size() {
return elements.size();
}
@Override
public Iterator<E> iterator() {
return new TwoPSetIterator();
}
@Override
public boolean add(E value) {
final boolean changed = doAdd(value);
if (changed) {
commands.onNext(new TwoPSet.AddCommand<>(crdtId, value));
}
return changed;
}
// implementation
private boolean doAdd(E value) {
return! tombstone.contains(value) && elements.add(value); } private booleandoRemove(E value) {
returntombstone.add(value) | elements.remove(value); } / /... }Copy the code
- Here TwoPSet is implemented using two sets, elements for add and tombstone for remove; The processCommand method receives TwoPSetCommand. It has two subclasses TwoPSet.AddCommand and TwoPSet.RemoveCommand. DoAdd requires that tombstone not contain the element and add elements to elements; DoRemove adds elements to the tombstone and removes elements from elements
ORSet
wurmloch-crdt/src/main/java/com/netopyr/wurmloch/crdt/ORSet.java
public class ORSet<E> extends AbstractSet<E> implements Crdt<ORSet<E>, ORSet.ORSetCommand<E>> /*, ObservableSet<E> */ {
// fields
private final String crdtId;
private final Set<Element<E>> elements = new HashSet<>();
private final Set<Element<E>> tombstone = new HashSet<>();
private final Processor<ORSetCommand<E>, ORSetCommand<E>> commands = ReplayProcessor.create();
// constructor
public ORSet(String crdtId) {
this.crdtId = Objects.requireNonNull(crdtId, "Id must not be null");
}
// crdt
@Override
public String getCrdtId() {
return crdtId;
}
@Override
public void subscribe(Subscriber<? super ORSetCommand<E>> subscriber) {
commands.subscribe(subscriber);
}
@Override
public void subscribeTo(Publisher<? extends ORSetCommand<E>> publisher) {
Flowable.fromPublisher(publisher).onTerminateDetach().subscribe(command -> {
final Option<ORSetCommand<E>> newCommand = processCommand(command);
newCommand.peek(commands::onNext);
});
}
private Option<ORSetCommand<E>> processCommand(ORSetCommand<E> command) {
if (command instanceof AddCommand) {
return doAdd(((AddCommand<E>) command).getElement())? Option.of(command) : Option.none();
} else if (command instanceof RemoveCommand) {
return doRemove(((RemoveCommand<E>) command).getElements())? Option.of(command) : Option.none();
}
return Option.none();
}
// core functionality
@Override
public int size() {
return doElements().size();
}
@Override
public Iterator<E> iterator() {
return new ORSetIterator();
}
@Override
public boolean add(E value) {
final boolean contained = doContains(value);
prepareAdd(value);
return! contained; } // implementation private static <U> Predicate<Element<U>> matches(U value) {return element -> Objects.equals(value, element.getValue());
}
private synchronized boolean doContains(E value) {
return elements.parallelStream().anyMatch(matches(value));
}
private synchronized Set<E> doElements() {
return elements.parallelStream().map(Element::getValue).collect(Collectors.toSet());
}
private synchronized void prepareAdd(E value) {
final Element<E> element = new Element<>(value, UUID.randomUUID());
commands.onNext(new AddCommand<>(getCrdtId(), element));
doAdd(element);
}
private synchronized boolean doAdd(Element<E> element) {
return(elements.add(element) | elements.removeAll(tombstone)) && (! tombstone.contains(element)); } private synchronized void prepareRemove(E value) { final Set<Element<E>> removes = elements.parallelStream().filter(matches(value)).collect(Collectors.toSet()); commands.onNext(new RemoveCommand<>(getCrdtId(), removes));doRemove(removes);
}
private synchronized boolean doRemove(Collection<Element<E>> removes) {
returnelements.removeAll(removes) | tombstone.addAll(removes); } / /... }Copy the code
- ORSet is implemented with two sets, elements for add and tombstone for remove; The processCommand method receives ORSetCommand, which has two subclasses ORSet.AddCommand and ORSet.RemoveCommand. The two commands correspond to doAdd and doRemove methods respectively. The doAdd method first creates the Element with the UUID and then adds elements to elements to remove the tombstone. The doRemove method first performs the prepareRemove to find the element collection yelps that need to be removed, then removes the yelps from Elements and adds them to the tombstone
summary
- CRDT is short for conflict-free Replicated Data Type, also known as a passive synchronisation, i.e., conflict-free Replicated Data Type. Concrete implementation can be divided into state-based CvRDT, operation-based CmRDT, delta-based, Pure operation-based and so on
- CvRDT, short for Convergent Replicated Data Type, also known as an Active Synchronisation, is commonly used in file systems such as NFS, AFS, Coda and KV storage such as Riak, Dynamo; CvRDT, short for Convergent Replicated Data Type, also known as an Active Synchronisation, is commonly used in file systems such as NFS, AFS, Coda and KV storage such as Riak, Dynamo
- For CRDT, in order to achieve conflict-free Replicated, the operation of data structures is Convergent, that is, it needs to satisfy Associative, Commutative and Idempotent; The basic data types of CRDT are Counters(
G - Counter, the PN - Counter
), Registers (LWW - Register, MV - Register
Sets(‘ g-set ‘, ‘2P-set’, ‘LWw-element Set’, ‘OR-set’)
doc
- Talk about CRDT
- CRDT – A powerful tool to solve the final conformance problem
- Akka Distributed Data Deep Dive
- Conflict-free replicated data types
- CRDT: Conflict-free Replicated Data Types
- Introduction to Conflict-Free Replicated Data Types
- A Look at Conflict-Free Replicated Data Types (CRDT)
- Conflict-free Replicated Data Types
- A comprehensive study of Convergent and Commutative Replicated Data Types
- wurmloch-crdt