0 x00 the

In this article, we introduce HugeCTR, an industry-oriented recommendation system training framework optimized for large-scale CTR models with model parallel embedding and data-parallel dense networks.

This article to GitHub source document github.com/NVIDIA-Merl… Based on the translation, and combined with the source code analysis. Thanks for using HugeCTR source code to read this masterpiece.

For better illustration, only the member variables are kept in the following class definition, and the member functions are not given until the analysis.

Other codes in this series are:

NVIDIA HugeCTR, GPU version parameter server –(1)

0x01 General Process

1.1 an overview of the

The process of HugeCTR training can be seen as data parallelism + model parallelism.

  • Data parallelism: Each GPU card can read different data at the same time for training.
  • Model parallelism is: Sparse parameters can be distributed to different Gpus on different nodes, and each GPU is assigned some Sparse parameters.

The training process is as follows:

  • First, a three-level pipeline is constructed and the model network is initialized. Initialize parameters and optimizer state.

  • The Reader loads a batch of data from the dataset into Host memory.

  • Start parsing data to get SPARSE parameters, dense parameters, label and so on.

  • The embedding layer propagates forward, that is, it reads the embedding from the parameter server for processing.

  • Forward propagation and backward propagation are carried out for the network layer, which are specifically distinguished as multi-card, single-card, multi-machine, single-machine, etc.

  • The embedding layer works in reverse.

  • Gradient of exchange of dense parameters between multiple CAL.

  • Embedded layer updated sparse parameter. The parameter gradient obtained by reverse calculation is pushed to the parameter server, and the parameter server updates parameters according to the gradient.

1.2 How to Call

As we can see from one example, the overall logic is similar to that of a single machine, that is, parsing configuration, using session to read data, training, etc., where vvGPU is a device map.

# train.py
import sys
import hugectr
from mpi4py import MPI

def train(json_config_file) :
  solver_config = hugectr.solver_parser_helper(batchsize = 16384,
                                               batchsize_eval = 16384,
                                               vvgpu = [[0.1.2.3.4.5.6.7]],
                                               repeat_dataset = True)
  sess = hugectr.Session(solver_config, json_config_file)
  sess.start_data_reading()
  for i in range(10000):
    sess.train()
    if (i % 100= =0):
      loss = sess.get_current_loss()

if __name__ == "__main__":
  json_config_file = sys.argv[1]
  train(json_config_file)
Copy the code

0x02 Session

Now that we know that sessions are the core, let’s use sessions to see how to build HugeCTR.

2.1 the Session definition

Let’s first look at the definition of Session. Only its member variables are kept.

  • Networks_ : Indicates model network information.
  • Embeddings_ : model embedding layer information.
  • ExchangeWgrad: Class for exchanging gradients.
  • Evaluate_data_reader_ : Reads evalution.
  • Train_data_reader_ : Reads training data to the embedding layer.
  • Resource_manager_ : GPU resources, such as handle and Stream.
class Session {
 public:
  Session(const SolverParser& solver_config, const std::string& config_file);
  Session(const Session&) = delete;
  Session& operator= (const Session&) = delete;

 private:
  std::vector<std::shared_ptr<Network>> networks_;      /**< networks (dense) used in training. */
  std::vector<std::shared_ptr<IEmbedding>> embeddings_; /**< embedding */
  std::shared_ptr<IDataReader> init_data_reader_;
  std::shared_ptr<IDataReader>
      train_data_reader_; /**< data reader to reading data from data set to embedding. */
  std::shared_ptr<IDataReader> evaluate_data_reader_; /**< data reader for evaluation. */
  std::shared_ptr<ResourceManager>
      resource_manager_; /**< GPU resources include handles and streams etc.*/
  std::shared_ptr<Parser> parser_;
  std::shared_ptr<ExchangeWgrad> exchange_wgrad_;

  metrics::Metrics metrics_;
  SolverParser solver_config_;

  struct HolisticCudaGraph {
    std::vector<bool> initialized;
    std::vector<cudaGraphExec_t> instance;
    std::vector<cudaEvent_t> fork_event;
  } train_graph_;

  // TODO: these two variables for export_predictions.
  // There may be a better place for them.
  bool use_mixed_precision_;
  size_t batchsize_eval_;
};
Copy the code

2.2 Constructors

Constructors can be roughly divided into the following steps:

  • Create the pipeline using create_pipeline.
  • Initialize the model network.
  • Initialize parameters and optimizer state.
