0 x00 the

In this series, we introduced HugeCTR, an industry-oriented recommendation system training framework optimized for large-scale CTR models with model parallel embedding and data-parallel dense networks. This paper introduces LocalizedSlotSparseEmbeddingHash after operation.

Thanks for using HugeCTR source code to read this masterpiece.

Other articles in this series are as follows:

NVIDIA HugeCTR, GPU version parameter server –(1)

NVIDIA HugeCTR, GPU version parameter server — (2)

NVIDIA HugeCTR, GPU version parameter server –(3)

NVIDIA HugeCTR, GPU version parameter server — (4)

NVIDIA HugeCTR GPU version Parameter server — (5) Embedded hash table

NVIDIA HugeCTR, GPU version parameter (6) — Distributed hash table

NVIDIA HugeCTR, GPU version parameter server –(7) –Distributed Hash before transmission

NVIDIA HugeCTR, GPU version parameter server –(8) –Distributed Hash after transmission

0x01 Previous review

From the previous analysis, we can learn that the overall flow of an embedded table LOOKUP is as follows.

0 x02 definition

LocalizedSlotSparseEmbeddingHash class inherits from the Embedding, Embedding is the realization of the base class for all embedded layers. In LocalizedSlotSparseEmbeddingHash class, embedded in the table some slots are assigned to a single GPU, called localization slot. For example, slot 0 on GPU-0, slot 1 on GPU-1, slot 2 on GPU-0, and slot 3 on GPU-1. In contrast, some of DistributedSlotSparseEmbeddingHash slots assigned to more than one GPU.

Embedded tables are encapsulated in a Hash table. The key in the hash table is called hash_table_key, and the value in the hash table is called hash_table_value_index, which represents the line number of the embedding feature in the embedded table, and the embedding feature is called hash_table_value.

LocalizedSlotSparseEmbeddingHash layer implements the embedded training process for all operations, including to spread before and after the spread. Forward propagation corresponds to API forward. Backpropagation is divided into two phases of the API: BACKWARD and update_Params. This class also provides the operation of uploading a hash table (including hash table keys, hash table value indexes, and hash table values) from a host file to a GPU named load_parameters, and downloading a hash table from the GPU to a host file named dump_Parameters.

template <typename TypeHashKey, typename TypeEmbeddingComp>
class LocalizedSlotSparseEmbeddingHash : public IEmbedding {
  using NvHashTable = HashTable<TypeHashKey, size_t>;

 private:
  EmbeddingData<TypeHashKey, TypeEmbeddingComp> embedding_data_;
  std::vector<LocalizedFilterKeyStorage<TypeHashKey>> filter_keys_storages_;

  std::vector<std::shared_ptr<NvHashTable>> hash_tables_; /**< Hash table. */

  // define tensors
  Tensors2<float> hash_table_value_tensors_; /**< Hash table value. */
  std::vector<Tensors2<float>> value_table_tensors_;

  Tensors2<size_t> hash_table_slot_id_tensors_; /**< the tensors for storing slot ids */
  Tensors2<size_t> hash_value_index_tensors_;   /**< Hash value index. The index is corresponding to the line number of the value. */
  Tensors2<TypeEmbeddingComp>
      embedding_feature_tensors_;             /**< the output tensor of the forward(). */
  Tensors2<TypeEmbeddingComp> wgrad_tensors_; /**< the input tensor of the backward(). */

  std::vector<EmbeddingOptimizer<TypeHashKey, TypeEmbeddingComp>> embedding_optimizers_;
  size_t max_vocabulary_size_;
  size_t max_vocabulary_size_per_gpu_;   /**< Max vocabulary size for each GPU. */
  std::vector<size_t> slot_num_per_gpu_; /* slot_num per GPU */
  std::vector<size_t> slot_size_array_;

  SparseEmbeddingFunctors functors_;

  Tensors2<TypeEmbeddingComp> all2all_tensors_; /**< the temple buffer to store all2all results */

  Tensors2<TypeEmbeddingComp> utest_all2all_tensors_;
  Tensors2<TypeEmbeddingComp> utest_reorder_tensors_;
  Tensors2<TypeEmbeddingComp> utest_backward_temp_tensors_;
  Tensors2<TypeEmbeddingComp> utest_forward_temp_tensors_;
}
Copy the code

0 x03 build

3.1 call

In HugeCTR/SRC/parsers/create_embedding. CPP, has the following call:

case Embedding_t::LocalizedSlotSparseEmbeddingHash: {
  const SparseEmbeddingHashParams embedding_params = {batch_size,
                                                      batch_size_eval,
                                                      max_vocabulary_size_per_gpu,
                                                      slot_size_array,
                                                      embedding_vec_size,
                                                      sparse_input.max_feature_num_per_sample,
                                                      sparse_input.slot_num,
                                                      combiner,  // combiner: 0-sum, 1-mean
                                                      embedding_opt_params};

  embeddings.emplace_back(new LocalizedSlotSparseEmbeddingHash<TypeKey, TypeFP>(
      sparse_input.train_sparse_tensors, sparse_input.evaluate_sparse_tensors, embedding_params,
      resource_manager));

  break;
}
Copy the code

3.2 Constructors

LocalizedSlotSparseEmbeddingHash constructor as follows, please see the following specific logic annotations.

