0 x00 the

This is the second part of the parameter server series, which introduces the pS-Lite communication module Van.

The other articles in this series are:

Machine learning parameter server pS-Lite (1) —– PostOffice

0x01 Function Overview

With the address book in the post office, a Van is needed to pull and deliver items. Van is the communication module of the whole Parameter Server, and its characteristics are as follows.

  • When the PostOffice class is instantiated, an instance of the Van class is created as a member variable. The Van instance has the same lifetime as the owning PostOffice instance (there is only one object per node);
  • The Van is responsible for the specific inter-node communication. Specifically, it’s responsible for establishing connections between nodes (such as Worker and Scheduler) and opening local receiving threads to listen for incoming messages.

VAN currently has two implementations:

  • ZMQVan is based on zerOMq Van implementation, that is, ZMQ library to achieve the low-level details of the connection (ZMQ library is an open source library, a good encapsulation of sockets, he makes socket programming more simple, concise and higher performance).
  • IBVerbsVan is an implementation of bytedance, not explored in detail.

0 x02 definition

2.1 UML diagrams

The UML diagram is presented first.

2.2 Main Remarks

Below we only show the key variables and member functions of the Van object.

The main variables are as follows:

  • Node scheduler_ : Scheduler Node parameter. Each Node records information about the Scheduler Node.
  • Node my_node_ : specifies the Node parameter. If this node is a Scheduler, my_node_ points to the scheduler_ above;
  • Bool is_Scheduler_ : Indicates whether the node is a scheduler.
  • STD ::unique_ptr< STD ::thread> receiver_thread_ : pointer to the thread receiving messages;
  • STD ::unique_ptr< STD ::thread> heartbeat_thread_ : pointer to the sending heartbeat thread;
  • STD ::vector barrier_count_ : barrier counts the number of registered nodes. When all nodes are registered, the system is in the ready state, and the scheduler sends a ready message to all nodes to officially start the system.
  • Resender *resender_ = NULlptr: Pointer to resend messages;
  • STD ::atomic timestamp_{0} : message increment id, atomic variable;
  • STD ::unordered_map< STD ::string, int> connected_NODES_ : Records which nodes are currently connected to;

Its main functions are as follows:

  • Start: establish communication initialization.

  • Receiving: Processing function for the Receiving thread;

  • Heartbeat: a handler for the thread that sends the Heartbeat;

  • ProcessAddNodeCommandAtScheduler: scheduler invoke the message processing functions;

    • ProcessHearbeat: heartbeat packet handler;
    • ProcessDataMsg: Data message (push & pull) handler;
    • ProcessAddNodeCommand: The AddNode message handler for workers and servers;
    • ProcessBarrierCommand: Barrier message handler;

2.3 Thread Management

The three roles defined by PS Lite work in a multi-threaded mechanism, with each thread taking on specific responsibilities and being created when the owning Van instance starts.

The detailed description is as follows:

  • The Scheduler, Worker, and Server Van instances all hold a thread that receives data.
  • The Worker and Server Van instances also hold a thread that intermittently sends heartbeats to the Scheduler.
  • Scheduler, Worker, and Server also start a monitoring thread if the environment variable PS_RESEND is defined with a non-zero value.

2.4 the class definition

The detailed code (summary) is as follows:

class Van { public: static Van *Create(const std::string &type); virtual void Start(int customer_id); int Send(const Message &msg); virtual void Stop(); inline int GetTimestamp() { return timestamp_++; } inline bool IsReady() { return ready_; } public void Connect(const Node & Node) = 0; Virtual int Bind(const Node & Node, int max_retry) = 0; Virtual int RecvMsg(Message * MSG) = 0; Virtual int SendMsg(const Message & MSG) = 0; /** * \brief pack meta into a string */ void PackMeta(const Meta &meta, char **meta_buf, int *buf_size); /** * \brief pack meta into protobuf */ void PackMetaPB(const Meta &meta, PBMeta *pb); /** * \brief unpack meta from a string */ void UnpackMeta(const char *meta_buf, int buf_size, Meta *meta); Node scheduler_; Node my_node_; bool is_scheduler_; std::mutex start_mu_; private: /** thread function for receving */ void Receiving(); /** thread function for heartbeat */ void Heartbeat(); // node's address string (i.e. ip:port) -> node id // this map is updated when ip:port is received for the first time std::unordered_map<std::string, int> connected_nodes_; // maps the id of node which is added later to the id of node // which is with the same ip:port and added first std::unordered_map<int, int> shared_node_mapping_; /** whether it is ready for sending */ std::atomic<bool> ready_{false}; std::atomic<size_t> send_bytes_{0}; size_t recv_bytes_ = 0; int num_servers_ = 0; int num_workers_ = 0; /** the thread for receiving messages */ std::unique_ptr<std::thread> receiver_thread_; /** the thread for sending heartbeat */ std::unique_ptr<std::thread> heartbeat_thread_; std::vector<int> barrier_count_; /** msg resender */ Resender *resender_ = nullptr; int drop_rate_ = 0; std::atomic<int> timestamp_{0}; Int init_stage = 0; / / here is dealing with all kinds of news void ProcessAddNodeCommandAtScheduler (Message * MSG, Meta * nodes, Meta * recovery_nodes); void ProcessTerminateCommand(); void ProcessAddNodeCommand(Message *msg, Meta *nodes, Meta *recovery_nodes); void ProcessBarrierCommand(Message *msg); void ProcessHearbeat(Message *msg); void ProcessDataMsg(Message *msg); Void UpdateLocalID(Message * MSG, STD ::unordered_set<int> *deadnodes_set, Meta *nodes, Meta *recovery_nodes); const char *heartbeat_timeout_val = Environment::Get()->find("PS_HEARTBEAT_TIMEOUT"); int heartbeat_timeout_ = heartbeat_timeout_val ? atoi(heartbeat_timeout_val) : 0; DISALLOW_COPY_AND_ASSIGN(Van); };Copy the code

0 x03 initialization

The Van object initializer is used to set up different Settings depending on the type of the local node, to start the port, establish a connection to the scheduler, start the receiving thread, heartbeat thread, etc., so that communication can take place. The details are as follows:

  1. The scheduler_ member variable is initialized by getting information from the environment variables, such as the scheduler’s “IP, port” (which are predefined), the role of the node (Worker/Server/ scheduler), and so on.

  2. If this node is a scheduler, the scheduler_ is assigned to my_node_;

  3. If this object is not a scheduler, then:

    1. Obtain the IP address of the node from the system.
    2. Use GetAvailablePort to get a port;
  4. Use Bind to Bind a port.

  5. Call Connect to establish a connection to the Scheduler (the Scheduler also connects to its own fixed pre-set port);

  6. Start the receiver_thread_ thread on the local Node and run Van::Receiving;

  7. If the Node is not a scheduler, send an ADD_NODE message to the scheduler. This will inform the scheduler of the information about the local Node, that is, register with the scheduler.

  8. Then it enters the waiting state, waiting for the Scheduler to notify Ready (the Scheduler will send Ready after all nodes are registered). Note that the Scheduler node also waits, but this does not affect the Recevie thread of the Scheduler node from receiving processing messages;

  9. After Ready, start the Heartbeat thread and establish a Heartbeat connection to the Scheduler.

A further explanation of 7 and 8 is:

  • When the worker and server node are bound with IP and port, the ADD_NODE message is sent to the Scheduler node.
  • When the scheduler receives ADD_NODE messages from all workers and servers, it replies ADD_NODE messages in turn.
  • Each node waits for the process to complete through the atomic variable ready_.

The specific code is as follows:

void Van::Start(int customer_id) { // get scheduler info start_mu_.lock(); If (init_stage == 0) {// initialize scheduler_ this member variable scheduler_. Hostname = STD ::string( CHECK_NOTNULL(Environment::Get()->find("DMLC_PS_ROOT_URI"))); scheduler_.port = atoi(CHECK_NOTNULL(Environment::Get()->find("DMLC_PS_ROOT_PORT"))); scheduler_.role = Node::SCHEDULER; scheduler_.id = kScheduler; Is_scheduler_ = Postoffice::Get()->is_scheduler(); // get my node info if (is_scheduler_) {my_node_ = scheduler_; } else { auto role = Postoffice::Get()->is_worker() ? Node::WORKER : Node::SERVER; const char* nhost = Environment::Get()->find("DMLC_NODE_HOST"); std::string ip; if (nhost) ip = std::string(nhost); if (ip.empty()) { const char* itf = Environment::Get()->find("DMLC_INTERFACE"); std::string interface; if (itf) interface = std::string(itf); if (interface.size()) { GetIP(interface, &ip); } else { GetAvailableInterfaceAndIP(&interface, &ip); } } int port = GetAvailablePort(); const char* pstr = Environment::Get()->find("PORT"); if (pstr) port = atoi(pstr); my_node_.hostname = ip; my_node_.role = role; my_node_.port = port; // cannot determine my id now, the scheduler will assign it later // set it explicitly to make re-register within a same process possible my_node_.id =  Node::kEmpty; my_node_.customer_id = customer_id; Receiver_ my_node_. Port = bind (my_node_, is_scheduler_? Zero: 40); // Connect to the scheduler; // connect to the scheduler_; <int,void*> // Here is a map<int,void*> // here is a map<int,void*> // here is a map of senders_[1] = socket_1. // for debug use if (Environment::Get()->find("PS_DROP_MSG")) { drop_rate_ = atoi(Environment::Get()->find("PS_DROP_MSG")); } // Start receiver // start a thread that receives messages, Receiver_thread_ = STD ::unique_ptr< STD ::thread>(new STD ::thread(&Van::Receiving, this)); init_stage++; } start_mu_.unlock(); if (! }}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}} Message msg; Node customer_specific_node = my_node_; customer_specific_node.customer_id = customer_id; msg.meta.recver = kScheduler; msg.meta.control.cmd = Control::ADD_NODE; msg.meta.control.node.push_back(customer_specific_node); msg.meta.timestamp = timestamp_++; Send(msg); } // Wait until ready_ (false) changes to true;} // Wait until ready_ (false) changes to true; The system waits for scheduler to send the system AllReady message. while (! ready_.load()) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); } start_mu_.lock(); if (init_stage == 1) { // resender if (Environment::Get()->find("PS_RESEND") && atoi(Environment::Get()->find("PS_RESEND")) ! = 0) { int timeout = 1000; if (Environment::Get()->find("PS_RESEND_TIMEOUT")) { timeout = atoi(Environment::Get()->find("PS_RESEND_TIMEOUT")); } resender_ = new Resender(timeout, 10, this); } if (! Is_scheduler_) {heartbeat_thread_ = STD ::unique_ptr< STD ::thread>(new std::thread(&Van::Heartbeat, this)); } init_stage++; } start_mu_.unlock(); }Copy the code

0x04 Message received

We’ll first look at how background threads run, and then look at how various messages are processed.

4.1 Background processing message thread

Ps-lite starts a background thread receiver_thread_ to receive/process messages.

// start receiver
receiver_thread_ =
        std::unique_ptr<std::thread>(new std::thread(&Van::Receiving, this));
Copy the code

4.2 Processing Functions

Receiver_thread_ Uses the Receiving function to process messages.

4.2.1 Control information

In addition to the data message passing the parameters, the control information between the nodes is:

  • ADD_NODE: Worker and server register nodes with Shceduler;
  • BARRIER: Synchronous blocking messages between nodes;
  • HEARTBEAT: HEARTBEAT signals between nodes.
  • TERMINATE: Node exit signal;
  • ACK: A confirmation message. The ACK type appears only if Resender is enabled.
  • EMPTY: push or pull;

So different handlers are called in Receiving for different types of messages:

  • ProcessTerminateCommand: Process TERMINATE;
  • ProcessAddNodeCommand: processes ADD_NODE;
  • ProcessBarrierCommand: processes the BARRIER (analyzed above);
  • ProcessHearbeat: processes HEARTBEAT.

4.2.2 Intra-thread global variables

There are two variables inside the thread. Because they are outside of the while (true) loop, they are global variables inside the thread, which is important to note when reading the code.

  • Nodes: Used only when the scheduler processes ADD_NODE. It stores all nodes currently owned by the scheduler.
  • Recovery_nodes: Used only by the scheduler to process add_nodes. It stores all recovery nodes (recovered and restarted nodes) in the scheduler.

4.2.3 Implementation

Receiving logic is as follows:

  • Call RecvMsg (which a derived class will implement) to get the latest message;
  • If sampling is set, drop is performed.
  • If the retransmission mechanism is set, it will detect whether the message is repeated, and use resender_->AddIncomming(MSG) to process repeated messages.
  • Processing control messages or data messages;

The specific code is as follows

void Van::Receiving() { Meta nodes; // Consider the following two global variables: Meta recovery_nodes; CMD = control ::ADD_NODE; // store recovery nodes Stores the restarted node recovery_nodes. ADD_NODE while (true) {Message MSG; int recv_bytes = RecvMsg(&msg); // receive the message with the receiver_ variable // For debug, drop received message if (ready_.load() && drop_rate_ > 0) { unsigned seed = time(NULL) + my_node_.id; if (rand_r(&seed) % 100 < drop_rate_) { LOG(WARNING) << "Drop message " << msg.DebugString(); continue; } } CHECK_NE(recv_bytes, -1); recv_bytes_ += recv_bytes; If (Postoffice::Get()->verbose() >= 2) {PS_VLOG(2) << msg.debugString (); } // duplicated message if (resender_ && resender_->AddIncomming(msg)) continue; // Retransmit confirmation mechanism if (! Empty ()) {// if the message is a control type // control MSG auto& CTRL = MSG.meta. if (ctrl.cmd == Control::TERMINATE) { ProcessTerminateCommand(); break; } else if (ctrl.cmd == Control::ADD_NODE) { ProcessAddNodeCommand(&msg, &nodes, &recovery_nodes); } else if (ctrl.cmd == Control::BARRIER) {ProcessBarrierCommand(& MSG); } else if (ctrl.cmd == Control::HEARTBEAT) { ProcessHearbeat(&msg); } else {LOG(WARNING) << "Drop unknown typed message "<< msg.debugString (); }} else {// ProcessDataMsg(& MSG); }}}Copy the code

4.3 Processing ADD_NODE messages

ADD_NODE is the control message that the worker/server uses to register itself with the Scheduler.

4.3.1 Registration logic

Let’s remember the basic idea of registering.

  • When the worker and server node are bound with IP and port, the ADD_NODE message is sent to the Scheduler node.
  • When the scheduler receives all the ADD_NODE messages from the worker and server, it responds with ADD_NODE messages in turn. Notice that it also responds with ADD_NODE messages of the same type.
  • Each node (scheduler, worker, server) waits for the process to complete through the atomic variable ready_.

4.3.2 ProcessAddNodeCommand

The ProcessAddNodeCommand logic is as follows.

  • Find out the id of the heartbeat packet timeout and save it to the dead_set.

  • Get the control message from the received message.

  • Call UpdateLocalID, where:

    • If it is a new node, the Scheduler records the new node.
    • If the node was created by a restart, update the information about the old node.
  • If scheduler is used, then:

    • Call ProcessAddNodeCommandAtScheduler after receiving all the worker and the server ADD_NODE message node id allocation and response, namely of all the node set latest rank and sent to all the worker and the server.
  • If not scheduler, work & Server received the ADD_NODE message answered by Scheduler, then:

    • If it is an existing node, the new node will not be found in connected_noDES_, and the first node will call Connect to establish a connection with the new node.
    • If it is a new node itself, all existing nodes (not of the same type) are connected.
    • Update global Node information in connected_nodes_, including global rank (the local Node’s global rank is obtained from receiver_thread_);
    • Finally, set ready_ = true to allow this node to run because the main thread will block on it.

The specific code is as follows:

void Van::ProcessAddNodeCommand(Message* msg, Meta* nodes, Meta* recovery_nodes) { auto dead_nodes = Postoffice::Get()->GetDeadNodes(heartbeat_timeout_); STD ::unordered_set<int> dead_set(dead_nodes.begin(), dead_nodes.end()); Auto & CTRL = MSG ->meta. Control; UpdateLocalID(MSG, &dead_set, nodes, recovery_nodes); If (is_scheduler_) {/ / Scheduler node ProcessAddNodeCommandAtScheduler (MSG, nodes, recovery_nodes); } else {// Worker & Server node for const auto& node: ctrl.node) { std::string addr_str = node.hostname + ":" + std::to_string(node.port); If (connected_nodes_.find(addr_str) == connected_nodes_.end()) {if (connected_nodes_.find(addr_str) == connected_nodes_.end()) {if (connected_nodes_.find(addr_str) == connected_nodes_.end()) {if (connected_nodes_.find(addr_str) == connected_nodes_.end()); Connect(node); Connected_nodes_ [addr_str] = node.id; // Add the connected node} if (! node.is_recovery && node.role == Node::SERVER) ++num_servers_; if (! node.is_recovery && node.role == Node::WORKER) ++num_workers_; } ready_ = true; }}Copy the code

4.3.3 UpdateLocalID

This function is used to update the node ID information inside a node.

  • If MSG ->meta. Sender = meta ::kEmpty; if MSG ->meta. Sender = meta ::kEmpty;

    • If the current number of nodes control. nodes is less than “configured number of servers + configured number of workers”, it indicates that the system is starting, and the node information of the current message is added to the control.node.
    • Otherwise, the system is running. Some nodes may die and connect to the system again. Then, find a dead node ID from the Control.node of nodes whose role matches the current message (of the same type) and assign the node ID to the restarted node. And update Nodes -> Control. node and recovery_Nodes.
  • Here is the logic for normal nodes:

    • That is, to find the node whose IP address and port are consistent with their own in all node information returned by scheduler.
    • If found, the local node information is updated (because the node_id information is not set when the node is started, this needs to be set by scheduler in order to make re-registration possible, according to the comment). Includes global rank information.

The specific code is as follows:

void Van::UpdateLocalID(Message* msg, std::unordered_set<int>* deadnodes_set, Meta* nodes, Meta* recovery_nodes) { auto& ctrl = msg->meta.control; size_t num_nodes = Postoffice::Get()->num_servers() + Postoffice::Get()->num_workers(); // assign an id if (MSG ->meta. Sender == meta ::kEmpty) {// If Scheduler CHECK(is_scheduler_) is not set; CHECK_EQ(ctrl.node.size(), 1); // The node set in the control command in MSG is the worker itself, If (nodes->control.node.size() < num_nodes) {// Nodes ->control.node.push_back(Ctrl.node [0]); } else {// If there were all restarts, enter else // some node dies and restarts CHECK(ready_.load()); for (size_t i = 0; i < nodes->control.node.size() - 1; ++i) { const auto& node = nodes->control.node[i]; if (deadnodes_set->find(node.id) ! = deadnodes_set->end() && node.role == ctrl.node[0].role) { auto& recovery_node = ctrl.node[0]; // assign previous node id recovery_node.id = node.id; recovery_node.is_recovery = true; nodes->control.node[i] = recovery_node; recovery_nodes->control.node.push_back(recovery_node); break; }}}} // update my id/For normal node, update its rank, scheduler node will not work (because it cannot be found). // If the message sent by schedule to the work node is found that the local IP and port coincide with one of the points in the message, then change the ID of the local node to the node ID sent by schedule. for (size_t i = 0; i < ctrl.node.size(); ++i) { const auto& node = ctrl.node[i]; if (my_node_.hostname == node.hostname && my_node_.port == node.port) { if (getenv("DMLC_RANK") == nullptr || my_node_.id == Meta::kEmpty) { my_node_ = node; std::string rank = std::to_string(Postoffice::IDtoRank(node.id)); #ifdef _MSC_VER _putenv_s("DMLC_RANK", rank.c_str()); #else setenv("DMLC_RANK", rank.c_str(), true); #endif } } } }Copy the code

4.3.4 ProcessAddNodeCommandAtScheduler

ProcessAddNodeCommandAtScheduler is running within the Scheduler, it is to control the type of the message processing.

For the Scheduler node, after receiving the ADD_NODE messages from all workers and servers, the Scheduler allocates node IDS and replies, that is, the latest global rank of all nodes needs to be set and sent to all workers and servers.

  • After receiving all worker & Server registration messages (Nodes ->control.node.size() == num_nodes) :

    • Sort nodes by IP + port combination.
    • Scheduler establishes connections with all registered nodes, updates heartbeat timestamps, and assigns global ranks to all connected nodes.
    • Send ADD_NODE messages to all workers and servers (with all node information in the scheduler).
    • theready_ = true; That is, the scheduler is in a ready state, regardless of whether the worker and server have acknowledged receiving the ADD_NODE message.
    • On the receiving end (worker & Server), the global rank information of each local Node is obtained by receiver_thread_ (other functions), which is the nodes information returned by the scheduler.
  • If! Recovery_nodes ->control.node.empty(), this indicates that it is handling the registration behavior of some restarted nodes:

    • Find out the id of the heartbeat packet timeout and save it to the dead_set.

    • Connect to the restarted node (because an ADD_NODE was received), so only connect to the newly restarted node (check CHECK_EQ(recovery_nodes-> Control.node.size (), 1) in the code to confirm that there are one restarted node).

    • Update the heartbeat of the restarted node.

    • Since the new restart node is added, one send serves two purposes:

      • Sends ADD_NODE messages to all recovery workers and servers (carrying all current node information in the Scheduler).
      • The recovery information is sent to the alive node.
      • In this way, each node that receives the message establishes a connection with the new node.

The specific code is as follows:

void Van::ProcessAddNodeCommandAtScheduler(Message* msg, Meta* nodes, Meta* recovery_nodes) { recovery_nodes->control.cmd = Control::ADD_NODE; time_t t = time(NULL); size_t num_nodes = Postoffice::Get()->num_servers() + Postoffice::Get()->num_workers(); If (nodes->control.node.size() == num_nodes) {// Collect all nodes // sort The nodes according to their IP and port, give the worker according to IP and port, STD ::sort(nodes->control.node.begin(), nodes->control.node.end(), [](const node & a, const Node& b) { return (a.hostname.compare(b.hostname) | (a.port < b.port)) > 0; }); // Assign node rank for (auto& node: nodes->control.node) {// Assign a global rank to all nodes connected to the scheduler. std::string node_host_ip = node.hostname + ":" + std::to_string(node.port); If (connected_nodes_.find(node_host_ip) == connected_nodes_.end()) {CHECK_EQ(node.id, Node::kEmpty); Int id = node.role == node ::SERVER? Postoffice::ServerRankToID(num_servers_) : Postoffice::WorkerRankToID(num_workers_); Set num_servers_ to 0 node.id = id; set num_servers_ to 0; // Assign the id of the new node to id Connect(node); Senders_ [id] = sender; senders_[id] = sender; Postoffice::Get()->UpdateHeartbeat(node.id, t); Postoffice::Get()->UpdateHeartbeat(node.id, t); Connected_nodes_ [node_host_IP] = id; // Update the heartbeat packet. } else {int id = node.role == node :: server? Postoffice::ServerRankToID(num_servers_) : Postoffice::WorkerRankToID(num_workers_); shared_node_mapping_[id] = connected_nodes_[node_host_ip]; node.id = connected_nodes_[node_host_ip]; } if (node.role == Node::SERVER) num_servers_++; // Update rank if (node.role == node ::WORKER) num_workers_++; } nodes->control.node.push_back(my_node_); Nodes ->control. CMD = control ::ADD_NODE; Message back; back.meta = *nodes; // Send ADD_NODE message to all worker and server for (int r: Postoffice::Get()->GetNodeIDs(kWorkerGroup + kServerGroup)) { int recver_id = r; if (shared_node_mapping_.find(r) == shared_node_mapping_.end()) { back.meta.recver = recver_id; back.meta.timestamp = timestamp_++; Send(back); } } ready_ = true; // Scheduler is already ready} else if (! Recovery_nodes ->control.node.empty()) {// The node is not completely collected auto dead_nodes = Postoffice::Get()->GetDeadNodes(heartbeat_timeout_); STD ::unordered_set<int> dead_set(dead_nodes.begin(), dead_nodes.end()); Send back the recovery node CHECK_EQ(recovery_nodes->control.node.size(), 1); // Send back the recovery node CHECK_EQ(recovery_nodes->control.node.size(), 1); Connect(recovery_nodes->control.node[0]); Postoffice::Get()->UpdateHeartbeat(recovery_nodes->control.node[0].id, t); Message back; for (int r : Postoffice::Get()->GetNodeIDs(kWorkerGroup + kServerGroup)) { if (r ! = recovery_nodes->control.node[0].id && dead_set.find(r) ! = dead_set.end()) { // do not try to send anything to dead node continue; } // only send recovery_node to nodes already exist // but send all nodes to the recovery_node back.meta = (r == recovery_nodes->control.node[0].id) ? *nodes : *recovery_nodes; back.meta.recver = r; back.meta.timestamp = timestamp_++; Send(back); }}}Copy the code

This part of the process logic is as follows:

                                                              +
    Scheduler                                                 |      Worker
                                                              |
        +                                                     |        +
        |                                                     |        |
        |                                                     |        |
        v                                                     |        |
Postoffice::Start +---->  Van::Start                          |        |
                             +                                |        |
                             |                                |        |
                             |                                |        |
                             v                                |        |
                          Connect--do nothing                 |        |
                             +                                |        v
                             |                                |
                             |                                |  Postoffice::Start +----->  Van::Start
                             |                                |                                +
                             v                                |                                |
                         receiver_thread_ +---+               |                                |
                             +                |               |                                v
                             |                |               |                             Connect--to scheduler
                             |                |               |                                +
                             |                |               |                                |
                             |                |               |                                |
                             |                |               |                                |
                             |                |               |                                v
                             |                |               |                          receiver_thread_  +----->+
                             |                |               |                                +                  |
                             |                |               |                                |                  |
                             |                |               |                                |                  |
                             |                |               |                                v                  |
                             |                |   <---------------------------------------+   Send                |
                             |                |               |   ADD_NODE                     +                  |
                             |                v               |                                |                  |
                             |                                |                                |                  |
                             |       ProcessAddNodeCommand    |                                |                  |
                             |                +               |                                |                  |
                             |                |               |                                |                  |
                             |                | All nodes OK  |                                |                  |
                             |                |               |                                |                  |
                             v                |               |                                |                  |
                                              | set rank      |                                |                  |
                      wait until ready        |               |                                |                  |
                             +                |               |                                |                  |
                             |                +---------------------------------------------------------------->  |
                             |                |               |  ADD_NODE response(nodes info) |                  |
                             |                |               |                                |         ProcessAddNodeCommand
                             |                |               |                                v                  |
                             |                |               |                                                   |
                             | <--------------+               |                         wait until ready          |
                             |    ready_ = true               |                                +                  |
                             |                                |                                |  <---------------+
       +-------------------+ v                                |                                |
       |                                                      |         +--------------------+ v
       |                                                      |         |
       v                                                      |         |
                                                              |         v
  Postoffice::Barrier                                         |
                                                              |   Postoffice::Barrier
                                                              +
Copy the code

The phone is as follows, with Scheduler on the left and worker on the right:

4.3.5 A sequence of newly added nodes

The interconnection process can be divided into three steps:

Step 1: When the worker/ Server node initializes, it sends a connection message to the schedular node, assuming it is node 2;

if (! is_scheduler_) { // let the scheduler know myself Message msg; Node customer_specific_node = my_node_; customer_specific_node.customer_id = customer_id; msg.meta.recver = kScheduler; msg.meta.control.cmd = Control::ADD_NODE; msg.meta.control.node.push_back(customer_specific_node); msg.meta.timestamp = timestamp_++; Send(msg); // Send to the schedular to set up the link information. }Copy the code

The second step: after receiving the information, the Scheduler node in ProcessAddNodeCommandAtScheduler will first establish a connection and node 2. This “node join message” is broadcast to all worker/Server nodes already connected to the schedular, and the connection request from node 2 is put into the meta.

// assign node rank for (auto& node : nodes->control.node) { std::string node_host_ip = node.hostname + ":" + std::to_string(node.port); if (connected_nodes_.find(node_host_ip) == connected_nodes_.end()) { int id = node.role == Node::SERVER ? Postoffice::ServerRankToID(num_servers_) : Postoffice::WorkerRankToID(num_workers_); node.id = id; Connect(node); Senders_ [id] = sender; senders_[id] = sender; Postoffice::Get()->UpdateHeartbeat(node.id, t); Postoffice::Get()->UpdateHeartbeat(node.id, t); connected_nodes_[node_host_ip] = id; } else { int id = node.role == Node::SERVER ? Postoffice::ServerRankToID(num_servers_) : Postoffice::WorkerRankToID(num_workers_); shared_node_mapping_[id] = connected_nodes_[node_host_ip]; node.id = connected_nodes_[node_host_ip]; } if (node.role == Node::SERVER) num_servers_++; if (node.role == Node::WORKER) num_workers_++; } nodes->control.node.push_back(my_node_); nodes->control.cmd = Control::ADD_NODE; Message back; back.meta = *nodes; // Broadcast this "node join message" to all worker/Server nodes already connected to the schedular, and put the connection request from node 2 into the meta. for (int r : Postoffice::Get()->GetNodeIDs(kWorkerGroup + kServerGroup)) { int recver_id = r; if (shared_node_mapping_.find(r) == shared_node_mapping_.end()) { back.meta.recver = recver_id; back.meta.timestamp = timestamp_++; Send(back); }}Copy the code

Step 3: When the existing worker/server node receives this command, it forms a connection with node 2 in ProcessAddNodeCommand.

for (const auto& node : ctrl.node) { std::string addr_str = node.hostname + ":" + std::to_string(node.port); If (connected_nodes_.find(addr_str) == connected_nodes_.end()) {Connect(node); Connected_nodes_ [addr_str] = node.id; } if (! node.is_recovery && node.role == Node::SERVER) ++num_servers_; if (! node.is_recovery && node.role == Node::WORKER) ++num_workers_;Copy the code

At this point, the whole process is described. After each new node joins, the already joined nodes connect to the new node through the Schedular node.

4.4 Processing HEARTBEAT Messages

Let’s look at the heartbeat mechanism.

4.4.1 Heartbeat Mechanism

To record network reachability, PS Lite has designed a heartbeat mechanism. Specifically:

  • Each node’s PostOffice singleton maintains a MAP structure that stores active information about the nodes associated with the heartbeat. The key is the node number and the value is the timestamp of the last time its HEARTBEAT message was received.
  • The Worker/Server records only the heartbeats of the Scheduler, while the Scheduler records the heartbeats of all nodes. All dead nodes can be output based on timestamp and heartbeat timeout.
  • Each Worker/Server node will create a new HEARTBEAT thread and send a HEARTBEAT message to the Scheduler every PS_HEARTBEAT_INTERVAL second.
  • Upon receipt, the Scheduler node responds with a HEARTBEAT message.
  • The scheduler responds and determines whether the heartbeat packet is alive based on the difference between the current time and the time when the heartbeat packet is received.
  • Scheduler determines the dead node based on the heartbeat node’s timestamp. If the new node id is in the dead_node container, the node is restored. The new nodes connect to existing nodes via the schedular’s transit.

The details are as follows:

4.4.2 Data structure

STD ::unordered_map<int, time_t> heartbeats_ stores active information about the node associated with the heartbeat. The key is the node number and the value is the timestamp of the last time its HEARTBEAT message was received.

UpdateHeartbeat Periodically updates the heartbeat.

  void UpdateHeartbeat(int node_id, time_t t) {
    std::lock_guard<std::mutex> lk(heartbeat_mu_);
    heartbeats_[node_id] = t;
  }
  
  std::unordered_map<int, time_t> heartbeats_;  
Copy the code

4.4.3 Worker/Server Sends heartbeats

In these two types of nodes, a thread is started, and each Worker/Server node sends a HEARTBEAT message to the Scheduler every PS_HEARTBEAT_INTERVAL seconds:

if (! is_scheduler_) { // start heartbeat thread heartbeat_thread_ = std::unique_ptr<std::thread>(new std::thread(&Van::Heartbeat, this)); }Copy the code

The specific heartbeat functions are:

void Van::Heartbeat() { const char* val = Environment::Get()->find("PS_HEARTBEAT_INTERVAL"); const int interval = val ? atoi(val) : kDefaultHeartbeatInterval; while (interval > 0 && ready_.load()) { std::this_thread::sleep_for(std::chrono::seconds(interval)); Message msg; msg.meta.recver = kScheduler; msg.meta.control.cmd = Control::HEARTBEAT; msg.meta.control.node.push_back(my_node_); msg.meta.timestamp = timestamp_++; Send(msg); }}Copy the code

4.4.4 Scheduler Node Processes heartbeat

The Scheduler node responds to a HEARTBEAT message after receiving the HEARTBEAT message. UpdateHeartbeat Periodically updates the heartbeat.

void Van::ProcessHearbeat(Message* msg) { auto& ctrl = msg->meta.control; time_t t = time(NULL); for (auto& node : ctrl.node) { Postoffice::Get()->UpdateHeartbeat(node.id, t); if (is_scheduler_) { Message heartbeat_ack; heartbeat_ack.meta.recver = node.id; heartbeat_ack.meta.control.cmd = Control::HEARTBEAT; heartbeat_ack.meta.control.node.push_back(my_node_); heartbeat_ack.meta.timestamp = timestamp_++; // send back heartbeat Send(heartbeat_ack); }}}Copy the code

4.4.5 A Node dies

Scheduler processes ADD_NODE messages to see if there is a dead node, and determines if there is alive using the difference between the current timestamp and the heartbeat packet receiving timestamp.

std::vector<int> Postoffice::GetDeadNodes(int t) { std::vector<int> dead_nodes; if (! van_->IsReady() || t == 0) return dead_nodes; time_t curr_time = time(NULL); const auto& nodes = is_scheduler_ ? GetNodeIDs(kWorkerGroup + kServerGroup) : GetNodeIDs(kScheduler); { std::lock_guard<std::mutex> lk(heartbeat_mu_); for (int r : nodes) { auto it = heartbeats_.find(r); if ((it == heartbeats_.end() || it->second + t < curr_time) && start_time_ + t < curr_time) { dead_nodes.push_back(r); } } } return dead_nodes; }Copy the code

The logic is as follows:

+----------------------------------------------------+ | Scheduler | | | | | | | | heartbeats_ | | | | receiver_thread_+--------> ProcessHearbeat | | ^ + ^ + | | | | | | | | | | | | | | | | | | | +----------------------------------------------------+ | | | | | | | | RESPONSE | | | +-------------------------------------+ | | | | | | +-------------------------------+ | | | | | HEARTBEAT | | RESPONSE HEARTBEAT | | | | | | +-----------------------------------------+ +-----------------------------------------+ | Worker |  | | | Server | | | | | | | | | | | | | | | | | | | | | | | | | | | | heartbeats_ | | | | heartbeats_ | | | | + | | | + | | | heartbeat_thread_+----> Heartbeat | | | heartbeat_thread_+--> Heartbeat | | | | | | | | | v | | v | | receiver_thread_ +---> ProcessHearbeat | | receiver_thread_ +--> ProcessHearbeat | | | | | | | | | | | | | +-----------------------------------------+ +-----------------------------------------+Copy the code

4.5 Processing TERMINATE messages

ProcessTerminateCommand processes the end message by setting ready_ to false.

This indicates that the Van is not in the right state and cannot be processed.

void Van::ProcessTerminateCommand() {
  PS_VLOG(1) << my_node().ShortDebugString() << " is stopped";
  ready_ = false;
}
​
inline bool IsReady() { return ready_; }
Copy the code

4.6 Processing ACK Messages

4.6.1 Ack mechanism

In distributed systems, communication is also unreliable, and packet loss and delay are scenarios that must be considered. PS Lite designed the Resender class to improve the reliability of communication, it introduces the ACK mechanism. That is:

  • Each node must respond to any non-ACK /TERMINATE message it receives.
  • Each node, for each non-ACK /TERMINATE message sent, must be cached locally. The stored data structure is a MAP that produces unique keys based on the content of the message.
  • Each node, when receiving an ACK message, must remove the corresponding message from the local cache based on the feedback key.
  • Each node runs a monitoring thread that checks the local cache every PS_RESEND_TIMEOUT millisecond. Based on the sending timestamp and the current time of each message, find out the timeout messages and resend them, and add up the number of retries.

4.6.2 Resender class

The definition is as follows, where send_buff_ is the send cache used to store a list of sent messages. Acked_ is the confirmed message.

class Resender {
  std::thread* monitor_;
  std::unordered_set<uint64_t> acked_;
  std::atomic<bool> exit_{false};
  std::mutex mu_;
  int timeout_;
  int max_num_retry_;
  Van* van_;
  using Time = std::chrono::milliseconds;
  // the buffer entry
  struct Entry {
    Message msg;
    Time send;
    int num_retry = 0;
  };
  std::unordered_map<uint64_t, Entry> send_buff_;  
};
​
Copy the code

4.6.3 Monitoring threads

The monitoring thread and function are as follows: when awakened, find the send timestamp and current time of each message from send_BUFF_ (local cache), find the timeout messages to resend, and add up the number of retries. :

  monitor_ = new std::thread(&Resender::Monitoring, this);
​
  void Monitoring() {
    while (!exit_) {
      std::this_thread::sleep_for(Time(timeout_));
      std::vector<Message> resend;
      Time now = Now();
      mu_.lock();
      for (auto& it : send_buff_) {
        if (it.second.send + Time(timeout_) * (1+it.second.num_retry) < now) {
          resend.push_back(it.second.msg);
          ++it.second.num_retry;
          CHECK_LT(it.second.num_retry, max_num_retry_);
        }
      }
      mu_.unlock();
​
      for (const auto& msg : resend) van_->Send(msg);
    }
  }
​
Copy the code

4.6.4 Cache during sending

When Van sends a message, if retransmission is configured, the AddOutgoing function is called to add the message to the send cache.

int Van::Send(const Message& msg) {
  int send_bytes = SendMsg(msg);
  CHECK_NE(send_bytes, -1);
  send_bytes_ += send_bytes;
  if (resender_) resender_->AddOutgoing(msg);
  if (Postoffice::Get()->verbose() >= 2) {
    PS_VLOG(2) << msg.DebugString();
  }
  return send_bytes;
}
​
Copy the code

The following function is added to the send cache.

/** * \brief add an outgoining message * */ void AddOutgoing(const Message& msg) { if (msg.meta.control.cmd == Control::ACK) return; CHECK_NE(msg.meta.timestamp, Meta::kEmpty) << msg.DebugString(); auto key = GetKey(msg); std::lock_guard<std::mutex> lk(mu_); // already buffered, which often due to call Send by the monitor thread if (send_buff_.find(key) ! = send_buff_.end()) return; auto& ent = send_buff_[key]; ent.msg = msg; ent.send = Now(); ent.num_retry = 0; }Copy the code

4.6.5 Clearing the Cache

The following function has two functions:

  • Check whether the message is a duplicate message, and then the confirmation message is received.
  • If it is an acknowledgement message, it is cleared from the send cache.
/** * \brief add an incomming message * \brief return true if msg has been added before or a ACK message */ bool AddIncomming(const Message& msg) { // a message can be received by multiple times if (msg.meta.control.cmd == Control::TERMINATE) { return false; } else if (msg.meta.control.cmd == Control::ACK) { mu_.lock(); auto key = msg.meta.control.msg_sig; auto it = send_buff_.find(key); if (it ! = send_buff_.end()) send_buff_.erase(it); mu_.unlock(); return true; } else { mu_.lock(); auto key = GetKey(msg); auto it = acked_.find(key); bool duplicated = it ! = acked_.end(); if (! duplicated) acked_.insert(key); mu_.unlock(); // send back ack message (even if it is duplicated) Message ack; ack.meta.recver = msg.meta.sender; ack.meta.sender = msg.meta.recver; ack.meta.control.cmd = Control::ACK; ack.meta.control.msg_sig = key; van_->Send(ack); // warning if (duplicated) LOG(WARNING) << "Duplicated message: " << msg.DebugString(); return duplicated; }}Copy the code

4.7 Processing data messages

ProcessDataMsg is used to process the data message sent by the worker (that is, the worker updates the gradient to the server). Specifically, after obtaining the corresponding Customer, it calls the method of Customer for processing and directly puts the MSG into the processing queue.

We will introduce it in Customer.

void Van::ProcessDataMsg(Message* msg) { // data msg int app_id = msg->meta.app_id; int customer_id = Postoffice::Get()->is_worker() ? msg->meta.customer_id : app_id; auto* obj = Postoffice::Get()->GetCustomer(app_id, customer_id, 5); obj->Accept(*msg); // add a message to Customer}Copy the code

0x05 ZMQVan

ZMQVan is based on the implementation of ZerOMq Van, that is, to use ZMQ library to achieve the low-level details of the connection (ZMQ library is an open source library, a good encapsulation of socket, he makes socket programming more simple, concise and higher performance).

5.1 define

ZMQVan is defined as follows:

ZMQVan inherits from Van and adds two member variables to the class:

  • Unordered_map

    senders_ : senders_ is a collection of sockets sent by the node, that is, the mapping of node ids to sockets. For example, if node 8 wants to send a message to node 9, just find the combination (9, socket_9) and call socket_9.send(message),
    ,>
  • Void *receiver_ = nullptr: indicates the socket connection received by Bind.

The details are as follows:

class ZMQVan : public Van {
  void *context_ = nullptr;
  /**
   * \brief node_id to the socket for sending data to this node
   */
  std::unordered_map<int, void*> senders_;
  std::mutex mu_;
  void *receiver_ = nullptr;
};
Copy the code

5.2 Van function

The Van class has the following functions that are called to or by ZMQVan.

5.2.1 Sending messages

The Send function is to call the SendMsg function of ZMQVan to Send the message. After sending, if the ACK mechanism is set, then resender_->AddOutgoing will be called.

int Van::Send(const Message& msg) {
  int send_bytes = SendMsg(msg);
  CHECK_NE(send_bytes, -1);
  send_bytes_ += send_bytes;
  if (resender_) resender_->AddOutgoing(msg);
  if (Postoffice::Get()->verbose() >= 2) {
    PS_VLOG(2) << msg.DebugString();
  }
  return send_bytes;
}
Copy the code

5.2.2 Meta class

Meta encapsulates metadata, sender, receiver, timestamp, request or response, etc.

/**
 * \brief meta info of a message
 */
struct Meta {
  /** \brief the empty value */
  static const int kEmpty;
  /** \brief an int head */
  int head;
  /** \brief the unique id of the application of messsage is for*/
  int app_id;
  /** \brief customer id*/
  int customer_id;
  /** \brief the timestamp of this message */
  int timestamp;
  /** \brief the node id of the sender of this message */
  int sender;
  /** \brief the node id of the receiver of this message */
  int recver;
  /** \brief whether or not this is a request message*/
  bool request;
  /** \brief whether or not a push message */
  bool push;
  /** \brief whether or not a pull message */
  bool pull;
  /** \brief whether or not it's for SimpleApp */
  bool simple_app;
  /** \brief an string body */
  std::string body;
  /** \brief data type of message.data[i] */
  std::vector<DataType> data_type;
  /** \brief system control message */
  Control control;
  /** \brief the byte size */
  int data_size = 0;
  /** \brief message priority */
  int priority = 0;
};
Copy the code

To ease communication pressure, PS-Lite uses Protobuf to compress Meta data.

5.2.3 requires compressed Meta

So you compress the data in protobuf.

void Van::PackMeta(const Meta& meta, char** meta_buf, int* buf_size) { // convert into protobuf PBMeta pb; pb.set_head(meta.head); if (meta.app_id ! = Meta::kEmpty) pb.set_app_id(meta.app_id); if (meta.timestamp ! = Meta::kEmpty) pb.set_timestamp(meta.timestamp); if (meta.body.size()) pb.set_body(meta.body); pb.set_push(meta.push); pb.set_pull(meta.pull); pb.set_request(meta.request); pb.set_simple_app(meta.simple_app); pb.set_priority(meta.priority); pb.set_customer_id(meta.customer_id); for (auto d : meta.data_type) pb.add_data_type(d); if (! meta.control.empty()) { auto ctrl = pb.mutable_control(); ctrl->set_cmd(meta.control.cmd); if (meta.control.cmd == Control::BARRIER) { ctrl->set_barrier_group(meta.control.barrier_group); } else if (meta.control.cmd == Control::ACK) { ctrl->set_msg_sig(meta.control.msg_sig); } for (const auto& n : meta.control.node) { auto p = ctrl->add_node(); p->set_id(n.id); p->set_role(n.role); p->set_port(n.port); p->set_hostname(n.hostname); p->set_is_recovery(n.is_recovery); p->set_customer_id(n.customer_id); } } // to string *buf_size = pb.ByteSize(); *meta_buf = new char[*buf_size + 1]; CHECK(pb.SerializeToArray(*meta_buf, *buf_size)) << "failed to serialize protobuf"; }Copy the code

5.2.3 requires decompression UnpackMeta

Decompress it in the PBMeta format pregenerated by Protobuf.

void Van::UnpackMeta(const char* meta_buf, int buf_size, Meta* meta) { // to protobuf PBMeta pb; CHECK(pb.ParseFromArray(meta_buf, buf_size)) << "failed to parse string into protobuf"; // to meta meta->head = pb.head(); meta->app_id = pb.has_app_id() ? pb.app_id() : Meta::kEmpty; meta->timestamp = pb.has_timestamp() ? pb.timestamp() : Meta::kEmpty; meta->request = pb.request(); meta->push = pb.push(); meta->pull = pb.pull(); meta->simple_app = pb.simple_app(); meta->priority = pb.priority(); meta->body = pb.body(); meta->customer_id = pb.customer_id(); meta->data_type.resize(pb.data_type_size()); for (int i = 0; i < pb.data_type_size(); ++i) { meta->data_type[i] = static_cast<DataType>(pb.data_type(i)); } if (pb.has_control()) { const auto& ctrl = pb.control(); meta->control.cmd = static_cast<Control::Command>(ctrl.cmd()); meta->control.barrier_group = ctrl.barrier_group(); meta->control.msg_sig = ctrl.msg_sig(); for (int i = 0; i < ctrl.node_size(); ++i) { const auto& p = ctrl.node(i); Node n; n.role = static_cast<Node::Role>(p.role()); n.port = p.port(); n.hostname = p.hostname(); n.id = p.has_id() ? p.id() : Node::kEmpty; n.is_recovery = p.is_recovery(); n.customer_id = p.customer_id(); meta->control.node.push_back(n); } } else { meta->control.cmd = Control::EMPTY; }}Copy the code

5.2.4 PackMetaPB

PackMetaPB: The PackMetaPB is in ibverbs_van.h, so we won’t go into it.

void Van::PackMetaPB(const Meta& meta, PBMeta* pb) { pb->set_head(meta.head); if (meta.app_id ! = Meta::kEmpty) pb->set_app_id(meta.app_id); if (meta.timestamp ! = Meta::kEmpty) pb->set_timestamp(meta.timestamp); if (meta.body.size()) pb->set_body(meta.body); pb->set_push(meta.push); pb->set_request(meta.request); pb->set_simple_app(meta.simple_app); pb->set_priority(meta.priority); pb->set_customer_id(meta.customer_id); for (auto d : meta.data_type) pb->add_data_type(d); if (! meta.control.empty()) { auto ctrl = pb->mutable_control(); ctrl->set_cmd(meta.control.cmd); if (meta.control.cmd == Control::BARRIER) { ctrl->set_barrier_group(meta.control.barrier_group); } else if (meta.control.cmd == Control::ACK) { ctrl->set_msg_sig(meta.control.msg_sig); } for (const auto& n : meta.control.node) { auto p = ctrl->add_node(); p->set_id(n.id); p->set_role(n.role); p->set_port(n.port); p->set_hostname(n.hostname); p->set_is_recovery(n.is_recovery); p->set_customer_id(n.customer_id); } } pb->set_data_size(meta.data_size); }Copy the code

5.3 ZMQVan derived functions

ZMQVan has the following important derived functions.

5.3.1 Bind

The Bind logic is as follows:

  • Use zmq_bind() to bind a socket to a local network node (endpoint) and start receiving messages sent to that node.
  • The node address information is a string containing a protocol :// followed by an address.
  • The Bind function uses the “DMLC_LOCAL” variable to determine whether to enable the IPC or TCP mode and configure the node address information.
  • For schedule node calls, you do not need to specify a port, but for work and server you need to find a locally available port.
  • The maximum number of retries is set when the port is searched.
  int Bind(const Node& node, int max_retry) override {
    receiver_ = zmq_socket(context_, ZMQ_ROUTER);
    int local = GetEnv("DMLC_LOCAL", 0);
    std::string hostname = node.hostname.empty() ? "*" : node.hostname;
    int use_kubernetes = GetEnv("DMLC_USE_KUBERNETES", 0);
    if (use_kubernetes > 0 && node.role == Node::SCHEDULER) {
      hostname = "0.0.0.0";
    }
    std::string addr = local ? "ipc:///tmp/" : "tcp://" + hostname + ":";
    int port = node.port;
    unsigned seed = static_cast<unsigned>(time(NULL) + port);
    for (int i = 0; i < max_retry + 1; ++i) {
      auto address = addr + std::to_string(port);
      if (zmq_bind(receiver_, address.c_str()) == 0) break;
      if (i == max_retry) {
        port = -1;
      } else {
        port = 10000 + rand_r(&seed) % 40000;
      }
    }
    return port;
  }
Copy the code

5.3.2 the Connect

Initialize Sender_. The logic is as follows:

  • If a corresponding socket is found, close the socket.
  • If it is found that the worker sent to the same class, or the server sent to the same class, and not to itself (Scheduler can send to itself), then return.
  • Set up a ZMQ socket and assign the newly created socket to the sender as an opaque pointer.
  • If it is a scheduler, configure the socket to bind its ID to the socket.
  • Connect the sender socket to the destination address.
  • Store the socket of the target ID, that is, add the socket to Sender_.

The details are as follows:

void Connect(const Node& node) override { int id = node.id; auto it = senders_.find(id); if (it ! = senders_.end()) { zmq_close(it->second); } // Worker doesn't need to connect to the other workers. same for server if ((node.role == my_node_.role) && (node.id ! = my_node_.id)) { return; } void *sender = zmq_socket(context_, ZMQ_DEALER); If (my_node_.id!); // If (my_node_.id!); = Node::kEmpty) { std::string my_id = "ps" + std::to_string(my_node_.id); zmq_setsockopt(sender, ZMQ_IDENTITY, my_id.data(), my_id.size()); const char* watermark = Environment::Get()->find("DMLC_PS_WATER_MARK"); if (watermark) { const int hwm = atoi(watermark); zmq_setsockopt(sender, ZMQ_SNDHWM, &hwm, sizeof(hwm)); } } // connect std::string addr = "tcp://" + node.hostname + ":" + std::to_string(node.port); if (GetEnv("DMLC_LOCAL", 0)) { addr = "ipc:///tmp/" + std::to_string(node.port); } if (zmq_connect(sender, addr.c_str()) ! LOG(FATAL) << "connect to "+ addr +" failed: "+ zmq_strerror(errno); } senders_[id] = sender; // Store the target id socket for later use}Copy the code

5.3.3 SendMsg

The logic is as follows:

  • Find the previously reserved socket from the saved sender_;
  • Compression meta.
  • Send a meta.
  • Sending data in loop segments;
  int SendMsg(const Message& msg) override {
    std::lock_guard<std::mutex> lk(mu_);
    // find the socket
    int id = msg.meta.recver;
    CHECK_NE(id, Meta::kEmpty);
    auto it = senders_.find(id);
    if (it == senders_.end()) {
      LOG(WARNING) << "there is no socket to node " << id;
      return -1;
    }
    void *socket = it->second;
​
    // send meta
    int meta_size; char* meta_buf;
    PackMeta(msg.meta, &meta_buf, &meta_size);
    int tag = ZMQ_SNDMORE;
    int n = msg.data.size();
    if (n == 0) tag = 0;
    zmq_msg_t meta_msg;
    zmq_msg_init_data(&meta_msg, meta_buf, meta_size, FreeData, NULL);
    while (true) {
      if (zmq_msg_send(&meta_msg, socket, tag) == meta_size) break;
      if (errno == EINTR) continue;
      return -1;
    }
    // zmq_msg_close(&meta_msg);
    int send_bytes = meta_size;
    // send data
    for (int i = 0; i < n; ++i) {
      zmq_msg_t data_msg;
      SArray<char>* data = new SArray<char>(msg.data[i]);
      int data_size = data->size();
      zmq_msg_init_data(&data_msg, data->data(), data->size(), FreeData, data);
      if (i == n - 1) tag = 0;
      while (true) {
        if (zmq_msg_send(&data_msg, socket, tag) == data_size) break;
        if (errno == EINTR) continue;
        return -1;
      }
      // zmq_msg_close(&data_msg);
      send_bytes += data_size;
    }
    return send_bytes;
  }
​
Copy the code

5.3.4 the RecvMsg

RecvMsg simply accepts the message on the bound port.

When a message is received, it determines which message it is and does different things.

int RecvMsg(Message* msg) override { msg->data.clear(); size_t recv_bytes = 0; for (int i = 0; ; ++i) { zmq_msg_t* zmsg = new zmq_msg_t; CHECK(zmq_msg_init(zmsg) == 0) << zmq_strerror(errno); while (true) { if (zmq_msg_recv(zmsg, receiver_, 0) ! = -1) break; if (errno == EINTR) { std::cout << "interrupted"; continue; } return -1; } char* buf = CHECK_NOTNULL((char *)zmq_msg_data(zmsg)); size_t size = zmq_msg_size(zmsg); recv_bytes += size; if (i == 0) { // identify msg->meta.sender = GetNodeID(buf, size); msg->meta.recver = my_node_.id; CHECK(zmq_msg_more(zmsg)); zmq_msg_close(zmsg); delete zmsg; } else if (i == 1) { // task UnpackMeta(buf, size, &(msg->meta)); zmq_msg_close(zmsg); bool more = zmq_msg_more(zmsg); delete zmsg; if (! more) break; } else { // zero-copy SArray<char> data; data.reset(buf, size, [zmsg, size](char* buf) { zmq_msg_close(zmsg); delete zmsg; }); msg->data.push_back(data); if (! zmq_msg_more(zmsg)) { break; } } } return recv_bytes; }Copy the code

GetNodeID function is

  /**
   * return the node id given the received identity
   * \return -1 if not find
   */
  int GetNodeID(const char* buf, size_t size) {
    if (size > 2 && buf[0] == 'p' && buf[1] == 's') {
      int id = 0;
      size_t i = 2;
      for (; i < size; ++i) {
        if (buf[i] >= '0' && buf[i] <= '9') {
          id = id * 10 + buf[i] - '0';
        } else {
          break;
        }
      }
      if (i == size) return id;
    }
    return Meta::kEmpty;
  }
Copy the code

0 x06 summary

Let’s make a final summary:

With the address book in the post office, a Van is needed to pull and deliver items. Van is the communication module of the whole Parameter Server, and its characteristics are as follows.

  • When the PostOffice class is instantiated, an instance of the Van class is created as a member variable. The Van instance has the same lifetime as the owning PostOffice instance (there is only one object per node);

  • The Van is responsible for the specific inter-node communication. Specifically, it’s responsible for establishing connections between nodes (such as Worker and Scheduler) and opening local receiving threads to listen for incoming messages.

  • The Van object initialization function is used to set up different Settings depending on the type of the local node to start the port, establish the connection between the local node and the scheduler, start the receiving thread, heartbeat thread, etc., so that the communication can be carried out.

  • The Parameter Server receives/processes messages on the background thread receiver_thread_. In addition to the data message passing the parameters, the control information between the nodes is:

    • ADD_NODE: Worker and server register nodes with Shceduler;
    • BARRIER: Synchronous blocking messages between nodes;
    • HEARTBEAT: HEARTBEAT signals between nodes.
    • TERMINATE: Node exit signal;
    • ACK: A confirmation message. The ACK type appears only if Resender is enabled.
    • EMPTY: push or pull;

0xEE Personal information

Thoughts on life and technology

Wechat public account: Rosie’s Thinking

If you want to get updates on your own articles, or check out your own technical recommendations, please pay attention.

0 XFF reference

Introduction to distributed machine learning — implementation principle of logistic regression based on parameter server

[Distributed] Distributed computing instance resolution based on PS-Lite

Ps-lite source code analysis

Official brief instructions for use

Ps-lite source code analysis -KangRoger

Ps-lite source code analysis – Zybuluo

Ps-lite Code Notes – WillZhang