The introduction
Also known as the Epidemic Protocol, the Gossip Protocol actually has a number of nicknames, including the Gossip Protocol and the Epidemic Protocol. Many well-known P2P networks or blockchain projects, such as IPFS, Ethereum, etc., use Kadmelia, while Bitcoin is known to use the Gossip protocol to spread transactions and block information. In fact, a close analysis of the scenario shows that Ethereum’s use of DHT doesn’t make a lot of sense because it uses nodes to store the entire chain of data, not fragments like IPFS, so the protocol that really suits Ethereum would be like Bitcoin, It’s the Gossip protocol.
directory
- Gossip a preliminary
- The framework design
- Source code analysis
- UDP manager
- Message codec
- Message handler
- Node status flusher
- Data synchronizer
- The core components
- Gossip manager
- conclusion
Gossip a preliminary
We will not go into too much detail about the Gossip protocol, because there are already many articles on the Internet, please refer to the appendix literature. Based on understanding gossip, we will explore the protocol implementation framework related to Gossip. The framework studied in this paper is cubator-retirement-Gossip.
The cubator-Retired gossip framework provides a standalone example of the Gossip protocol, including family management, SHARED data based on CRDT, PN counters, data centers, and distributed locks.
For details, see the usage instructions gossip-examples
Let’s take a look at gossip’s framework design.
The framework design
Gossip is based on UDP protocol for communication, the bottom layer is through the UDP manager for message sending and receiving; Message codecs use JSON serialization. After a node receives a message, a decoder decodes the message and entrusts a message processor to process the message. The message processor has node data, shared data, and downtime messages, etc. The status of nodes in the cluster is synchronized through the node status flusher. Synchronization of active nodes, down nodes, node data, and shared data is accomplished by data synchronizer. Core components manage nodes and shared data, and send and receive messages. The Gossip manager persists node state, data, and shared data.
Source code analysis
UDP manager
/** * This class is constructed by reflection in GossipManager. * It manages transport (byte read/write) operations over UDP. * Manages udP-based data transfer operations */
public class UdpTransportManager extends AbstractTransportManager implements Runnable {
public static final Logger LOGGER = Logger.getLogger(UdpTransportManager.class);
/** * * The socket used for the passive thread of the gossip service. * gossip socket server * */
private final DatagramSocket server;
/** * Get data timeout */
private final int soTimeout;
private final Thread me;
/** * Running status */
private final AtomicBoolean keepRunning = new AtomicBoolean(true);
/**
* required for reflection to work!
* @param gossipManager
* @param gossipCore*/
public UdpTransportManager(GossipManager gossipManager, GossipCore gossipCore) {
super(gossipManager, gossipCore);
soTimeout = gossipManager.getSettings().getGossipInterval() * 2;
try {
SocketAddress socketAddress = new InetSocketAddress(gossipManager.getMyself().getUri().getHost(),
gossipManager.getMyself().getUri().getPort());
server = new DatagramSocket(socketAddress);
} catch (SocketException ex) {
LOGGER.warn(ex);
throw new RuntimeException(ex);
}
me = new Thread(this);
}
@Override
public void run(a) {
while (keepRunning.get()) {
try {
byte[] buf = read();
try {
// Read data
Base message = gossipManager.getProtocolManager().read(buf);
// Process the message
gossipCore.receive(message);
//TODO this is suspect GossipMemberStateRefresher
gossipManager.getMemberStateRefresher().run();
} catch (RuntimeException ex) {//TODO trap json exception
LOGGER.error("Unable to process message", ex); }}catch (IOException e) {
// InterruptedException are completely normal here because of the blocking lifecycle.
if(! (e.getCause()instanceof InterruptedException)) {
LOGGER.error(e);
}
keepRunning.set(false); }}}@Override
public void shutdown(a) {
keepRunning.set(false);
server.close();
super.shutdown();
me.interrupt();
}
/** * blocking read a message@return buffer of message contents.
* @throws IOException
*/
public byte[] read() throws IOException {
byte[] buf = new byte[server.getReceiveBufferSize()];
DatagramPacket p = new DatagramPacket(buf, buf.length);
server.receive(p);
debug(p.getData());
return p.getData();
}
/** * Send bytes of data *@param endpoint
* @param buf
* @throws IOException
*/
@Override
public void send(URI endpoint, byte[] buf) throws IOException {
// todo: investigate UDP socket reuse. It would save a little setup/teardown time wrt to the local socket.
try (DatagramSocket socket = new DatagramSocket()){
socket.setSoTimeout(soTimeout);
InetAddress dest = InetAddress.getByName(endpoint.getHost());
DatagramPacket payload = newDatagramPacket(buf, buf.length, dest, endpoint.getPort()); socket.send(payload); }}/** * Start the endpoint */
@Override
public void startEndpoint(a) { me.start(); }}Copy the code
As you can see from the above, the UDP transport manager, UdpTransportManager, mainly sends byte data, receives byte data, and delegates to the core component. When the UDP Transport Manager is started, the node state flusher is also started.
Let’s look at message codecs again
Message codec
// this class is constructed by reflection in GossipManager.
public class JacksonProtocolManager implements ProtocolManager {
private final ObjectMapper objectMapper;
private final PrivateKey privKey;
private final Meter signed;
private final Meter unsigned;
/** required for reflection to work!
* @param settings
* @param id
* @param registry*/
public JacksonProtocolManager(GossipSettings settings, String id, MetricRegistry registry) {
// set up object mapper.
objectMapper = buildObjectMapper(settings);
// Set up message signing
if (settings.isSignMessages()){
File privateKey = new File(settings.getPathToKeyStore(), id);
File publicKey = new File(settings.getPathToKeyStore(), id + ".pub");
if(! privateKey.exists()){throw new IllegalArgumentException("private key not found " + privateKey);
}
if(! publicKey.exists()){throw new IllegalArgumentException("public key not found " + publicKey);
}
try (FileInputStream keyfis = new FileInputStream(privateKey)) {
byte[] encKey = new byte[keyfis.available()];
keyfis.read(encKey);
keyfis.close();
PKCS8EncodedKeySpec privKeySpec = new PKCS8EncodedKeySpec(encKey);
KeyFactory keyFactory = KeyFactory.getInstance("DSA");
privKey = keyFactory.generatePrivate(privKeySpec);
} catch (NoSuchAlgorithmException | InvalidKeySpecException | IOException e) {
throw new RuntimeException("failed hard", e); }}else {
privKey = null;
}
signed = registry.meter(PassiveGossipConstants.SIGNED_MESSAGE);
unsigned = registry.meter(PassiveGossipConstants.UNSIGNED_MESSAGE);
}
/** * convert message to byte array *@param message
* @return
* @throws IOException
*/
@Override
public byte[] write(Base message) throws IOException {
byte[] json_bytes;
if (privKey == null){
json_bytes = objectMapper.writeValueAsBytes(message);
} else {
SignedPayload p = new SignedPayload();
p.setData(objectMapper.writeValueAsString(message).getBytes());
p.setSignature(sign(p.getData(), privKey));
json_bytes = objectMapper.writeValueAsBytes(p);
}
return json_bytes;
}
/** * converts a byte array to a message object *@param buf
* @return
* @throws IOException
*/
@Override
public Base read(byte[] buf) throws IOException {
Base activeGossipMessage = objectMapper.readValue(buf, Base.class);
if (activeGossipMessage instanceof SignedPayload){
SignedPayload s = (SignedPayload) activeGossipMessage;
signed.mark();
return objectMapper.readValue(s.getData(), Base.class);
} else {
unsigned.mark();
returnactiveGossipMessage; }}public static ObjectMapper buildObjectMapper(GossipSettings settings) {
ObjectMapper om = new ObjectMapper();
om.enableDefaultTyping();
// todo: should be specified in the configuration.
om.registerModule(new CrdtModule());
om.configure(JsonGenerator.Feature.WRITE_NUMBERS_AS_STRINGS, false);
return om;
}
private static byte[] sign(byte [] bytes, PrivateKey pk){
Signature dsa;
try {
dsa = Signature.getInstance("SHA1withDSA"."SUN");
dsa.initSign(pk);
dsa.update(bytes);
return dsa.sign();
} catch (NoSuchAlgorithmException | NoSuchProviderException | InvalidKeyException | SignatureException e) {
throw newRuntimeException(e); }}}Copy the code
The message codec JacksonProtocolManager is a MESSAGE codec based on JSON. If RSA is used, the message is sent and received, and the response is encrypted and decrypted.
Message handler
In the message processing section, let’s take a look at the processing of node data, shared data and node downtime messages.
- Shared data message handlers
public class SharedDataMessageHandler implements MessageHandler{
/ * * *@param gossipCore context.
* @param gossipManager context.
* @param base message reference.
* @return boolean indicating success.
*/
@Override
public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
UdpSharedDataMessage message = (UdpSharedDataMessage) base;
// Add shared data
gossipCore.addSharedData(message);
return true; }}Copy the code
- Node data message handler
public class PerNodeDataMessageHandler implements MessageHandler {
/ * * *@param gossipCore context.
* @param gossipManager context.
* @param base message reference.
* @return boolean indicating success.
*/
@Override
public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
UdpPerNodeDataMessage message = (UdpPerNodeDataMessage) base;
// Add node data message
gossipCore.addPerNodeData(message);
return true; }}Copy the code
- Node down message processor
public class ShutdownMessageHandler implements MessageHandler {
/ * * *@param gossipCore context.
* @param gossipManager context.
* @param base message reference.
* @return boolean indicating success.
*/
@Override
public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
ShutdownMessage s = (ShutdownMessage) base;
// Convert downtime messages to node data messages
PerNodeDataMessage m = new PerNodeDataMessage();
m.setKey(ShutdownMessage.PER_NODE_KEY);
m.setNodeId(s.getNodeId());
m.setPayload(base);
m.setTimestamp(System.currentTimeMillis());
m.setExpireAt(System.currentTimeMillis() + 30L * 1000L);
// Add node data message
gossipCore.addPerNodeData(m);
return true; }}Copy the code
From above you can share data and node data processing is actually delegated to core components, which we’ll see later, downtime messages are converted into node messages for processing.
Node status flusher
/** * Member state refresh thread */
public class GossipMemberStateRefresher {
public static final Logger LOGGER = Logger.getLogger(GossipMemberStateRefresher.class);
/** * Gossip member */
private final Map<LocalMember, GossipState> members;
/** * gossip configuration */
private final GossipSettings settings;
/**
* gossip监听器
*/
private final List<GossipListener> listeners = new CopyOnWriteArrayList<>();
/** * System clock */
private final Clock clock;
private final BiFunction<String, String, PerNodeDataMessage> findPerNodeGossipData;
/** * listener thread */
private final ExecutorService listenerExecutor;
/** ** */
private final ScheduledExecutorService scheduledExecutor;
/**
* 任务队列
*/
private final BlockingQueue<Runnable> workQueue;
public GossipMemberStateRefresher(Map
members, GossipSettings settings, GossipListener listener, BiFunction
findPerNodeGossipData)
,>
,> {
this.members = members;
this.settings = settings;
listeners.add(listener);
this.findPerNodeGossipData = findPerNodeGossipData;
clock = new SystemClock();
workQueue = new ArrayBlockingQueue<>(1024);
listenerExecutor = new ThreadPoolExecutor(1.20.1, TimeUnit.SECONDS, workQueue,
new ThreadPoolExecutor.DiscardOldestPolicy());
scheduledExecutor = Executors.newScheduledThreadPool(1);
}
/** * Schedule member state flusher */
public void init(a) {
scheduledExecutor.scheduleAtFixedRate(() -> run(), 0.100, TimeUnit.MILLISECONDS);
}
/ * * * * /
public void run(a) {
try {
runOnce();
} catch (RuntimeException ex) {
LOGGER.warn("scheduled state had exception", ex); }}/** * Member status probe in Gossip */
public void runOnce(a) {
for (Entry<LocalMember, GossipState> entry : members.entrySet()) {
boolean userDown = processOptimisticShutdown(entry);
if (userDown)
continue;
Double phiMeasure = entry.getKey().detect(clock.nanoTime());
GossipState requiredState;
// Determine node status based on probe results
if(phiMeasure ! =null) {
requiredState = calcRequiredState(phiMeasure);
} else {
requiredState = calcRequiredStateCleanupInterval(entry.getKey(), entry.getValue());
}
if(entry.getValue() ! = requiredState) { members.put(entry.getKey(), requiredState);/* Call Listeners asynchronously trigger node status listeners */
for(GossipListener listener: listeners) listenerExecutor.execute(() -> listener.gossipEvent(entry.getKey(), requiredState)); }}}/ * * * *@param phiMeasure
* @return* /
public GossipState calcRequiredState(Double phiMeasure) {
// If the probe node lives for an interval greater than the threshold, the node becomes Down
if (phiMeasure > settings.getConvictThreshold())
return GossipState.DOWN;
else
return GossipState.UP;
}
/** * First probe status *@param member
* @param state
* @return* /
public GossipState calcRequiredStateCleanupInterval(LocalMember member, GossipState state) {
long now = clock.nanoTime();
long nowInMillis = TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS);
// If the current time minus the cleaning interval is greater than the heartbeat time of the member, the state is DOWN
if (nowInMillis - settings.getCleanupInterval() > member.getHeartbeat()) {
return GossipState.DOWN;
} else {
returnstate; }}/** * If we have a special key the per-node data that means that the node has sent us * a pre-emptive shutdown message. We process this so node is seen down sooner@param l member to consider
* @return true if node forced down
*/
public boolean processOptimisticShutdown(Entry<LocalMember, GossipState> l) {
// Get the node down message
PerNodeDataMessage m = findPerNodeGossipData.apply(l.getKey().getId(), ShutdownMessage.PER_NODE_KEY);
if (m == null) {
return false;
}
ShutdownMessage s = (ShutdownMessage) m.getPayload();
// If the node downtime is longer than the current heartbeat duration
if (s.getShutdownAtNanos() > l.getKey().getHeartbeat()) {
members.put(l.getKey(), GossipState.DOWN);
if (l.getValue() == GossipState.UP) {
// If the status changes, the listener is notified
for (GossipListener listener: listeners)
listenerExecutor.execute(() -> listener.gossipEvent(l.getKey(), GossipState.DOWN));
}
return true;
}
return false;
}
/** * annotation listener *@param listener
*/
public void register(GossipListener listener) {
listeners.add(listener);
}
/** * Close the gossip member flusher */
public void shutdown(a) {
scheduledExecutor.shutdown();
try {
scheduledExecutor.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOGGER.debug("Issue during shutdown", e);
}
listenerExecutor.shutdown();
try {
listenerExecutor.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOGGER.debug("Issue during shutdown", e); } listenerExecutor.shutdownNow(); }}Copy the code
Node status refresh GossipMemberStateRefresher, through FailureDetector downtime detector, the survival status of detection, and updates the node status, in view of the state changes, the listener to be notified of the response.
Data synchronizer
/** * Base implementation gossips randomly to live nodes periodically gossips to dead ones * */
public class SimpleActiveGossiper extends AbstractActiveGossiper {
/**
* 调度器
*/
private ScheduledExecutorService scheduledExecutorService;
/**
* 任务队列
*/
private final BlockingQueue<Runnable> workQueue;
/** * thread pool executor */
private ThreadPoolExecutor threadService;
/ * * *@param gossipManager
* @param gossipCore
* @param registry
*/
public SimpleActiveGossiper(GossipManager gossipManager, GossipCore gossipCore, MetricRegistry registry) {
super(gossipManager, gossipCore, registry);
scheduledExecutorService = Executors.newScheduledThreadPool(2);
workQueue = new ArrayBlockingQueue<Runnable>(1024);
threadService = new ThreadPoolExecutor(1.30.1, TimeUnit.SECONDS, workQueue,
new ThreadPoolExecutor.DiscardOldestPolicy());
}
@Override
public void init(a) {
super.init();
scheduledExecutorService.scheduleAtFixedRate(() -> {
threadService.execute(() -> {
// Send Live members
sendToALiveMember();
});
}, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
scheduledExecutorService.scheduleAtFixedRate(() -> {
// Send a Dead member
sendToDeadMember();
}, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
scheduledExecutorService.scheduleAtFixedRate(
// Send node data
() -> sendPerNodeData(gossipManager.getMyself(),
selectPartner(gossipManager.getLiveMembers())),
0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
scheduledExecutorService.scheduleAtFixedRate(
// Send shared data
() -> sendSharedData(gossipManager.getMyself(),
selectPartner(gossipManager.getLiveMembers())),
0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
}
@Override
public void shutdown(a) {
super.shutdown();
scheduledExecutorService.shutdown();
try {
scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOGGER.debug("Issue during shutdown", e);
}
// Send a close message
sendShutdownMessage();
threadService.shutdown();
try {
threadService.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOGGER.debug("Issue during shutdown", e); }}/** * Select a member from the gossip member list of the current node and send the Live node information */
protected void sendToALiveMember(a){
LocalMember member = selectPartner(gossipManager.getLiveMembers());
sendMembershipList(gossipManager.getMyself(), member);
}
/** * Select a member from the gossip member list of the current node and send the Dead node information */
protected void sendToDeadMember(a){
LocalMember member = selectPartner(gossipManager.getDeadMembers());
sendMembershipList(gossipManager.getMyself(), member);
}
/** * Optimistic shutdown message to several clusters Nodes * notifies cluster nodes that the current node is closed */
protected void sendShutdownMessage(a){
List<LocalMember> l = gossipManager.getLiveMembers();
int sendTo = l.size() < 3 ? 1 : l.size() / 2;
for (int i = 0; i < sendTo; i++) { threadService.execute(() -> sendShutdownMessage(gossipManager.getMyself(), selectPartner(l))); }}}Copy the code
Data synchronizer to synchronize active and down members, node data and shared data.
Let’s look at synchronous active nodes
//SimpleActiveGossiper
/** * Select a member from the gossip member list of the current node and send the Live node information */
protected void sendToALiveMember(a){
LocalMember member = selectPartner(gossipManager.getLiveMembers());
sendMembershipList(gossipManager.getMyself(), member);
}
Copy the code
//AbstractActiveGossiper
/** * Returns the local random gossip member *@param memberList
* An immutable list
* @return The chosen LocalGossipMember to gossip with.
*/
protected LocalMember selectPartner(List<LocalMember> memberList) {
LocalMember member = null;
if (memberList.size() > 0) {
int randomNeighborIndex = random.nextInt(memberList.size());
member = memberList.get(randomNeighborIndex);
}
return member;
}
/** * everyone learns the sending of the membership list, after we have incremented our own heartbeat
protected void sendMembershipList(LocalMember me, LocalMember member) {
if (member == null) {return;
}
long startTime = System.currentTimeMillis();
me.setHeartbeat(System.nanoTime());
// Keep the gossip message
UdpActiveGossipMessage message = new UdpActiveGossipMessage();
message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString());
message.setUuid(UUID.randomUUID().toString());
message.getMembers().add(convert(me));
for (LocalMember other : gossipManager.getMembers().keySet()) {
message.getMembers().add(convert(other));
}
Response r = gossipCore.send(message, member.getUri());
if (r instanceof ActiveGossipOk){
//maybe count metrics here
} else {
LOGGER.debug("Message " + message + " generated response " + r);
}
sendMembershipHistogram.update(System.currentTimeMillis() - startTime);
}
Copy the code
Send active node message, actually randomly from the active node information, the active node information of the current node packet keep-alive message sent to the selected node. The principle of sending down nodes is the same. Send messages to delegate to the core component.
Now look at sending shared data
//SimpleActiveGossiper
@Override
public void init(a) {
super.init();
scheduledExecutorService.scheduleAtFixedRate(() -> {
threadService.execute(() -> {
// Send Live members
sendToALiveMember();
});
}, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
scheduledExecutorService.scheduleAtFixedRate(() -> {
// Send a Dead member
sendToDeadMember();
}, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
scheduledExecutorService.scheduleAtFixedRate(
// Send node data
() -> sendPerNodeData(gossipManager.getMyself(),
selectPartner(gossipManager.getLiveMembers())),
0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
scheduledExecutorService.scheduleAtFixedRate(
// Send shared data
() -> sendSharedData(gossipManager.getMyself(),
selectPartner(gossipManager.getLiveMembers())),
0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
}
Copy the code
//AbstractActiveGossiper
/** * Share this data *@param me
* @param member
*/
public final void sendSharedData(LocalMember me, LocalMember member) {
if (member == null) {
return;
}
long startTime = System.currentTimeMillis();
if (gossipSettings.isBulkTransfer()) {
sendSharedDataInBulkInternal(me, member);
} else {
sendSharedDataInternal(me, member);
}
sharedDataHistogram.update(System.currentTimeMillis() - startTime);
}
/** * Send shared data one entry ata time@param me
* @param member
*/
private void sendSharedDataInternal(LocalMember me, LocalMember member) {
for (Entry<String, SharedDataMessage> innerEntry : gossipCore.getSharedData().entrySet()){
if(innerEntry.getValue().getReplicable() ! =null && !innerEntry.getValue().getReplicable()
.shouldReplicate(me, member, innerEntry.getValue())) {
// do nothing
continue;
}
// Copy the shared data
UdpSharedDataMessage message = newUdpSharedDataMessage(); message.setUuid(UUID.randomUUID().toString()); message.setUriFrom(me.getId()); copySharedDataMessage(innerEntry.getValue(), message); gossipCore.sendOneWay(message, member.getUri()); }}Copy the code
To send shared data, the core component guarantees the shared data as UdpSharedDataMessage and sends it to the peer node. Node data works the same way, except that it is node data that is sent.
The core components
public class GossipCore implements GossipCoreConstants {
class LatchAndBase {
/** * request lock */
private final CountDownLatch latch;
/** * message */
private volatile Base base;
LatchAndBase(){
latch = new CountDownLatch(1); }}public static final Logger LOGGER = Logger.getLogger(GossipCore.class);
private final GossipManager gossipManager;
/** * traceable request */
private ConcurrentHashMap<String, LatchAndBase> requests;
/** * Data for each node */
private final ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>> perNodeData;
/** * Share data set */
private final ConcurrentHashMap<String, SharedDataMessage> sharedData;
private final Meter messageSerdeException;
/** * Transfer exception gauge */
private final Meter transmissionException;
/** * Transfer success gauge */
private final Meter transmissionSuccess;
private final DataEventManager eventManager;
public GossipCore(GossipManager manager, MetricRegistry metrics){
this.gossipManager = manager;
requests = new ConcurrentHashMap<>();
perNodeData = new ConcurrentHashMap<>();
sharedData = new ConcurrentHashMap<>();
eventManager = newDataEventManager(metrics); metrics.register(PER_NODE_DATA_SIZE, (Gauge<Integer>)() -> perNodeData.size()); metrics.register(SHARED_DATA_SIZE, (Gauge<Integer>)() -> sharedData.size()); metrics.register(REQUEST_SIZE, (Gauge<Integer>)() -> requests.size()); messageSerdeException = metrics.meter(MESSAGE_SERDE_EXCEPTION); transmissionException = metrics.meter(MESSAGE_TRANSMISSION_EXCEPTION); transmissionSuccess = metrics.meter(MESSAGE_TRANSMISSION_SUCCESS); }... }Copy the code
//GossipCore
/** * accept message *@param base
*/
public void receive(Base base) {
if(! gossipManager.getMessageHandler().invoke(this, gossipManager, base)) {
LOGGER.warn("received message can not be handled"); }}Copy the code
//SharedDataMessageHandler
public class SharedDataMessageHandler implements MessageHandler{
/ * * *@param gossipCore context.
* @param gossipManager context.
* @param base message reference.
* @return boolean indicating success.
*/
@Override
public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
UdpSharedDataMessage message = (UdpSharedDataMessage) base;
// Add shared data
gossipCore.addSharedData(message);
return true; }}Copy the code
//GossipCore
/** * Add shared data *@param message
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public void addSharedData(SharedDataMessage message) {
while (true){
SharedDataMessage previous = sharedData.putIfAbsent(message.getKey(), message);
if (previous == null){
eventManager.notifySharedData(message.getKey(), message.getPayload(), null);
return;
}
if (message.getPayload() instanceof Crdt){
// Merge shared data
SharedDataMessage merged = new SharedDataMessage();
merged.setExpireAt(message.getExpireAt());
merged.setKey(message.getKey());
merged.setNodeId(message.getNodeId());
merged.setTimestamp(message.getTimestamp());
Crdt mergedCrdt = ((Crdt) previous.getPayload()).merge((Crdt) message.getPayload());
merged.setPayload(mergedCrdt);
boolean replaced = sharedData.replace(message.getKey(), previous, merged);
if (replaced){
if(! merged.getPayload().equals(previous.getPayload())) { eventManager .notifySharedData(message.getKey(), merged.getPayload(), previous.getPayload()); }return; }}else {
// Non-CRDT data, the latest data shall prevail
if (previous.getTimestamp() < message.getTimestamp()){
boolean result = sharedData.replace(message.getKey(), previous, message);
if (result){
eventManager.notifySharedData(message.getKey(), message.getPayload(), previous.getPayload());
return; }}else {
return; }}}}Copy the code
After receiving the message, the peer node invokes the message processor to process the message. For shared data, the data based on CRDT will be merged; otherwise, it will update and notify the event processor. The data for nodes is basically the same.
Let’s look at sending data
//GossipCore
/ * * *@param message
* @param uri
* @return* /
public Response send(Base message, URI uri){
if (LOGGER.isDebugEnabled()){
LOGGER.debug("Sending " + message);
LOGGER.debug("Current request queue " + requests);
}
final Trackable t;
LatchAndBase latchAndBase = null;
if (message instanceof Trackable){
t = (Trackable) message;
latchAndBase = new LatchAndBase();
// Put into the request set
requests.put(t.getUuid() + "/" + t.getUriFrom(), latchAndBase);
} else {
t = null;
}
sendInternal(message, uri);
if (latchAndBase == null) {return null;
}
try {
boolean complete = latchAndBase.latch.await(1, TimeUnit.SECONDS);
if (complete){
// Wait for the result
return (Response) latchAndBase.base;
} else{
return null; }}catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
if(latchAndBase ! =null) {// Remove hd balls
requests.remove(t.getUuid() + "/"+ t.getUriFrom()); }}}/** * Sends a message across the network while blocking. Catches and ignores IOException in transmission. Used * when The protocol for the message is not to wait for a response. Ignore and catch the IO exception in transit. Generally used for protocol messages * that do not require a reply@param message the message to send
* @param u the uri to send it to
*/
public void sendOneWay(Base message, URI u) {
try {
sendInternal(message, u);
} catch (RuntimeException ex) {
LOGGER.debug("Send one way failed", ex); }}/**
* Sends a blocking message.
* todo: move functionality to TransportManager layer.
* @param message
* @param uri
* @throws RuntimeException if data can not be serialized or in transmission error
*/
private void sendInternal(Base message, URI uri) {
byte[] json_bytes;
try {
json_bytes = gossipManager.getProtocolManager().write(message);
} catch (IOException e) {
messageSerdeException.mark();
throw new RuntimeException(e);
}
try {
gossipManager.getTransportManager().send(uri, json_bytes);
transmissionSuccess.mark();
} catch (IOException e) {
transmissionException.mark();
throw newRuntimeException(e); }}Copy the code
The core component actually delegates messages to the underlying UDP transport manager.
Gossip manager
public abstract class GossipManager {
public static final Logger LOGGER = Logger.getLogger(GossipManager.class);
// this mapper is used for ring and user-data persistence only. NOT messages.
public static final ObjectMapper metdataObjectMapper = new ObjectMapper() {
private static final long serialVersionUID = 1L;
{
enableDefaultTyping();
configure(JsonGenerator.Feature.WRITE_NUMBERS_AS_STRINGS, false);
}};
/** * Gossip member */
private final ConcurrentSkipListMap<LocalMember, GossipState> members;
/** * Current node information */
private final LocalMember me;
/** * Gossip protocol configuration */
private final GossipSettings settings;
/** * The running status of the gossip service */
private final AtomicBoolean gossipServiceRunning;
/** * Management of the gossip node */
private TransportManager transportManager;
/** * Protocol manager */
private ProtocolManager protocolManager;
private final GossipCore gossipCore;
private final DataReaper dataReaper;
/** ** clock */
private final Clock clock;
/**
* 调度器
*/
private final ScheduledExecutorService scheduledServiced;
private final MetricRegistry registry;
/** * The gossip member persistence */
private final RingStatePersister ringState;
/** * User data persistence */
private final UserDataPersister userDataState;
/** * Member state flusher */
private final GossipMemberStateRefresher memberStateRefresher;
/** * Message handler */
private final MessageHandler messageHandler;
private final LockManager lockManager;
/ * * *@param cluster
* @param uri
* @param id
* @paramProperties Member properties *@param settings
* @param gossipMembers
* @param listener
* @param registry
* @param messageHandler
*/
public GossipManager(String cluster, URI uri, String id, Map
properties, GossipSettings settings, List
gossipMembers, GossipListener listener, MetricRegistry registry, MessageHandler messageHandler)
,> {
this.settings = settings;
this.messageHandler = messageHandler;
// Create a system clock
clock = new SystemClock();
// Local gossip members
me = new LocalMember(cluster, uri, id, clock.nanoTime(), properties,
settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution());
gossipCore = new GossipCore(this, registry);
this.lockManager = new LockManager(this, settings.getLockManagerSettings(), registry);
dataReaper = new DataReaper(gossipCore, clock);
members = new ConcurrentSkipListMap<>();
for (Member startupMember : gossipMembers) {
if(! startupMember.equals(me)) { LocalMember member =new LocalMember(startupMember.getClusterName(),
startupMember.getUri(), startupMember.getId(),
clock.nanoTime(), startupMember.getProperties(), settings.getWindowSize(),
settings.getMinimumSamples(), settings.getDistribution());
//TODO should members start in down state?
members.put(member, GossipState.DOWN);
}
}
gossipServiceRunning = new AtomicBoolean(true);
this.scheduledServiced = Executors.newScheduledThreadPool(1);
this.registry = registry;
// Gossip member persistence
this.ringState = new RingStatePersister(GossipManager.buildRingStatePath(this), this);
// User data persistence
this.userDataState = new UserDataPersister(
gossipCore,
GossipManager.buildPerNodeDataPath(this),
GossipManager.buildSharedDataPath(this));
// Gossip member state flusher
this.memberStateRefresher = new GossipMemberStateRefresher(members, settings, listener, this::findPerNodeGossipData);
// Load the members of the node gossip
readSavedRingState();
// Load nodes and share datareadSavedDataState(); }... }Copy the code
Gossip manager, for cluster membership, message processing, global configuration, node state persistence, user data persistence, protocol manager, UDP transport manager integration. Let’s take a quick look at shared data persistence;
public class UserDataPersister implements Runnable {
private static final Logger LOGGER = Logger.getLogger(UserDataPersister.class);
private final GossipCore gossipCore;
/** * Node data file */
private final File perNodePath;
/** * Nodes share data files */
private final File sharedPath;
private final ObjectMapper objectMapper;
UserDataPersister(GossipCore gossipCore, File perNodePath, File sharedPath) {
this.gossipCore = gossipCore;
this.objectMapper = GossipManager.metdataObjectMapper;
this.perNodePath = perNodePath;
this.sharedPath = sharedPath;
}
/** * Load node data from disk *@return* /
@SuppressWarnings("unchecked")
ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>> readPerNodeFromDisk(){
if(! perNodePath.exists()) {return new ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>>();
}
try (FileInputStream fos = new FileInputStream(perNodePath)){
return objectMapper.readValue(fos, ConcurrentHashMap.class);
} catch (IOException e) {
LOGGER.debug(e);
}
return new ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>>();
}
/ * * * * /
void writePerNodeToDisk(a){
try (FileOutputStream fos = new FileOutputStream(perNodePath)){
objectMapper.writeValue(fos, gossipCore.getPerNodeData());
} catch(IOException e) { LOGGER.warn(e); }}void writeSharedToDisk(a){
try (FileOutputStream fos = new FileOutputStream(sharedPath)){
objectMapper.writeValue(fos, gossipCore.getSharedData());
} catch(IOException e) { LOGGER.warn(e); }}/** * Load shared data from disk *@return* /
@SuppressWarnings("unchecked")
ConcurrentHashMap<String, SharedDataMessage> readSharedDataFromDisk(a){
if(! sharedPath.exists()) {return new ConcurrentHashMap<>();
}
try (FileInputStream fos = new FileInputStream(sharedPath)){
return objectMapper.readValue(fos, ConcurrentHashMap.class);
} catch (IOException e) {
LOGGER.debug(e);
}
return new ConcurrentHashMap<String, SharedDataMessage>();
}
/** * Writes all pernode and shared data to disk */
@Override
public void run(a) { writePerNodeToDisk(); writeSharedToDisk(); }}Copy the code
User data persistence UserDataPersister, a disk that persists user data in the event of a node failure, is loaded from the disk at startup. Node state persistence works similarly.
conclusion
Gossip is based on UDP protocol for communication, the bottom layer is through the UDP manager for message sending and receiving; Message codecs use JSON serialization. After a node receives a message, a decoder decodes the message and entrusts a message processor to process the message. The message processor has node data, shared data, and downtime messages, etc. The status of nodes in the cluster is synchronized through the node status flusher. Synchronization of active nodes, down nodes, node data, and shared data is accomplished by data synchronizer. Core components manage nodes and shared data, and send and receive messages. The Gossip manager persists node state, data, and shared data.
UDP transport manager UdpTransportManager, which sends and receives byte data and delegates it to core components. When the UDP Transport Manager is started, the node state flusher is also started.
Message codec JacksonProtocolManager is a MESSAGE codec based on JSON. If RSA is used, it sends and receives messages, and decrypts and encrypts responses.
Shared data and node data processing are actually delegated to core components, and downtime messages are converted into node messages for processing.
Node status refresh GossipMemberStateRefresher, through FailureDetector downtime detector, the survival status of detection, and updates the node status, in view of the state changes, the listener to be notified of the response.
Data synchronizer to synchronize active and down members, node data and shared data. Send active node message, actually randomly from the active node information, the active node information of the current node packet keep-alive message sent to the selected node. The principle of sending down nodes is the same. Send messages to delegate to the core component. To send shared data, the core component guarantees the shared data as UdpSharedDataMessage and sends it to the peer node. Node data works the same way, except that it is node data that is sent.
After receiving the message, the peer node invokes the message processor to process the message. For shared data, the data based on CRDT will be merged; otherwise, it will update and notify the event processor. The data for nodes is basically the same. The core component actually delegates messages to the underlying UDP transport manager.
Gossip manager, for cluster membership, message processing, global configuration, node state persistence, node data persistence, shared data persistence, protocol manager, UDP transport manager integration.
The attached
Gossip
A brief introduction to Redis and The Gossip protocol
incubator-retired-gossip
apache gossip
Distributed Consistency algorithm-CRDT Distributed Consistency algorithm-CRDT Two Distributed CRDT model How to choose a geographically distributed active-active strategy: Comparison between merge replication and CRDT When to use a CRDT-BASED database and a TLA+ based CRDT protocol validation framework
A single instance is 12 times larger than Redis. What kind of hack technology does Anna use?
Kademlia
P2P network core technologies: Kademlia Kademlia OpenKAD Github OpenKAD Google