template <typename TypeHashKey, typename TypeEmbeddingComp>
LocalizedSlotSparseEmbeddingHash<TypeHashKey, TypeEmbeddingComp>::LocalizedSlotSparseEmbeddingHash(
    const SparseTensors<TypeHashKey> &train_keys, const SparseTensors<TypeHashKey> &evaluate_keys,
    const SparseEmbeddingHashParams &embedding_params,
    const std::shared_ptr<ResourceManager> &resource_manager)
    : embedding_data_(Embedding_t::LocalizedSlotSparseEmbeddingHash, train_keys, evaluate_keys,
                      embedding_params, resource_manager),
      slot_size_array_(embedding_params.slot_size_array) {
  try {
    // Set the maximum amount of data per GPU
    if (slot_size_array_.empty()) {
      max_vocabulary_size_per_gpu_ = embedding_data_.embedding_params_.max_vocabulary_size_per_gpu;
      max_vocabulary_size_ = embedding_data_.embedding_params_.max_vocabulary_size_per_gpu *
                             embedding_data_.get_resource_manager().get_global_gpu_count(a); }else {
      max_vocabulary_size_per_gpu_ =
          cal_max_voc_size_per_gpu(slot_size_array_, embedding_data_.get_resource_manager());
      max_vocabulary_size_ = 0;
      for (size_t slot_size : slot_size_array_) {
        max_vocabulary_size_ += slot_size;
      }
    }

    CudaDeviceContext context;
    // Iterate over the local GPU
    for (size_t id = 0; id < embedding_data_.get_resource_manager().get_local_gpu_count(a); id++) {// Set the current context
      context.set_device(embedding_data_.get_local_gpu(id).get_device_id());

      // Number of slots per GPU
      size_t gid = embedding_data_.get_local_gpu(id).get_global_id(a);size_t slot_num_per_gpu =
          embedding_data_.embedding_params_.slot_num /
              embedding_data_.get_resource_manager().get_global_gpu_count() +
          ((gid < embedding_data_.embedding_params_.slot_num %
                      embedding_data_.get_resource_manager().get_global_gpu_count())?1
               : 0);
      slot_num_per_gpu_.push_back(slot_num_per_gpu);
      // new GeneralBuffer objects
      const std::shared_ptr<GeneralBuffer2<CudaAllocator>> &buf = embedding_data_.get_buffer(id);
      embedding_optimizers_.emplace_back(max_vocabulary_size_per_gpu_,
                                         embedding_data_.embedding_params_, buf);

      // Allocate memory for various variables
      // new hash table value vectors
      if (slot_size_array_.empty()) {
        Tensor2<float> tensor;
        buf->reserve(
            {max_vocabulary_size_per_gpu_, embedding_data_.embedding_params_.embedding_vec_size},
            &tensor);
        hash_table_value_tensors_.push_back(tensor);
      } else {
        const std::shared_ptr<BufferBlock2<float>> &block = buf->create_block<float> (); Tensors2<float> tensors;
        size_t vocabulary_size_in_current_gpu = 0;
        for (size_t i = 0; i < slot_size_array_.size(a); i++) {if ((i % embedding_data_.get_resource_manager().get_global_gpu_count()) == gid) {
            Tensor2<float> tensor;
            block->reserve(
                {slot_size_array_[i], embedding_data_.embedding_params_.embedding_vec_size},
                &tensor);
            tensors.push_back(tensor);
            vocabulary_size_in_current_gpu += slot_size_array_[i];
          }
        }
        value_table_tensors_.push_back(tensors);
        if (max_vocabulary_size_per_gpu_ > vocabulary_size_in_current_gpu) {
          Tensor2<float> padding_tensor_for_optimizer;
          block->reserve({max_vocabulary_size_per_gpu_ - vocabulary_size_in_current_gpu,
                          embedding_data_.embedding_params_.embedding_vec_size},
                         &padding_tensor_for_optimizer);
        }
        hash_table_value_tensors_.push_back(block->as_tensor());
      }
      {
        Tensor2<TypeHashKey> tensor;
        buf->reserve({embedding_data_.embedding_params_.get_batch_size(true),
                      embedding_data_.embedding_params_.max_feature_num},
                     &tensor);
        embedding_data_.train_value_tensors_.push_back(tensor);
      }
      {
        Tensor2<TypeHashKey> tensor;
        buf->reserve({embedding_data_.embedding_params_.get_batch_size(false),
                      embedding_data_.embedding_params_.max_feature_num},
                     &tensor);
        embedding_data_.evaluate_value_tensors_.push_back(tensor);
      }
      {
        Tensor2<TypeHashKey> tensor;
        buf->reserve(
            {embedding_data_.embedding_params_.get_batch_size(true) * slot_num_per_gpu + 1},
            &tensor);
        embedding_data_.train_row_offsets_tensors_.push_back(tensor);
      }
      {
        Tensor2<TypeHashKey> tensor;
        buf->reserve(
            {embedding_data_.embedding_params_.get_batch_size(false) * slot_num_per_gpu + 1},
            &tensor);
        embedding_data_.evaluate_row_offsets_tensors_.push_back(tensor);
      }
      { embedding_data_.train_nnz_array_.push_back(std::make_shared<size_t> (0)); }
      { embedding_data_.evaluate_nnz_array_.push_back(std::make_shared<size_t> (0)); }
      // new hash table value_index that get() from HashTable
      {
        Tensor2<size_t> tensor;
        buf->reserve({1, embedding_data_.embedding_params_.get_universal_batch_size() *
                             embedding_data_.embedding_params_.max_feature_num},
                     &tensor);
        hash_value_index_tensors_.push_back(tensor);
      }

      // new embedding features reduced by hash table values(results of forward)
      {
        Tensor2<TypeEmbeddingComp> tensor;
        buf->reserve(
            {embedding_data_.embedding_params_.get_universal_batch_size() * slot_num_per_gpu,
             embedding_data_.embedding_params_.embedding_vec_size},
            &tensor);
        embedding_feature_tensors_.push_back(tensor);
      }

      // new wgrad used by backward
      {
        Tensor2<TypeEmbeddingComp> tensor;
        buf->reserve({embedding_data_.embedding_params_.get_batch_size(true) * slot_num_per_gpu,
                      embedding_data_.embedding_params_.embedding_vec_size},
                     &tensor);
        wgrad_tensors_.push_back(tensor);
      }

      // the tenosrs for storing slot ids
      // TODO: init to -1 ?
      {
        Tensor2<size_t> tensor;
        buf->reserve({max_vocabulary_size_per_gpu_, 1}, &tensor);
        hash_table_slot_id_tensors_.push_back(tensor);
      }
      // temp tensors for all2all
      {
        Tensor2<TypeEmbeddingComp> tensor;
        buf->reserve({embedding_data_.get_universal_batch_size_per_gpu() *
                          embedding_data_.embedding_params_.slot_num,
                      embedding_data_.embedding_params_.embedding_vec_size},
                     &tensor);
        all2all_tensors_.push_back(tensor);
      }
      {
        Tensor2<TypeEmbeddingComp> tensor;
        buf->reserve({embedding_data_.embedding_params_.get_universal_batch_size() *
                          embedding_data_.embedding_params_.slot_num,
                      embedding_data_.embedding_params_.embedding_vec_size},
                     &tensor);
        utest_forward_temp_tensors_.push_back(tensor);
      }
      {
        Tensor2<TypeEmbeddingComp> tensor;
        buf->reserve({embedding_data_.get_batch_size_per_gpu(true) *
                          embedding_data_.embedding_params_.slot_num,
                      embedding_data_.embedding_params_.embedding_vec_size},
                     &tensor);
        utest_all2all_tensors_.push_back(tensor);
      }
      {
        Tensor2<TypeEmbeddingComp> tensor;
        buf->reserve({embedding_data_.get_batch_size_per_gpu(true) *
                          embedding_data_.embedding_params_.slot_num,
                      embedding_data_.embedding_params_.embedding_vec_size},
                     &tensor);
        utest_reorder_tensors_.push_back(tensor);
      }
      {
        Tensor2<TypeEmbeddingComp> tensor;
        buf->reserve({embedding_data_.embedding_params_.get_batch_size(true) *
                          embedding_data_.embedding_params_.slot_num,
                      embedding_data_.embedding_params_.embedding_vec_size},
                     &tensor);
        utest_backward_temp_tensors_.push_back(tensor);
      }
      {
        size_t max_nnz = embedding_data_.embedding_params_.get_universal_batch_size() *
                         embedding_data_.embedding_params_.max_feature_num;
        size_t rowoffset_count = embedding_data_.embedding_params_.slot_num *
                                     embedding_data_.embedding_params_.get_universal_batch_size() +
                                 1;

        filter_keys_storages_.emplace_back(buf, max_nnz, rowoffset_count);
      }
    }

    hash_tables_.resize(embedding_data_.get_resource_manager().get_local_gpu_count());
#pragma omp parallel for num_threads(embedding_data_.get_resource_manager().get_local_gpu_count())
    for (size_t id = 0; id < embedding_data_.get_resource_manager().get_local_gpu_count(a); id++) {// Initialize the internal hash table
      CudaDeviceContext context(embedding_data_.get_local_gpu(id).get_device_id());
      // construct HashTable object: used to store hash table <key, value_index>
      hash_tables_[id].reset(new NvHashTable(max_vocabulary_size_per_gpu_));
      embedding_data_.get_buffer(id)->allocate(a); }// Initialize the optimizer
    for (size_t id = 0; id < embedding_data_.get_resource_manager().get_local_gpu_count(a); id++) { context.set_device(embedding_data_.get_local_gpu(id).get_device_id());
      embedding_optimizers_[id].initialize(embedding_data_.get_local_gpu(id));

    }  // end of for(int id = 0; id < embedding_data_.get_local_gpu_count(); id++)

    if(! embedding_data_.embedding_params_.slot_size_array.empty()) {
      std::vector<TypeHashKey> embedding_offsets;
      TypeHashKey slot_sizes_prefix_sum = 0;
      for (size_t i = 0; i < embedding_data_.embedding_params_.slot_size_array.size(a); i++) { embedding_offsets.push_back(slot_sizes_prefix_sum);
        slot_sizes_prefix_sum += embedding_data_.embedding_params_.slot_size_array[i];
      }
      for (size_t id = 0; id < embedding_data_.get_resource_manager().get_local_gpu_count(a); ++id) {CudaDeviceContext context(embedding_data_.get_local_gpu(id).get_device_id());

        CK_CUDA_THROW_(
            cudaMemcpy(embedding_data_.embedding_offsets_[id].get_ptr(), embedding_offsets.data(),
                       embedding_offsets.size(*)sizeof(TypeHashKey), cudaMemcpyHostToDevice)); }}// sync
    functors_.sync_all_gpus(embedding_data_.get_resource_manager());

  } catch (const std::runtime_error &rt_err) {
    std::cerr << rt_err.what() << std::endl;
    throw;
  }

  return;
}
Copy the code

