0 x00 the
We introduced the connection management and topological awareness part of NetFlix Dynomite client DynoJedisClient above, and this article will continue to analyze automatic discovery and failover.
0x02 Requirements & Ideas
Again, we’re going to review the basic idea and the legend.
Because the information needs to be shielded for the upper layer, DynoJedisClient needs to deal with various complex information and have a deep understanding of the system, such as:
- How to maintain connections and provide connection pools for persistent connections;
- How to maintain topology;
- How to load balance;
- How to failover;
- How to automatically retry and discover, for example, automatically retry a failed host. Automatically discovers other hosts in the cluster.
- How to monitor the underlying rack status;
Therefore, the idea behind DynoJedisClient is that Java drivers provide multiple policy interfaces that can be used to tune driver behavior. This includes load balancing, retry requests, management node connections, and more.
The current legend is as follows:
0x3 Automatic Discovery
Automatic discovery is made in the ConnectionPoolImpl start method, which starts the thread, periodically refreshes the host state, and updates it.
2.1 the thread
The thread refresh logic is as follows:
- Call hostsUpdater to get the latest status;
- Call updateHosts to update the ConnectionPoolImpl internal member variable based on these states;
@Override
public Future<Boolean> start(a) throws DynoException {
HostSupplier hostSupplier = cpConfiguration.getHostSupplier();
HostStatusTracker hostStatus = hostsUpdater.refreshHosts();
Collection<Host> hostsUp = hostStatus.getActiveHosts();
final ExecutorService threadPool = Executors.newFixedThreadPool(Math.max(10, hostsUp.size()));
final List<Future<Void>> futures = new ArrayList<Future<Void>>();
// Initialize to add host
for (final Host host : hostsUp) {
// Add host connection pool, but don't init the load balancer yet
futures.add(threadPool.submit(new Callable<Void>() {
@Override
public Void call(a) throws Exception {
addHost(host, false);
return null; }})); }boolean success = started.compareAndSet(false.true);
if (success) {
idling.set(false);
idleThreadPool.shutdownNow();
selectionStrategy = initSelectionStrategy();
cpHealthTracker.start();
// Start the timer thread
connPoolThreadPool.scheduleWithFixedDelay(new Runnable() {
@Override
public void run(a) {
try {
// Call hostsUpdater to get the latest status
HostStatusTracker hostStatus = hostsUpdater.refreshHosts();
cpMonitor.setHostCount(hostStatus.getHostCount());
// Call updateHosts to update the ConnectionPoolImpl internal member variable based on these statesupdateHosts(hostStatus.getActiveHosts(), hostStatus.getInactiveHosts()); }}},15 * 1000.30 * 1000, TimeUnit.MILLISECONDS);
}
return getEmptyFutureTask(true);
}
Copy the code
2.2 the update operation
In the above code, updateHosts completes the host update operation, namely, add and delete.
@Override
public Future<Boolean> updateHosts(Collection<Host> hostsUp, Collection<Host> hostsDown) {
boolean condition = false;
if(hostsUp ! =null && !hostsUp.isEmpty()) {
for(Host hostUp : hostsUp) { condition |= addHost(hostUp); }}if(hostsDown ! =null && !hostsDown.isEmpty()) {
for(Host hostDown : hostsDown) { condition |= removeHost(hostDown); }}return getEmptyFutureTask(condition);
}
Copy the code
The updateHosts method calls addHost, and as you can see, it does something about selectionStrategy, cpMonitor, cpHealthTracker, and cpMap, because all of those things need to be handled for host changes.
public boolean addHost(Host host, boolean refreshLoadBalancer) {
HostConnectionPool<CL> connPool = cpMap.get(host);
final HostConnectionPool<CL> hostPool = hostConnPoolFactory.createHostConnectionPool(host, this);
HostConnectionPool<CL> prevPool = cpMap.putIfAbsent(host, hostPool);
if (prevPool == null) {
// This is the first time we are adding this pool.
try {
int primed = hostPool.primeConnections();
if (hostPool.isActive()) {
if (refreshLoadBalancer) {
selectionStrategy.addHost(host, hostPool);
}
cpHealthTracker.initializePingHealthchecksForPool(hostPool);
cpMonitor.hostAdded(host, hostPool);
} else {
cpMap.remove(host);
}
return primed > 0; }}}Copy the code
The logic is as follows:
+--------------------------------------------------------------------------+
| |
| ConnectionPoolImpl |
| |
| Timer |
| +----------------------+ |
| refreshHosts | | |
| hostsUpdater +----------------> | connPoolThreadPool | |
| | | |
| +------------+---------+ |
| | |
| | |
| updateHosts | |
| | |
| +----------------+-----------------------------+ |
| | | | | |
| | | | | |
| v v v v |
| cpHealthTracker selectionStrategy cpMonitor cpMap |
| |
| |
| |
+--------------------------------------------------------------------------+
Copy the code
2.3 find
Discovery is performed using HostsUpdater.
2.3.1 HostsUpdater
The main purpose of this class is to call HostSupplier to refresh:
- If the state of hostFromHostSupplier is up and down, different processing will be done. For example, use the attribute of hostFromHostSupplier to set upHostBuilder. Such as IP, hostname, status, port, DatastorePort, rack, datacenter, Hashtag, password…
- Call HostStatusTracker for logging.
public class HostsUpdater {
private final HostSupplier hostSupplier;
private final TokenMapSupplier tokenMapSupplier;
private final AtomicReference<HostStatusTracker> hostTracker = new AtomicReference<HostStatusTracker>(null);
public HostStatusTracker refreshHosts(a) {
List<Host> allHostsFromHostSupplier = hostSupplier.getHosts();
/** * HostTracker should return the hosts that we get from TokenMapSupplier. * Hence get the hosts from HostSupplier and map them to TokenMapSupplier * and return them. */
Set<Host> hostSet = new HashSet<>(allHostsFromHostSupplier);
// Create a list of host/Tokens
List<HostToken> hostTokens = tokenMapSupplier.getTokens(hostSet);
// The key here really needs to be a object that is overlapping between
// the host from HostSupplier and TokenMapSupplier. Since that is a
// subset of the Host object itself, Host is the key as well as value here.
Map<Host, Host> allHostSetFromTokenMapSupplier = new HashMap<>();
for (HostToken ht : hostTokens) {
allHostSetFromTokenMapSupplier.put(ht.getHost(), ht.getHost());
}
for (Host hostFromHostSupplier : allHostsFromHostSupplier) {
if (hostFromHostSupplier.isUp()) {
// Set the up state
Host hostFromTokenMapSupplier = allHostSetFromTokenMapSupplier.get(hostFromHostSupplier);
// Use the hostFromHostSupplier attribute to set upHostBuilder. Such as IP, hostname, status, port, DatastorePort, rack, datacenter, Hashtag, password...
HostBuilder upHostBuilder = newHostBuilder() .setHostname()...... ; hostsUpFromHostSupplier.add(upHostBuilder.createHost()); allHostSetFromTokenMapSupplier.remove(hostFromTokenMapSupplier); }else {
// Set the state to Down
Host hostFromTokenMapSupplier = allHostSetFromTokenMapSupplier.get(hostFromHostSupplier);
// downHostBuilder, such as IP, hostname, status, port, DatastorePort, rack, datacenter, hashtag, password...
HostBuilder downHostBuilder = newHostBuilder() .setHostname()...... ; hostsDownFromHostSupplier.add(downHostBuilder.createHost()); allHostSetFromTokenMapSupplier.remove(hostFromTokenMapSupplier); } } HostStatusTracker newTracker = hostTracker.get().computeNewHostStatus(hostsUpFromHostSupplier, hostsDownFromHostSupplier); hostTracker.set(newTracker);returnhostTracker.get(); }}Copy the code
2.3.2 HostStatusTracker
This class acts as a record from which other modules extract information.
public class HostStatusTracker {
// the set of active and inactive hosts
private final Set<Host> activeHosts = new HashSet<Host>();
private final Set<Host> inactiveHosts = new HashSet<Host>();
}
Copy the code
2.3.3 HostSupplier
HostSupplier is a specific refresh and has many implementations. Dynomite’s specific business work is already dependent on other specific function classes.
Taking EurekaHostsSupplier as an example, we call EurekaClient discoveryClient to obtain information.
public class EurekaHostsSupplier implements HostSupplier {
private final EurekaClient discoveryClient;
@Override
public List<Host> getHosts(a) {
return getUpdateFromEureka();
}
private List<Host> getUpdateFromEureka(a) {
Application app = discoveryClient.getApplication(applicationName);
List<Host> hosts = new ArrayList<Host>();
List<InstanceInfo> ins = app.getInstances();
hosts = Lists.newArrayList(Collections2.transform(ins, info -> {
Host.Status status = info.getStatus() == InstanceStatus.UP ? Host.Status.Up : Host.Status.Down;
String rack = null;
try {
if (info.getDataCenterInfo() instanceof AmazonInfo) {
AmazonInfo amazonInfo = (AmazonInfo) info.getDataCenterInfo();
rack = amazonInfo.get(MetaDataKey.availabilityZone);
}
}
Host host = newHostBuilder().setHostname(info.getHostName()).setIpAddress(info.getIPAddr()).setRack(rack).setStatus(status).createHost( );return host;
}));
returnhosts; }}Copy the code
Therefore, the logic expands as follows:
+--------------------------------------------------------------------------------------+
| ConnectionPoolImpl |
| |
| |
| +-----------------------------------+ |
| | hostsUpdater | |
| | | |
| | | |
| | | Timer |
| | | +----------------------+ |
| | | refreshHosts | | |
| | HostStatusTracker -------------------------> | connPoolThreadPool | |
| | ^ | | | |
| | | | +------------+---------+ |
| | getUpdateFromEureka | | | |
| | | | | |
| | | | updateHosts | |
| | +----------------------+ | | |
| | | HostSupplier | | | +-----------------------------+ |
| | | | | | | | | |
| | | + | | | | | |
| | | EurekaClient | | v v v |
| | | | | selectionStrategy cpMonitor cpMap |
| | +----------------------+ | |
| +-----------------------------------+ |
+--------------------------------------------------------------------------------------+
Copy the code
0x04 Error Handling & Load Balancing
Since we are running a cluster and not an instance, we will take some precautions during failover.
In Dynomite, there are three main types of errors:
- Invalid requests: Errors go straight back to the application because the driver has no way of knowing how to handle such requests;
- Server error: The driver can try the next node according to the load balancing policy;
- Network timeout: If the request is marked idempotent, the driver can retry the request. By default, requests are not considered idempotent, so it is a good practice to mark requests as much as possible. For idempotent requests, if there is no response from the first node within a certain delay, the driver can send the request to the second node. Try again this is called “speculation”, using SpeculativeExecutionPolicy configuration.
According to the error level, there are two error handling options: retry and fallback. We will introduce them according to the error level below.
4.1 Retry Policy
When a node fails or becomes inaccessible, the driver automatically and transparently tries other nodes and arranges to reconnect to dead nodes in the background.
But because temporary changes in network conditions can also make nodes appear offline, the driver also provides a retry strategy to retry queries that fail due to network-related errors. This eliminates the need to write retry logic in the client code.
The Retry policy determines the default behavior to adopt when a request times out or a node becomes unavailable.
4.1.1 Policy Classification
The Java driver provides several RetryPolicy implementations:
- RetryNTimes: Guarantees that an operation can be retried at most N times, RetryNTimes (2) means that at most 2 + 1 = 3 retries are performed before abandoning;
- RunOnce: retries are never recommended, but exceptions are always rethrown.
4.1.2 Using Policies
When executing the command, we can see that the driver transparently tries other nodes and schedules to reconnect the dead node in the background:
- Get retry policy;
- To operate in a loop:
- Perform operations;
- If successful, execute the success method of the operation policy and jump out of the loop;
- If it fails, the failure method of the action policy is executed;
- If retries are allowed, the loop continues;
The abbreviated code is as follows:
@Override
public <R> OperationResult<R> executeWithFailover(Operation<CL, R> op) throws DynoException {
RetryPolicy retry = cpConfiguration.getRetryPolicyFactory().getRetryPolicy();
retry.begin();
DynoException lastException = null;
do {
Connection<CL> connection = null;
try {
connection = selectionStrategy.getConnectionUsingRetryPolicy(op,
cpConfiguration.getMaxTimeoutWhenExhausted(), TimeUnit.MILLISECONDS, retry);
OperationResult<R> result = connection.execute(op); // Perform the operation
retry.success(); // If successful, execute the success method of the operation policy and break the loop
return result;
} catch (DynoException e) {
retry.failure(e); // If it fails, execute the failure method of the action policy
lastException = e;
if(connection ! =null) {
cpMonitor.incOperationFailure(connection.getHost(), e);
if (retry.allowRetry()) { // Invoke the retry implementation policycpMonitor.incFailover(connection.getHost(), e); }}}}while (retry.allowRetry()); // If retries are allowed, the loop continues
}
Copy the code
Let’s take RetryNTimes as an example.
4.1.3 RetryNTimes
Sucess, failure sets internal variables that determine whether retries are allowed.
public class RetryNTimes implements RetryPolicy {
private int n;
private final AtomicReference<RetryState> state = new AtomicReference<>(new RetryState(0.false));
public RetryNTimes(int n, boolean allowFallback) {
this.n = n;
this.allowCrossZoneFallback = allowFallback;
}
@Override
public void success(a) {
boolean success = false;
RetryState rs;
while(! success) { rs = state.get();// Set internal variables
success = state.compareAndSet(rs, new RetryState(rs.count + 1.true)); }}@Override
public void failure(Exception e) {
boolean success = false;
RetryState rs;
while(! success) { rs = state.get();// Set internal variables
success = state.compareAndSet(rs, new RetryState(rs.count + 1.false)); }}@Override
public boolean allowRetry(a) {
final RetryState rs = state.get();
return! rs.success && rs.count <= n; }private class RetryState {
private final int count;
private final boolean success;
public RetryState(final int count, final boolean success) {
this.count = count;
this.success = success; }}}Copy the code
4.2 Policy Selection
Since retries sometimes don’t solve problems, let’s talk about fallback selection strategies for more serious problems.
The driver can query any node in the cluster and then call it the coordinating node for that query. Depending on the content of the query, the coordinator can communicate with other nodes to satisfy the query. If a client boots all of its queries on the same node, it can create an unbalanced load on the cluster, especially if other clients perform the same operation.
To prevent a single node from acting as a coordinating node for too many requests, the DynoJedisClient driver provides a pluggable mechanism to balance the query load between multiple nodes. The implementation of HostSelectionStrategy interface is selected to achieve load balancing.
Each HostSelectionStrategy classifies each node in the cluster as local, remote, or ignored. The driver prefers the interaction with the local node and maintains more connections to the local node with the remote node.
HostSelectionStrategy is set on the cluster at build time. The driver provides two basic load balancing implementations: RoundRobin Policy and TokenAwareSelection.
- RoundRobinPolicy: Allocates requests across nodes in the cluster in a repeating pattern to spread out the processing load, balancing the load among all nodes.
- TokenAwareSelection: token-aware, which uses token values to select nodes as copies of the required data to make requests, minimizing the number of nodes that must be queried. This is done by wrapping the selected policy using TokenAwarePolicy.
2 the coordinator
HostSelectionWithFallback is to choose the coordinator.
- Coordinate among many hostselectionStrategies and map requirements to specific rack;
- HostSelectionWithFallback are not responsible for the concrete implementation (um participant Round Robin or Token Aware), but the connection was obtained from the specific implementation strategy;
- HostSelectionWithFallback depends on two strategies:
- The local” local” HostSelectionStrategy is used preferentially;
- Failure, if the local connection pool or local hosts will HostSelectionWithFallback falls back to the distal strategy remote HostSelectionStrategy;
- When the local rack failure, in order to achieve uniform distribution load, HostSelectionWithFallback using pure round robin to select remote HostSelectionStrategy;
- HostSelectionWithFallback prefer not a distal strategy;
HostSelectionWithFallback concrete member variables as follows:
- Local Host information, such as localDataCenter, localRack, and local selection policy localSelector;
- Remote Host information, such as remoteRackNames of remote zones and remoteRackSelectors.
- Information related to tokens, such as hostTokens, tokenSupplier, topology information topology;
- Configuration information cpConfig;
- Monitoring information;
The concrete class is defined as follows:
public class HostSelectionWithFallback<CL> {
// Only used in calculating replication factor
private final String localDataCenter;
// tracks the local zone
private final String localRack;
// The selector for the local zone
private final HostSelectionStrategy<CL> localSelector;
// Track selectors for each remote zone
private final ConcurrentHashMap<String, HostSelectionStrategy<CL>> remoteRackSelectors = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Host, HostToken> hostTokens = new ConcurrentHashMap<>();
private final TokenMapSupplier tokenSupplier;
private final ConnectionPoolConfiguration cpConfig;
private final ConnectionPoolMonitor cpMonitor;
private final AtomicInteger replicationFactor = new AtomicInteger(-1);
// Represents the *initial* topology from the token supplier. This does not affect selection of a host connection
// pool for traffic. It only affects metrics such as failover/fallback
private final AtomicReference<TokenPoolTopology> topology = new AtomicReference<>(null);
// list of names of remote zones. Used for RoundRobin over remote zones when local zone host is down
private final CircularList<String> remoteRackNames = new CircularList<>(new ArrayList<>());
private final HostSelectionStrategyFactory<CL> selectorFactory;
}
Copy the code
4.2.2 strategy
HostSelectionStrategy is a Host selection strategy, which is implemented in RoundRobinSelection and TokenAwareSelection.
Load balancing is responsible for establishing connections to the entire cluster (not just on one node) and maintaining connection pools with each host in the cluster. Load balancing also determines whether the host is local or remote.
It has the logic to send certain requests to certain nodes. The load balancing policy determines which hosts to connect to and which hosts to send requests to.
In effect, a query plan is calculated for each request. The query plan determines which host to send requests to and in which order (depending on the speculative execution policy and retry policy).
4.2.2.1 RoundRobinSelection
This policy literally provides RR load balancing over ring data structures in a thread-safe manner using the ROUND ROBIN policy.
- Supports dynamic Host addition and deletion, that is, when the external world perceives topology changes, the interface will be called to update.
- To return the join is simply to extract it from the list.
- Provides different functions to return various forms of joins, such as
- GetPoolForToken: Returns based on the token.
- GetOrderedHostPools: Returns the sorted list;
- GetNextConnectionPool: Returns according to a circularList;
- GetPoolsForTokens: Returns according to the given region;
public class RoundRobinSelection<CL> implements HostSelectionStrategy<CL> {
// The total set of host pools. Once the host is selected, we ask it's corresponding pool to vend a connection
private final ConcurrentHashMap<Long, HostConnectionPool<CL>> tokenPools = new ConcurrentHashMap<Long, HostConnectionPool<CL>>();
// the circular list of Host over which we load balance in a round robin fashion
private final CircularList<HostToken> circularList = new CircularList<HostToken>(null);
@Override
public HostConnectionPool<CL> getPoolForOperation(BaseOperation
op, String hashtag)
,> throws NoAvailableHostsException {
int numTries = circularList.getSize();
HostConnectionPool<CL> lastPool = null;
while (numTries > 0) {
lastPool = getNextConnectionPool();
numTries--;
if (lastPool.isActive() && lastPool.getHost().isUp()) {
returnlastPool; }}// If we reach here then we haven't found an active pool. Return the last inactive pool anyways,
// and HostSelectionWithFallback can choose a fallback pool from another dc
return lastPool;
}
@Override
public void initWithHosts(Map<HostToken, HostConnectionPool<CL>> hPools) {
for (HostToken token : hPools.keySet()) {
tokenPools.put(token.getToken(), hPools.get(token));
}
circularList.swapWithList(hPools.keySet());
}
@Override
public boolean addHostPool(HostToken host, HostConnectionPool<CL> hostPool) {
HostConnectionPool<CL> prevPool = tokenPools.put(host.getToken(), hostPool);
if (prevPool == null) {
List<HostToken> newHostList = new ArrayList<HostToken>(circularList.getEntireList());
newHostList.add(host);
circularList.swapWithList(newHostList);
}
return prevPool == null; }}Copy the code
RR policies are roughly as follows, which can be understood as the sequential selection of the next policy from a circularList:
+--------------------------+ |HostSelectionWithFallback | +-------------+------------+ | +--------------+--------------+ | | v v +---------+------------+ +-----------+--------+ | RoundRobinSelection | | TokenAwareSelection| | | +--------------------+ | | | circularList | | + | | | | +----------------------+ | | v +--> Pool1, Pool2, Pool3,... , Pooln +----+ | | | | +----------------------------------------+Copy the code
4.2.2.2 TokenAwareSelection
TokenAwareSelection is processed using the TOKEN AWARE algorithm.
TOKEN_AWARE is the request to the same client based on the primary key token, that is, the request to the same record is sent to the same node based on the token.
So the module needs to know the Dynomite Ring Topology so that it can map it to the correct Token owner node based on the key of the operation.
TokenAwareSelection
This strategy uses a binary lookup to get the token based on the key and then locate the token to the Dynomite Topology ring.
Provides different functions to return various forms of joins, such as
- GetPoolForToken: Returns based on the token.
- GetOrderedHostPools: Returns the sorted list;
- GetNextConnectionPool: Returns according to a circularList;
- GetPoolsForTokens: Returns according to the given region;
public class TokenAwareSelection<CL> implements HostSelectionStrategy<CL> {
private final BinarySearchTokenMapper tokenMapper;
private final ConcurrentHashMap<Long, HostConnectionPool<CL>> tokenPools = new ConcurrentHashMap<Long, HostConnectionPool<CL>>();
public TokenAwareSelection(HashPartitioner hashPartitioner) {
this.tokenMapper = new BinarySearchTokenMapper(hashPartitioner);
}
/** * Identifying the proper pool for the operation. A couple of things that may affect the decision * (a) hashtags: In this case we will construct the key by decomposing from the hashtag * (b) type of key: string keys vs binary keys. * In binary keys hashtags do not really matter. */
@Override
public HostConnectionPool<CL> getPoolForOperation(BaseOperation
op, String hashtag)
,> throws NoAvailableHostsException {
String key = op.getStringKey();
HostConnectionPool<CL> hostPool;
HostToken hToken;
if(key ! =null) {
// If a hashtag is provided by Dynomite then we use that to create the key to hash.
if (hashtag == null || hashtag.isEmpty()) {
hToken = this.getTokenForKey(key);
} else {
String hashValue = StringUtils.substringBetween(key, Character.toString(hashtag.charAt(0)), Character.toString(hashtag.charAt(1)));
hToken = this.getTokenForKey(hashValue);
}
hostPool = tokenPools.get(hToken.getToken());
} else {
// the key is binary
byte[] binaryKey = op.getBinaryKey();
hToken = this.getTokenForKey(binaryKey);
hostPool = tokenPools.get(hToken.getToken());
}
return hostPool;
}
@Override
public boolean addHostPool(HostToken hostToken, HostConnectionPool<CL> hostPool) {
HostConnectionPool<CL> prevPool = tokenPools.put(hostToken.getToken(), hostPool);
if (prevPool == null) {
tokenMapper.addHostToken(hostToken);
return true;
} else {
return false; }}}Copy the code
BinarySearchTokenMapper
The code above uses the BinarySearchTokenMapper, so let’s take a look.
In fact, this class is the corresponding relationship between key and token, and dichotomy is used when searching.
public class BinarySearchTokenMapper implements HashPartitioner {
private final HashPartitioner partitioner;
private final AtomicReference<DynoBinarySearch<Long>> binarySearch = new AtomicReference<DynoBinarySearch<Long>>(null);
private final ConcurrentHashMap<Long, HostToken> tokenMap = new ConcurrentHashMap<Long, HostToken>();
@Override
public HostToken getToken(Long keyHash) {
Long token = binarySearch.get().getTokenOwner(keyHash);
return tokenMap.get(token);
}
public void initSearchMechanism(Collection<HostToken> hostTokens) {
for (HostToken hostToken : hostTokens) {
tokenMap.put(hostToken.getToken(), hostToken);
}
initBinarySearch();
}
public void addHostToken(HostToken hostToken) {
HostToken prevToken = tokenMap.putIfAbsent(hostToken.getToken(), hostToken);
if (prevToken == null) { initBinarySearch(); }}private void initBinarySearch(a) {
List<Long> tokens = new ArrayList<Long>(tokenMap.keySet());
Collections.sort(tokens);
binarySearch.set(newDynoBinarySearch<Long>(tokens)); }}Copy the code
The Token Aware strategy is as follows: create a map and select a Pool based on the Token’s key:
+--------------------------+ |HostSelectionWithFallback | +-------------+------------+ | +--------------+--------------+ | | v v +---------+------------+ +-----------+------------+ | RoundRobinSelection | | TokenAwareSelection | | | | | | | | | | circularList | | ConcurrentHashMap | | + | | + | | | | | | | +----------------------+ +------------------------+ | | | | v | +-------------------+ | | [token1 : Pool 1]| +--> Pool1, Pool2, Pool3,... , Pooln +----+ | | | | | +---> | [token2 : Pool 2] | | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + |...... | | | | [token3 : Pool 3] | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- +Copy the code
0 x05 compression
Finally, we look at the implementation of compression.
Enabling compression reduces the network bandwidth consumed by the driver at the cost of increased CPU usage on the client and server.
5.1 Compression Mode
In the driver, there are two ways of compression, namely simple uncompression and limit compression Threshold.
enum CompressionStrategy {
/** * Disables compression */
NONE,
/**
* Compresses values that exceed {@link #getValueCompressionThreshold()}
*/
THRESHOLD
}
Copy the code
5.2 Compression Implementation
From the code, use
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
Copy the code
Specific operations are as follows:
private abstract class CompressionValueOperation<T> extends BaseKeyOperation<T>
implements CompressionOperation<Jedis.T> {
@Override
public String compressValue(String value, ConnectionContext ctx) {
String result = value;
int thresholdBytes = connPool.getConfiguration().getValueCompressionThreshold();
try {
// prefer speed over accuracy here so rather than using
// getBytes() to get the actual size
// just estimate using 2 bytes per character
if ((2 * value.length()) > thresholdBytes) {
result = ZipUtils.compressStringToBase64String(value);
ctx.setMetadata("compression".true); }}return result;
}
@Override
public String decompressValue(String value, ConnectionContext ctx) {
try {
if (ZipUtils.isCompressed(value)) {
ctx.setMetadata("decompression".true);
returnZipUtils.decompressFromBase64String(value); }}returnvalue; }}Copy the code
5.3 the use of
In operation, for example, when need to compression, generates CompressionValueOperation.
public OperationResult<Map<String, String>> d_hgetAll(final String key) {
if (CompressionStrategy.NONE == connPool.getConfiguration().getCompressionStrategy()) {
return connPool.executeWithFailover(new BaseKeyOperation<Map<String, String>>(key, OpName.HGETALL) {
@Override
public Map<String, String> execute(Jedis client, ConnectionContext state) throws DynoException {
returnclient.hgetAll(key); }}); }else {
return connPool
.executeWithFailover(new CompressionValueOperation<Map<String, String>>(key, OpName.HGETALL) {
@Override
public Map<String, String> execute(final Jedis client, final ConnectionContext state) {
return CollectionUtils.transform(client.hgetAll(key),
new CollectionUtils.MapEntryTransform<String, String, String>() {
@Override
public String get(String key, String val) {
returndecompressValue(val, state); }}); }}); }}Copy the code
0 x06 summary
At this point, the preliminary analysis of DynoJedisClient is completed, and we have seen how DynoJedisClient deals with various complex information, such as:
- How to maintain connections and provide connection pools for persistent connections;
- How to maintain topology;
- How to load balance;
- How to failover;
- How to automatically retry and discover, for example, automatically retry a failed host. Automatically discovers other hosts in the cluster.
- How to monitor the underlying rack status;
We will next introduce dyno-queues, a distributed delay queue based on DynoJedisClient, to see how it is implemented.
0xEE Personal information
★★★★ Thoughts on life and technology ★★★★★
Wechat official account: Rosie’s Thoughts
If you want to get a timely news feed of personal articles, or want to see the technical information of personal recommendations, please pay attention.
0 XFF reference
Cassandra Series (2) : System flow
How does Cassandra JAVA client achieve high performance and high concurrency
The Token of Cassandra
www.ningoo.net/html/2010/c…
Cassandra Definitive Guide to Reading Notes – Client
Data consistency issues with Cassandra cluster