0 x00 the

We introduced the connection management and topological awareness part of NetFlix Dynomite client DynoJedisClient above, and this article will continue to analyze automatic discovery and failover.

0x02 Requirements & Ideas

Again, we’re going to review the basic idea and the legend.

Because the information needs to be shielded for the upper layer, DynoJedisClient needs to deal with various complex information and have a deep understanding of the system, such as:

  • How to maintain connections and provide connection pools for persistent connections;
  • How to maintain topology;
  • How to load balance;
  • How to failover;
  • How to automatically retry and discover, for example, automatically retry a failed host. Automatically discovers other hosts in the cluster.
  • How to monitor the underlying rack status;

Therefore, the idea behind DynoJedisClient is that Java drivers provide multiple policy interfaces that can be used to tune driver behavior. This includes load balancing, retry requests, management node connections, and more.

The current legend is as follows:

0x3 Automatic Discovery

Automatic discovery is made in the ConnectionPoolImpl start method, which starts the thread, periodically refreshes the host state, and updates it.

2.1 the thread

The thread refresh logic is as follows:

  • Call hostsUpdater to get the latest status;
  • Call updateHosts to update the ConnectionPoolImpl internal member variable based on these states;
    @Override
    public Future<Boolean> start(a) throws DynoException {

        HostSupplier hostSupplier = cpConfiguration.getHostSupplier();

        HostStatusTracker hostStatus = hostsUpdater.refreshHosts();

        Collection<Host> hostsUp = hostStatus.getActiveHosts();

        final ExecutorService threadPool = Executors.newFixedThreadPool(Math.max(10, hostsUp.size()));
        final List<Future<Void>> futures = new ArrayList<Future<Void>>();

        // Initialize to add host
        for (final Host host : hostsUp) {

            // Add host connection pool, but don't init the load balancer yet
            futures.add(threadPool.submit(new Callable<Void>() {
                @Override
                public Void call(a) throws Exception {
                    addHost(host, false);
                    return null; }})); }boolean success = started.compareAndSet(false.true);
        if (success) {
            idling.set(false);
            idleThreadPool.shutdownNow();
            selectionStrategy = initSelectionStrategy();
            cpHealthTracker.start();

            // Start the timer thread
            connPoolThreadPool.scheduleWithFixedDelay(new Runnable() {

                @Override
                public void run(a) {
                    try {
                        // Call hostsUpdater to get the latest status
                        HostStatusTracker hostStatus = hostsUpdater.refreshHosts();
                        
                        cpMonitor.setHostCount(hostStatus.getHostCount());
                        
                        // Call updateHosts to update the ConnectionPoolImpl internal member variable based on these statesupdateHosts(hostStatus.getActiveHosts(), hostStatus.getInactiveHosts()); }}},15 * 1000.30 * 1000, TimeUnit.MILLISECONDS);

        }
        return getEmptyFutureTask(true);
    }
Copy the code

2.2 the update operation

In the above code, updateHosts completes the host update operation, namely, add and delete.

@Override
public Future<Boolean> updateHosts(Collection<Host> hostsUp, Collection<Host> hostsDown) {
        boolean condition = false;
        if(hostsUp ! =null && !hostsUp.isEmpty()) {
            for(Host hostUp : hostsUp) { condition |= addHost(hostUp); }}if(hostsDown ! =null && !hostsDown.isEmpty()) {
            for(Host hostDown : hostsDown) { condition |= removeHost(hostDown); }}return getEmptyFutureTask(condition);
}
Copy the code

The updateHosts method calls addHost, and as you can see, it does something about selectionStrategy, cpMonitor, cpHealthTracker, and cpMap, because all of those things need to be handled for host changes.

public boolean addHost(Host host, boolean refreshLoadBalancer) {
    
        HostConnectionPool<CL> connPool = cpMap.get(host);
    
        final HostConnectionPool<CL> hostPool = hostConnPoolFactory.createHostConnectionPool(host, this);

        HostConnectionPool<CL> prevPool = cpMap.putIfAbsent(host, hostPool);
    
        if (prevPool == null) {
            // This is the first time we are adding this pool.
            try {
                int primed = hostPool.primeConnections();

                if (hostPool.isActive()) {
                    if (refreshLoadBalancer) {
                        selectionStrategy.addHost(host, hostPool);
                    }
                    cpHealthTracker.initializePingHealthchecksForPool(hostPool);
                    cpMonitor.hostAdded(host, hostPool);
                } else {
                    cpMap.remove(host);
                }
                return primed > 0; }}}Copy the code

The logic is as follows:

+--------------------------------------------------------------------------+
|                                                                          |
|   ConnectionPoolImpl                                                     |
|                                                                          |
|                                                      Timer               |
|                                           +----------------------+       |
|                           refreshHosts    |                      |       |
|           hostsUpdater +----------------> |  connPoolThreadPool  |       |
|                                           |                      |       |
|                                           +------------+---------+       |
|                                                        |                 |
|                                                        |                 |
|                                         updateHosts    |                 |
|                                                        |                 |
|                      +----------------+-----------------------------+    |
|                      |                |                |            |    |
|                      |                |                |            |    |
|                      v                v                v            v    |
|             cpHealthTracker      selectionStrategy   cpMonitor    cpMap  |
|                                                                          |
|                                                                          |
|                                                                          |
+--------------------------------------------------------------------------+

Copy the code

2.3 find

Discovery is performed using HostsUpdater.

2.3.1 HostsUpdater

The main purpose of this class is to call HostSupplier to refresh:

  • If the state of hostFromHostSupplier is up and down, different processing will be done. For example, use the attribute of hostFromHostSupplier to set upHostBuilder. Such as IP, hostname, status, port, DatastorePort, rack, datacenter, Hashtag, password…
  • Call HostStatusTracker for logging.
public class HostsUpdater {
    
    private final HostSupplier hostSupplier;
    private final TokenMapSupplier tokenMapSupplier;
    private final AtomicReference<HostStatusTracker> hostTracker = new AtomicReference<HostStatusTracker>(null);

    public HostStatusTracker refreshHosts(a) {

        List<Host> allHostsFromHostSupplier = hostSupplier.getHosts();

        /** * HostTracker should return the hosts that we get from TokenMapSupplier. * Hence get the hosts from HostSupplier and  map them to TokenMapSupplier * and return them. */
        Set<Host> hostSet = new HashSet<>(allHostsFromHostSupplier);
        // Create a list of host/Tokens
        List<HostToken> hostTokens = tokenMapSupplier.getTokens(hostSet);

        // The key here really needs to be a object that is overlapping between
        // the host from HostSupplier and TokenMapSupplier. Since that is a
        // subset of the Host object itself, Host is the key as well as value here.
        Map<Host, Host> allHostSetFromTokenMapSupplier = new HashMap<>();
        for (HostToken ht : hostTokens) {
            allHostSetFromTokenMapSupplier.put(ht.getHost(), ht.getHost());
        }

        for (Host hostFromHostSupplier : allHostsFromHostSupplier) {
            if (hostFromHostSupplier.isUp()) {
                // Set the up state
                Host hostFromTokenMapSupplier = allHostSetFromTokenMapSupplier.get(hostFromHostSupplier);

                // Use the hostFromHostSupplier attribute to set upHostBuilder. Such as IP, hostname, status, port, DatastorePort, rack, datacenter, Hashtag, password...
                HostBuilder upHostBuilder = newHostBuilder() .setHostname()...... ; hostsUpFromHostSupplier.add(upHostBuilder.createHost()); allHostSetFromTokenMapSupplier.remove(hostFromTokenMapSupplier); }else {
                // Set the state to Down
                Host hostFromTokenMapSupplier = allHostSetFromTokenMapSupplier.get(hostFromHostSupplier);

                // downHostBuilder, such as IP, hostname, status, port, DatastorePort, rack, datacenter, hashtag, password...
                HostBuilder downHostBuilder = newHostBuilder() .setHostname()...... ; hostsDownFromHostSupplier.add(downHostBuilder.createHost()); allHostSetFromTokenMapSupplier.remove(hostFromTokenMapSupplier); } } HostStatusTracker newTracker = hostTracker.get().computeNewHostStatus(hostsUpFromHostSupplier, hostsDownFromHostSupplier); hostTracker.set(newTracker);returnhostTracker.get(); }}Copy the code

2.3.2 HostStatusTracker

This class acts as a record from which other modules extract information.

public class HostStatusTracker {
    // the set of active and inactive hosts
    private final Set<Host> activeHosts = new HashSet<Host>();
    private final Set<Host> inactiveHosts = new HashSet<Host>();
}
Copy the code

2.3.3 HostSupplier

HostSupplier is a specific refresh and has many implementations. Dynomite’s specific business work is already dependent on other specific function classes.

Taking EurekaHostsSupplier as an example, we call EurekaClient discoveryClient to obtain information.

public class EurekaHostsSupplier implements HostSupplier {

    private final EurekaClient discoveryClient;

    @Override
    public List<Host> getHosts(a) {
        return getUpdateFromEureka();
    }

    private List<Host> getUpdateFromEureka(a) {
        Application app = discoveryClient.getApplication(applicationName);
        List<Host> hosts = new ArrayList<Host>();
        List<InstanceInfo> ins = app.getInstances();
      
        hosts = Lists.newArrayList(Collections2.transform(ins, info -> {
            Host.Status status = info.getStatus() == InstanceStatus.UP ? Host.Status.Up : Host.Status.Down;

            String rack = null;
            try {
                if (info.getDataCenterInfo() instanceof AmazonInfo) {
                    AmazonInfo amazonInfo = (AmazonInfo) info.getDataCenterInfo();
                    rack = amazonInfo.get(MetaDataKey.availabilityZone);
                }
            } 

            Host host = newHostBuilder().setHostname(info.getHostName()).setIpAddress(info.getIPAddr()).setRack(rack).setStatus(status).createHost( );return host;
        }));

        returnhosts; }}Copy the code

Therefore, the logic expands as follows:

+--------------------------------------------------------------------------------------+
|   ConnectionPoolImpl                                                                 |
|                                                                                      |
|                                                                                      |
|  +-----------------------------------+                                               |
|  | hostsUpdater                      |                                               |
|  |                                   |                                               |
|  |                                   |                                               |
|  |                                   |                          Timer                |
|  |                                   |                  +----------------------+     |
|  |                                   |  refreshHosts    |                      |     |
|  |         HostStatusTracker -------------------------> |  connPoolThreadPool  |     |
|  |                       ^           |                  |                      |     |
|  |                       |           |                  +------------+---------+     |
|  |  getUpdateFromEureka  |           |                               |               |
|  |                       |           |                               |               |
|  |                       |           |                updateHosts    |               |
|  |       +----------------------+    |                               |               |
|  |       | HostSupplier  |      |    |              +-----------------------------+  |
|  |       |               |      |    |              |                |            |  |
|  |       |               +      |    |              |                |            |  |
|  |       |      EurekaClient    |    |              v                v            v  |
|  |       |                      |    |         selectionStrategy   cpMonitor   cpMap |
|  |       +----------------------+    |                                               |
|  +-----------------------------------+                                               |
+--------------------------------------------------------------------------------------+

Copy the code

0x04 Error Handling & Load Balancing

Since we are running a cluster and not an instance, we will take some precautions during failover.

In Dynomite, there are three main types of errors:

  • Invalid requests: Errors go straight back to the application because the driver has no way of knowing how to handle such requests;
  • Server error: The driver can try the next node according to the load balancing policy;
  • Network timeout: If the request is marked idempotent, the driver can retry the request. By default, requests are not considered idempotent, so it is a good practice to mark requests as much as possible. For idempotent requests, if there is no response from the first node within a certain delay, the driver can send the request to the second node. Try again this is called “speculation”, using SpeculativeExecutionPolicy configuration.

According to the error level, there are two error handling options: retry and fallback. We will introduce them according to the error level below.

4.1 Retry Policy

When a node fails or becomes inaccessible, the driver automatically and transparently tries other nodes and arranges to reconnect to dead nodes in the background.

But because temporary changes in network conditions can also make nodes appear offline, the driver also provides a retry strategy to retry queries that fail due to network-related errors. This eliminates the need to write retry logic in the client code.

The Retry policy determines the default behavior to adopt when a request times out or a node becomes unavailable.

4.1.1 Policy Classification

The Java driver provides several RetryPolicy implementations:

  • RetryNTimes: Guarantees that an operation can be retried at most N times, RetryNTimes (2) means that at most 2 + 1 = 3 retries are performed before abandoning;
  • RunOnce: retries are never recommended, but exceptions are always rethrown.

4.1.2 Using Policies

When executing the command, we can see that the driver transparently tries other nodes and schedules to reconnect the dead node in the background:

  • Get retry policy;
  • To operate in a loop:
    • Perform operations;
    • If successful, execute the success method of the operation policy and jump out of the loop;
    • If it fails, the failure method of the action policy is executed;
    • If retries are allowed, the loop continues;

The abbreviated code is as follows:

@Override
public <R> OperationResult<R> executeWithFailover(Operation<CL, R> op) throws DynoException {
        RetryPolicy retry = cpConfiguration.getRetryPolicyFactory().getRetryPolicy();
        retry.begin();

        DynoException lastException = null;
        do {
            Connection<CL> connection = null;
            try {
                connection = selectionStrategy.getConnectionUsingRetryPolicy(op,
                        cpConfiguration.getMaxTimeoutWhenExhausted(), TimeUnit.MILLISECONDS, retry);
                OperationResult<R> result = connection.execute(op); // Perform the operation
                retry.success();  // If successful, execute the success method of the operation policy and break the loop
                return result;        
            } catch (DynoException e) {
                retry.failure(e); // If it fails, execute the failure method of the action policy
                lastException = e;
                if(connection ! =null) {
                    cpMonitor.incOperationFailure(connection.getHost(), e);
                    if (retry.allowRetry()) { // Invoke the retry implementation policycpMonitor.incFailover(connection.getHost(), e); }}}}while (retry.allowRetry()); // If retries are allowed, the loop continues
}

Copy the code

Let’s take RetryNTimes as an example.

4.1.3 RetryNTimes

Sucess, failure sets internal variables that determine whether retries are allowed.

public class RetryNTimes implements RetryPolicy {

    private int n;
    private final AtomicReference<RetryState> state = new AtomicReference<>(new RetryState(0.false));

    public RetryNTimes(int n, boolean allowFallback) {
        this.n = n;
        this.allowCrossZoneFallback = allowFallback;
    }

    @Override
    public void success(a) {
        boolean success = false;
        RetryState rs;
        while(! success) { rs = state.get();// Set internal variables
            success = state.compareAndSet(rs, new RetryState(rs.count + 1.true)); }}@Override
    public void failure(Exception e) {
        boolean success = false;
        RetryState rs;
        while(! success) { rs = state.get();// Set internal variables
            success = state.compareAndSet(rs, new RetryState(rs.count + 1.false)); }}@Override
    public boolean allowRetry(a) {
        final RetryState rs = state.get();
        return! rs.success && rs.count <= n; }private class RetryState {
        private final int count;
        private final boolean success;

        public RetryState(final int count, final boolean success) {
            this.count = count;
            this.success = success; }}}Copy the code

4.2 Policy Selection

Since retries sometimes don’t solve problems, let’s talk about fallback selection strategies for more serious problems.

The driver can query any node in the cluster and then call it the coordinating node for that query. Depending on the content of the query, the coordinator can communicate with other nodes to satisfy the query. If a client boots all of its queries on the same node, it can create an unbalanced load on the cluster, especially if other clients perform the same operation.

To prevent a single node from acting as a coordinating node for too many requests, the DynoJedisClient driver provides a pluggable mechanism to balance the query load between multiple nodes. The implementation of HostSelectionStrategy interface is selected to achieve load balancing.

Each HostSelectionStrategy classifies each node in the cluster as local, remote, or ignored. The driver prefers the interaction with the local node and maintains more connections to the local node with the remote node.

HostSelectionStrategy is set on the cluster at build time. The driver provides two basic load balancing implementations: RoundRobin Policy and TokenAwareSelection.

  • RoundRobinPolicy: Allocates requests across nodes in the cluster in a repeating pattern to spread out the processing load, balancing the load among all nodes.
  • TokenAwareSelection: token-aware, which uses token values to select nodes as copies of the required data to make requests, minimizing the number of nodes that must be queried. This is done by wrapping the selected policy using TokenAwarePolicy.

2 the coordinator

HostSelectionWithFallback is to choose the coordinator.

  • Coordinate among many hostselectionStrategies and map requirements to specific rack;
  • HostSelectionWithFallback are not responsible for the concrete implementation (um participant Round Robin or Token Aware), but the connection was obtained from the specific implementation strategy;
  • HostSelectionWithFallback depends on two strategies:
    • The local” local” HostSelectionStrategy is used preferentially;
    • Failure, if the local connection pool or local hosts will HostSelectionWithFallback falls back to the distal strategy remote HostSelectionStrategy;
    • When the local rack failure, in order to achieve uniform distribution load, HostSelectionWithFallback using pure round robin to select remote HostSelectionStrategy;
  • HostSelectionWithFallback prefer not a distal strategy;

HostSelectionWithFallback concrete member variables as follows:

  • Local Host information, such as localDataCenter, localRack, and local selection policy localSelector;
  • Remote Host information, such as remoteRackNames of remote zones and remoteRackSelectors.
  • Information related to tokens, such as hostTokens, tokenSupplier, topology information topology;
  • Configuration information cpConfig;
  • Monitoring information;

The concrete class is defined as follows:

public class HostSelectionWithFallback<CL> {
    // Only used in calculating replication factor
    private final String localDataCenter;
    // tracks the local zone
    private final String localRack;
    // The selector for the local zone
    private final HostSelectionStrategy<CL> localSelector;
    // Track selectors for each remote zone
    private final ConcurrentHashMap<String, HostSelectionStrategy<CL>> remoteRackSelectors = new ConcurrentHashMap<>();

    private final ConcurrentHashMap<Host, HostToken> hostTokens = new ConcurrentHashMap<>();

    private final TokenMapSupplier tokenSupplier;
    private final ConnectionPoolConfiguration cpConfig;
    private final ConnectionPoolMonitor cpMonitor;

    private final AtomicInteger replicationFactor = new AtomicInteger(-1);

    // Represents the *initial* topology from the token supplier. This does not affect selection of a host connection
    // pool for traffic. It only affects metrics such as failover/fallback
    private final AtomicReference<TokenPoolTopology> topology = new AtomicReference<>(null);

    // list of names of remote zones. Used for RoundRobin over remote zones when local zone host is down
    private final CircularList<String> remoteRackNames = new CircularList<>(new ArrayList<>());

    private final HostSelectionStrategyFactory<CL> selectorFactory;
}

Copy the code

4.2.2 strategy

HostSelectionStrategy is a Host selection strategy, which is implemented in RoundRobinSelection and TokenAwareSelection.

Load balancing is responsible for establishing connections to the entire cluster (not just on one node) and maintaining connection pools with each host in the cluster. Load balancing also determines whether the host is local or remote.

It has the logic to send certain requests to certain nodes. The load balancing policy determines which hosts to connect to and which hosts to send requests to.

In effect, a query plan is calculated for each request. The query plan determines which host to send requests to and in which order (depending on the speculative execution policy and retry policy).

4.2.2.1 RoundRobinSelection

This policy literally provides RR load balancing over ring data structures in a thread-safe manner using the ROUND ROBIN policy.

  • Supports dynamic Host addition and deletion, that is, when the external world perceives topology changes, the interface will be called to update.
  • To return the join is simply to extract it from the list.
  • Provides different functions to return various forms of joins, such as
    • GetPoolForToken: Returns based on the token.
    • GetOrderedHostPools: Returns the sorted list;
    • GetNextConnectionPool: Returns according to a circularList;
    • GetPoolsForTokens: Returns according to the given region;
public class RoundRobinSelection<CL> implements HostSelectionStrategy<CL> {

    // The total set of host pools. Once the host is selected, we ask it's corresponding pool to vend a connection
    private final ConcurrentHashMap<Long, HostConnectionPool<CL>> tokenPools = new ConcurrentHashMap<Long, HostConnectionPool<CL>>();

    // the circular list of Host over which we load balance in a round robin fashion
    private final CircularList<HostToken> circularList = new CircularList<HostToken>(null);

    @Override
    public HostConnectionPool<CL> getPoolForOperation(BaseOperation
       
         op, String hashtag)
       ,> throws NoAvailableHostsException {

        int numTries = circularList.getSize();
        HostConnectionPool<CL> lastPool = null;

        while (numTries > 0) {
            lastPool = getNextConnectionPool();
            numTries--;
            if (lastPool.isActive() && lastPool.getHost().isUp()) {
                returnlastPool; }}// If we reach here then we haven't found an active pool. Return the last inactive pool anyways,
        // and HostSelectionWithFallback can choose a fallback pool from another dc
        return lastPool;
    }

    @Override
    public void initWithHosts(Map<HostToken, HostConnectionPool<CL>> hPools) {

        for (HostToken token : hPools.keySet()) {
            tokenPools.put(token.getToken(), hPools.get(token));
        }
        circularList.swapWithList(hPools.keySet());
    }

    @Override
    public boolean addHostPool(HostToken host, HostConnectionPool<CL> hostPool) {

        HostConnectionPool<CL> prevPool = tokenPools.put(host.getToken(), hostPool);
        if (prevPool == null) {
            List<HostToken> newHostList = new ArrayList<HostToken>(circularList.getEntireList());
            newHostList.add(host);
            circularList.swapWithList(newHostList);
        }
        return prevPool == null; }}Copy the code

RR policies are roughly as follows, which can be understood as the sequential selection of the next policy from a circularList:

+--------------------------+ |HostSelectionWithFallback | +-------------+------------+ | +--------------+--------------+  | | v v +---------+------------+ +-----------+--------+ | RoundRobinSelection | | TokenAwareSelection| | | +--------------------+ | | | circularList | | + | | | | +----------------------+ | | v +--> Pool1, Pool2, Pool3,... , Pooln +----+ | | | | +----------------------------------------+Copy the code
4.2.2.2 TokenAwareSelection

TokenAwareSelection is processed using the TOKEN AWARE algorithm.

TOKEN_AWARE is the request to the same client based on the primary key token, that is, the request to the same record is sent to the same node based on the token.

So the module needs to know the Dynomite Ring Topology so that it can map it to the correct Token owner node based on the key of the operation.

TokenAwareSelection

This strategy uses a binary lookup to get the token based on the key and then locate the token to the Dynomite Topology ring.

Provides different functions to return various forms of joins, such as

  • GetPoolForToken: Returns based on the token.
  • GetOrderedHostPools: Returns the sorted list;
  • GetNextConnectionPool: Returns according to a circularList;
  • GetPoolsForTokens: Returns according to the given region;
public class TokenAwareSelection<CL> implements HostSelectionStrategy<CL> {

    private final BinarySearchTokenMapper tokenMapper;

    private final ConcurrentHashMap<Long, HostConnectionPool<CL>> tokenPools = new ConcurrentHashMap<Long, HostConnectionPool<CL>>();

    public TokenAwareSelection(HashPartitioner hashPartitioner) {
        this.tokenMapper = new BinarySearchTokenMapper(hashPartitioner);
    }

    /** * Identifying the proper pool for the operation. A couple of things that may affect the decision * (a) hashtags: In this case we will construct the key by decomposing from the hashtag * (b) type of key: string keys vs binary keys. * In binary keys hashtags do not really matter. */
    @Override
    public HostConnectionPool<CL> getPoolForOperation(BaseOperation
       
         op, String hashtag)
       ,> throws NoAvailableHostsException {

        String key = op.getStringKey();
        HostConnectionPool<CL> hostPool;
        HostToken hToken;

        if(key ! =null) {
            // If a hashtag is provided by Dynomite then we use that to create the key to hash.
            if (hashtag == null || hashtag.isEmpty()) {
                hToken = this.getTokenForKey(key);
            } else {
                String hashValue = StringUtils.substringBetween(key, Character.toString(hashtag.charAt(0)), Character.toString(hashtag.charAt(1)));
                hToken = this.getTokenForKey(hashValue);
            }
            hostPool = tokenPools.get(hToken.getToken());
        } else {
            // the key is binary
            byte[] binaryKey = op.getBinaryKey();
            hToken = this.getTokenForKey(binaryKey);
            hostPool = tokenPools.get(hToken.getToken());
        }
        return hostPool;
    }

    @Override
    public boolean addHostPool(HostToken hostToken, HostConnectionPool<CL> hostPool) {

        HostConnectionPool<CL> prevPool = tokenPools.put(hostToken.getToken(), hostPool);
        if (prevPool == null) {
            tokenMapper.addHostToken(hostToken);
            return true;
        } else {
            return false; }}}Copy the code
BinarySearchTokenMapper

The code above uses the BinarySearchTokenMapper, so let’s take a look.

In fact, this class is the corresponding relationship between key and token, and dichotomy is used when searching.

public class BinarySearchTokenMapper implements HashPartitioner {

    private final HashPartitioner partitioner;

    private final AtomicReference<DynoBinarySearch<Long>> binarySearch = new AtomicReference<DynoBinarySearch<Long>>(null);
    
    private final ConcurrentHashMap<Long, HostToken> tokenMap = new ConcurrentHashMap<Long, HostToken>();

    @Override
    public HostToken getToken(Long keyHash) {
        Long token = binarySearch.get().getTokenOwner(keyHash);
        return tokenMap.get(token);
    }

    public void initSearchMechanism(Collection<HostToken> hostTokens) {
        for (HostToken hostToken : hostTokens) {
            tokenMap.put(hostToken.getToken(), hostToken);
        }
        initBinarySearch();
    }

    public void addHostToken(HostToken hostToken) {
        HostToken prevToken = tokenMap.putIfAbsent(hostToken.getToken(), hostToken);
        if (prevToken == null) { initBinarySearch(); }}private void initBinarySearch(a) {
        List<Long> tokens = new ArrayList<Long>(tokenMap.keySet());
        Collections.sort(tokens);
        binarySearch.set(newDynoBinarySearch<Long>(tokens)); }}Copy the code

The Token Aware strategy is as follows: create a map and select a Pool based on the Token’s key:

+--------------------------+ |HostSelectionWithFallback | +-------------+------------+ | +--------------+--------------+  | | v v +---------+------------+ +-----------+------------+ | RoundRobinSelection | | TokenAwareSelection | | | | | | |  | | | circularList | | ConcurrentHashMap | | + | | + | | | | | | | +----------------------+ +------------------------+ | | | | v | +-------------------+ | | [token1 : Pool 1]| +--> Pool1, Pool2, Pool3,... , Pooln +----+ | | | | | +---> | [token2 : Pool 2] | | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + |...... | | | | [token3 : Pool 3] | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- +Copy the code

0 x05 compression

Finally, we look at the implementation of compression.

Enabling compression reduces the network bandwidth consumed by the driver at the cost of increased CPU usage on the client and server.

5.1 Compression Mode

In the driver, there are two ways of compression, namely simple uncompression and limit compression Threshold.

enum CompressionStrategy {
        /** * Disables compression */
        NONE,

        /**
         * Compresses values that exceed {@link #getValueCompressionThreshold()}
         */
        THRESHOLD
}

Copy the code

5.2 Compression Implementation

From the code, use

import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

Copy the code

Specific operations are as follows:

private abstract class CompressionValueOperation<T> extends BaseKeyOperation<T>
            implements CompressionOperation<Jedis.T> {

        @Override
        public String compressValue(String value, ConnectionContext ctx) {
            String result = value;
            int thresholdBytes = connPool.getConfiguration().getValueCompressionThreshold();

            try {
                // prefer speed over accuracy here so rather than using
                // getBytes() to get the actual size
                // just estimate using 2 bytes per character
                if ((2 * value.length()) > thresholdBytes) {
                    result = ZipUtils.compressStringToBase64String(value);
                    ctx.setMetadata("compression".true); }}return result;
        }

        @Override
        public String decompressValue(String value, ConnectionContext ctx) {
            try {
                if (ZipUtils.isCompressed(value)) {
                    ctx.setMetadata("decompression".true);
                    returnZipUtils.decompressFromBase64String(value); }}returnvalue; }}Copy the code

5.3 the use of

In operation, for example, when need to compression, generates CompressionValueOperation.

public OperationResult<Map<String, String>> d_hgetAll(final String key) {
        if (CompressionStrategy.NONE == connPool.getConfiguration().getCompressionStrategy()) {
            return connPool.executeWithFailover(new BaseKeyOperation<Map<String, String>>(key, OpName.HGETALL) {
                @Override
                public Map<String, String> execute(Jedis client, ConnectionContext state) throws DynoException {
                    returnclient.hgetAll(key); }}); }else {
            return connPool
                    .executeWithFailover(new CompressionValueOperation<Map<String, String>>(key, OpName.HGETALL) {
                        @Override
                        public Map<String, String> execute(final Jedis client, final ConnectionContext state) {
                            return CollectionUtils.transform(client.hgetAll(key),
                                    new CollectionUtils.MapEntryTransform<String, String, String>() {
                                        @Override
                                        public String get(String key, String val) {
                                            returndecompressValue(val, state); }}); }}); }}Copy the code

0 x06 summary

At this point, the preliminary analysis of DynoJedisClient is completed, and we have seen how DynoJedisClient deals with various complex information, such as:

  • How to maintain connections and provide connection pools for persistent connections;
  • How to maintain topology;
  • How to load balance;
  • How to failover;
  • How to automatically retry and discover, for example, automatically retry a failed host. Automatically discovers other hosts in the cluster.
  • How to monitor the underlying rack status;

We will next introduce dyno-queues, a distributed delay queue based on DynoJedisClient, to see how it is implemented.

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

If you want to get a timely news feed of personal articles, or want to see the technical information of personal recommendations, please pay attention.

0 XFF reference

Cassandra Series (2) : System flow

How does Cassandra JAVA client achieve high performance and high concurrency

The Token of Cassandra

www.ningoo.net/html/2010/c…

Cassandra Definitive Guide to Reading Notes – Client

Data consistency issues with Cassandra cluster