3.3 How do I Determine a Slot

Let’s look at how to determine which GPU has which slot. Init_embedding is called in init_params to complete the build.

  /** * Initialize the embedding table */
  void init_params(a) override {
    // do hash table value initialization
    if (slot_size_array_.empty()) {  // if no slot_sizes provided, use the old method to init
      init_embedding(max_vocabulary_size_per_gpu_,
                     embedding_data_.embedding_params_.embedding_vec_size,
                     hash_table_value_tensors_);

    } else {
      if (slot_size_array_.size() == embedding_data_.embedding_params_.slot_num) {
#ifndef DATA_READING_TEST
        init_embedding(slot_size_array_, embedding_data_.embedding_params_.embedding_vec_size,
                       value_table_tensors_, hash_table_slot_id_tensors_);

#endif
      } else {
        throw std::runtime_error(
            std::string("[HCDEBUG][ERROR] Runtime error: the size of slot_sizes ! = slot_num\n")); }}}Copy the code

Init_embedding will create an embedded table on each GPU.

template <typename TypeHashKey, typename TypeEmbeddingComp>
void LocalizedSlotSparseEmbeddingHash<TypeHashKey, TypeEmbeddingComp>::init_embedding(
    const std::vector<size_t> &slot_sizes, size_t embedding_vec_size,
    std::vector<Tensors2<float>> &hash_table_value_tensors,
    Tensors2<size_t> &hash_table_slot_id_tensors) {
  
  // Get the number of Gpus on the local node and the number of gpus globally
  size_t local_gpu_count = embedding_data_.get_resource_manager().get_local_gpu_count(a);size_t total_gpu_count = embedding_data_.get_resource_manager().get_global_gpu_count(a);for (size_t id = 0; id < local_gpu_count; id++) { // Iterate over the local GPU
    // Use the global ID here
    size_t device_id = embedding_data_.get_local_gpu(id).get_device_id(a);size_t global_id = embedding_data_.get_local_gpu(id).get_global_id(a); functors_.init_embedding_per_gpu(global_id, total_gpu_count, slot_sizes, embedding_vec_size,
                                     hash_table_value_tensors[id], hash_table_slot_id_tensors[id],
                                     embedding_data_.get_local_gpu(id));
  }

  for (size_t id = 0; id < local_gpu_count; id++) {
    CK_CUDA_THROW_(cudaStreamSynchronize(embedding_data_.get_local_gpu(id).get_stream()));
  }

  return;
}
Copy the code

Let’s look at init_embedding_per_GPU, which is simply allocated with % arithmetic. For example, if there are 10 slots and 3 Gpus, the slot ids are 0 to 9 and the GPU ids are 0 to 2. 0~ 10% 3 = 0,1,2,0,1,2,0,1,2,0,1,2,0

  • GPU 0:0, 3, 6, 9

  • GPU 1:1, 4, 7,

  • GPU 2:2, 5, 8,

Therefore, slots per GPU are not equal.

void SparseEmbeddingFunctors::init_embedding_per_gpu(size_t gid, size_t total_gpu_count,
                                                     const std::vector<size_t> &slot_sizes,
                                                     size_t embedding_vec_size,
                                                     Tensors2<float> &embedding_tables,
                                                     Tensor2<size_t> &slot_ids,
                                                     const GPUResource &gpu_resource) {
  CudaDeviceContext context(gpu_resource.get_device_id());
  size_t *slot_ids_ptr = slot_ids.get_ptr(a);size_t key_offset = 0;
  size_t value_index_offset = 0;
  for (size_t i = 0, j = 0; i < slot_sizes.size(a); i++) {/ / traverse slot
    size_t slot_size = slot_sizes[i];
    if ((i % total_gpu_count) == gid) { / / the GPU id
      // The operation continues only when I is equal to gid
      float up_bound = sqrt(1.f / slot_size);
      HugeCTR::UniformGenerator::fill(
          embedding_tables[j++], -up_bound, up_bound, gpu_resource.get_sm_count(),
          gpu_resource.get_replica_variant_curand_generator(), gpu_resource.get_stream());
      // Configure slot ids
      memset_const(slot_ids_ptr, i, slot_size, gpu_resource.get_stream()); value_index_offset += slot_size; slot_ids_ptr += slot_size; } key_offset += slot_size; }}Copy the code

0x04 Forward Propagation

4.1 an overview

Let’s first summarize the steps of forward propagation:

  • First, configure EmbeddingData using filter_keys_per_GPU.

  • Secondly, forward_per_GPU is used to lookup from embedding, that is, functors_. Forward_per_gpu is called to lookup from the hashmap of the local GPU to get a dense vector.

  • Use all2ALL_forward to have all data for all samples on each GPU. The ultimate goal here is similar to dist idea. Each GPU only has several complete samples at last, and the samples are different on different Gpus. Therefore, data of the current sample in another slot needs to be copied to this GPU. In other words, in the result of all2all, only other slots of the current sample are selected.

  • Use forward_reorder to internally reorder the data for each GPU (more on that later).

  • Use store_slot_id to store slot ids. The reason why we need to save the slot ID of the parameter is that there were different slots on each GPU. Now, we need to put a sample of all slots on the same GPU, so we need to know which slot to load.

The specific code is as follows:

/** * The forward propagation of embedding layer. */
  void forward(bool is_train, int eval_batch = - 1) override {
#pragma omp parallel num_threads(embedding_data_.get_resource_manager().get_local_gpu_count())
    {
      size_t i = omp_get_thread_num(a);CudaDeviceContext context(embedding_data_.get_local_gpu(i).get_device_id());

      if (embedding_data_.embedding_params_.is_data_parallel) {
        filter_keys_per_gpu(is_train, i, embedding_data_.get_local_gpu(i).get_global_id(),
                            embedding_data_.get_resource_manager().get_global_gpu_count());
      }
      functors_.forward_per_gpu(
          embedding_data_.embedding_params_.get_batch_size(is_train), slot_num_per_gpu_[i],
          embedding_data_.embedding_params_.embedding_vec_size,
          embedding_data_.embedding_params_.combiner, is_train,
          embedding_data_.get_row_offsets_tensors(is_train)[i],
          embedding_data_.get_value_tensors(is_train)[i],
          *embedding_data_.get_nnz_array(is_train)[i], *hash_tables_[i],
          hash_table_value_tensors_[i], hash_value_index_tensors_[i], embedding_feature_tensors_[i],
          embedding_data_.get_local_gpu(i).get_stream());
    }

    Embedding_feature_tensors_ specifies the embedding vector
// do all-to-all
#ifndef ENABLE_MPI
    if (embedding_data_.get_resource_manager().get_global_gpu_count(a) >1) {
      functors_.all2all_forward(embedding_data_.get_batch_size_per_gpu(is_train), slot_num_per_gpu_,
                                embedding_data_.embedding_params_.embedding_vec_size,
                                embedding_feature_tensors_, all2all_tensors_,
                                embedding_data_.get_resource_manager());
    } else {
      CK_CUDA_THROW_(cudaMemcpyAsync(
          all2all_tensors_[0].get_ptr(), embedding_feature_tensors_[0].get_ptr(),
          embedding_data_.get_batch_size_per_gpu(is_train) * slot_num_per_gpu_[0] *
              embedding_data_.embedding_params_.embedding_vec_size * sizeof(TypeEmbeddingComp),
          cudaMemcpyDeviceToDevice, embedding_data_.get_local_gpu(0).get_stream()));
    }
#else
    if (embedding_data_.get_resource_manager().get_global_gpu_count(a) >1) {
      functors_.all2all_forward(embedding_data_.get_batch_size_per_gpu(is_train),
                                embedding_data_.embedding_params_.slot_num,
                                embedding_data_.embedding_params_.embedding_vec_size,
                                embedding_feature_tensors_, all2all_tensors_,
                                embedding_data_.get_resource_manager());
    } else {
      CK_CUDA_THROW_(cudaMemcpyAsync(
          all2all_tensors_[0].get_ptr(), embedding_feature_tensors_[0].get_ptr(),
          (size_t)embedding_data_.get_batch_size_per_gpu(is_train) * slot_num_per_gpu_[0] *
              embedding_data_.embedding_params_.embedding_vec_size * sizeof(TypeEmbeddingComp),
          cudaMemcpyDeviceToDevice, embedding_data_.get_local_gpu(0).get_stream()));
    }
#endif

    // reorder
    functors_.forward_reorder(embedding_data_.get_batch_size_per_gpu(is_train),
                              embedding_data_.embedding_params_.slot_num,
                              embedding_data_.embedding_params_.embedding_vec_size,
                              all2all_tensors_, embedding_data_.get_output_tensors(is_train),
                              embedding_data_.get_resource_manager());

    // store slot ids
    functors_.store_slot_id(embedding_data_.embedding_params_.get_batch_size(is_train),
                            embedding_data_.embedding_params_.slot_num, slot_num_per_gpu_,
                            embedding_data_.get_row_offsets_tensors(is_train),
                            hash_value_index_tensors_, hash_table_slot_id_tensors_,
                            embedding_data_.get_resource_manager());

    return;
  }
Copy the code

Let’s use the following figure as an example. Here, we assume that there are two samples and four slots in total. Embedding_vec_size = 8, batch_size_per_gpu = 2. Here is an important area: how to determine which GPU has which slot on top of it.

0~ 3% 2 = 0, 1, 0, 1, so 4 slots are allocated to 2 Gpus.

  • GPU 0: slot 0, slot 2;
  • GPU 1: slot 1, slot 3.

Note that the slot order is not 1,2,3,4, which is why we reorder it later. Since slot is not simply ascending, the following numeric allocation is not simply ascending, but:

  • GPU 0:1,3,5,7;

  • GPU 1:2,4,6,8;

Why is it distributed this way? This is known after the final forward propagation.

4.2 alltoall

Since the forward_per_GPU function has already been covered, let’s jump right into the alltoall operation.

As we described earlier, each GPU is stored with embedding_feature_tensors_ when it gets a dense vector locally. This is a one-dimensional array, of type DIST, of length sample_num (batch_size) * slot_num_per_GPU [I] * embedding_VEC_size. In local, this is batch_size_per_GPU * slot_num_per_gpu[I] * embedding_vec_size.

So the next step is to send each GPU embedding_Feature_tensors_ between them, and then each GPU accepts only what it should.

template <typename Type>
void SparseEmbeddingFunctors::all2all_forward(size_t batch_size_per_gpu,
                                              const std::vector<size_t> &slot_num_per_gpu,
                                              size_t embedding_vec_size,
                                              const Tensors2<Type> &send_tensors,
                                              Tensors2<Type> &recv_tensors,
                                              const ResourceManager &resource_manager) {
  size_t local_gpu_count = resource_manager.get_local_gpu_count(a);// Fill in partition table, ith Topo GPU to jth Topo GPU
  std::vector<std::vector<size_t>> table(local_gpu_count, std::vector<size_t>(local_gpu_count));
  for (size_t i = 0; i < local_gpu_count; i++) {
    size_t element_per_send = batch_size_per_gpu * slot_num_per_gpu[i] * embedding_vec_size;
    for (size_t j = 0; j < local_gpu_count; j++) { table[i][j] = element_per_send; }}std::vector<const Type *> src(local_gpu_count);
  std::vector<Type *> dst(local_gpu_count);
  for (size_t id = 0; id < local_gpu_count; id++) {
    src[id] = send_tensors[id].get_ptr(a); dst[id] = recv_tensors[id].get_ptr(a); } std::vector<std::vector<const Type *>> src_pos(local_gpu_count,
                                                 std::vector<const Type *>(local_gpu_count));
  std::vector<std::vector<Type *>> dst_pos(local_gpu_count, std::vector<Type *>(local_gpu_count));
  
  // Set the source data offset
  // Calculate the src offset pointer from each GPU to each other
  for (size_t i = 0; i < local_gpu_count; i++) {
    size_t src_offset = 0;
    for (size_t j = 0; j < local_gpu_count; j++) { src_pos[i][j] = src[i] + src_offset; src_offset += table[i][j]; }}// Set the target data offset
  // Calculate the dst offset pointer from each GPU to each other
  for (size_t i = 0; i < local_gpu_count; i++) {
    size_t dst_offset = 0;
    for (size_t j = 0; j < local_gpu_count; j++) { dst_pos[i][j] = dst[i] + dst_offset; dst_offset += table[j][i]; }}// need to know the Type
  ncclDataType_t type;
  switch (sizeof(Type)) {
    case 2:
      type = ncclHalf;
      break;
    case 4:
      type = ncclFloat;
      break;
    default:
      CK_THROW_(Error_t::WrongInput, "Error: Type not support by now");
  }

  // Do the all2all transfer
  CK_NCCL_THROW_(ncclGroupStart());
  for (size_t i = 0; i < local_gpu_count; i++) {
    const auto &local_gpu = resource_manager.get_local_gpu(i);
    for (size_t j = 0; j < local_gpu_count; j++) {
      CK_NCCL_THROW_(ncclSend(src_pos[i][j], table[i][j], type, j, local_gpu->get_nccl(),
                              local_gpu->get_stream()));
      CK_NCCL_THROW_(ncclRecv(dst_pos[i][j], table[j][i], type, j, local_gpu->get_nccl(),
                              local_gpu->get_stream())); }}CK_NCCL_THROW_(ncclGroupEnd());

  return;
}
Copy the code

