0 x00 the

This article is the fourth part of parameter server, introduces KVWorker, KVServer.

KVWorker and KVServer are abstracts of the Server/Worker node, respectively, and are started by Van –> Customer –> recv_handle_ as part of the engine.

This article will first introduce some basic support classes, then introduce Server/Worker base class SimpleApp, and finally introduce Server/Worker concrete implementation.

The overall flow chart reveals the plot in advance as follows:

0 x01 base classes

We first need to introduce some base classes.

1.1 the Range

The Range class is used to determine the Range of parameters on which server to pull, and the Range of keys corresponding to the server.

The Range class provides the following functions:

  • Begin () and end() two uint64 positions;
  • Size () gets the size of the range, i.e. End_ -begin_;
class Range {
 public:
  Range() : Range(0, 0) {}
  Range(uint64_t begin, uint64_t end) : begin_(begin), end_(end) { }
​
  uint64_t begin() const { return begin_; }
  uint64_t end() const { return end_; }
  uint64_t size() const { return end_ - begin_; }
 private:
  uint64_t begin_;
  uint64_t end_;
};
Copy the code

1.2 TreadsafeQueue

TreadsafeQueue is a queue that can be read by multiple threads and is thread-safe through lock and conditional cooperation. It is used as a message queue.

/** * \brief thread-safe queue allowing push and waited pop */ class ThreadsafePQueue { public: ThreadsafePQueue() { } ~ThreadsafePQueue() { } /** * \brief push an value into the end. threadsafe. * \param new_value the value */ void Push(Message new_value) { mu_.lock(); queue_.push(std::move(new_value)); mu_.unlock(); cond_.notify_all(); } /** * \brief wait until pop an element from the beginning, Threadsafe * \param value the poped value */ void WaitAndPop(Message* value) {// Wait queue is not empty, Pop message STD ::unique_lock< STD ::mutex> lk(mu_); cond_.wait(lk, [this]{return ! queue_.empty(); }); *value = std::move(queue_.top()); queue_.pop(); } private: class Compare { public: bool operator()(const Message &l, const Message &r) { return l.meta.priority <= r.meta.priority; }}; mutable std::mutex mu_; STD ::priority_queue<Message, STD ::vector<Message>, Compare> queue_; STD ::condition_variable cond_; // The queue is not empty condition variable};Copy the code

0x02 SimpleApp

2.1 an overview of the

SimpleApp is a base class that makes a unified abstraction of the application node functionality.

  • Provides basic sending functions and simple message processing functions (Request, Wait, Response).
  • The message type is head of int and body of string.
  • It has two derived classes. KVServer and KVWorker.

2.2 define

2.2.1 support class

SimpleData defines the basic format for Request and Response.

struct SimpleData {
  /** \brief the int head */
  int head;
  /** \brief the string body */
  std::string body;
  /** \brief sender's node id */
  int sender;
  /** \brief the associated timestamp */
  int timestamp;
  /** \brief sender's customer id */
  int customer_id;
};
Copy the code

2.2.2 Member variables

SimpleApp has the following member variables:

  • Customer* obj_ : This App’s Customer, controls the request connection;
  • Handle request_handle_ : Request handler;
  • Handle response_handle_ : Response handler;
  • Set_request_handle, set_response_handle: sets the memberrequest_handle_.response_handle_. When the client calls SimpleApp::Process, it determines whether it is request or response according to the indication variable in message.meta, and calls the corresponding handle.
class SimpleApp {
 public:
  /**
   * \brief constructor
   * @param app_id the app id, should match with the remote node app with which this app
   * @param customer_id the customer_id, should be node-locally unique
   * is communicated
   */
  explicit SimpleApp(int app_id, int customer_id);
​
  /** \brief deconstructor */
  virtual ~SimpleApp() { delete obj_; obj_ = nullptr; }
​
  /**
   * \brief send a request to a remote node
   *
   * \param req_head request head
   * \param req_body request body
   * \param recv_id remote node id
   *
   * @return the timestamp of this request
   */
  virtual inline int Request(int req_head, const std::string& req_body, int recv_id);
​
  /**
   * \brief wait until a request is finished
   *
   * \param timestamp
   */
  virtual inline void Wait(int timestamp) { obj_->WaitRequest(timestamp); }
​
​
  /**
   * \brief send back a response for a request
   * \param recv_req the received request
   * \param the response body
   */
  virtual inline void Response(const SimpleData& recv_req, const std::string& res_body = "");
​
  /**
   * \brief the handle to proces a received request/respoonse
   *
   * \param recved the received request or response
   * \param app this pointer
   */
  using Handle = std::function<void(const SimpleData& recved, SimpleApp* app)>;
​
  /**
   * \brief set the request handle
   * \param request_handle the request handle
   */
  virtual inline void set_request_handle(const Handle& request_handle) {
    CHECK(request_handle) << "invalid request handle";
    request_handle_ = request_handle;
  }
​
  /**
   * \brief set the response handle
   * \param response_handle the response handle
   */
  virtual inline void set_response_handle(const Handle& response_handle) {
    CHECK(response_handle) << "invalid response handle";
    response_handle_ = response_handle;
  }
​
  /**
   * \brief returns the customer
   */
  virtual inline Customer* get_customer() { return obj_; }
​
 protected:
  /** \brief empty construct */
  inline SimpleApp() : obj_(nullptr) {
    request_handle_ = [](const SimpleData& recved, SimpleApp* app) {
      app->Response(recved);
    };
    response_handle_ = [](const SimpleData& recved, SimpleApp* app) { };
  }
​
  /** \brief process a received message */
  virtual inline void Process(const Message& msg);
​
  /** \brief ps internal object */
  Customer* obj_;
​
 private:
  /** \brief request handle */
  Handle request_handle_;
  /** \brief request handle */
  Handle response_handle_;
};
Copy the code

2.3 Functions

The three simple functions are as follows:

Request is to call the Van to send a message.

inline int SimpleApp::Request(int req_head, const std::string& req_body, int recv_id) {
  // setup message
  Message msg;
  msg.meta.head = req_head;
  if (req_body.size()) msg.meta.body = req_body;
  int ts = obj_->NewRequest(recv_id);
  msg.meta.timestamp = ts;
  msg.meta.request = true;
  msg.meta.simple_app = true;
  msg.meta.app_id = obj_->app_id();
  msg.meta.customer_id = obj_->customer_id();
​
  // send
  for (int r : Postoffice::Get()->GetNodeIDs(recv_id)) {
    msg.meta.recver = r;
    Postoffice::Get()->van()->Send(msg);
  }
  return ts;
}
Copy the code

Response is to call Van to reply to the message.

inline void SimpleApp::Response(const SimpleData& req, const std::string& res_body) {
  // setup message
  Message msg;
  msg.meta.head = req.head;
  if (res_body.size()) msg.meta.body = res_body;
  msg.meta.timestamp = req.timestamp;
  msg.meta.request = false;
  msg.meta.simple_app = true;
  msg.meta.app_id = obj_->app_id();
  msg.meta.customer_id = req.customer_id;
  msg.meta.recver = req.sender;
​
  // send
  Postoffice::Get()->van()->Send(msg);
}
Copy the code

The Process function determines whether it is a request or a response based on the indication variables in message.meta and calls handle accordingly.

inline void SimpleApp::Process(const Message& msg) { SimpleData recv; recv.sender = msg.meta.sender; recv.head = msg.meta.head; recv.body = msg.meta.body; recv.timestamp = msg.meta.timestamp; recv.customer_id = msg.meta.customer_id; If (MSG. Meta. Request) {// Determine whether it is request or response, call the corresponding handle to handle CHECK(request_handle_); request_handle_(recv, this); } else { CHECK(response_handle_); response_handle_(recv, this); }}Copy the code

0x03 KVServer

KVServer is the abstraction of the Server node. Its function is to receive information, process information, and return results. Its main functions are:

  • Maintaining key-value pairs data;

  • Handle and respond to push and pull requests from clients;

    • The function request_handle_ handles requests:

      • Is called when KVServer::Process is calledrequest_handle_
      • request_handle_The default isKVServerDefaultHandle.
    • The Response function is used to return data;

3.1 define

Request_handle_ is a request handler and needs to be customized.

  • In this callback function, the user needs to implement the model weight gradient update algorithm and the model weight return operation of various optimizers.
  • Refer directly to the implemented default version of PS-Lite, KVServerDefaultHandle.
/** * \brief A server node for maintaining key-value pairs */ template <typename Val> class KVServer : public SimpleApp { public: /** * \brief constructor * \param app_id the app id, should match with \ref KVWorker's id */ explicit KVServer(int app_id) : SimpleApp() { using namespace std::placeholders; obj_ = new Customer(app_id, app_id, std::bind(&KVServer<Val>::Process, this, _1)); } /** \brief deconstructor */ virtual ~KVServer() { delete obj_; obj_ = nullptr; } /** * \brief the handle to process a push/pull request from a worker * \param req_meta meta-info of this request * \param req_data kv pairs of this request * \param server this pointer */ using ReqHandle = std::function<void(const KVMeta& req_meta, const KVPairs<Val>& req_data, KVServer* server)>; void set_request_handle(const ReqHandle& request_handle) { CHECK(request_handle) << "invalid request handle"; request_handle_ = request_handle; } /** * \brief response to the push/pull request * \param req the meta-info of the request * \param res the kv pairs that will send back to the worker */ void Response(const KVMeta& req, const KVPairs<Val>& res = KVPairs<Val>()); private: /** \brief internal receive handle */ void Process(const Message& msg); /** \brief request handle */ ReqHandle request_handle_; // Need the user to implement};Copy the code

3.2 Functions

3.2.1 the Response

Response() is to send Response information to the calling worker. Compared to SimpleApp, KVServer has new processing for both head and body.

It should be noted that Response function should be called by user-defined request_handle_, that is, request_handle_ processes the received message, and then calls Response to reply to the worker.

template <typename Val>
void KVServer<Val>::Response(const KVMeta& req, const KVPairs<Val>& res) {
  Message msg;
  msg.meta.app_id = obj_->app_id();
  msg.meta.customer_id = req.customer_id;
  msg.meta.request     = false;
  msg.meta.push        = req.push;
  msg.meta.pull        = req.pull;
  msg.meta.head        = req.cmd;
  msg.meta.timestamp   = req.timestamp;
  msg.meta.recver      = req.sender;
  if (res.keys.size()) {
    msg.AddData(res.keys);
    msg.AddData(res.vals);
    if (res.lens.size()) {
      msg.AddData(res.lens);
    }
  }
  Postoffice::Get()->van()->Send(msg);
}
Copy the code

3.2.2 Process

Process() is registered with the Customer object, and when the Customer object’s receiving Thread receives a message, Process() is called to Process the data.

The logic inside Process() is:

  • Extract the meta information of the message and build a KVMeta.
  • As you can see, there is no maintenance of KV data in Process.
  • Process calls a user-implemented request_handle_ (STD ::function object) to Process the data.
  • In the callback function request_handle_, the user needs to implement the various optimizer’s model weight gradient update algorithms and model weight return operations.
template <typename Val>
void KVServer<Val>::Process(const Message& msg) {
  if (msg.meta.simple_app) {
    SimpleApp::Process(msg); return;
  }
  KVMeta meta;
  meta.cmd       = msg.meta.head;
  meta.push      = msg.meta.push;
  meta.pull      = msg.meta.pull;
  meta.sender    = msg.meta.sender;
  meta.timestamp = msg.meta.timestamp;
  meta.customer_id = msg.meta.customer_id;
  KVPairs<Val> data;
  int n = msg.data.size();
  if (n) {
    CHECK_GE(n, 2);
    data.keys = msg.data[0];
    data.vals = msg.data[1];
    if (n > 2) {
      CHECK_EQ(n, 3);
      data.lens = msg.data[2];
      CHECK_EQ(data.lens.size(), data.keys.size());
    }
  }
  CHECK(request_handle_);
  request_handle_(meta, data, this);
}
Copy the code

3.2.3 Example functions

KVServerDefaultHandle is an example provided by ps-lite to demonstrate how to maintain KV, process messages, and return requests.

This maintains a hash table, unordered_map, that records keys and values, and responds to push and pull requests.

The STD ::unordered_map store is used to store the server parameters. When the request is push, the store parameter is updated, and when the request is pull, the parameter is pulled.

/** * \brief an example handle adding pushed kv into store */ template <typename Val> struct KVServerDefaultHandle { void operator()( const KVMeta& req_meta, const KVPairs<Val>& req_data, KVServer<Val>* server) { size_t n = req_data.keys.size(); KVPairs<Val> res; if (! req_meta.pull) { CHECK_EQ(n, req_data.vals.size()); } else { res.keys = req_data.keys; res.vals.resize(n); } for (size_t i = 0; i < n; ++i) { Key key = req_data.keys[i]; if (req_meta.push) { store[key] += req_data.vals[i]; } if (req_meta.pull) { res.vals[i] = store[key]; } } server->Response(req_meta, res); } std::unordered_map<Key, Val> store; };Copy the code

3.2.4 process

Let’s continue the elaboration process from above.

  • The worker node or server node executes Postoffice::start() at the beginning of the program.

  • Postoffice::start() initializes the node information and calls Van::start().

  • Each node listens on a local port; The connected node was already connected at startup.

  • Van::start() starts a local thread dedicated to Receiving messages from the socket, using Van::Receiving() to continuously listen for incoming messages.

    • receiver_thread_ = std::unique_ptr<std::thread>(new std::thread(&Van::Receiving, this));
      Copy the code
  • Van::Receiving() After Receiving the message, perform different actions according to different commands. For data messages, ProcessDataMsg is called if further processing is required:

    • Find the Customer based on the app ID in the message (each app task is bound to a Custom class), that is, the Recv thread that sends the message to different customers based on the Customer ID.
    • Pass the message toCustomer::AcceptFunction.
  • The Customer::Accept() function adds the message to a queue recv_queue_;

  • The Customer object itself will also start an Receiving thread recv_thread_, using Customer::Receiving() :

    • Constantly fromrecv_queue_Queue fetch message.
    • If (! Recv.meta. Request), which means response, thentracker_[req.timestamp].second++
    • User defined by the call registrationrecv_handle_Function to process the message.
  • For workers, the recv_handle_ registered is the KVWorker::Process() function. Because the message received by the worker’s Recv thread is mainly the KV pair pulled from the server, the Process() mainly receives the KV pair in the message.

  • For the Server, the recv_handle_ registered is the KVServer::Process() function.

  • KVServer::set_request_handle(); KVServer::set_request_handle();

  • In the user-defined request_handle_ function, if a response needs to be sent to the worker, then KVServer:: response is called.

In step 8, recv_handle_ points to KVServer::Process or KVWorker::Process (server in this section, so the corresponding is KVServer::Process). In step 10, return response to the worker.

+--------------------------+ | Van | | | Request +-----------> Receiving | | 1 + | +---------------------------+ | | | |  Postoffice | | | 2 | | | | v | GetCustomer | | | ProcessDataMsg <------------------> unordered_map customers_| | + | 3 | | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | Customer | | | | | | v | | Accept | | + | | | | | | 5 | | v | | recv_queue_ | +------------------+ | + | |KVWorker | | |  6 | +--------> | | | | | | 8 | Process | | v | | +------------------+ | recv_thread_ +---> Receiving | | | + | | | | 7 | | | | | | +------------------+ | v | | |KVServer | | recv_handle_+---------+--------> | | | | 8 | Process | +------------------------------------+ | + | +------------------+ | | 9 v +-----------+-------+ | request_handle_ | 10 |  | Response <----------------------------------------------------+ Response | | | +-------------------+Copy the code

0x04 KVWorker

4.1 an overview of the

KVWorker is used to push and pull key-value pairs to the server node, which is a variety of parameters that need to be processed in parallel during the algorithm process.

  • Both push and pull operations in Worker return an ID asynchronously, and then use ID to block wait, that is, synchronous operation.
  • Or an asynchronous call is passed in a Callback for subsequent operations.

4.2 define

The main variables of KVWorker are:

  • STD ::unordered_map

    > RECV_kvs: Pull result received: kv value;
    ,>
  • STD ::unordered_map

    callbacks: STD ::unordered_map
    ,>
    ,>
  • Slicer slicer_ : the default slice function variable that slices KVPairs by Range for each server when the Send function is called;

The main functions are:

  • ZPush Zero copy push function
  • ZPull zero copy pull function
  • AddPullCB key Recombination function
  • Process Message processing function
  • DefaultSlicer Specifies the default slice handler
  • Set_slicer: sets the slicer_ member, which slices KVPairs by Range for each server when the Send function is called;
/** * \brief A worker node that can \ref Push (\ref Pull) key-value pairs to (from) server * nodes * * \tparam Val the type of value, which should be primitive types such as * int32_t and float */ template<typename Val> class KVWorker : public SimpleApp { public: /** avoid too many this-> */ using SimpleApp::obj_; // Customer object /** * \brief callback function for \ref Push and \ref Pull ** It is called by the data receiving thread of this instance when the push or * pull is actually finished. Namely the kv pairs have already written into * servers' data structure or the kv pairs have already pulled back. */ using Callback = std::function<void()>; /** * \brief constructor * * \param app_id the app id, should match with \ref KVServer's id * \param customer_id the customer id which is unique locally */ explicit KVWorker(int app_id, int customer_id) : SimpleApp() { using namespace std::placeholders; slicer_ = std::bind(&KVWorker<Val>::DefaultSlicer, this, _1, _2, _3); obj_ = new Customer(app_id, customer_id, std::bind(&KVWorker<Val>::Process, this, _1)); } /** \brief deconstructor */ virtual ~KVWorker() { delete obj_; obj_ = nullptr; } using SlicedKVs = std::vector<std::pair<bool, KVPairs<Val>>>; /** * \brief a slicer partitions a key-value list according to the key ranges * \param send the kv list for partitioning  * \param ranges the key ranges, ranges[i] is the key range of server i * \param sliced the sliced lists. slices[i] should only contains keys in * ranges[i] and the according values */ using Slicer = std::function<void( const KVPairs<Val>& send, const std::vector<Range>& ranges, SlicedKVs* sliced)>; /** * \brief set a user-defined slicer */ void set_slicer(const Slicer& slicer) { CHECK(slicer); slicer_ = slicer; } private: /** * \brief add a callback for a request. threadsafe. * @param cb callback * @param timestamp the timestamp of the request */ void AddCallback(int timestamp, const Callback& cb) { if (! cb) return; std::lock_guard<std::mutex> lk(mu_); callbacks_[timestamp] = cb; } /** \brief data buffer for received kvs for each timestamp */ std::unordered_map<int, std::vector<KVPairs<Val>>> recv_kvs_; /** \brief callbacks for each timestamp */ STD ::unordered_map<int, Callback> callbacks_; /** \brief lock */ STD ::mutex mu_; /** \brief kv list slicer */ Slicer slicer_; // The default slice function variable};Copy the code

4.3 Functions

This will Push & ZPush

Because Push calls ZPush, we’ll put it together.

The main methods of Push are:

  • Send data (KV list) to the corresponding server node;

  • The KV list is partitioned according to the Key range maintained by each server;

  • Push is an asynchronous direct return. If you want to know what the return result is, you can:

    • Use Wait to Wait, that is, tracker_ is used to record the sent request quantity and the corresponding response request quantity. When the sent amount equals to the received amount, it means that each request has been sent successfully, so as to achieve the purpose of synchronization.
    • Use a callback so that you can callback to.

ZPush method is:

  • Use the NewRequest method of obj_ (type Customer) to record the number of requests sent and the corresponding response requests, and return a timestamp;
  • Set callback to timestamp;
  • Construct the KVPair object with the arguments passed in, and call Send to Send the object;
  int Push(const std::vector<Key>& keys,
           const std::vector<Val>& vals,
           const std::vector<int>& lens = {},
           int cmd = 0,
           const Callback& cb = nullptr,
           int priority = 0) {
    return ZPush(
        SArray<Key>(keys), SArray<Val>(vals), SArray<int>(lens), cmd, cb,
        priority);
  }
  
  int ZPush(const SArray<Key>& keys,
            const SArray<Val>& vals,
            const SArray<int>& lens = {},
            int cmd = 0,
            const Callback& cb = nullptr,
            int priority = 0) {
    int ts = obj_->NewRequest(kServerGroup);
    AddCallback(ts, cb);
    KVPairs<Val> kvs;
    kvs.keys = keys;
    kvs.vals = vals;
    kvs.lens = lens;
    kvs.priority = priority;
    Send(ts, true, false, cmd, kvs);
    return ts;
  }  
​
Copy the code

Refer to its comment for how to call it:

* Sample usage: The following codes push two KV pairs' {1, (1.1, 1.2)} 'and' {3, * (3.1,3.2)} 'to server nodes, where the value is a length-2 float vector * \code * KVWorker<float> w; * std::vector<Key> keys = {1, 3}; * STD: : vector < float > vals = {1.1, 1.2, 3.1, 3.2}; * w.Push(keys, vals); * \endcodeCopy the code

4.3.2 Pull

The pull method has roughly the same logic as push:

  • Bind a callback function to copy the data and get a timestamp.
  • Pull val_vector from Server based on key_vector,
  • Finally return timestamp,
  • This function does not block and can be used to Wait worker.Wait(timestamp);
  int Pull(const std::vector<Key>& keys,
           std::vector<Val>* vals,
           std::vector<int>* lens = nullptr,
           int cmd = 0,
           const Callback& cb = nullptr,
           int priority = 0) {
    SArray<Key> skeys(keys);
    int ts = AddPullCB(skeys, vals, lens, cmd, cb);
    KVPairs<Val> kvs;
    kvs.keys = skeys;
    kvs.priority = priority;
    Send(ts, false, true, cmd, kvs);
    return ts;
  }
​
Copy the code

4.3.3 ZPull

The logic is the same as Pull, except that the copying to the system is omitted. Therefore, you need to ensure that the caller does not change the key_vector until the ZPull is complete;

  int ZPull(const SArray<Key>& keys,
            SArray<Val>* vals,
            SArray<int>* lens = nullptr,
            int cmd = 0,
            const Callback& cb = nullptr,
            int priority = 0) {
    int ts = AddPullCB(keys, vals, lens, cmd, cb);
    KVPairs<Val> kvs;
    kvs.keys = keys;
    kvs.priority = priority;
    Send(ts, false, true, cmd, kvs);
    return ts;
  }
​
Copy the code

4.3.4 the Send

Push() and Pull() both finally call Send(), which shred KVPairs because each Server keeps only a few parameters, so the shred KVPairs are sent to different servers.

In the case of bounce, the callback is called directly.

Otherwise the traversal is sent.

template <typename Val> void KVWorker<Val>::Send(int timestamp, bool push, bool pull, int cmd, const KVPairs<Val>& kvs) { // slice the message SlicedKVs sliced; slicer_(kvs, Postoffice::Get()->GetServerKeyRanges(), &sliced); // need to add response first, since it will not always trigger the callback int skipped = 0; for (size_t i = 0; i < sliced.size(); ++i) { if (! sliced[i].first) ++skipped; } obj_->AddResponse(timestamp, skipped); if ((size_t)skipped == sliced.size()) { RunCallback(timestamp); } for (size_t i = 0; i < sliced.size(); ++i) { const auto& s = sliced[i]; if (! s.first) continue; Message msg; msg.meta.app_id = obj_->app_id(); msg.meta.customer_id = obj_->customer_id(); msg.meta.request = true; msg.meta.push = push; msg.meta.pull = pull; msg.meta.head = cmd; msg.meta.timestamp = timestamp; msg.meta.recver = Postoffice::Get()->ServerRankToID(i); msg.meta.priority = kvs.priority; const auto& kvs = s.second; if (kvs.keys.size()) { msg.AddData(kvs.keys); msg.AddData(kvs.vals); if (kvs.lens.size()) { msg.AddData(kvs.lens); } } Postoffice::Get()->van()->Send(msg); }}Copy the code

4.3.5 DefaultSlicer

The shard function can be overridden by the user. The default is DefaultSlicer. Each SlicedKVPairs is wrapped as a Message object and then sent using van::send().

The data to be sent is sharded according to STD ::vector& Ranges information. At present the use of the default Postoffice: : GetServerKeyRanges to divide the range of fragmentation.

template <typename Val> void KVWorker<Val>::DefaultSlicer( const KVPairs<Val>& send, const std::vector<Range>& ranges, typename KVWorker<Val>::SlicedKVs* sliced) { sliced->resize(ranges.size()); // find the positions in msg.key size_t n = ranges.size(); std::vector<size_t> pos(n+1); const Key* begin = send.keys.begin(); const Key* end = send.keys.end(); for (size_t i = 0; i < n; ++i) { if (i == 0) { pos[0] = std::lower_bound(begin, end, ranges[0].begin()) - begin; begin += pos[0]; } else { CHECK_EQ(ranges[i-1].end(), ranges[i].begin()); } size_t len = std::lower_bound(begin, end, ranges[i].end()) - begin; begin += len; pos[i+1] = pos[i] + len; // don't send it to servers for empty kv sliced->at(i).first = (len ! = 0); } CHECK_EQ(pos[n], send.keys.size()); if (send.keys.empty()) return; // the length of value size_t k = 0, val_begin = 0, val_end = 0; if (send.lens.empty()) { k = send.vals.size() / send.keys.size(); CHECK_EQ(k * send.keys.size(), send.vals.size()); } else { CHECK_EQ(send.keys.size(), send.lens.size()); } // slice for (size_t i = 0; i < n; ++i) { if (pos[i+1] == pos[i]) { sliced->at(i).first = false; continue; } sliced->at(i).first = true; auto& kv = sliced->at(i).second; kv.keys = send.keys.segment(pos[i], pos[i+1]); if (send.lens.size()) { kv.lens = send.lens.segment(pos[i], pos[i+1]); for (int l : kv.lens) val_end += l; kv.vals = send.vals.segment(val_begin, val_end); val_begin = val_end; } else { kv.vals = send.vals.segment(pos[i]*k, pos[i+1]*k); }}}Copy the code

4.3.6 PushPull & ZPushPull

Push and pull together.

  int PushPull(const std::vector<Key>& keys,
               const std::vector<Val>& vals,
               std::vector<Val>* outs,
               std::vector<int>* lens = nullptr,
               int cmd = 0,
               const Callback& cb = nullptr,
               int priority = 0) {
    CHECK_NOTNULL(outs);
    if (outs->empty())
      outs->resize(vals.size());
    else
      CHECK_EQ(vals.size(), outs->size());
​
    SArray<Key> skeys(keys);
    SArray<Val> svals(vals);
    auto souts = new SArray<Val>(outs->data(), outs->size());
    SArray<int>* slens = lens ?
        new SArray<int>(lens->data(), lens->size()) : nullptr;
    int ts = ZPushPull(skeys, svals, souts, slens, cmd,
        [this, cb, souts, slens]() {
          delete souts;
          delete slens;
          if (cb) cb();
        }, priority);
    return ts;
  }
​
  int ZPushPull(const SArray<Key>& keys,
                const SArray<Val>& vals,
                SArray<Val>* outs,
                SArray<int>* lens = nullptr,
                int cmd = 0,
                const Callback& cb = nullptr,
                int priority = 0) {
    int ts = AddPullCB(keys, outs, lens, cmd, cb);
    KVPairs<Val> kvs;
    kvs.keys = keys;
    kvs.vals = vals;
    kvs.priority = priority;
    if (lens)
      kvs.lens = *lens;
    Send(ts, true, true, cmd, kvs);
    re
​
Copy the code

4.3.7 Callback related

We’ve already mentioned some callback Settings, so let’s see how to use them.

4.3.7.1 set

As you can see, a callback function is set for each timestamp, which forms a list of callback functions.

After each request is sent, the callback function is registered into this list.

using Callback = std::function<void()>; /** \brief callbacks for each timestamp */ std::unordered_map<int, Callback> callbacks_; Void AddCallback(int timestamp, const Callback& cb) {if (! cb) return; std::lock_guard<std::mutex> lk(mu_); callbacks_[timestamp] = cb; // Add a callback function}Copy the code
4.3.7.2 AddPullCB

This is the callback function that gets the response after the pull and is used to copy the returned data.

However, what if multiple servers should have returns? Either push or pull, values pulled from each server will be inserted into local vals only after all responses have been received.

template <typename Val> template <typename C, typename D> int KVWorker<Val>::AddPullCB( const SArray<Key>& keys, C* vals, D* lens, int cmd, const Callback& cb) { int ts = obj_->NewRequest(kServerGroup); AddCallback(ts, [this, ts, keys, vals, lens, cb]() mutable { mu_.lock(); auto& kvs = recv_kvs_[ts]; mu_.unlock(); // do check size_t total_key = 0, total_val = 0; For (const auto& s: KVS) {Range Range = FindRange(keys, s.keys.front(), s.keys.back()+1); CHECK_EQ(range.size(), s.keys.size()) << "unmatched keys size from one server"; if (lens) CHECK_EQ(s.lens.size(), s.keys.size()); total_key += s.keys.size(); total_val += s.vals.size(); } CHECK_EQ(total_key, keys.size()) << "lost some servers?" ; // fill vals and lens std::sort(kvs.begin(), kvs.end(), []( const KVPairs<Val>& a, const KVPairs<Val>& b) { return a.keys.front() < b.keys.front(); }); CHECK_NOTNULL(vals); if (vals->empty()) { vals->resize(total_val); } else { CHECK_EQ(vals->size(), total_val); } Val* p_vals = vals->data(); int *p_lens = nullptr; if (lens) { if (lens->empty()) { lens->resize(keys.size()); } else { CHECK_EQ(lens->size(), keys.size()); } p_lens = lens->data(); } for (const auto& s: KVS) {// copy the returned data memcpy(p_vals, s.vals.data(), s.vals.size() * sizeof(Val)); p_vals += s.vals.size(); if (p_lens) { memcpy(p_lens, s.lens.data(), s.lens.size() * sizeof(int)); p_lens += s.lens.size(); } } mu_.lock(); recv_kvs_.erase(ts); mu_.unlock(); if (cb) cb(); }); return ts; }Copy the code
4.3.7.3 run

Find the callback function based on the timestamp, run it, and delete it.

When it’s called, it’s called in Process, which we’ll get to in a moment.

template <typename Val> void KVWorker<Val>::RunCallback(int timestamp) { mu_.lock(); auto it = callbacks_.find(timestamp); if (it ! = callbacks_.end()) { mu_.unlock(); CHECK(it->second); it->second(); mu_.lock(); callbacks_.erase(it); } mu_.unlock(); }Copy the code

4.3.8 Process

Recv_kvs_ [ts].push_back(KVS); recv_kvs_[ts].push_back(KVS);

Either push or pull, values pulled from each server will be inserted into local vals only after all responses have been received.

template <typename Val> void KVWorker<Val>::Process(const Message& msg) { if (msg.meta.simple_app) { SimpleApp::Process(msg); return; } // store the data for pulling int ts = msg.meta.timestamp; if (msg.meta.pull) { CHECK_GE(msg.data.size(), (size_t)2); KVPairs<Val> kvs; kvs.keys = msg.data[0]; kvs.vals = msg.data[1]; if (msg.data.size() > (size_t)2) { kvs.lens = msg.data[2]; } mu_.lock(); recv_kvs_[ts].push_back(kvs); mu_.unlock(); } // Finished, run callbacks, If (obj_->NumResponse(ts) == Postoffice::Get()->num_servers() -1) {RunCallback(ts); // This is where RunCallback is called. }}Copy the code

0 x05 summary

Finally, let’s wrap up with a messaging process and see how the parts are used there. The overall flow chart is as follows:

  1. Nuggets have problems parsing ordered lists, no time to adjust, temporarily add a 0 sequence

  2. The worker node wants to Send a message, so the Send method is called.

  3. The Send method calls the Customer’s NewRequest to create a NewRequest.

  4. Postoffice::start() initializes the node information and calls Van::start().

  5. The Send method calls the Van’s Send method to interact with the network.

  6. After network delivery, the process goes to the Server. For the Server, this is a Request, calling Van Receiving. Van::Receiving() After Receiving the message, perform different actions according to different commands. For data messages, ProcessDataMsg is called if further processing is required.

  7. Proceed to the Van’s ProcessDataMsg, and then call GetCustomer.

  8. GetCustomer calls Postoffice and handles customers_ accordingly.

  9. Customer uses Accept to process the message.

  10. The Customer::Accept() function adds the message to a queue recv_queue_.

  11. The Customer object itself will also start an Receiving thread recv_thread_, using Customer::Receiving() :

    1. Constantly fromrecv_queue_Queue fetch message.
    2. If (! Recv.meta. Request), which means response, thentracker_[req.timestamp].second++
    3. User defined by the call registrationrecv_handle_Function to process the message.
  12. Van::Receiving() calls the registered user-defined recv_handle_ function to process the messages.

  13. For a Server, its registered recv_handle_ is the KVServer::Process() function.

  14. The Process function calls request_handle_ to continue processing and generates a Response, which is returned to the Worker.

  15. The Response is passed to the Worker through the network.

  16. The run goes back to the Worker, to the Worker’s Van. For the worker, this is a Request calling Van Receiving. (The following sequence of operations is similar to Server)

  17. Van::Receiving() After Receiving the message, perform different actions according to different commands. For data messages, ProcessDataMsg is called if further processing is required.

  18. Customer uses Accept to process the message.

  19. The Customer::Accept() function adds the message to a queue recv_queue_.

  20. There is a decoupling, which is handled by a new thread recv_thread_.

  21. The Customer object itself has started a new thread recv_thread_, using Customer::Receiving().

  22. For workers, the recv_handle_ registered is the KVWorker::Process() function.

  23. The KVWorker::Process() function is called to Process the Response message Response.

+---------------------+ +------------------------+ Worker + Server +--------------------------+ | KVWorker | 1 | Van | 3  | | Van | | Send +--------+---------------> send +-----------------+-----> Request +-----------> Receiving | | | | | | | + | | | | | Receiving <---------+ | 4 | | | +---------------------------+ | | | | + | | | | | | | Postoffice | | Process | | | | 16 | | | | | 5 | | | | ^ | | | v | | 15 | | v | GetCustomer | | | | | | | ProcessDataMsg | | | | ProcessDataMsg <------------------> unordered_map customers_| | | | | | + | | | | + | 6 | | | | | | | | | | | | | | +---------------------------+ +---------------------+ | +------------------------+ | | +--------------------------+ | | 17 | 2 | | | | | | | | | 7 | | | | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | +------------------------------------+ | | Customer | | | | | | Customer | | | | | v | | | | | | | | v | | | | v | | | NewRequest Accept | | | | Accept | | | + | | | | + | | | | 18 | | | | | | | | | | | | | | 8 | | | v | | | | v | | | revc_queue_ | | | | recv_queue_ | | | + | | | | + | 22 | | | 19 | | | | | 9 | | | | | | | | | | | | 20 v | | | | 10 v | | | recv_thread_ +-------> Receving | | | | recv_thread_ +---> Receiving | | | | | | | | + | | | | 21 | | | | | 11 | | |  | | | | | | | +------------------+ | | v | | | | v | |KVServer | +---------------------------+ recv_handle | | | | recv_handle_+------------------> | | | | | | | | 12 | Process | +---------------------------------------+ | | +------------------------------------+ | + | | | +------------------+ | | | | | | 13 | | v | | +-----------+-------+ | |  | request_handle_ | | | 14 | | +<-----------+ Response <----------------------------------------------------+ Response | | | | | +-------------------+ +Copy the code

Mobile phones are as follows:

So far, the introduction of PS-Lite is finished, the following beginning, the introduction of douban parameter server Paracel, please look forward to.

0xEE Personal information

Thoughts on life and technology

Wechat public account: Rosie’s Thinking

If you want to get updates on your own articles, or check out your own technical recommendations, please pay attention.

0 XFF reference

The most comprehensive PS-Lite understanding ever

Implementing machine Learning Parameter Server Framework from Zero (part 2)