Session::Session(const SolverParser& solver_config, const std::string& config_file)
    : resource_manager_(ResourceManagerExt::create(solver_config.vvgpu, solver_config.seed,
                                                   solver_config.device_layout)),
      solver_config_(solver_config) {
        
  // Check the device
  for (auto dev : resource_manager_->get_local_gpu_device_id_list()) {
    if (solver_config.use_mixed_precision) {
      check_device(dev, 7.0);  // to support mixed precision training earliest supported device is CC=70
    } else {
      check_device(dev, 6.0);  // earliest supported device is CC=60}}Parser is generated to parse the configuration
  parser_.reset(new Parser(config_file, solver_config.batchsize, solver_config.batchsize_eval,
                           solver_config.num_epochs < 1, solver_config.i64_input_key,
                           solver_config.use_mixed_precision, solver_config.enable_tf32_compute,
                           solver_config.scaler, solver_config.use_algorithm_search,
                           solver_config.use_cuda_graph));

  // Build assembly line
  parser_->create_pipeline(init_data_reader_, train_data_reader_, evaluate_data_reader_,
                           embeddings_, networks_, resource_manager_, exchange_wgrad_);

#ifndef DATA_READING_TEST
#pragma omp parallel num_threads(networks_.size())
  {
    // Multi-thread parallel initialization model
    size_t id = omp_get_thread_num(a); networks_[id]->initialize(a);if (solver_config.use_algorithm_search) {
      networks_[id]->search_algorithm(a); }CK_CUDA_THROW_(cudaStreamSynchronize(resource_manager_->get_local_gpu(id)->get_stream()));
  }
#endif

  // Load the parameters required by the dense feature
  init_or_load_params_for_dense_(solver_config.model_file);
  // Load parameters required by the sparse feature
  init_or_load_params_for_sparse_(solver_config.embedding_files);

  // Load information
  load_opt_states_for_sparse_(solver_config.sparse_opt_states_files);
  load_opt_states_for_dense_(solver_config.dense_opt_states_file);

  int num_total_gpus = resource_manager_->get_global_gpu_count(a);for (const auto& metric : solver_config.metrics_spec) {
    metrics_.emplace_back(
        std::move(metrics::Metric::Create(metric.first, solver_config.use_mixed_precision,
                                          solver_config.batchsize_eval / num_total_gpus,
                                          solver_config.max_eval_batches, resource_manager_)));
  }

  if (solver_config_.use_holistic_cuda_graph) {
    train_graph_.initialized.resize(networks_.size(), false);
    train_graph_.instance.resize(networks_.size());
    for (size_t i = 0; i < resource_manager_->get_local_gpu_count(a); i++) {auto& gpu_resource = resource_manager_->get_local_gpu(i);
      CudaCPUDeviceContext context(gpu_resource->get_device_id());
      cudaEvent_t event;
      CK_CUDA_THROW_(cudaEventCreateWithFlags(&event, cudaEventDisableTiming));
      train_graph_.fork_event.push_back(event); }}if (embeddings_.size() = =1) {
    auto lr_scheds = embeddings_[0] - >get_learning_rate_schedulers(a);for (size_t i = 0; i < lr_scheds.size(a); i++) { networks_[i]->set_learning_rate_scheduler(lr_scheds[i]); }}}Copy the code

There are a few related classes to note here.

2.2.1 the ResourceManager

Let’s start with ResourceManager.

2.2.1.1 interface

The first two interfaces are ResourceManagerBase. ResourceManager extends the top-level interface.

/** * @brief Top-level ResourceManager interface * * The top level resource manager interface shared by various components */
class ResourceManagerBase {
 public:
  virtual void set_local_gpu(std::shared_ptr<GPUResource> gpu_resource, size_t local_gpu_id) = 0;
  virtual const std::shared_ptr<GPUResource>& get_local_gpu(size_t local_gpu_id) const = 0;
  virtual size_t get_local_gpu_count(a) const = 0;
  virtual size_t get_global_gpu_count(a) const = 0;
};

/** * @brief Second-level ResourceManager interface * * The second level resource manager interface shared by training and inference */
class ResourceManager : public ResourceManagerBase {
   // The function definition is omitted
}
Copy the code
2.2.1.2 Core

Then there is the core implementation: ResourceManagerCore, where various resources are logged.

/** * @brief GPU resources manager which holds the minimal, essential set of resources * * A core GPU Resource manager */
class ResourceManagerCore : public ResourceManager {
 private:
  int num_process_;
  int process_id_;
  DeviceMap device_map_;
  std::shared_ptr<CPUResource> cpu_resource_;
  std::vector<std::shared_ptr<GPUResource>> gpu_resources_; /**< GPU resource vector */
  std::vector<std::vector<bool>> p2p_matrix_;

  std::vector<std::shared_ptr<rmm::mr::device_memory_resource>> base_cuda_mr_;
  std::vector<std::shared_ptr<rmm::mr::device_memory_resource>> memory_resource_;
}
Copy the code
2.2.1.3 expand

ResourceManagerExt is reencapsulated on the basis of ResourceManagerCore, whose core is Core_, which is a ResourceManagerCore type. Let’s use ResourceManagerExt for analysis.

/** * @brief GPU resources manager which holds all the resources required by training * * An extended GPU Resource manager */
class ResourceManagerExt : public ResourceManager {
  std::shared_ptr<ResourceManager> core_;

#ifdef ENABLE_MPI
  std::unique_ptr<IbComm> ib_comm_ = NULL;
#endif
  std::shared_ptr<AllReduceInPlaceComm> ar_comm_ = NULL;
};
Copy the code

The code is created as follows, and you can see that it uses MPI to do some communication configuration:

std::shared_ptr<ResourceManager> ResourceManagerExt::create(
    const std::vector<std::vector<int>>& visible_devices, unsigned long long seed,
    DeviceMap::Layout layout) {
  
  int size = 1, rank = 0;

#ifdef ENABLE_MPI
  HCTR_MPI_THROW(MPI_Comm_size(MPI_COMM_WORLD, &size));
  HCTR_MPI_THROW(MPI_Comm_rank(MPI_COMM_WORLD, &rank));
#endif

  DeviceMap device_map(visible_devices, rank, layout);

  std::random_device rd;
  if (seed == 0) {
    seed = rd(a); }#ifdef ENABLE_MPI
  HCTR_MPI_THROW(MPI_Bcast(&seed, 1, MPI_UNSIGNED_LONG_LONG, 0, MPI_COMM_WORLD));
#endif

  std::shared_ptr<ResourceManager> core(
      new ResourceManagerCore(size, rank, std::move(device_map), seed));

  return std::shared_ptr<ResourceManager>(new ResourceManagerExt(core));
}

ResourceManagerExt::ResourceManagerExt(std::shared_ptr<ResourceManager> core) : core_(core) {
#ifdef ENABLE_MPI
  int num_process = get_num_process(a);if (num_process > 1) {
    int process_id = get_process_id(a); ib_comm_ = std::make_unique<IbComm>(); ib_comm_->init(num_process, get_local_gpu_count(), process_id, get_local_gpu_device_id_list());
  }
#endif
}

void ResourceManagerExt::set_ar_comm(AllReduceAlgo algo, bool use_mixed_precision) {
  int num_process = get_num_process(a);#ifdef ENABLE_MPI
  ar_comm_ = AllReduceInPlaceComm::create(num_process, algo, use_mixed_precision, get_local_gpus(),
                                          ib_comm_.get());
#else
  ar_comm_ = AllReduceInPlaceComm::create(num_process, algo, use_mixed_precision, get_local_gpus());
#endif
}
Copy the code

The

core_ is used to configure specific resources.

// from ResourceManagerBase
void set_local_gpu(std::shared_ptr<GPUResource> gpu_resource, size_t local_gpu_id) override {
  core_->set_local_gpu(gpu_resource, local_gpu_id);
}
const std::shared_ptr<GPUResource>& get_local_gpu(size_t local_gpu_id) const override {
  return core_->get_local_gpu(local_gpu_id);
}
size_t get_local_gpu_count(a) const override { return core_->get_local_gpu_count(a); }size_t get_global_gpu_count(a) const override { return core_->get_global_gpu_count(a); }// from ResourceManager
int get_num_process(a) const override { return core_->get_num_process(a); }int get_process_id(a) const override { return core_->get_process_id(a); }Copy the code

0x03 Parser

Parser was mentioned earlier, so let’s look at it. Parser parses configuration files and builds pipelines. Similar supporting files are SolverParser, Solver, InferenceParser, etc. It can be said that Parser is the key to automated operation and the soul of the supporting system.

3.1 define

/** * @brief The parser of configure file (in json format). * * The builder of each layer / optimizer in HugeCTR. * Please see User Guide to learn how to write a configure file. * @verbatim * Some Restrictions: * 1. Embedding should be the first element of layers. * 2. layers should be listed from bottom to top. * @endverbatim */
class Parser {
 private:
  nlohmann::json config_;  /**< configure file. */
  size_t batch_size_;      /**< batch size. */
  size_t batch_size_eval_; /**< batch size. */
  const bool repeat_dataset_;
  const bool i64_input_key_{false};
  const bool use_mixed_precision_{false};
  const bool enable_tf32_compute_{false};

  const float scaler_{1.f};
  const bool use_algorithm_search_;
  const bool use_cuda_graph_;
  bool grouped_all_reduce_ = false;
}
Copy the code

All of our subsequent analyses call Parser or its related classes.

3.2 How to Organize a Network

Let’s first look at the configuration file to see how it organizes a model network, using test/scripts/deepfm_8gpu.json as an example.

Here’s what the JSON field does:

  • bottom_names: Input tensor name for this layer.
  • top_names: The output tensor name of this layer.

So, the model is organized from the bottom up by bottom and top.

3.2.1 input

The input layers are as follows: Dense is the input to the slice layer, Sparse is the input to Sparse_embedding1, which contains 26 slots.

{
  "name": "data"."type": "Data"."source": "./file_list.txt"."eval_source": "./file_list_test.txt"."check": "Sum"."label": {
    "top": "label"."label_dim": 1
  },
  "dense": {
    "top": "dense"."dense_dim": 13
  },
  "sparse": [{"top": "data1"."slot_num": 26."is_fixed_length": false."nnz_per_slot": 2}},Copy the code

The model diagram is as follows:

3.2.2 embedded layer

Let’s look at the definition:

  • Embedding_vec_size is a vector dimension.

  • Combiner: how to do pooling after obtaining vectors, sum or AVG

  • Workspace_size_per_gpu_in_mb: memory size on each GPU.

{
  "name": "sparse_embedding1"."type": "DistributedSlotSparseEmbeddingHash"."bottom": "data1"."top": "sparse_embedding1"."sparse_embedding_hparam": {
    "embedding_vec_size": 11."combiner": "sum"."workspace_size_per_gpu_in_mb": 10}},Copy the code

The model is as follows:

3.2.3 other layer

We’re going to include the other layers here, the one above the current input data and embedding layer, and we’re going to omit a lot of layers, just to give you a sense of the logic.

3.2.3.1 Reshape layer

The 0 0 layer converts a 3D input into a 2D shape. This layer is the consumer of the embedded layer.

{
  "name": "reshape1"."type": "Reshape"."bottom": "sparse_embedding1"."top": "reshape1"."leading_dim": 11
},
Copy the code
3.2.3.2 Slice layer

The Slice layer splits a bottom into tops.

{
  "name": "slice2"."type": "Slice"."bottom": "dense"."ranges": [[0.13
    ],
    [
      0.13]],"top": [
    "slice21"."slice22"]},Copy the code
3.2.3.3 Loss

So this is our final loss layer, and that’s where the label goes directly.

{
  "name": "loss"."type": "BinaryCrossEntropyLoss"."bottom": [
    "add"."label"]."top": "loss"
}
Copy the code
3.2.3.4 Schematic model diagram

The current logic is as follows, the model is organized from the bottom up, and we have omitted the rest:

3.3 all

Let’s simplify each layer, omit the internal tags, and sort out all the layers in the configuration file to see the overall architecture of a DeepFM in HugeCTR.

0x04 Establish pipeline

Let’s move on to how to build an assembly line. The Create_pipeline function is used to build the pipeline, which is transferred to the create_pipeline_internal method.

void Parser::create_pipeline(std::shared_ptr<IDataReader>& init_data_reader,
                             std::shared_ptr<IDataReader>& train_data_reader,
                             std::shared_ptr<IDataReader>& evaluate_data_reader,
                             std::vector<std::shared_ptr<IEmbedding>>& embeddings,
                             std::vector<std::shared_ptr<Network>>& networks,
                             const std::shared_ptr<ResourceManager>& resource_manager,
                             std::shared_ptr<ExchangeWgrad>& exchange_wgrad) {
  if (i64_input_key_) {
    create_pipeline_internal<long long>(init_data_reader, train_data_reader, evaluate_data_reader,
                                        embeddings, networks, resource_manager, exchange_wgrad);
  } else {
    create_pipeline_internal<unsigned int>(init_data_reader, train_data_reader, evaluate_data_reader, embeddings, networks, resource_manager, exchange_wgrad); }}Copy the code

4.3.1 create_pipeline_internal

Create_pipeline_internal consists of four steps:

  • Create_allreduce_comm: set up the allReduce communication mechanism.
  • Create a Data Reader.
  • Establish the correlation mechanism of embedding layer.
  • Establish a network related mechanism and build a copy of network in each GPU card.
  • Assign the gradient interchange class.
template <typename TypeKey>
void Parser::create_pipeline_internal(std::shared_ptr<IDataReader>& init_data_reader,
                                      std::shared_ptr<IDataReader>& train_data_reader,
                                      std::shared_ptr<IDataReader>& evaluate_data_reader,
                                      std::vector<std::shared_ptr<IEmbedding>>& embeddings,
                                      std::vector<std::shared_ptr<Network>>& networks,
                                      const std::shared_ptr<ResourceManager>& resource_manager,
                                      std::shared_ptr<ExchangeWgrad>& exchange_wgrad) {
  try {
    // Establish allReduce communication
    create_allreduce_comm(resource_manager, exchange_wgrad);

    std::map<std::string, SparseInput<TypeKey>> sparse_input_map;
    std::vector<TensorEntry> train_tensor_entries_list[resource_manager->get_local_gpu_count()];
    std::vector<TensorEntry> evaluate_tensor_entries_list[resource_manager->get_local_gpu_count()];
    {
      if(! networks.empty()) {
        CK_THROW_(Error_t::WrongInput, "vector network is not empty");
      }

      // Verify the network
      auto j_layers_array = get_json(config_, "layers");
      auto j_optimizer = get_json(config_, "optimizer");
      check_graph(tensor_active_, j_layers_array);

      // Create Data Reader
      // Create a Data Reader
      {
        // TODO: In using AsyncReader, if the overlap is disabled,
        // scheduling the data reader should be off.
        // THe scheduling needs to be generalized.
        auto j_solver = get_json(config_, "solver");
        auto enable_overlap = get_value_from_json_soft<bool>(j_solver, "enable_overlap".false);

        const nlohmann::json& j = j_layers_array[0];
        create_datareader<TypeKey>()(j, sparse_input_map, train_tensor_entries_list,
                                     evaluate_tensor_entries_list, init_data_reader,
                                     train_data_reader, evaluate_data_reader, batch_size_,
                                     batch_size_eval_, use_mixed_precision_, repeat_dataset_,
                                     enable_overlap, resource_manager);
      }  // Create Data Reader

      // Create Embedding
      {
        for (unsigned int i = 1; i < j_layers_array.size(a); i++) {// Each layer of the network configuration runs from bottom to top, so whenever a non-embedded layer is encountered, the layer behind it is not checked
          // if not embedding then break
          const nlohmann::json& j = j_layers_array[i];
          auto embedding_name = get_value_from_json<std::string>(j, "type");
          Embedding_t embedding_type;
          if (!find_item_in_map(embedding_type, embedding_name, EMBEDDING_TYPE_MAP)) {
            Layer_t layer_type;
            if (!find_item_in_map(layer_type, embedding_name, LAYER_TYPE_MAP) &&
                !find_item_in_map(layer_type, embedding_name, LAYER_TYPE_MAP_MP)) {
              CK_THROW_(Error_t::WrongInput, "No such layer: " + embedding_name);
            }
            break;
          }

          // Create an embedded layer
          if (use_mixed_precision_) {
            create_embedding<TypeKey, __half>()(
                sparse_input_map, train_tensor_entries_list, evaluate_tensor_entries_list,
                embeddings, embedding_type, config_, resource_manager, batch_size_,
                batch_size_eval_, exchange_wgrad, use_mixed_precision_, scaler_, j, use_cuda_graph_,
                grouped_all_reduce_);
          } else {
            create_embedding<TypeKey, float>()( sparse_input_map, train_tensor_entries_list, evaluate_tensor_entries_list, embeddings, embedding_type, config_, resource_manager, batch_size_, batch_size_eval_, exchange_wgrad, use_mixed_precision_, scaler_, j, use_cuda_graph_, grouped_all_reduce_); }}// for ()
      }    // Create Embedding

      // Create the network layer
      // create network
      int total_gpu_count = resource_manager->get_global_gpu_count(a);if (0! = batch_size_ % total_gpu_count) {CK_THROW_(Error_t::WrongInput, "Zero! = batch_size\%total_gpu_count");
      }
      
      // create network, build a copy of network on each GPU card
      for (size_t i = 0; i < resource_manager->get_local_gpu_count(a); i++) { networks.emplace_back(Network::create_network(
            j_layers_array, j_optimizer, train_tensor_entries_list[i],
            evaluate_tensor_entries_list[i], total_gpu_count, exchange_wgrad,
            resource_manager->get_local_cpu(), resource_manager->get_local_gpu(i),
            use_mixed_precision_, enable_tf32_compute_, scaler_, use_algorithm_search_,
            use_cuda_graph_, false, grouped_all_reduce_));
      }
    }
    exchange_wgrad->allocate(a);// Create the gradient swap class

  } catch (const std::runtime_error& rt_err) {
    std::cerr << rt_err.what() << std::endl;
    throw; }}Copy the code

4.3.2 create_allreduce_comm

Create_allreduce_comm is used to set up communication algorithms, such as AllReduceInPlaceComm and GroupedExchangeWgrad.

void Parser::create_allreduce_comm(const std::shared_ptr<ResourceManager>& resource_manager,
                                   std::shared_ptr<ExchangeWgrad>& exchange_wgrad) {
  auto ar_algo = AllReduceAlgo::NCCL;
  bool grouped_all_reduce = false;
  
  // Get the communication algorithm configuration
  if (has_key_(config_, "all_reduce")) {
    auto j_all_reduce = get_json(config_, "all_reduce");
    std::string ar_algo_name = "Oneshot";
    if (has_key_(j_all_reduce, "algo")) {
      ar_algo_name = get_value_from_json<std::string>(j_all_reduce, "algo");
    }
    if (has_key_(j_all_reduce, "grouped")) {
      grouped_all_reduce = get_value_from_json<bool>(j_all_reduce, "grouped");
    }
    if (!find_item_in_map(ar_algo, ar_algo_name, ALLREDUCE_ALGO_MAP)) {
      CK_THROW_(Error_t::WrongInput, "All reduce algo unknown: "+ ar_algo_name); }}// Set up the communication algorithm, such as AllReduceInPlaceComm
  resource_manager->set_ar_comm(ar_algo, use_mixed_precision_);

  / / build GroupedExchangeWgrad
  grouped_all_reduce_ = grouped_all_reduce;
  if (grouped_all_reduce_) {
    if (use_mixed_precision_) {
      exchange_wgrad = std::make_shared<GroupedExchangeWgrad<__half>>(resource_manager);
    } else {
      exchange_wgrad = std::make_shared<GroupedExchangeWgrad<float>>(resource_manager); }}else {
    if (use_mixed_precision_) {
      exchange_wgrad = std::make_shared<NetworkExchangeWgrad<__half>>(resource_manager);
    } else {
      exchange_wgrad = std::make_shared<NetworkExchangeWgrad<float>>(resource_manager); }}}Copy the code

The GroupedExchangeWgrad is used to exchange gradients.

template <typename TypeFP>
class GroupedExchangeWgrad : public ExchangeWgrad {
 public:
  const BuffPtrs<TypeFP>& get_network_wgrad_buffs(a) const { return network_wgrad_buffs_; }
  const BuffPtrs<TypeFP>& get_embed_wgrad_buffs(a) const { return embed_wgrad_buffs_; }
  void allocate(a) final;
  void update_embed_wgrad_size(size_t size) final;
  void allreduce(size_t device_id, cudaStream_t stream);
  GroupedExchangeWgrad(const std::shared_ptr<ResourceManager>& resource_manager);
  ~GroupedExchangeWgrad() = default;

 private:
  BuffPtrs<TypeFP> network_wgrad_buffs_;
  BuffPtrs<TypeFP> embed_wgrad_buffs_;
  std::vector<std::shared_ptr<GeneralBuffer2<CudaAllocator>>> bufs_;
  std::shared_ptr<ResourceManager> resource_manager_;

  AllReduceInPlaceComm::Handle ar_handle_;

  size_t network_wgrad_size_ = 0;
  size_t embed_wgrad_size_ = 0;
  size_t num_gpus_ = 0;
};
Copy the code

For example, exchange through AllReduce:

template <typename T>
void GroupedExchangeWgrad<T>::allreduce(size_t device_id, cudaStream_t stream) {
  auto ar_comm = resource_manager_->get_ar_comm(a); ar_comm->all_reduce(ar_handle_, stream, device_id);
}
Copy the code

4.3.3 create_datareader

The DataReader is the body of the pipeline, which actually contains the first two levels of the pipeline: the data reader worker and the data Collector.

4.3.3.1 What content to establish

Call create_datareader as follows,

create_datareader<TypeKey>()(j, sparse_input_map, train_tensor_entries_list,
                                 evaluate_tensor_entries_list, init_data_reader,
                                 train_data_reader, evaluate_data_reader, batch_size_,
                                 batch_size_eval_, use_mixed_precision_, repeat_dataset_,
                                 enable_overlap, resource_manager);
Copy the code

Recall that the following code calls create_datareader to create several readers.

parser_->create_pipeline(init_data_reader_, train_data_reader_, evaluate_data_reader_,
                         embeddings_, networks_, resource_manager_, exchange_wgrad_);
Copy the code

Session member variables, such as:

std::shared_ptr<IDataReader> init_data_reader_;
std::shared_ptr<IDataReader> train_data_reader_; /**< data reader to reading data from data set to embedding. */
std::shared_ptr<IDataReader> evaluate_data_reader_; /**< data reader for evaluation. */
Copy the code

For training, for evaluation.

4.3.3.2 establish reader

Because the code is too long, we only keep some of the key code. Evaluate_data_reader (evaluate_data_reader); evaluate_data_reader (evaluate_data_reader); Workgroups are then set up for them.

For Reader, HugeCTR provides three implementations:

  • Norm: Normal file reading.
  • Parquet: File in Parquet format.
  • Raw: The Raw dataset format differs from the Norm dataset format in that the training data occurs in a binary file.
template <typename TypeKey>
void create_datareader<TypeKey>::operator() (switch (format) {
      case DataReaderType_t::Norm: {
        bool start_right_now = repeat_dataset;
        train_data_reader->create_drwg_norm(source_data, check_type, start_right_now);
        evaluate_data_reader->create_drwg_norm(eval_source, check_type, start_right_now);
        break;
      }
      case DataReaderType_t::Raw: {
        const auto num_samples = get_value_from_json<long long>(j, "num_samples");
        const auto eval_num_samples = get_value_from_json<long long>(j, "eval_num_samples");
        std::vector<long long> slot_offset = f(a);bool float_label_dense = get_value_from_json_soft<bool>(j, "float_label_dense".false);
        train_data_reader->create_drwg_raw(source_data, num_samples, float_label_dense, true.false);
        evaluate_data_reader->create_drwg_raw(eval_source, eval_num_samples, float_label_dense,
                                              false.false);

        break;
      }
      case DataReaderType_t::Parquet: {
        // @Future: Should be slot_offset here and data_reader ctor should
        // be TypeKey not long long
        std::vector<long long> slot_offset = f(a); train_data_reader->create_drwg_parquet(source_data, slot_offset, true);
        evaluate_data_reader->create_drwg_parquet(eval_source, slot_offset, true);
        break; }}}Copy the code

Let’s take Norm as an example and first mention that he has a WorkerGroup built into him.

void create_drwg_norm(std::string file_name, Check_t check_type,
                      bool start_reading_from_beginning = true) override {
  source_type_ = SourceType_t::FileList;
  worker_group_.reset(new DataReaderWorkerGroupNorm<TypeKey>(
      thread_buffers_, resource_manager_, file_name, repeat_, check_type, params_,
      start_reading_from_beginning));
  file_name_ = file_name;
}
Copy the code
4.3.3.3 DataReaderWorkerGroupNorm

Among DataReaderWorkerGroupNorm, DataReaderWorker is established, including file_list_ is need to read the data files.

template <typename TypeKey>
class DataReaderWorkerGroupNorm : public DataReaderWorkerGroup {
  
  std::string file_list_; /**< file list of data set */

  std::shared_ptr<Source> create_source(size_t worker_id, size_t num_worker,
                                        const std::string &file_name, bool repeat) override {
    return std::make_shared<FileSource>(worker_id, num_worker, file_name, repeat);
  }

 public:
  // Ctor
  DataReaderWorkerGroupNorm(const std::vector<std::shared_ptr<ThreadBuffer>> &output_buffers,
                            const std::shared_ptr<ResourceManager> &resource_manager_,
                            std::string file_list, bool repeat, Check_t check_type,
                            const std::vector<DataReaderSparseParam> &params,
                            bool start_reading_from_beginning = true)
      : DataReaderWorkerGroup(start_reading_from_beginning, DataReaderType_t::Norm) {

    int num_threads = output_buffers.size(a);size_t local_gpu_count = resource_manager_->get_local_gpu_count(a);// create data reader workers
    int max_feature_num_per_sample = 0;
    for (auto &param : params) {
      max_feature_num_per_sample += param.max_feature_num;
    }

    set_resource_manager(resource_manager_);
    for (int i = 0; i < num_threads; i++) {
      std::shared_ptr<IDataReaderWorker> data_reader(new DataReaderWorker<TypeKey>(
          i, num_threads, resource_manager_->get_local_gpu(i % local_gpu_count),
          &data_reader_loop_flag_, output_buffers[i], file_list, max_feature_num_per_sample, repeat,
          check_type, params));
      data_readers_.push_back(data_reader);
    }
    create_data_reader_threads();
  }
};
Copy the code

Multiple threads, datA_reader_THREADs_, are then created to run these wokers separately.

  /** * Create threads to run data reader workers >>>>>>> v3.1_preview */
  void create_data_reader_threads(a) {

    size_t local_gpu_count = resource_manager_->get_local_gpu_count(a);for (size_t i = 0; i < data_readers_.size(a); ++i) {auto local_gpu = resource_manager_->get_local_gpu(i % local_gpu_count);
      data_reader_threads_.emplace_back(data_reader_thread_func_, data_readers_[i],
                                        &data_reader_loop_flag_, local_gpu->get_device_id()); }}Copy the code
4.3.4 summary

So let’s summarize. DataReader contains the first two levels of the pipeline, and the current analysis covers the first level. In the Reader, there is a worker group, which contains several workers and several corresponding threads to run these workers. The Data Reader worker is the first level of the assembly line. The second level of Collecotr we will skip over for the moment and cover in the next chapter.

4.4 Embedding

Let’s go straight to pipelining level 3, where the following code creates an embed.

create_embedding<TypeKey, float>()(
            sparse_input_map, train_tensor_entries_list, evaluate_tensor_entries_list,
            embeddings, embedding_type, config_, resource_manager, batch_size_,
            batch_size_eval_, exchange_wgrad, use_mixed_precision_, scaler_, j, use_cuda_graph_,
            grouped_all_reduce_);
Copy the code

Here established some embedding, such as DistributedSlotSparseEmbeddingHash.

As mentioned earlier, HugeCTR contains several hashes, such as:

  • LocalizedSlotEmbeddingHash: features in the same slot (domain) are stored in a GPU, which is why it’s called the “localization” trough, according to the index number of groove, different slot may be stored in different GPU.

  • DistributedSlotEmbeddingHash: all features are stored in different domain/slot, no matter how much trough index number is all of these characteristics according to the characteristics of the index number distribution to different gpus. This means that features in the same slot may be stored in different Gpus, which is why it is called a “distributed slot”.

Much of the following code is omitted, so interested readers can delve into the source code.

template <typename TypeKey, typename TypeFP>
void create_embedding<TypeKey, TypeFP>::operator()(
    std::map<std::string, SparseInput<TypeKey>>& sparse_input_map,
    std::vector<TensorEntry>* train_tensor_entries_list,
    std::vector<TensorEntry>* evaluate_tensor_entries_list,
    std::vector<std::shared_ptr<IEmbedding>>& embeddings, Embedding_t embedding_type,
    const nlohmann::json& config, const std::shared_ptr<ResourceManager>& resource_manager,
    size_t batch_size, size_t batch_size_eval, std::shared_ptr<ExchangeWgrad>& exchange_wgrad,
    bool use_mixed_precision, float scaler, const nlohmann::json& j_layers, bool use_cuda_graph,
    bool grouped_all_reduce) {
  
#ifdef ENABLE_MPI
  int num_procs = 1, pid = 0; // Establish MPI correlation
  MPI_Comm_rank(MPI_COMM_WORLD, &pid);
  MPI_Comm_size(MPI_COMM_WORLD, &num_procs);
#endif

  // Read from the configuration file
  auto j_optimizer = get_json(config, "optimizer");
  auto embedding_name = get_value_from_json<std::string>(j_layers, "type");
  auto bottom_name = get_value_from_json<std::string>(j_layers, "bottom");
  auto top_name = get_value_from_json<std::string>(j_layers, "top");
  auto j_hparam = get_json(j_layers, "sparse_embedding_hparam");
  size_t workspace_size_per_gpu_in_mb =
      get_value_from_json_soft<size_t>(j_hparam, "workspace_size_per_gpu_in_mb".0);
  auto embedding_vec_size = get_value_from_json<size_t>(j_hparam, "embedding_vec_size");
  size_t max_vocabulary_size_per_gpu =
      (workspace_size_per_gpu_in_mb * 1024 * 1024)/(sizeof(float) * embedding_vec_size);
  auto combiner_str = get_value_from_json<std::string>(j_hparam, "combiner");

  int combiner; // Set the combiner method
  if (combiner_str == "sum") {
    combiner = 0;
  } else if (combiner_str == "mean") {
    combiner = 1;
  } else {
    CK_THROW_(Error_t::WrongInput, "No such combiner type: " + combiner_str);
  }

  // Set slot configuration
  std::vector<size_t> slot_size_array;
  if (has_key_(j_hparam, "slot_size_array")) {
    auto slots = get_json(j_hparam, "slot_size_array");
    assert(slots.is_array());
    for (auto slot : slots) {
      slot_size_array.emplace_back(slot.get<size_t> ()); } } SparseInput<TypeKey> sparse_input;// Set the optimizer configuration
  OptParams embedding_opt_params;
  if (has_key_(j_layers, "optimizer")) {
    embedding_opt_params = get_optimizer_param(get_json(j_layers, "optimizer"));
  } else {
    embedding_opt_params = get_optimizer_param(j_optimizer);
  }
  embedding_opt_params.scaler = scaler;

  // Create different hashes
  switch (embedding_type) {
    case Embedding_t::DistributedSlotSparseEmbeddingHash: {
      const SparseEmbeddingHashParams embedding_params = {batch_size,
                                                          batch_size_eval,
                                                          max_vocabulary_size_per_gpu,
                                                          {},
                                                          embedding_vec_size,
                                                          sparse_input.max_feature_num_per_sample,
                                                          sparse_input.slot_num,
                                                          combiner,  // combiner: 0-sum, 1-mean
                                                          embedding_opt_params};

      embeddings.emplace_back(new DistributedSlotSparseEmbeddingHash<TypeKey, TypeFP>(
          sparse_input.train_sparse_tensors, sparse_input.evaluate_sparse_tensors, embedding_params,
          resource_manager));
      break;
    }
    case Embedding_t::LocalizedSlotSparseEmbeddingHash: {
      const SparseEmbeddingHashParams embedding_params = {batch_size,
                                                          batch_size_eval,
                                                          max_vocabulary_size_per_gpu,
                                                          slot_size_array,
                                                          embedding_vec_size,
                                                          sparse_input.max_feature_num_per_sample,
                                                          sparse_input.slot_num,
                                                          combiner,  // combiner: 0-sum, 1-mean
                                                          embedding_opt_params};

      embeddings.emplace_back(new LocalizedSlotSparseEmbeddingHash<TypeKey, TypeFP>(
          sparse_input.train_sparse_tensors, sparse_input.evaluate_sparse_tensors, embedding_params,
          resource_manager));

      break;
    }
    case Embedding_t::LocalizedSlotSparseEmbeddingOneHot: {
      constSparseEmbeddingHashParams embedding_params = {... }; embeddings.emplace_back(new LocalizedSlotSparseEmbeddingOneHot<TypeKey, TypeFP>(
          sparse_input.train_sparse_tensors, sparse_input.evaluate_sparse_tensors, embedding_params,
          resource_manager));

      break;
    }
    case Embedding_t::HybridSparseEmbedding: {
      constHybridSparseEmbeddingParams<TypeFP> embedding_params = {... }; embeddings.emplace_back(new HybridSparseEmbedding<TypeKey, TypeFP>(
          sparse_input.train_sparse_tensors, sparse_input.evaluate_sparse_tensors, embedding_params,
          embed_wgrad_buff, get_gpu_learning_rate_schedulers(config, resource_manager), graph_mode,
          resource_manager));
      break; }}// switch
  
}
Copy the code

4.5 Establishing a Network

After this part, the hugeCTR system is set up and ready for training. The general logic is as follows:

  • Create_block (BufferBlockImpl) is used heavily to allocate GPU memory.
  • Establish the training network layer.
  • Establish an evaluation network layer.
  • Build the optimizer.
  • Initialize other network information.
Network* Network::create_network(const nlohmann::json& j_array, const nlohmann::json& j_optimizer,
                                 std::vector<TensorEntry>& train_tensor_entries,
                                 std::vector<TensorEntry>& evaluate_tensor_entries,
                                 int num_networks_in_global,
                                 std::shared_ptr<ExchangeWgrad>& exchange_wgrad,
                                 const std::shared_ptr<CPUResource>& cpu_resource,
                                 const std::shared_ptr<GPUResource>& gpu_resource,
                                 bool use_mixed_precision, bool enable_tf32_compute, float scaler,
                                 bool use_algorithm_search, bool use_cuda_graph,
                                 bool inference_flag, bool grouped_all_reduce) {
  Network* network = new Network(cpu_resource, gpu_resource, use_mixed_precision, use_cuda_graph);

  auto& train_layers = network->train_layers_;
  auto* bottom_layers = &network->bottom_layers_;
  auto* top_layers = &network->top_layers_;
  auto& evaluate_layers = network->evaluate_layers_;
  auto& train_loss_tensor = network->train_loss_tensor_;
  auto& evaluate_loss_tensor = network->evaluate_loss_tensor_;
  auto& train_loss = network->train_loss_;
  auto& evaluate_loss = network->evaluate_loss_;
  auto& enable_cuda_graph = network->enable_cuda_graph_;
  auto& raw_metrics = network->raw_metrics_;

  // The GPU memory is allocated, and create_block is used heavily, where BufferBlockImpl is used
  std::shared_ptr<GeneralBuffer2<CudaAllocator>> blobs_buff =
      GeneralBuffer2<CudaAllocator>::create(a); std::shared_ptr<BufferBlock2<float>> train_weight_buff = blobs_buff->create_block<float> (); std::shared_ptr<BufferBlock2<__half>> train_weight_buff_half = blobs_buff->create_block<__half>(); std::shared_ptr<BufferBlock2<float>> wgrad_buff = nullptr;
  std::shared_ptr<BufferBlock2<__half>> wgrad_buff_half = nullptr;

  if(! inference_flag) {if (use_mixed_precision) {
      auto id = gpu_resource->get_local_id(a); wgrad_buff_half = (grouped_all_reduce) ? std::dynamic_pointer_cast<GroupedExchangeWgrad<__half>>(exchange_wgrad) ->get_network_wgrad_buffs()[id]
              : std::dynamic_pointer_cast<NetworkExchangeWgrad<__half>>(exchange_wgrad)
                    ->get_network_wgrad_buffs()[id];
      wgrad_buff = blobs_buff->create_block<float> ();// placeholder
    } else {
      auto id = gpu_resource->get_local_id(a); wgrad_buff = (grouped_all_reduce) ? std::dynamic_pointer_cast<GroupedExchangeWgrad<float>>(exchange_wgrad)
                             ->get_network_wgrad_buffs()[id]
                       : std::dynamic_pointer_cast<NetworkExchangeWgrad<float>>(exchange_wgrad)
                             ->get_network_wgrad_buffs()[id];
      wgrad_buff_half = blobs_buff->create_block<__half>();  // placeholder}}else {
    wgrad_buff = blobs_buff->create_block<float> (); wgrad_buff_half = blobs_buff->create_block<__half>(); } std::shared_ptr<BufferBlock2<float>> evaluate_weight_buff = blobs_buff->create_block<float> (); std::shared_ptr<BufferBlock2<__half>> evaluate_weight_buff_half = blobs_buff->create_block<__half>(); std::shared_ptr<BufferBlock2<float>> wgrad_buff_placeholder = blobs_buff->create_block<float> (); std::shared_ptr<BufferBlock2<__half>> wgrad_buff_half_placeholder = blobs_buff->create_block<__half>(); std::shared_ptr<BufferBlock2<float>> opt_buff = blobs_buff->create_block<float> (); std::shared_ptr<BufferBlock2<__half>> opt_buff_half = blobs_buff->create_block<__half>();// Create the training network layer
  if(! inference_flag) {// create train layers
    create_layers(j_array, train_tensor_entries, blobs_buff, train_weight_buff,
                  train_weight_buff_half, wgrad_buff, wgrad_buff_half, train_loss_tensor,
                  gpu_resource, use_mixed_precision, enable_tf32_compute, num_networks_in_global,
                  scaler, enable_cuda_graph, inference_flag, train_layers, train_loss, nullptr,
                  top_layers, bottom_layers);
  }

  // Set up the evaluation network layer
  // create evaluate layers
  create_layers(j_array, evaluate_tensor_entries, blobs_buff, evaluate_weight_buff,
                evaluate_weight_buff_half, wgrad_buff_placeholder, wgrad_buff_half_placeholder,
                evaluate_loss_tensor, gpu_resource, use_mixed_precision, enable_tf32_compute,
                num_networks_in_global, scaler, enable_cuda_graph, inference_flag, evaluate_layers,
                evaluate_loss, &raw_metrics);

  // Create the optimizer
  // create optimizer
  if(! inference_flag) {if (use_mixed_precision) {
      auto opt_param = get_optimizer_param(j_optimizer);

      network->optimizer_ = std::move(Optimizer::Create(opt_param, train_weight_buff->as_tensor(),
                                                        wgrad_buff_half->as_tensor(), scaler,
                                                        opt_buff_half, gpu_resource));
    } else {
      auto opt_param = get_optimizer_param(j_optimizer);

      network->optimizer_ =
          std::move(Optimizer::Create(opt_param, train_weight_buff->as_tensor(),
                                      wgrad_buff->as_tensor(), scaler, opt_buff, gpu_resource)); }}else {
    try {
      TensorEntry pred_tensor_entry = evaluate_tensor_entries.back(a);if (use_mixed_precision) {
        network->pred_tensor_half_ = Tensor2<__half>::stretch_from(pred_tensor_entry.bag);
      } else {
        network->pred_tensor_ = Tensor2<float> : :stretch_from(pred_tensor_entry.bag); }}catch (const std::runtime_error& rt_err) {
      std::cerr << rt_err.what() << std::endl;
      throw; }}// Initialize other network information
  network->train_weight_tensor_ = train_weight_buff->as_tensor(a); network->train_weight_tensor_half_ = train_weight_buff_half->as_tensor(a); network->wgrad_tensor_ = wgrad_buff->as_tensor(a); network->wgrad_tensor_half_ = wgrad_buff_half->as_tensor(a); network->evaluate_weight_tensor_ = evaluate_weight_buff->as_tensor(a); network->evaluate_weight_tensor_half_ = evaluate_weight_buff_half->as_tensor(a); network->opt_tensor_ = opt_buff->as_tensor(a); network->opt_tensor_half_ = opt_buff_half->as_tensor(a);CudaDeviceContext context(gpu_resource->get_device_id());
  blobs_buff->allocate(a);return network;
}
Copy the code

4.5.1 create_layers

Create_layers has two versions, one is HugeCTR/SRC/parsers/create_network CPP and HugeCTR/SRC/CPU/create_network_cpu CPP, Let’s take a look at this using the create_network.cpp code.

It’s essentially iterating through the JSON array read from the configuration and building up each layer, because there are so many layer types, we only give two examples.


void create_layers(const nlohmann::json& j_array, std::vector<TensorEntry>& tensor_entries,
                   const std::shared_ptr<GeneralBuffer2<CudaAllocator>>& blobs_buff,
                   const std::shared_ptr<BufferBlock2<float>>& weight_buff,
                   const std::shared_ptr<BufferBlock2<__half>>& weight_buff_half,
                   const std::shared_ptr<BufferBlock2<float>>& wgrad_buff,
                   const std::shared_ptr<BufferBlock2<__half>>& wgrad_buff_half,
                   Tensor2<float>& loss_tensor, const std::shared_ptr<GPUResource>& gpu_resource,
                   bool use_mixed_precision, bool enable_tf32_compute, int num_networks_in_global,
                   float scaler, bool& enable_cuda_graph, bool inference_flag,
                   std::vector<std::unique_ptr<Layer>>& layers, std::unique_ptr<ILoss>& loss,
                   metrics::RawMetricMap* raw_metrics, std::vector<Layer*>* top_layers = nullptr,
                   std::vector<Layer*>* bottom_layers = nullptr) {
  
  for (unsigned int i = 1; i < j_array.size(a); i++) {// Iterate over the JSON array
    const nlohmann::json& j = j_array[i];
    const auto layer_type_name = get_value_from_json<std::string>(j, "type");
    Layer_t layer_type;


    std::vector<TensorEntry> output_tensor_entries;
    // Get the input and output of this layer
    auto input_output_info = get_input_tensor_and_output_name(j, tensor_entries);
    
    switch (layer_type) {
      // Create the corresponding layer
      case Layer_t::ReduceMean: {
        int axis = get_json(j, "axis").get<int> ();// Input this layer
        Tensor2<float> in_tensor = Tensor2<float> : :stretch_from(input_output_info.inputs[0]);
        Tensor2<float> out_tensor;
        emplaceback_layer(
            new ReduceMeanLayer<float>(in_tensor, out_tensor, blobs_buff, axis, gpu_resource));
        // This layer output
        output_tensor_entries.push_back({input_output_info.output_names[0], out_tensor.shrink()});
        break;
      }

      case Layer_t::Softmax: {
        // Input this layer
        Tensor2<float> in_tensor = Tensor2<float> : :stretch_from(input_output_info.inputs[0]);
        Tensor2<float> out_tensor;
        blobs_buff->reserve(in_tensor.get_dimensions(), &out_tensor);
        // This layer output
        output_tensor_entries.push_back({input_output_info.output_names[0], out_tensor.shrink()});
        emplaceback_layer(new SoftmaxLayer<float>(in_tensor, out_tensor, blobs_buff, gpu_resource));
        break; }}// end of switch
    
  }  // for layers
}

Copy the code

4.5.2 layer

HugeCTR is a concrete and subtle deep learning system that implements the following layer types:

enum class Layer_t {
  BatchNorm,
  BinaryCrossEntropyLoss,
  Reshape,
  Concat,
  CrossEntropyLoss,
  Dropout,
  ELU,
  InnerProduct,
  FusedInnerProduct,
  Interaction,
  MultiCrossEntropyLoss,
  ReLU,
  ReLUHalf,
  GRU,
  MatrixMultiply,
  Scale,
  FusedReshapeConcat,
  FusedReshapeConcatGeneral,
  Softmax,
  PReLU_Dice,
  ReduceMean,
  Sub,
  Gather,
  Sigmoid,
  Slice,
  WeightMultiply,
  FmOrder2,
  Add,
  ReduceSum,
  MultiCross,
  Cast,
  DotProduct,
  ElementwiseMultiply
};
Copy the code

We use SigmoidLayer as an example, so let’s see.

/** * Sigmoid activation function as a derived class of Layer */
template <typename T>
class SigmoidLayer : public Layer {
  /* * stores the references to the input tensors of this layer. */
  Tensors2<T> in_tensors_;
  /* * stores the references to the output tensors of this layer. */
  Tensors2<T> out_tensors_;

 public:
  /** * Ctor of SigmoidLayer. * @param in_tensor the input tensor * @param out_tensor the output tensor which has the same  dim with in_tensor * @param device_id the id of GPU where this layer belongs */
  SigmoidLayer(const Tensor2<T>& in_tensor, const Tensor2<T>& out_tensor,
               const std::shared_ptr<GPUResource>& gpu_resource);

  /** * A method of implementing the forward pass of Sigmoid * @param stream CUDA stream where the foward propagation is executed */
  void fprop(bool is_train) override;
  /** * A method of implementing the backward pass of Sigmoid * @param stream CUDA stream where the backward propagation is executed */
  void bprop(a) override;
};
Copy the code

Its forward propagation is as follows:

template <typename T>
void SigmoidLayer<T>::fprop(bool is_train) {
  CudaDeviceContext context(get_device_id());

  int len = in_tensors_[0].get_num_elements(a);auto fop = [] __device__(T in) { return T(1)/(T(1) + exponential(-in)); };

  MLCommon::LinAlg::unaryOp(out_tensors_[0].get_ptr(), in_tensors_[0].get_ptr(), len, fop,
                            get_gpu().get_stream());

#ifndef NDEBUG
  cudaDeviceSynchronize(a);CK_CUDA_THROW_(cudaGetLastError());
#endif
}
Copy the code

The subsequent propagation is as follows:

template <typename T>
void SigmoidLayer<T>::bprop() {
  CudaDeviceContext context(get_device_id());

  int len = in_tensors_[0].get_num_elements(a);auto bop = [] __device__(T d_out, T d_in) {
    T y = T(1)/(T(1) + exponential(-d_in));
    return d_out * y * (T(1) - y);
  };

  MLCommon::LinAlg::binaryOp(in_tensors_[0].get_ptr(), out_tensors_[0].get_ptr(),
                             in_tensors_[0].get_ptr(), len, bop, get_gpu().get_stream());

#ifndef NDEBUG
  cudaDeviceSynchronize(a);CK_CUDA_THROW_(cudaGetLastError());
#endif
}
Copy the code

Now that the HugeCTR has been initialized, we’re ready to start training by taking a snapshot of the official HugeCTR_Webinar, where CSR is the embedded layer dependent data format, which we’ll examine below.

4.5.3 How to connect layers in series

We still have a question, that is, how to connect layers with layers?

In create_layers there is the following code:

// Get the input and output of this layer
auto input_output_info = get_input_tensor_and_output_name(j, tensor_entries);
Copy the code

The code for get_input_tensor_and_output_name is shown below. As you can see, each layer records its own input and output, and with the internal parsing module, these layers establish their logical relationship.

static InputOutputInfo get_input_tensor_and_output_name(
    const nlohmann::json& json, const std::vector<TensorEntry>& tensor_entries) {
  auto bottom = get_json(json, "bottom");
  auto top = get_json(json, "top");

  // Get the input from Jason and print the name
  std::vector<std::string> bottom_names = get_layer_names(bottom);
  std::vector<std::string> top_names = get_layer_names(top);

  std::vector<TensorBag2> bottom_bags;
  // Put the output into a vector list
  for (auto& bottom_name : bottom_names) {
    for (auto& top_name : top_names) {
      if (bottom_name == top_name) {
        CK_THROW_(Error_t::WrongInput, "bottom and top include a same layer name");
      }
    }
    TensorBag2 bag;
    if (!get_tensor_from_entries(tensor_entries, bottom_name, &bag)) {
      CK_THROW_(Error_t::WrongInput, "No such bottom: " + bottom_name);
    }
    bottom_bags.push_back(bag);
  }
  return {bottom_bags, top_names}; / / return
}
Copy the code

Finally, the logical relationship of the assembly line is established as follows:

0 x05 training

The specific training code logic is as follows:

  • The batchsize data needs to be read by the reader.
  • Start parsing the data.
  • The embedding layer propagates forward, that is, it reads the embedding from the parameter server for processing.
  • Forward propagation and backward propagation are carried out for the network layer, which are specifically distinguished as multi-card, single-card, multi-machine, single-machine, etc.
  • The embedding layer works in reverse.
  • Gradient of exchange of dense parameters between multiple CAL.
  • Embedded layer updated sparse parameter.
  • The streams are synchronized.
bool Session::train(a) {
  try {
    // Make sure train_datA_reader_ is started
    if (train_data_reader_->is_started() = =false) {
      CK_THROW_(Error_t::IllegalCall,
                "Start the data reader first before calling Session::train()");
    }

#ifndef DATA_READING_TEST
    Batchsize = batchsize;
    long long current_batchsize = train_data_reader_->read_a_batch_to_device_delay_release(a);if(! current_batchsize) {return false; // No data is available
    }
    #pragma omp parallel num_threads(networks_.size()) // Subsequent statements will be executed in parallel by networks_.size() threads
    { 
      
      size_t id = omp_get_thread_num(a);CudaCPUDeviceContext ctx(resource_manager_->get_local_gpu(id)->get_device_id());
      cudaStreamSynchronize(resource_manager_->get_local_gpu(id)->get_stream());
    }
    // Reader can start parsing data
    train_data_reader_->ready_to_collect(a);#ifdef ENABLE_PROFILING
    global_profiler.iter_check(a);#endif

    // If true we're gonna use overlaping, if false we use default
    if (solver_config_.use_overlapped_pipeline) {
      train_overlapped(a); }else {
      for (const auto& one_embedding : embeddings_) {
        one_embedding->forward(true); // The embedding layer propagates forward, that is, reads the embedding from the parameter server for processing
      }

      // Network forward / backward
      if (networks_.size(a) >1) { // If the model size is greater than 1, it means that there are many cards
        // Single-machine multi-card or multi-machine multi-card
        // execute dense forward and backward with multi-cpu threads
        #pragma omp parallel num_threads(networks_.size())
        {
          // Forward reversal of the dense network
          size_t id = omp_get_thread_num(a);long long current_batchsize_per_device =
              train_data_reader_->get_current_batchsize_per_device(id);
          networks_[id]->train(current_batchsize_per_device); // Forward operation
          const auto& local_gpu = resource_manager_->get_local_gpu(id);
          local_gpu->set_compute_event_sync(local_gpu->get_stream());
          local_gpu->wait_on_compute_event(local_gpu->get_comp_overlap_stream()); }}else if (resource_manager_->get_global_gpu_count(a) >1) {
        // Multi-machine single card
        long long current_batchsize_per_device =
            train_data_reader_->get_current_batchsize_per_device(0);
        networks_[0] - >train(current_batchsize_per_device); // Forward operation
        const auto& local_gpu = resource_manager_->get_local_gpu(0);
        local_gpu->set_compute_event_sync(local_gpu->get_stream());
        local_gpu->wait_on_compute_event(local_gpu->get_comp_overlap_stream());
      } else {
        // Single card
        long long current_batchsize_per_device =
            train_data_reader_->get_current_batchsize_per_device(0);
        networks_[0] - >train(current_batchsize_per_device); // Forward operation
        const auto& local_gpu = resource_manager_->get_local_gpu(0);
        local_gpu->set_compute_event_sync(local_gpu->get_stream());
        local_gpu->wait_on_compute_event(local_gpu->get_comp_overlap_stream());
        networks_[0] - >update_params(a); }// Embedding backward
      for (const auto& one_embedding : embeddings_) {
        one_embedding->backward(a);// The embedded layer works in reverse
      }

      // Exchange wgrad and update params
      if (networks_.size(a) >1) {
        #pragma omp parallel num_threads(networks_.size())
        {
          size_t id = omp_get_thread_num(a);exchange_wgrad(id); // Gradient of exchange of dense parameters between multiple cards
          networks_[id]->update_params();
        }
      } else if (resource_manager_->get_global_gpu_count(a) >1) {
        exchange_wgrad(0);
        networks_[0] - >update_params(a); }for (const auto& one_embedding : embeddings_) {
        one_embedding->update_params(a);// Insert layer update sparse parameter
      }

      // Join streams Synchronizes the streams
      if (networks_.size(a) >1) {
        #pragma omp parallel num_threads(networks_.size())
        {
          size_t id = omp_get_thread_num(a);const auto& local_gpu = resource_manager_->get_local_gpu(id);
          local_gpu->set_compute2_event_sync(local_gpu->get_comp_overlap_stream());
          local_gpu->wait_on_compute2_event(local_gpu->get_stream()); }}else {
        const auto& local_gpu = resource_manager_->get_local_gpu(0);
        local_gpu->set_compute2_event_sync(local_gpu->get_comp_overlap_stream());
        local_gpu->wait_on_compute2_event(local_gpu->get_stream());
      }
      return true;
    }
#else
      data_reader_->read_a_batch_to_device(a);#endif

  } catch (const internal_runtime_error& err) {
    std::cerr << err.what() << std::endl;
    throw err;
  } catch (const std::exception& err) {
    std::cerr << err.what() << std::endl;
    throw err;
  }
  return true;
}
Copy the code

The training process is as follows:

Now that we have a general idea of how HugeCTR is initialized and trained, the next article will show you how to read data.

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

0 XFF reference

Developer.nvidia.com/blog/introd…

Developer.nvidia.com/blog/announ…

Developer.nvidia.com/blog/accele…

Read HugeCTR source code

How does embedding propagate back

Web.eecs.umich.edu/~justincj/t…

HugeCTR_Webinar