MPI_Alltoall is compared to MPI_AllGahter. The difference is that:

  • MPI_AllGather: Different processes gather exactly the same data from a process (the result gathering process).
  • MPI_Alltoall: Different processes collect different data from one process (aggregation result process).

For example:

If rank=0, send 0, 1, 2, 3, 4, 5, 6, 7, 8Copy the code

It is accepted that:

If rank=0, accept 0, 3, 6, 1, 4, 7, 2, 5, 8Copy the code

For our example, the following is currently available:

GPU0 send: 1,3,5,7 GPU1 send: 2,4,6,8 GPU0 receive: 1,3,2,4 GPU1 receive: 5,7,6,8Copy the code

Get the following, “…” Stands for all2ALL_tensors_ longer than 4 items.

4.3 the Reorder

We can find that each GPU now has its own data (each GPU is a complete sample), but there is something wrong with the internal order of the sample data. Instead of ascending by slot, we roughly adjust and refine the above figure (the legend is different from the actual variables, which is just for better demonstration).

Then use Reorder to copy from all2all_tensor to embedding_data_. Get_output_tensors (is_train). Slot 0, slot 2, Slot 1, and slot3 are converted to slot 0, slot 1, Slot 2, and Slot3.

template <typename TypeEmbeddingComp>
void SparseEmbeddingFunctors::forward_reorder(size_t batch_size_per_gpu, size_t slot_num,
                                              size_t embedding_vec_size, size_t total_gpu_count,
                                              const Tensors2<TypeEmbeddingComp> &src_tensors,
                                              Tensors2<TypeEmbeddingComp> &dst_tensors,
                                              const ResourceManager &resource_manager) {
  CudaDeviceContext context;
  size_t local_gpu_count = resource_manager.get_local_gpu_count(a);for (size_t id = 0; id < local_gpu_count; id++) { // Iterate over the local GPU
    const auto &local_gpu = resource_manager.get_local_gpu(id); 
    context.set_device(local_gpu->get_device_id());

    / / copy
    do_forward_reorder(batch_size_per_gpu, slot_num, embedding_vec_size, total_gpu_count,
                       src_tensors[id].get_ptr(), dst_tensors[id].get_ptr(),
                       local_gpu->get_stream()); }}Copy the code

The do_forward_reorder code is as follows, which relies on the forward_reorder_kernel for specific logic.

template <typename TypeEmbeddingComp>
void do_forward_reorder(size_t batch_size_per_gpu, size_t slot_num, size_t embedding_vec_size,
                        size_t total_gpu_count, const TypeEmbeddingComp *input,
                        TypeEmbeddingComp *output, cudaStream_t stream) {
  const size_t grid_size = batch_size_per_gpu;
  const size_t block_size = embedding_vec_size;
  forward_reorder_kernel<<<grid_size, block_size, 0, stream>>>(
      batch_size_per_gpu, slot_num, embedding_vec_size, total_gpu_count, input, output);
}
Copy the code

This way of thinking

The specific logic is:

  • Gpu_num = gpu_num = gPU_num = gPU_num = gPU_num = gPU_num = gPU_num = gPU_num
  • Get the sample ID of the current sample on the current GPU (in fact, it is bid, each bid corresponds to a sample), and then proceed with this sample ID, so that only the sample of this GPU can be retained. For example, the second sample, sample_id = 1.
  • Get the starting position of the first slot in the current sample, e.g. 1 * 4 * 8 = 32.
  • We get the size of the embedding vector corresponding to slot, that is, the stride = 8 between slot and embedding vector
  • Traverses the sample slots, ranging from 0 to slot num, to copy those slots from all2all to embedding_data_. Get_output_tensors, Therefore, we need to find the starting slot of the sample in all2All.
  • For each slot, you need to find which GPU the slot is on.
    • The purpose of traversing a GPU is to find the location of the GPU in front of it because slots are allocated according to the GPU. Offset_pre ultimately yields how many slots are on top of the GPU before this slot.
      • The key code here is gpu_id = slot_id % gpu_num, which is used to determine “which slot is found above the buffer from which GPU”.
      • For our example, when alltoall is sent, two slots are sent together. In this case, reorder requires data search from slot to slot. In this case, gpu_id is the key point to search for.
    • Obtain several slots for each GPU.
    • Get the offset of the current SAMPLE on the current GPU.
    • Get the starting data position of the current sample in the other slot.
    • Gets the target location of the current slot in embedding_data_. Get_output_tensors.
    • Copy slot_id information corresponding to this sample.

The code is as follows:

// reorder operation after all2all in forward propagation
template <typename TypeEmbeddingComp>
__global__ void forward_reorder_kernel(int batch_size_per_gpu, int slot_num, int embedding_vec_size,
                                       int gpu_num, const TypeEmbeddingComp *input,
                                       TypeEmbeddingComp *output) {
  // blockDim.x = embedding_vec_size; // each thread corresponding to one element of embedding
  // vector gridDim.x = batch_size / gpu_num = samples_per_gpu; // each block corresponding to one
  // sample on each GPU Each thread needs to process slot_num slots

  int tid = threadIdx.x;
  int bid = blockIdx.x;

  // Sample ID of the current GPU. The following processes are based on this SAMPLE ID, so that only the sample of the current GPU can be retained
  int sample_id = bid;  // sample_id on the current GPU, such as the second sample, sample_id = 1

  if ((bid < batch_size_per_gpu) && (tid < embedding_vec_size)) {
    // The starting position of the first slot in the current sample, e.g. 1 * 4 * 8 = 32
    int dst_offset =
        sample_id * slot_num * embedding_vec_size;  // offset for the first slot of one sample
    The local vector size of a slot is the stride = 8 between slots
    int dst_stride = embedding_vec_size;            // stride from slot to slot

    // Traverses the sample slots with the range from 0 to slot num. The purpose is to copy these slots from all2all to embedding_data_. Get_output_tensors
    // So we need to find the slot of this sample at the start of all2all
    for (int slot_id = 0; slot_id < slot_num; slot_id++) { 
      int gpu_id = slot_id % gpu_num; // Key code to determine which GPU slot is on
      int offset_pre = 0;  // offset in previous gpus
      
      // The purpose of traversing the GPU is to find the location of the previous GPU, since slots are allocated according to the GPU
      // offset_pre finally yields the number of slots on top of the GPU before this slot
      for (int id = 0; id < gpu_id; id++) { 
        int slot_num_per_gpu = slot_num / gpu_num + ((id < (slot_num % gpu_num)) ? 1 : 0);
        int stride = batch_size_per_gpu * slot_num_per_gpu;
        offset_pre += stride; // Find the front position
      }
      // Each GPU corresponds to several slots
      int slot_num_per_gpu = slot_num / gpu_num + ((gpu_id < (slot_num % gpu_num)) ? 1 : 0);
      // The current sample is offset of the current GPU
      int offset_cur = sample_id * slot_num_per_gpu;  // offset in current gpu
      // Start data position of the current sample in another slot
      // (offset_cur + offset_pre + (int)(slot_id/gpu_num)) specifies the number of slots in front of this slot
      int src_addr = (offset_cur + offset_pre + (int)(slot_id / gpu_num)) * embedding_vec_size;
      
      // The current slot is the target location in the embedding_data_. Get_output_tensors directory
      int dst_addr = dst_offset + dst_stride * slot_id;
      // Copy slot_id information corresponding to this sampleoutput[dst_addr + tid] = input[src_addr + tid]; }}}Copy the code

4.3.2 graphic

And just to show you, just to simplify the logic, embedding_feature_tensors_, all2all_tensors_ is supposed to be a one-dimensional array, but it’s abstracted into a two-dimensional array.

4.4 slot id

Finally, you need to store slot ids. The reason why we need to save the slot ID of the parameter is that there were different slots on each GPU. Now, we need to put a sample of all slots on the same GPU, so we need to know which slot to load.

// store slot_id by row_offset and value_index
template <typename TypeKey, typename TypeValueIndex>
__global__ void store_slot_id_kernel(size_t batch_size,
                                     int slot_num,  // total slot number in hash table
                                     int slot_num_per_gpu,
                                     int gpu_num,  // total gpu number
                                     int gpu_id,   // global gpu device id
                                     const TypeKey *row_offset, const TypeValueIndex *value_index,
                                     TypeValueIndex *slot_id) {
  size_t gid = blockIdx.x * blockDim.x + threadIdx.x;

  if (gid < (batch_size * slot_num_per_gpu)) {
    int sid = gid % slot_num_per_gpu;
    sid = gpu_id + sid * gpu_num;  // global slot id
    if (sid < slot_num) {
      TypeKey offset = row_offset[gid];
      int value_num = row_offset[gid + 1] - offset;
      for (int i = 0; i < value_num; i++) {
        TypeValueIndex index = value_index[offset + i];  // row number
        slot_id[index] = sid;
      }
    }
  }
}

}  // namespace

template <typename TypeKey>
void SparseEmbeddingFunctors::store_slot_id(size_t batch_size, size_t slot_num,
                                            const std::vector<size_t> &slot_num_per_gpu,
                                            const Tensors2<TypeKey> &row_offset_tensors,
                                            const Tensors2<size_t> &value_index_tensors,
                                            Tensors2<size_t> &slot_id_tensors,
                                            const ResourceManager &resource_manager) {
  CudaDeviceContext context;
  size_t local_gpu_count = resource_manager.get_local_gpu_count(a);size_t total_gpu_count = resource_manager.get_global_gpu_count(a);for (size_t id = 0; id < local_gpu_count; id++) {
    if (slot_num_per_gpu[id] == 0) {
      continue;
    }

    const auto &local_gpu = resource_manager.get_local_gpu(id);
    size_t local_device_id = local_gpu->get_device_id(a);size_t global_id = local_gpu->get_global_id(a);const size_t block_size = 64;
    const size_t grid_size = (batch_size * slot_num_per_gpu[id] + block_size - 1) / block_size;

    context.set_device(local_device_id);
    store_slot_id_kernel<<<grid_size, block_size, 0, local_gpu->get_stream()>>>(
        batch_size, slot_num, slot_num_per_gpu[id], total_gpu_count, global_id,
        row_offset_tensors[id].get_ptr(), value_index_tensors[id].get_ptr(),
        slot_id_tensors[id].get_ptr()); }}Copy the code

4.5 Output Matrix

We look at the size of the output dense matrix with a function called batch_size_per_GPU * slot_num * embedding_VEC_size.

// only used for results check
/** * Get the forward() results from GPUs and copy them to the host pointer * embedding_feature. This function is only used for unit test. * @param embedding_feature the host pointer for storing the forward() * results. */
void get_forward_results(bool is_train, Tensor2<TypeEmbeddingComp> &embedding_feature) {
  size_t memcpy_size = embedding_data_.get_batch_size_per_gpu(is_train) *
                       embedding_data_.embedding_params_.slot_num *
                       embedding_data_.embedding_params_.embedding_vec_size;

  functors_.get_forward_results(memcpy_size, embedding_data_.get_output_tensors(is_train),
                                embedding_feature, utest_forward_temp_tensors_,
                                embedding_data_.get_resource_manager());

  return;
}
Copy the code

Get_batch_size_per_gpu is defined as follows:

size_t get_batch_size_per_gpu(bool is_train) const {
  return embedding_params_.get_batch_size(is_train) / resource_manager_->get_global_gpu_count(a); }Copy the code

0x05 Backward propagation

Because forward propagation has done ALL2ALL and BACKWARD successively, backward propagation needs to do its reverse operation first and then backward.

Although we know that ALL2ALL_BACKWARD and backward_reorder are reverse operations of forward propagation respectively, the code here is still quite brain-burning, and it will be better to combine with the diagram.

  /** * The first stage of backward propagation of embedding layer, * which computes the wgrad by the dgrad from the top layer. */
  void backward(a) override {
    // Read dgrad from output_tensors -> compute wgrad

    // reorder
    functors_.backward_reorder(embedding_data_.get_batch_size_per_gpu(true),
                               embedding_data_.embedding_params_.slot_num,
                               embedding_data_.embedding_params_.embedding_vec_size,
                               embedding_data_.get_output_tensors(true), all2all_tensors_,
                               embedding_data_.get_resource_manager());

		// do all2all
#ifndef ENABLE_MPI
    if (embedding_data_.get_resource_manager().get_global_gpu_count(a) >1) {
      functors_.all2all_backward(embedding_data_.get_batch_size_per_gpu(true), slot_num_per_gpu_,
                                 embedding_data_.embedding_params_.embedding_vec_size,
                                 all2all_tensors_, embedding_feature_tensors_,
                                 embedding_data_.get_resource_manager());

    } else {
      CudaDeviceContext context(embedding_data_.get_local_gpu(0).get_device_id());
      CK_CUDA_THROW_(cudaMemcpyAsync(
          embedding_feature_tensors_[0].get_ptr(), all2all_tensors_[0].get_ptr(),
          embedding_data_.get_batch_size_per_gpu(true) * slot_num_per_gpu_[0] *
              embedding_data_.embedding_params_.embedding_vec_size * sizeof(TypeEmbeddingComp),
          cudaMemcpyDeviceToDevice, embedding_data_.get_local_gpu(0).get_stream()));
    }
#else
    if (embedding_data_.get_resource_manager().get_global_gpu_count(a) >1) {
      functors_.all2all_backward(
          embedding_data_.get_batch_size_per_gpu(true), embedding_data_.embedding_params_.slot_num,
          embedding_data_.embedding_params_.embedding_vec_size, all2all_tensors_,
          embedding_feature_tensors_, embedding_data_.get_resource_manager());

    } else {
      CudaDeviceContext context(embedding_data_.get_local_gpu(0).get_device_id());
      CK_CUDA_THROW_(cudaMemcpyAsync(
          embedding_feature_tensors_[0].get_ptr(), all2all_tensors_[0].get_ptr(),
          embedding_data_.get_batch_size_per_gpu(true) * slot_num_per_gpu_[0] *
              embedding_data_.embedding_params_.embedding_vec_size * sizeof(TypeEmbeddingComp),
          cudaMemcpyDeviceToDevice, embedding_data_.get_local_gpu(0).get_stream()));
    }
#endif

    // do backward
    functors_.backward(embedding_data_.embedding_params_.get_batch_size(true), slot_num_per_gpu_,
                       embedding_data_.embedding_params_.embedding_vec_size,
                       embedding_data_.embedding_params_.combiner,
                       embedding_data_.get_row_offsets_tensors(true), embedding_feature_tensors_,
                       wgrad_tensors_, embedding_data_.get_resource_manager());

    return;
  }
Copy the code

5.1 the Reorder backward

The purpose of Reorder backpropagation is to have gradients on all gpus copied to different locations on ALL2ALL_tensors_. In the following figure, each slot corresponds to a gradient embedding vector, train_output_tensors_(gradients) is a gradient. Now the gradient on each GPU is a complete gradient of two samples.

Here, there are two bids on each GPU, corresponding to sample 1 and sample 2 respectively:

// reorder operation before all2all in backward propagation
template <typename TypeEmbeddingComp>
__global__ void backward_reorder_kernel(int batch_size_per_gpu, int slot_num,
                                        int embedding_vec_size, int gpu_num,
                                        const TypeEmbeddingComp *input, TypeEmbeddingComp *output) {
  // blockDim.x = embedding_vec_size; // each thread corresponding to one element of embedding
  // vector gridDim.x = batch_size / gpu_num = samples_per_gpu; // each block corresponding to one
  // sample on each GPU Each thread needs to process slot_num slots

  int tid = threadIdx.x;
  int bid = blockIdx.x;

  int sample_id = bid;  // sample_id on the current GPU

  if ((bid < batch_size_per_gpu) && (tid < embedding_vec_size)) {
    // Source: the starting position of the gradient in this sample. GPU0 is 0, and GPU1 is 1*4* embedding_VEC_size
    int src_offset = sample_id * slot_num * embedding_vec_size; 
    int src_stride = embedding_vec_size; / / span. Here are 4

    for (int slot_id = 0; slot_id < slot_num; slot_id++) { // The value ranges from 0 to 3
      int gpu_id = slot_id % gpu_num; // The value is 0 or 1
      int offset_pre = 0;  // offset in previous gpus
      for (int id = 0; id < gpu_id; id++) {
        // The value is 2
        int slot_num_per_gpu = slot_num / gpu_num + ((id < (slot_num % gpu_num)) ? 1 : 0);
        // The value is 2*2
        int stride = batch_size_per_gpu * slot_num_per_gpu;
        GPU0 = 0 and GPU1 = 4
        offset_pre += stride; 
      }
      
      // Target location: find the starting location of this sample in the current GPU
      // slot_num_per_gpu = 2
      int slot_num_per_gpu = slot_num / gpu_num + ((gpu_id < (slot_num % gpu_num)) ? 1 : 0);
      // 2*sample_id
      int offset_cur = sample_id * slot_num_per_gpu;  // offset in current gpu
      // Note that embedding_vec_size is 4, but we all boil it down to one slot
      // Embedding_vec_size is set to 1, so it is simplified as follows:
      // GPU0=sample_id*2+0+slot_id/gpu_num, sample1 is 0 to 1, sample2 is 4 to 5
      // GPU1=sample_id*2+4+slot_id/gpu_num, sample1 is 2 to 3, sample2 is 6 to 7
      int dst_addr = (offset_cur + offset_pre + (int)(slot_id / gpu_num)) * embedding_vec_size;

      // Source position: find the starting position of the sample in the current gradient
      // Note that embedding_vec_size is 4, but we all boil it down to one slot
      // Embedding_vec_size is set to 1, so it is simplified as follows:
      // src_offset=sample_id * slot_num
      // src_addr = sample_id * slot_num + slot_id
      // Src_addr should be sample_id * slot_num + slot_id
      // The values of GPU0 and GPU1 are sample1=0 ~ 3 and sample2=4 ~ 7
      int src_addr = src_offset + src_stride * slot_id;
      output[dst_addr + tid] = input[src_addr + tid]; // Copy the gradient of this sample into the all2ALL_tensors_ tensor where it should be}}}Copy the code

5.2 All2all backward

This is an exchange, essentially the same as the forward propagation initiation, where you send yourself out, but you only receive what you should receive. Each GPU ends up with only its own gradient on top of the original sample. As you can see, the resulting gradient corresponds exactly to the original embedding_feature_tensors_, whether it’s sample, slot, or specific value.

The specific code is as follows:

/** * nccl all2all communication for backward * @param batch_size_per_gpu batch size per GPU * @param slot_num slot number * @param embedding_vec_size embedding vector size * @param send_tensors the send tensors of multi GPUs. * @param recv_tensors the recv tensors of multi GPUs. * @param device_resources all gpus device resources. */
template <typename Type>
void SparseEmbeddingFunctors::all2all_backward(size_t batch_size_per_gpu, size_t slot_num,
                                               size_t embedding_vec_size,
                                               const Tensors2<Type> &send_tensors,
                                               Tensors2<Type> &recv_tensors,
                                               const ResourceManager &resource_manager) {
  size_t local_gpu_count = resource_manager.get_local_gpu_count(a);size_t total_gpu_count = resource_manager.get_global_gpu_count(a);size_t num_proc = resource_manager.get_num_process(a);std::vector<const Type *> src(local_gpu_count);
  std::vector<Type *> dst(local_gpu_count);
  for (size_t id = 0; id < local_gpu_count; id++) {
    src[id] = send_tensors[id].get_ptr(a);Send_tensors is a list of gpus
    dst[id] = recv_tensors[id].get_ptr(a);// Recv_tensors is a list that corresponds to multiple Gpus
  }

  std::vector<std::vector<size_t>> send_table(local_gpu_count,
                                              std::vector<size_t>(total_gpu_count));
  std::vector<std::vector<size_t>> recv_table(local_gpu_count,
                                              std::vector<size_t>(total_gpu_count));

  // Fill in receiving partition table, ith Topo GPU receive from jth global GPU
  for (size_t i = 0; i < local_gpu_count; i++) {
    size_t global_id = resource_manager.get_local_gpu(i)->get_global_id(a);size_t slot_num_per_gpu =
        slot_num / total_gpu_count + ((global_id < (slot_num % total_gpu_count)) ? 1 : 0);
    size_t element_per_recv = batch_size_per_gpu * slot_num_per_gpu * embedding_vec_size;

    for (size_t j = 0; j < total_gpu_count; j++) { recv_table[i][j] = element_per_recv; }}// Fill in sending partition table, ith Topo GPU send to jth global GPU
  for (size_t j = 0; j < total_gpu_count; j++) {
    size_t global_id = j;
    size_t slot_num_per_gpu =
        slot_num / total_gpu_count + ((global_id < (slot_num % total_gpu_count)) ? 1 : 0);
    size_t element_per_send = batch_size_per_gpu * slot_num_per_gpu * embedding_vec_size;

    for (size_t i = 0; i < local_gpu_count; i++) {
      send_table[i][j] = element_per_send;
    }
  }

  std::vector<std::vector<const Type *>> src_pos(local_gpu_count,
                                                 std::vector<const Type *>(total_gpu_count));
  std::vector<std::vector<Type *>> dst_pos(local_gpu_count, std::vector<Type *>(total_gpu_count));
  // Calculate the src offset pointer from each GPU to each other
  for (size_t i = 0; i < local_gpu_count; i++) {
    size_t src_offset = 0;
    for (size_t j = 0; j < total_gpu_count; j++) { src_pos[i][j] = src[i] + src_offset; src_offset += send_table[i][j]; }}// Calculate the dst offset pointer from each GPU to each other
  for (size_t i = 0; i < local_gpu_count; i++) {
    size_t dst_offset = 0;
    for (size_t j = 0; j < total_gpu_count; j++) { dst_pos[i][j] = dst[i] + dst_offset; dst_offset += recv_table[i][j]; }}// need to know the Type
  ncclDataType_t type;
  switch (sizeof(Type)) {
    case 2:
      type = ncclHalf;
      break;
    case 4:
      type = ncclFloat;
      break;
    default:
      CK_THROW_(Error_t::WrongInput, "Error: Type not support by now");
  }

  // Do the all2all transfer
  CK_NCCL_THROW_(ncclGroupStart());
  for (size_t i = 0; i < local_gpu_count; i++) {
    const auto &local_gpu = resource_manager.get_local_gpu(i);
    for (size_t j = 0; j < total_gpu_count; j++) {
      CK_NCCL_THROW_(ncclSend(src_pos[i][j], send_table[i][j], type, j, local_gpu->get_nccl(),
                              local_gpu->get_stream()));
      CK_NCCL_THROW_(ncclRecv(dst_pos[i][j], recv_table[i][j], type, j, local_gpu->get_nccl(),
                              local_gpu->get_stream())); }}CK_NCCL_THROW_(ncclGroupEnd());

  return;
}
Copy the code

5.3 backward

Now we get the gradient corresponding to the original sample on GPU, and then we can conduct BACKWARD. This part has been introduced before, so we will not repeat it.

// do backward
functors_.backward(embedding_data_.embedding_params_.get_batch_size(true), slot_num_per_gpu_,
                   embedding_data_.embedding_params_.embedding_vec_size,
                   embedding_data_.embedding_params_.combiner,
                   embedding_data_.get_row_offsets_tensors(true), embedding_feature_tensors_,
                   wgrad_tensors_, embedding_data_.get_resource_manager());
Copy the code

0 x06 storage

Here’s a quick analysis. When storing, rank 0 writes files.

Error_t Session::download_params_to_files_(std::string weights_file,
                                           std::string dense_opt_states_file,
                                           const std::vector<std::string>& embedding_files,
                                           const std::vector<std::string>& sparse_opt_state_files) {
  try{{// Store parameters
      int i = 0;
      for (auto& embedding_file : embedding_files) {
        embeddings_[i]->dump_parameters(embedding_file); i++; }} {// Storage optimizer
      int i = 0;
      for (auto& sparse_opt_state_file : sparse_opt_state_files) {
        std::ofstream out_stream_opt(sparse_opt_state_file, std::ofstream::binary);
        embeddings_[i]->dump_opt_states(out_stream_opt);
        out_stream_opt.close();
        i++;
      }
    }

    // rank 0 is responsible for writing files
    if (resource_manager_->is_master_process()) {
      std::ofstream out_stream_weight(weights_file, std::ofstream::binary);
      networks_[0] - >download_params_to_host(out_stream_weight);

      std::ofstream out_dense_opt_state_weight(dense_opt_states_file, std::ofstream::binary);
      networks_[0] - >download_opt_states_to_host(out_dense_opt_state_weight);

      std::string no_trained_params = networks_[0] - >get_no_trained_params_in_string(a);if (no_trained_params.length() != 0) {
        std::string ntp_file = weights_file + ".ntp.json";
        std::ofstream out_stream_ntp(ntp_file, std::ofstream::out);
        out_stream_ntp.write(no_trained_params.c_str(), no_trained_params.length());
        out_stream_ntp.close(a); } out_stream_weight.close(a); out_dense_opt_state_weight.close();
    }

  } catch (const internal_runtime_error& rt_err) {
    std::cerr << rt_err.what() << std::endl;
    return rt_err.get_error(a); }catch (const std::exception& err) {
    std::cerr << err.what() << std::endl;
    return Error_t::UnspecificError;
  }
  return Error_t::Success;
}
Copy the code

In the case of Optimizer, other worker nodes send data to Rank 0, which processes the data after receiving it.

template <typename TypeEmbeddingComp>
void SparseEmbeddingFunctors::dump_opt_states(
    std::ofstream& stream, const ResourceManager& resource_manager,
    std::vector<Tensors2<TypeEmbeddingComp>>& opt_states) {
  size_t local_gpu_count = resource_manager.get_local_gpu_count(a); CudaDeviceContext context;for (auto& opt_state : opt_states) {
    size_t total_size = 0;
    for (size_t id = 0; id < local_gpu_count; id++) {
      total_size += opt_state[id].get_size_in_bytes(a); }size_t max_size = total_size;

#ifdef ENABLE_MPI
    bool is_master_process = resource_manager.is_master_process(a);CK_MPI_THROW_(MPI_Reduce(is_master_process ? MPI_IN_PLACE : &max_size, &max_size,
                             sizeof(size_t), MPI_CHAR, MPI_MAX,
                             resource_manager.get_master_process_id(), MPI_COMM_WORLD));
#endif

    std::unique_ptr<char[] >h_opt_state(new char[max_size]);
    size_t offset = 0;
    for (size_t id = 0; id < local_gpu_count; id++) {
      size_t local_size = opt_state[id].get_size_in_bytes(a);auto& local_gpu = resource_manager.get_local_gpu(id);
      context.set_device(local_gpu->get_device_id());
      CK_CUDA_THROW_(cudaMemcpyAsync(h_opt_state.get() + offset, opt_state[id].get_ptr(),
                                     local_size, cudaMemcpyDeviceToHost, local_gpu->get_stream()));
      offset += local_size;
    }
    sync_all_gpus(resource_manager);

    int pid = resource_manager.get_process_id(a);if (resource_manager.is_master_process()) {
      // rank 0 is responsible for writing
      stream.write(h_opt_state.get(), total_size);
    }
#ifdef ENABLE_MPI
    else {
      // Other worker nodes send data to rank0
      int tag = (pid << 8) | 0xBA;
      CK_MPI_THROW_(MPI_Send(h_opt_state.get(), total_size, MPI_CHAR,
                             resource_manager.get_master_process_id(), tag, MPI_COMM_WORLD));
    }

    if (resource_manager.is_master_process()) {
      for (int r = 1; r < resource_manager.get_num_process(a); r++) {int tag = (r << 8) | 0xBA;
        int recv_size = 0;
        MPI_Status status;
        CK_MPI_THROW_(MPI_Probe(r, tag, MPI_COMM_WORLD, &status));
        CK_MPI_THROW_(MPI_Get_count(&status, MPI_CHAR, &recv_size));
        // Rank 0 receives data
        CK_MPI_THROW_(MPI_Recv(h_opt_state.get(), recv_size, MPI_CHAR, r, tag, MPI_COMM_WORLD,
                               MPI_STATUS_IGNORE));
        stream.write(h_opt_state.get(), recv_size); }}#endif
    MESSAGE_("Done"); }}Copy the code

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

0 XFF reference

Developer.nvidia.com/blog/introd…

Developer.nvidia.com/blog/announ…

Developer.nvidia.com/blog/accele…

Read HugeCTR source code

How does embedding propagate back

Web.eecs.umich.edu/~justincj/t…

Sparse matrix storage format summary + storage efficiency comparison :COO,CSR,DIA,ELL,HYB

Out of Thin Air: On the Embedding idea in recommendation algorithm

Principle of the tf.nm.embedding_lookup function

Tensorflow’s embedding_lookup interface is embedding_lookup.

How does embedding do in the recommended scene of big factory