sequence
NodesFaultDetection for ElasticSearch
NodesFaultDetection
Elasticsearch 0.90.0 / SRC/main/Java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection. Java
public class NodesFaultDetection extends AbstractComponent {
public static interface Listener {
void onNodeFailure(DiscoveryNode node, String reason);
}
private final ThreadPool threadPool;
private final TransportService transportService;
private final boolean connectOnNetworkDisconnect;
private final TimeValue pingInterval;
private final TimeValue pingRetryTimeout;
private final int pingRetryCount;
// used mainly for testing, should always be true
private final boolean registerConnectionListener;
private final CopyOnWriteArrayList<Listener> listeners = new CopyOnWriteArrayList<Listener>();
private final ConcurrentMap<DiscoveryNode, NodeFD> nodesFD = newConcurrentMap();
private final FDConnectionListener connectionListener;
private volatile DiscoveryNodes latestNodes = EMPTY_NODES;
private volatile boolean running = false;
public NodesFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService) {
super(settings);
this.threadPool = threadPool;
this.transportService = transportService;
this.connectOnNetworkDisconnect = componentSettings.getAsBoolean("connect_on_network_disconnect".true);
this.pingInterval = componentSettings.getAsTime("ping_interval", timeValueSeconds(1));
this.pingRetryTimeout = componentSettings.getAsTime("ping_timeout", timeValueSeconds(30));
this.pingRetryCount = componentSettings.getAsInt("ping_retries", 3);
this.registerConnectionListener = componentSettings.getAsBoolean("register_connection_listener".true);
logger.debug("[node ] uses ping_interval [{}], ping_timeout [{}], ping_retries [{}]", pingInterval, pingRetryTimeout, pingRetryCount);
transportService.registerHandler(PingRequestHandler.ACTION, new PingRequestHandler());
this.connectionListener = new FDConnectionListener();
if (registerConnectionListener) {
transportService.addConnectionListener(connectionListener);
}
}
public NodesFaultDetection start() {
if (running) {
return this;
}
running = true;
return this;
}
public NodesFaultDetection stop() {
if(! running) {return this;
}
running = false;
return this;
}
public void close() { stop(); transportService.removeHandler(PingRequestHandler.ACTION); transportService.removeConnectionListener(connectionListener); } / /... }Copy the code
- NodesFaultDetection extends AbstractComponent to define a CopyOnWriteArrayList process, a ConcurrentMap nodesFD, ConnectionListener, latestNodes, running and other properties
- Its constructor reads connect_on_network_disconnect(
The default true
), ping_interval (The default 1 s
), ping_timeout (The default 30 s
), ping_retries (The default is 3
), register_connection_listener (The default true
) configuration, and then gives transportService registered PingRequestHandler. ACTION PingRequestHandler, added FDConnectionListener - The start method sets running to true; Stop sets running to false; Close method executed stop first and then removed from transportService PingRequestHandler. The ACTION handler, and remove connectionListener
PingRequestHandler
Elasticsearch 0.90.0 / SRC/main/Java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection. Java
class PingRequestHandler extends BaseTransportRequestHandler<PingRequest> {
public static final String ACTION = "discovery/zen/fd/ping";
@Override
public PingRequest newInstance() {
return new PingRequest();
}
@Override
public void messageReceived(PingRequest request, TransportChannel channel) throws Exception {
// if we are not the node we are supposed to be pinged, send an exception
// this can happen when a kill -9 is sent, and another node is started using the same port
if(! latestNodes.localNodeId().equals(request.nodeId)) { throw new ElasticSearchIllegalStateException("Got pinged as node [" + request.nodeId + "], but I am node [" + latestNodes.localNodeId() + "]");
}
channel.sendResponse(new PingResponse());
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}
static class PingRequest extends TransportRequest {
// the (assumed) node id we are pinging
private String nodeId;
PingRequest() {
}
PingRequest(String nodeId) {
this.nodeId = nodeId;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
nodeId = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(nodeId);
}
}
private static class PingResponse extends TransportResponse {
private PingResponse() {
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); }}Copy the code
- The PingRequestHandler newInstance method is used to create a PingRequest. This object defines the nodeId attribute to identify the target nodeId it is requesting. The messageReceived method is used to respond to PingRequest requests. It determines whether the destination nodeId is the same as the localNodeId and returns PingResponse if so
FDConnectionListener
Elasticsearch 0.90.0 / SRC/main/Java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection. Java
private class FDConnectionListener implements TransportConnectionListener {
@Override
public void onNodeConnected(DiscoveryNode node) {
}
@Override
public void onNodeDisconnected(DiscoveryNode node) {
handleTransportDisconnect(node);
}
}
private void handleTransportDisconnect(DiscoveryNode node) {
if(! latestNodes.nodeExists(node.id())) {return;
}
NodeFD nodeFD = nodesFD.remove(node);
if (nodeFD == null) {
return;
}
if(! running) {return;
}
nodeFD.running = false;
if (connectOnNetworkDisconnect) {
try {
transportService.connectToNode(node);
nodesFD.put(node, new NodeFD());
threadPool.schedule(pingInterval, ThreadPool.Names.SAME, new SendPingRequest(node));
} catch (Exception e) {
logger.trace("[node ] [{}] transport disconnected (with verified connect)", node);
notifyNodeFailure(node, "transport disconnected (with verified connect)"); }}else {
logger.trace("[node ] [{}] transport disconnected", node);
notifyNodeFailure(node, "transport disconnected");
}
}
private void notifyNodeFailure(final DiscoveryNode node, final String reason) {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
for(Listener listener : listeners) { listener.onNodeFailure(node, reason); }}}); }Copy the code
- FDConnectionListener executes when onNodeDisconnected handleTransportDisconnect; This method removes the node from the nodesFD and marks the running of the nodeFD as false
- If connectOnNetworkDisconnect to true is to connect the node, succeeds in nodesFD, and register the node is SendPingRequest delay task, delay pingInterval execution; If the connect exception or connectOnNetworkDisconnect to false, no execution notifyNodeFailure method
- Will trigger NodesFaultDetection notifyNodeFailure method. The Listener. OnNodeFailure callback, callback ZenDiscovery NodeFailureListener onNodeFailure method here
ZenDiscovery
Elasticsearch 0.90.0 / SRC/main/Java/org/elasticsearch/discovery/zen/ZenDiscovery Java
public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implements Discovery, DiscoveryNodesProvider {
//......
@Inject
public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threadPool,
TransportService transportService, ClusterService clusterService, NodeSettingsService nodeSettingsService,
DiscoveryNodeService discoveryNodeService, ZenPingService pingService) {
super(settings);
this.clusterName = clusterName;
this.threadPool = threadPool;
this.clusterService = clusterService;
this.transportService = transportService;
this.discoveryNodeService = discoveryNodeService;
this.pingService = pingService;
// also support direct discovery.zen settings, for cases when it gets extended
this.pingTimeout = settings.getAsTime("discovery.zen.ping.timeout", settings.getAsTime("discovery.zen.ping_timeout", componentSettings.getAsTime("ping_timeout", componentSettings.getAsTime("initial_ping_timeout", timeValueSeconds(3)))));
this.sendLeaveRequest = componentSettings.getAsBoolean("send_leave_request".true);
this.masterElectionFilterClientNodes = settings.getAsBoolean("discovery.zen.master_election.filter_client".true);
this.masterElectionFilterDataNodes = settings.getAsBoolean("discovery.zen.master_election.filter_data".false);
logger.debug("using ping.timeout [{}], master_election.filter_client [{}], master_election.filter_data [{}]", pingTimeout, masterElectionFilterClientNodes, masterElectionFilterDataNodes);
this.electMaster = new ElectMasterService(settings);
nodeSettingsService.addListener(new ApplySettings());
this.masterFD = new MasterFaultDetection(settings, threadPool, transportService, this);
this.masterFD.addListener(new MasterNodeFailureListener());
this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService);
this.nodesFD.addListener(new NodeFailureListener());
this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewClusterStateListener());
this.pingService.setNodesProvider(this);
this.membership = new MembershipAction(settings, transportService, this, new MembershipListener());
transportService.registerHandler(RejoinClusterRequestHandler.ACTION, new RejoinClusterRequestHandler());
}
protected void doStart() throws ElasticSearchException {
Map<String, String> nodeAttributes = discoveryNodeService.buildAttributes();
// note, we rely on the fact that its a new id each time we start, see FD and "kill -9" handling
String nodeId = UUID.randomBase64UUID();
localNode = new DiscoveryNode(settings.get("name"), nodeId, transportService.boundAddress().publishAddress(), nodeAttributes);
latestDiscoNodes = new DiscoveryNodes.Builder().put(localNode).localNodeId(localNode.id()).build();
nodesFD.updateNodes(latestDiscoNodes);
pingService.start();
// do the join on a different thread, the DiscoveryService waits for 30s anyhow till it is discovered
asyncJoinCluster();
}
public void publish(ClusterState clusterState) {
if(! master) { throw new ElasticSearchIllegalStateException("Shouldn't publish state when not master");
}
latestDiscoNodes = clusterState.nodes();
nodesFD.updateNodes(clusterState.nodes());
publishClusterState.publish(clusterState);
}
private class NodeFailureListener implements NodesFaultDetection.Listener {
@Override
public void onNodeFailure(DiscoveryNode node, String reason) {
handleNodeFailure(node, reason);
}
}
private void handleNodeFailure(final DiscoveryNode node, String reason) {
if(lifecycleState() ! = Lifecycle.State.STARTED) { // not started, ignore a node failurereturn;
}
if(! master) { // nothing todo here...
return;
}
clusterService.submitStateUpdateTask("zen-disco-node_failed(" + node + "), reason " + reason, new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder()
.putAll(currentState.nodes())
.remove(node.id());
latestDiscoNodes = builder.build();
currentState = newClusterStateBuilder().state(currentState).nodes(latestDiscoNodes).build();
// check if we have enough master nodes, if not, we need to move into joining the cluster again
if(! electMaster.hasEnoughMasterNodes(currentState.nodes())) {return rejoin(currentState, "not enough master nodes");
}
// eagerly run reroute to remove dead nodes from routing table
RoutingAllocation.Result routingResult = allocationService.reroute(newClusterStateBuilder().state(currentState).build());
returnnewClusterStateBuilder().state(currentState).routingResult(routingResult).build(); } @Override public void clusterStateProcessed(ClusterState clusterState) { sendInitialStateEventIfNeeded(); }}); } / /... }Copy the code
- The ZenDiscovery constructor creates NodesFaultDetection and adds NodeFailureListener to it; The listener implements NodesFaultDetection. The listener interface, its onNodeFailure callback execution is handleNodeFailure method, it can perform ProcessedClusterStateUpdateTask, The node from the currentState. Nodes () is removed, and then judge whether masterNode quantity meet minimumMasterNodes, enough will rejoin execution method, enough would perform allocationService. Reroute
- The doStart method creates localNode based on the node configuration in the configuration file and adds it to latestDiscoNodes. Then execute nodesfd. updateNodes(latestDiscoNodes). Then execute pingservice.start () and asyncJoinCluster()
- Publish updates local latestDiscoNodes according to clusterState Nodes, and nodesfd. updateNodes(latestDiscoNodes) is executed. Finally perform publishClusterState. The publish (clusterState)
NodesFaultDetection.updateNodes
Elasticsearch 0.90.0 / SRC/main/Java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection. Java
public class NodesFaultDetection extends AbstractComponent {
//......
public void updateNodes(DiscoveryNodes nodes) {
DiscoveryNodes prevNodes = latestNodes;
this.latestNodes = nodes;
if(! running) {return;
}
DiscoveryNodes.Delta delta = nodes.delta(prevNodes);
for (DiscoveryNode newNode : delta.addedNodes()) {
if (newNode.id().equals(nodes.localNodeId())) {
// no need to monitor the local node
continue;
}
if (!nodesFD.containsKey(newNode)) {
nodesFD.put(newNode, new NodeFD());
threadPool.schedule(pingInterval, ThreadPool.Names.SAME, new SendPingRequest(newNode));
}
}
for(DiscoveryNode removedNode : delta.removedNodes()) { nodesFD.remove(removedNode); }} / /... }Copy the code
- NodesFaultDetection provides the updateNodes method to update its latestNodes. This method calls Nodes.delta (prevNodes) to calculate DiscoveryNodes.delta, Its addedNodes method returns the newly added node, while the emovedNodes() method returns the deleted node
- For newNode, check whether it is in nodesFD first. If not, it will be added to nodesFD and register a delayed SendPingRequest task, which will be executed at a delayed pingInterval
- Remove removedNode from nodesFD; HandleTransportDisconnect method will remove a node disconnect from ndoesFD, if the retry it again into the nodesFD a success
SendPingRequest
Elasticsearch 0.90.0 / SRC/main/Java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection. Java
private class SendPingRequest implements Runnable {
private final DiscoveryNode node;
private SendPingRequest(DiscoveryNode node) {
this.node = node;
}
@Override
public void run() {
if(! running) {return;
}
transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()), options().withHighType().withTimeout(pingRetryTimeout),
new BaseTransportResponseHandler<PingResponse>() {
@Override
public PingResponse newInstance() {
return new PingResponse();
}
@Override
public void handleResponse(PingResponse response) {
if(! running) {return;
}
NodeFD nodeFD = nodesFD.get(node);
if(nodeFD ! = null) {if(! nodeFD.running) {return;
}
nodeFD.retryCount = 0;
threadPool.schedule(pingInterval, ThreadPool.Names.SAME, SendPingRequest.this);
}
}
@Override
public void handleException(TransportException exp) {
// check if the master node did not get switched on us...
if(! running) {return;
}
if (exp instanceof ConnectTransportException) {
// ignore this one, we already handle it by registering a connection listener
return;
}
NodeFD nodeFD = nodesFD.get(node);
if(nodeFD ! = null) {if(! nodeFD.running) {return;
}
int retryCount = ++nodeFD.retryCount;
logger.trace("[node ] failed to ping [{}], retry [{}] out of [{}]", exp, node, retryCount, pingRetryCount);
if (retryCount >= pingRetryCount) {
logger.debug("[node ] failed to ping [{}], tried [{}] times, each with maximum [{}] timeout", node, pingRetryCount, pingRetryTimeout);
// not good, failure
if(nodesFD.remove(node) ! = null) { notifyNodeFailure(node,"failed to ping, tried [" + pingRetryCount + "] times, each with maximum [" + pingRetryTimeout + "] timeout"); }}else {
// resend the request, not reschedule, rely on send timeout
transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()),
options().withHighType().withTimeout(pingRetryTimeout), this);
}
}
}
@Override
public String executor() {
returnThreadPool.Names.SAME; }}); }}Copy the code
- The SendPingRequest method sends PingRequest to the destination node with a timeout of pingRetryTimeout. Its handleResponse method determines whether the node is in nodesFD and ignores it if it has been removed. It also ignores it if it changes nodeFD running to false, otherwise it resets its retryCount and re-registers the delayed SendPingRequest task. The delay pingInterval is executed
- If the request appears TransportException judge whether ConnectTransportException, if they are ignored, Because this exception has already been handled by onNodeDisconnected of FDConnectionListener registered with transportService
- In the case of other exceptions, add nodefd. retryCount. When retryCount is greater than or equal to pingRetryCount, the node is removed from nodesFD and notifyNodeFailure is called. It calls back to ZenDiscovery’s handleNodeFailure method; If the configured pingRetryCount is not exceeded, the PingRequest request is retried
summary
- NodesFaultDetection give transportService registered PingRequestHandler. The ACTION of PingRequestHandler, added FDConnectionListener; PingRequestHandler is used to respond to PingRequest by returning PingResponse; FDConnectionListener is used to handle ConnectTransportException anomalies
- The onNodeDisconnected method of FDConnectionListener removes the node from nodesFD and marks running for the nodeFD as false; If connectOnNetworkDisconnect to true will try again at a time (
Connect to the node. If it succeeds, nodesFD is added and a delay task of SendPingRequest is registered for the node. The delay pingInterval is executed
); If the connect exception or connectOnNetworkDisconnect to false, no execution notifyNodeFailure method; Will trigger NodesFaultDetection notifyNodeFailure method. The Listener. OnNodeFailure callback, callback ZenDiscovery NodeFailureListener onNodeFailure method here - ZenDiscovery’s doStart and Publish methods both execute NodesFaultDetection’s updateNodes method to update latestNodes and register the delayed SendPingRequest task for the new node
- If SendPingRequest is successfully executed, retryCount will be reset and the delayed task of SendPingRequest will continue to be registered. If the number of retries is not TransportException, notifyNodeFailure will be triggered. Callback NodesFaultDetection. Listener. OnNodeFailure method, where the callback ZenDiscovery NodeFailureListener onNodeFailure method
- ZenDiscovery NodeFailureListener realized NodesFaultDetection Listener interface, its onNodeFailure callback execution is handleNodeFailure method, It will perform ProcessedClusterStateUpdateTask, the node from the currentState. Nodes () is removed, and then judge whether masterNode quantity meet minimumMasterNodes, Got enough enough will rejoin method, execute allocationService. Reroute
doc
- Elasticsearch reference 0.90