0 x00 the
In this series, we introduced HugeCTR, an industry-oriented recommendation system training framework optimized for large-scale CTR models with model parallel embedding and data-parallel dense networks. This paper introduces LocalizedSlotSparseEmbeddingHash after operation.
Thanks for using HugeCTR source code to read this masterpiece.
Other articles in this series are as follows:
NVIDIA HugeCTR, GPU version parameter server –(1)
NVIDIA HugeCTR, GPU version parameter server — (2)
NVIDIA HugeCTR, GPU version parameter server –(3)
NVIDIA HugeCTR, GPU version parameter server — (4)
NVIDIA HugeCTR GPU version Parameter server — (5) Embedded hash table
NVIDIA HugeCTR, GPU version parameter (6) — Distributed hash table
NVIDIA HugeCTR, GPU version parameter server –(7) –Distributed Hash before transmission
NVIDIA HugeCTR, GPU version parameter server –(8) –Distributed Hash after transmission
0x01 Previous review
From the previous analysis, we can learn that the overall flow of an embedded table LOOKUP is as follows.
0 x02 definition
LocalizedSlotSparseEmbeddingHash class inherits from the Embedding, Embedding is the realization of the base class for all embedded layers. In LocalizedSlotSparseEmbeddingHash class, embedded in the table some slots are assigned to a single GPU, called localization slot. For example, slot 0 on GPU-0, slot 1 on GPU-1, slot 2 on GPU-0, and slot 3 on GPU-1. In contrast, some of DistributedSlotSparseEmbeddingHash slots assigned to more than one GPU.
Embedded tables are encapsulated in a Hash table. The key in the hash table is called hash_table_key, and the value in the hash table is called hash_table_value_index, which represents the line number of the embedding feature in the embedded table, and the embedding feature is called hash_table_value.
LocalizedSlotSparseEmbeddingHash layer implements the embedded training process for all operations, including to spread before and after the spread. Forward propagation corresponds to API forward. Backpropagation is divided into two phases of the API: BACKWARD and update_Params. This class also provides the operation of uploading a hash table (including hash table keys, hash table value indexes, and hash table values) from a host file to a GPU named load_parameters, and downloading a hash table from the GPU to a host file named dump_Parameters.
template <typename TypeHashKey, typename TypeEmbeddingComp>
class LocalizedSlotSparseEmbeddingHash : public IEmbedding {
using NvHashTable = HashTable<TypeHashKey, size_t>;
private:
EmbeddingData<TypeHashKey, TypeEmbeddingComp> embedding_data_;
std::vector<LocalizedFilterKeyStorage<TypeHashKey>> filter_keys_storages_;
std::vector<std::shared_ptr<NvHashTable>> hash_tables_; /**< Hash table. */
// define tensors
Tensors2<float> hash_table_value_tensors_; /**< Hash table value. */
std::vector<Tensors2<float>> value_table_tensors_;
Tensors2<size_t> hash_table_slot_id_tensors_; /**< the tensors for storing slot ids */
Tensors2<size_t> hash_value_index_tensors_; /**< Hash value index. The index is corresponding to the line number of the value. */
Tensors2<TypeEmbeddingComp>
embedding_feature_tensors_; /**< the output tensor of the forward(). */
Tensors2<TypeEmbeddingComp> wgrad_tensors_; /**< the input tensor of the backward(). */
std::vector<EmbeddingOptimizer<TypeHashKey, TypeEmbeddingComp>> embedding_optimizers_;
size_t max_vocabulary_size_;
size_t max_vocabulary_size_per_gpu_; /**< Max vocabulary size for each GPU. */
std::vector<size_t> slot_num_per_gpu_; /* slot_num per GPU */
std::vector<size_t> slot_size_array_;
SparseEmbeddingFunctors functors_;
Tensors2<TypeEmbeddingComp> all2all_tensors_; /**< the temple buffer to store all2all results */
Tensors2<TypeEmbeddingComp> utest_all2all_tensors_;
Tensors2<TypeEmbeddingComp> utest_reorder_tensors_;
Tensors2<TypeEmbeddingComp> utest_backward_temp_tensors_;
Tensors2<TypeEmbeddingComp> utest_forward_temp_tensors_;
}
Copy the code
0 x03 build
3.1 call
In HugeCTR/SRC/parsers/create_embedding. CPP, has the following call:
case Embedding_t::LocalizedSlotSparseEmbeddingHash: {
const SparseEmbeddingHashParams embedding_params = {batch_size,
batch_size_eval,
max_vocabulary_size_per_gpu,
slot_size_array,
embedding_vec_size,
sparse_input.max_feature_num_per_sample,
sparse_input.slot_num,
combiner, // combiner: 0-sum, 1-mean
embedding_opt_params};
embeddings.emplace_back(new LocalizedSlotSparseEmbeddingHash<TypeKey, TypeFP>(
sparse_input.train_sparse_tensors, sparse_input.evaluate_sparse_tensors, embedding_params,
resource_manager));
break;
}
Copy the code
3.2 Constructors
LocalizedSlotSparseEmbeddingHash constructor as follows, please see the following specific logic annotations.
template <typename TypeHashKey, typename TypeEmbeddingComp>
LocalizedSlotSparseEmbeddingHash<TypeHashKey, TypeEmbeddingComp>::LocalizedSlotSparseEmbeddingHash(
const SparseTensors<TypeHashKey> &train_keys, const SparseTensors<TypeHashKey> &evaluate_keys,
const SparseEmbeddingHashParams &embedding_params,
const std::shared_ptr<ResourceManager> &resource_manager)
: embedding_data_(Embedding_t::LocalizedSlotSparseEmbeddingHash, train_keys, evaluate_keys,
embedding_params, resource_manager),
slot_size_array_(embedding_params.slot_size_array) {
try {
// Set the maximum amount of data per GPU
if (slot_size_array_.empty()) {
max_vocabulary_size_per_gpu_ = embedding_data_.embedding_params_.max_vocabulary_size_per_gpu;
max_vocabulary_size_ = embedding_data_.embedding_params_.max_vocabulary_size_per_gpu *
embedding_data_.get_resource_manager().get_global_gpu_count(a); }else {
max_vocabulary_size_per_gpu_ =
cal_max_voc_size_per_gpu(slot_size_array_, embedding_data_.get_resource_manager());
max_vocabulary_size_ = 0;
for (size_t slot_size : slot_size_array_) {
max_vocabulary_size_ += slot_size;
}
}
CudaDeviceContext context;
// Iterate over the local GPU
for (size_t id = 0; id < embedding_data_.get_resource_manager().get_local_gpu_count(a); id++) {// Set the current context
context.set_device(embedding_data_.get_local_gpu(id).get_device_id());
// Number of slots per GPU
size_t gid = embedding_data_.get_local_gpu(id).get_global_id(a);size_t slot_num_per_gpu =
embedding_data_.embedding_params_.slot_num /
embedding_data_.get_resource_manager().get_global_gpu_count() +
((gid < embedding_data_.embedding_params_.slot_num %
embedding_data_.get_resource_manager().get_global_gpu_count())?1
: 0);
slot_num_per_gpu_.push_back(slot_num_per_gpu);
// new GeneralBuffer objects
const std::shared_ptr<GeneralBuffer2<CudaAllocator>> &buf = embedding_data_.get_buffer(id);
embedding_optimizers_.emplace_back(max_vocabulary_size_per_gpu_,
embedding_data_.embedding_params_, buf);
// Allocate memory for various variables
// new hash table value vectors
if (slot_size_array_.empty()) {
Tensor2<float> tensor;
buf->reserve(
{max_vocabulary_size_per_gpu_, embedding_data_.embedding_params_.embedding_vec_size},
&tensor);
hash_table_value_tensors_.push_back(tensor);
} else {
const std::shared_ptr<BufferBlock2<float>> &block = buf->create_block<float> (); Tensors2<float> tensors;
size_t vocabulary_size_in_current_gpu = 0;
for (size_t i = 0; i < slot_size_array_.size(a); i++) {if ((i % embedding_data_.get_resource_manager().get_global_gpu_count()) == gid) {
Tensor2<float> tensor;
block->reserve(
{slot_size_array_[i], embedding_data_.embedding_params_.embedding_vec_size},
&tensor);
tensors.push_back(tensor);
vocabulary_size_in_current_gpu += slot_size_array_[i];
}
}
value_table_tensors_.push_back(tensors);
if (max_vocabulary_size_per_gpu_ > vocabulary_size_in_current_gpu) {
Tensor2<float> padding_tensor_for_optimizer;
block->reserve({max_vocabulary_size_per_gpu_ - vocabulary_size_in_current_gpu,
embedding_data_.embedding_params_.embedding_vec_size},
&padding_tensor_for_optimizer);
}
hash_table_value_tensors_.push_back(block->as_tensor());
}
{
Tensor2<TypeHashKey> tensor;
buf->reserve({embedding_data_.embedding_params_.get_batch_size(true),
embedding_data_.embedding_params_.max_feature_num},
&tensor);
embedding_data_.train_value_tensors_.push_back(tensor);
}
{
Tensor2<TypeHashKey> tensor;
buf->reserve({embedding_data_.embedding_params_.get_batch_size(false),
embedding_data_.embedding_params_.max_feature_num},
&tensor);
embedding_data_.evaluate_value_tensors_.push_back(tensor);
}
{
Tensor2<TypeHashKey> tensor;
buf->reserve(
{embedding_data_.embedding_params_.get_batch_size(true) * slot_num_per_gpu + 1},
&tensor);
embedding_data_.train_row_offsets_tensors_.push_back(tensor);
}
{
Tensor2<TypeHashKey> tensor;
buf->reserve(
{embedding_data_.embedding_params_.get_batch_size(false) * slot_num_per_gpu + 1},
&tensor);
embedding_data_.evaluate_row_offsets_tensors_.push_back(tensor);
}
{ embedding_data_.train_nnz_array_.push_back(std::make_shared<size_t> (0)); }
{ embedding_data_.evaluate_nnz_array_.push_back(std::make_shared<size_t> (0)); }
// new hash table value_index that get() from HashTable
{
Tensor2<size_t> tensor;
buf->reserve({1, embedding_data_.embedding_params_.get_universal_batch_size() *
embedding_data_.embedding_params_.max_feature_num},
&tensor);
hash_value_index_tensors_.push_back(tensor);
}
// new embedding features reduced by hash table values(results of forward)
{
Tensor2<TypeEmbeddingComp> tensor;
buf->reserve(
{embedding_data_.embedding_params_.get_universal_batch_size() * slot_num_per_gpu,
embedding_data_.embedding_params_.embedding_vec_size},
&tensor);
embedding_feature_tensors_.push_back(tensor);
}
// new wgrad used by backward
{
Tensor2<TypeEmbeddingComp> tensor;
buf->reserve({embedding_data_.embedding_params_.get_batch_size(true) * slot_num_per_gpu,
embedding_data_.embedding_params_.embedding_vec_size},
&tensor);
wgrad_tensors_.push_back(tensor);
}
// the tenosrs for storing slot ids
// TODO: init to -1 ?
{
Tensor2<size_t> tensor;
buf->reserve({max_vocabulary_size_per_gpu_, 1}, &tensor);
hash_table_slot_id_tensors_.push_back(tensor);
}
// temp tensors for all2all
{
Tensor2<TypeEmbeddingComp> tensor;
buf->reserve({embedding_data_.get_universal_batch_size_per_gpu() *
embedding_data_.embedding_params_.slot_num,
embedding_data_.embedding_params_.embedding_vec_size},
&tensor);
all2all_tensors_.push_back(tensor);
}
{
Tensor2<TypeEmbeddingComp> tensor;
buf->reserve({embedding_data_.embedding_params_.get_universal_batch_size() *
embedding_data_.embedding_params_.slot_num,
embedding_data_.embedding_params_.embedding_vec_size},
&tensor);
utest_forward_temp_tensors_.push_back(tensor);
}
{
Tensor2<TypeEmbeddingComp> tensor;
buf->reserve({embedding_data_.get_batch_size_per_gpu(true) *
embedding_data_.embedding_params_.slot_num,
embedding_data_.embedding_params_.embedding_vec_size},
&tensor);
utest_all2all_tensors_.push_back(tensor);
}
{
Tensor2<TypeEmbeddingComp> tensor;
buf->reserve({embedding_data_.get_batch_size_per_gpu(true) *
embedding_data_.embedding_params_.slot_num,
embedding_data_.embedding_params_.embedding_vec_size},
&tensor);
utest_reorder_tensors_.push_back(tensor);
}
{
Tensor2<TypeEmbeddingComp> tensor;
buf->reserve({embedding_data_.embedding_params_.get_batch_size(true) *
embedding_data_.embedding_params_.slot_num,
embedding_data_.embedding_params_.embedding_vec_size},
&tensor);
utest_backward_temp_tensors_.push_back(tensor);
}
{
size_t max_nnz = embedding_data_.embedding_params_.get_universal_batch_size() *
embedding_data_.embedding_params_.max_feature_num;
size_t rowoffset_count = embedding_data_.embedding_params_.slot_num *
embedding_data_.embedding_params_.get_universal_batch_size() +
1;
filter_keys_storages_.emplace_back(buf, max_nnz, rowoffset_count);
}
}
hash_tables_.resize(embedding_data_.get_resource_manager().get_local_gpu_count());
#pragma omp parallel for num_threads(embedding_data_.get_resource_manager().get_local_gpu_count())
for (size_t id = 0; id < embedding_data_.get_resource_manager().get_local_gpu_count(a); id++) {// Initialize the internal hash table
CudaDeviceContext context(embedding_data_.get_local_gpu(id).get_device_id());
// construct HashTable object: used to store hash table <key, value_index>
hash_tables_[id].reset(new NvHashTable(max_vocabulary_size_per_gpu_));
embedding_data_.get_buffer(id)->allocate(a); }// Initialize the optimizer
for (size_t id = 0; id < embedding_data_.get_resource_manager().get_local_gpu_count(a); id++) { context.set_device(embedding_data_.get_local_gpu(id).get_device_id());
embedding_optimizers_[id].initialize(embedding_data_.get_local_gpu(id));
} // end of for(int id = 0; id < embedding_data_.get_local_gpu_count(); id++)
if(! embedding_data_.embedding_params_.slot_size_array.empty()) {
std::vector<TypeHashKey> embedding_offsets;
TypeHashKey slot_sizes_prefix_sum = 0;
for (size_t i = 0; i < embedding_data_.embedding_params_.slot_size_array.size(a); i++) { embedding_offsets.push_back(slot_sizes_prefix_sum);
slot_sizes_prefix_sum += embedding_data_.embedding_params_.slot_size_array[i];
}
for (size_t id = 0; id < embedding_data_.get_resource_manager().get_local_gpu_count(a); ++id) {CudaDeviceContext context(embedding_data_.get_local_gpu(id).get_device_id());
CK_CUDA_THROW_(
cudaMemcpy(embedding_data_.embedding_offsets_[id].get_ptr(), embedding_offsets.data(),
embedding_offsets.size(*)sizeof(TypeHashKey), cudaMemcpyHostToDevice)); }}// sync
functors_.sync_all_gpus(embedding_data_.get_resource_manager());
} catch (const std::runtime_error &rt_err) {
std::cerr << rt_err.what() << std::endl;
throw;
}
return;
}
Copy the code
3.3 How do I Determine a Slot
Let’s look at how to determine which GPU has which slot. Init_embedding is called in init_params to complete the build.
/** * Initialize the embedding table */
void init_params(a) override {
// do hash table value initialization
if (slot_size_array_.empty()) { // if no slot_sizes provided, use the old method to init
init_embedding(max_vocabulary_size_per_gpu_,
embedding_data_.embedding_params_.embedding_vec_size,
hash_table_value_tensors_);
} else {
if (slot_size_array_.size() == embedding_data_.embedding_params_.slot_num) {
#ifndef DATA_READING_TEST
init_embedding(slot_size_array_, embedding_data_.embedding_params_.embedding_vec_size,
value_table_tensors_, hash_table_slot_id_tensors_);
#endif
} else {
throw std::runtime_error(
std::string("[HCDEBUG][ERROR] Runtime error: the size of slot_sizes ! = slot_num\n")); }}}Copy the code
Init_embedding will create an embedded table on each GPU.
template <typename TypeHashKey, typename TypeEmbeddingComp>
void LocalizedSlotSparseEmbeddingHash<TypeHashKey, TypeEmbeddingComp>::init_embedding(
const std::vector<size_t> &slot_sizes, size_t embedding_vec_size,
std::vector<Tensors2<float>> &hash_table_value_tensors,
Tensors2<size_t> &hash_table_slot_id_tensors) {
// Get the number of Gpus on the local node and the number of gpus globally
size_t local_gpu_count = embedding_data_.get_resource_manager().get_local_gpu_count(a);size_t total_gpu_count = embedding_data_.get_resource_manager().get_global_gpu_count(a);for (size_t id = 0; id < local_gpu_count; id++) { // Iterate over the local GPU
// Use the global ID here
size_t device_id = embedding_data_.get_local_gpu(id).get_device_id(a);size_t global_id = embedding_data_.get_local_gpu(id).get_global_id(a); functors_.init_embedding_per_gpu(global_id, total_gpu_count, slot_sizes, embedding_vec_size,
hash_table_value_tensors[id], hash_table_slot_id_tensors[id],
embedding_data_.get_local_gpu(id));
}
for (size_t id = 0; id < local_gpu_count; id++) {
CK_CUDA_THROW_(cudaStreamSynchronize(embedding_data_.get_local_gpu(id).get_stream()));
}
return;
}
Copy the code
Let’s look at init_embedding_per_GPU, which is simply allocated with % arithmetic. For example, if there are 10 slots and 3 Gpus, the slot ids are 0 to 9 and the GPU ids are 0 to 2. 0~ 10% 3 = 0,1,2,0,1,2,0,1,2,0,1,2,0
-
GPU 0:0, 3, 6, 9
-
GPU 1:1, 4, 7,
-
GPU 2:2, 5, 8,
Therefore, slots per GPU are not equal.
void SparseEmbeddingFunctors::init_embedding_per_gpu(size_t gid, size_t total_gpu_count,
const std::vector<size_t> &slot_sizes,
size_t embedding_vec_size,
Tensors2<float> &embedding_tables,
Tensor2<size_t> &slot_ids,
const GPUResource &gpu_resource) {
CudaDeviceContext context(gpu_resource.get_device_id());
size_t *slot_ids_ptr = slot_ids.get_ptr(a);size_t key_offset = 0;
size_t value_index_offset = 0;
for (size_t i = 0, j = 0; i < slot_sizes.size(a); i++) {/ / traverse slot
size_t slot_size = slot_sizes[i];
if ((i % total_gpu_count) == gid) { / / the GPU id
// The operation continues only when I is equal to gid
float up_bound = sqrt(1.f / slot_size);
HugeCTR::UniformGenerator::fill(
embedding_tables[j++], -up_bound, up_bound, gpu_resource.get_sm_count(),
gpu_resource.get_replica_variant_curand_generator(), gpu_resource.get_stream());
// Configure slot ids
memset_const(slot_ids_ptr, i, slot_size, gpu_resource.get_stream()); value_index_offset += slot_size; slot_ids_ptr += slot_size; } key_offset += slot_size; }}Copy the code
0x04 Forward Propagation
4.1 an overview
Let’s first summarize the steps of forward propagation:
-
First, configure EmbeddingData using filter_keys_per_GPU.
-
Secondly, forward_per_GPU is used to lookup from embedding, that is, functors_. Forward_per_gpu is called to lookup from the hashmap of the local GPU to get a dense vector.
-
Use all2ALL_forward to have all data for all samples on each GPU. The ultimate goal here is similar to dist idea. Each GPU only has several complete samples at last, and the samples are different on different Gpus. Therefore, data of the current sample in another slot needs to be copied to this GPU. In other words, in the result of all2all, only other slots of the current sample are selected.
-
Use forward_reorder to internally reorder the data for each GPU (more on that later).
-
Use store_slot_id to store slot ids. The reason why we need to save the slot ID of the parameter is that there were different slots on each GPU. Now, we need to put a sample of all slots on the same GPU, so we need to know which slot to load.
The specific code is as follows:
/** * The forward propagation of embedding layer. */
void forward(bool is_train, int eval_batch = - 1) override {
#pragma omp parallel num_threads(embedding_data_.get_resource_manager().get_local_gpu_count())
{
size_t i = omp_get_thread_num(a);CudaDeviceContext context(embedding_data_.get_local_gpu(i).get_device_id());
if (embedding_data_.embedding_params_.is_data_parallel) {
filter_keys_per_gpu(is_train, i, embedding_data_.get_local_gpu(i).get_global_id(),
embedding_data_.get_resource_manager().get_global_gpu_count());
}
functors_.forward_per_gpu(
embedding_data_.embedding_params_.get_batch_size(is_train), slot_num_per_gpu_[i],
embedding_data_.embedding_params_.embedding_vec_size,
embedding_data_.embedding_params_.combiner, is_train,
embedding_data_.get_row_offsets_tensors(is_train)[i],
embedding_data_.get_value_tensors(is_train)[i],
*embedding_data_.get_nnz_array(is_train)[i], *hash_tables_[i],
hash_table_value_tensors_[i], hash_value_index_tensors_[i], embedding_feature_tensors_[i],
embedding_data_.get_local_gpu(i).get_stream());
}
Embedding_feature_tensors_ specifies the embedding vector
// do all-to-all
#ifndef ENABLE_MPI
if (embedding_data_.get_resource_manager().get_global_gpu_count(a) >1) {
functors_.all2all_forward(embedding_data_.get_batch_size_per_gpu(is_train), slot_num_per_gpu_,
embedding_data_.embedding_params_.embedding_vec_size,
embedding_feature_tensors_, all2all_tensors_,
embedding_data_.get_resource_manager());
} else {
CK_CUDA_THROW_(cudaMemcpyAsync(
all2all_tensors_[0].get_ptr(), embedding_feature_tensors_[0].get_ptr(),
embedding_data_.get_batch_size_per_gpu(is_train) * slot_num_per_gpu_[0] *
embedding_data_.embedding_params_.embedding_vec_size * sizeof(TypeEmbeddingComp),
cudaMemcpyDeviceToDevice, embedding_data_.get_local_gpu(0).get_stream()));
}
#else
if (embedding_data_.get_resource_manager().get_global_gpu_count(a) >1) {
functors_.all2all_forward(embedding_data_.get_batch_size_per_gpu(is_train),
embedding_data_.embedding_params_.slot_num,
embedding_data_.embedding_params_.embedding_vec_size,
embedding_feature_tensors_, all2all_tensors_,
embedding_data_.get_resource_manager());
} else {
CK_CUDA_THROW_(cudaMemcpyAsync(
all2all_tensors_[0].get_ptr(), embedding_feature_tensors_[0].get_ptr(),
(size_t)embedding_data_.get_batch_size_per_gpu(is_train) * slot_num_per_gpu_[0] *
embedding_data_.embedding_params_.embedding_vec_size * sizeof(TypeEmbeddingComp),
cudaMemcpyDeviceToDevice, embedding_data_.get_local_gpu(0).get_stream()));
}
#endif
// reorder
functors_.forward_reorder(embedding_data_.get_batch_size_per_gpu(is_train),
embedding_data_.embedding_params_.slot_num,
embedding_data_.embedding_params_.embedding_vec_size,
all2all_tensors_, embedding_data_.get_output_tensors(is_train),
embedding_data_.get_resource_manager());
// store slot ids
functors_.store_slot_id(embedding_data_.embedding_params_.get_batch_size(is_train),
embedding_data_.embedding_params_.slot_num, slot_num_per_gpu_,
embedding_data_.get_row_offsets_tensors(is_train),
hash_value_index_tensors_, hash_table_slot_id_tensors_,
embedding_data_.get_resource_manager());
return;
}
Copy the code
Let’s use the following figure as an example. Here, we assume that there are two samples and four slots in total. Embedding_vec_size = 8, batch_size_per_gpu = 2. Here is an important area: how to determine which GPU has which slot on top of it.
0~ 3% 2 = 0, 1, 0, 1, so 4 slots are allocated to 2 Gpus.
- GPU 0: slot 0, slot 2;
- GPU 1: slot 1, slot 3.
Note that the slot order is not 1,2,3,4, which is why we reorder it later. Since slot is not simply ascending, the following numeric allocation is not simply ascending, but:
-
GPU 0:1,3,5,7;
-
GPU 1:2,4,6,8;
Why is it distributed this way? This is known after the final forward propagation.
4.2 alltoall
Since the forward_per_GPU function has already been covered, let’s jump right into the alltoall operation.
As we described earlier, each GPU is stored with embedding_feature_tensors_ when it gets a dense vector locally. This is a one-dimensional array, of type DIST, of length sample_num (batch_size) * slot_num_per_GPU [I] * embedding_VEC_size. In local, this is batch_size_per_GPU * slot_num_per_gpu[I] * embedding_vec_size.
So the next step is to send each GPU embedding_Feature_tensors_ between them, and then each GPU accepts only what it should.
template <typename Type>
void SparseEmbeddingFunctors::all2all_forward(size_t batch_size_per_gpu,
const std::vector<size_t> &slot_num_per_gpu,
size_t embedding_vec_size,
const Tensors2<Type> &send_tensors,
Tensors2<Type> &recv_tensors,
const ResourceManager &resource_manager) {
size_t local_gpu_count = resource_manager.get_local_gpu_count(a);// Fill in partition table, ith Topo GPU to jth Topo GPU
std::vector<std::vector<size_t>> table(local_gpu_count, std::vector<size_t>(local_gpu_count));
for (size_t i = 0; i < local_gpu_count; i++) {
size_t element_per_send = batch_size_per_gpu * slot_num_per_gpu[i] * embedding_vec_size;
for (size_t j = 0; j < local_gpu_count; j++) { table[i][j] = element_per_send; }}std::vector<const Type *> src(local_gpu_count);
std::vector<Type *> dst(local_gpu_count);
for (size_t id = 0; id < local_gpu_count; id++) {
src[id] = send_tensors[id].get_ptr(a); dst[id] = recv_tensors[id].get_ptr(a); } std::vector<std::vector<const Type *>> src_pos(local_gpu_count,
std::vector<const Type *>(local_gpu_count));
std::vector<std::vector<Type *>> dst_pos(local_gpu_count, std::vector<Type *>(local_gpu_count));
// Set the source data offset
// Calculate the src offset pointer from each GPU to each other
for (size_t i = 0; i < local_gpu_count; i++) {
size_t src_offset = 0;
for (size_t j = 0; j < local_gpu_count; j++) { src_pos[i][j] = src[i] + src_offset; src_offset += table[i][j]; }}// Set the target data offset
// Calculate the dst offset pointer from each GPU to each other
for (size_t i = 0; i < local_gpu_count; i++) {
size_t dst_offset = 0;
for (size_t j = 0; j < local_gpu_count; j++) { dst_pos[i][j] = dst[i] + dst_offset; dst_offset += table[j][i]; }}// need to know the Type
ncclDataType_t type;
switch (sizeof(Type)) {
case 2:
type = ncclHalf;
break;
case 4:
type = ncclFloat;
break;
default:
CK_THROW_(Error_t::WrongInput, "Error: Type not support by now");
}
// Do the all2all transfer
CK_NCCL_THROW_(ncclGroupStart());
for (size_t i = 0; i < local_gpu_count; i++) {
const auto &local_gpu = resource_manager.get_local_gpu(i);
for (size_t j = 0; j < local_gpu_count; j++) {
CK_NCCL_THROW_(ncclSend(src_pos[i][j], table[i][j], type, j, local_gpu->get_nccl(),
local_gpu->get_stream()));
CK_NCCL_THROW_(ncclRecv(dst_pos[i][j], table[j][i], type, j, local_gpu->get_nccl(),
local_gpu->get_stream())); }}CK_NCCL_THROW_(ncclGroupEnd());
return;
}
Copy the code
MPI_Alltoall is compared to MPI_AllGahter. The difference is that:
- MPI_AllGather: Different processes gather exactly the same data from a process (the result gathering process).
- MPI_Alltoall: Different processes collect different data from one process (aggregation result process).
For example:
If rank=0, send 0, 1, 2, 3, 4, 5, 6, 7, 8Copy the code
It is accepted that:
If rank=0, accept 0, 3, 6, 1, 4, 7, 2, 5, 8Copy the code
For our example, the following is currently available:
GPU0 send: 1,3,5,7 GPU1 send: 2,4,6,8 GPU0 receive: 1,3,2,4 GPU1 receive: 5,7,6,8Copy the code
Get the following, “…” Stands for all2ALL_tensors_ longer than 4 items.
4.3 the Reorder
We can find that each GPU now has its own data (each GPU is a complete sample), but there is something wrong with the internal order of the sample data. Instead of ascending by slot, we roughly adjust and refine the above figure (the legend is different from the actual variables, which is just for better demonstration).
Then use Reorder to copy from all2all_tensor to embedding_data_. Get_output_tensors (is_train). Slot 0, slot 2, Slot 1, and slot3 are converted to slot 0, slot 1, Slot 2, and Slot3.
template <typename TypeEmbeddingComp>
void SparseEmbeddingFunctors::forward_reorder(size_t batch_size_per_gpu, size_t slot_num,
size_t embedding_vec_size, size_t total_gpu_count,
const Tensors2<TypeEmbeddingComp> &src_tensors,
Tensors2<TypeEmbeddingComp> &dst_tensors,
const ResourceManager &resource_manager) {
CudaDeviceContext context;
size_t local_gpu_count = resource_manager.get_local_gpu_count(a);for (size_t id = 0; id < local_gpu_count; id++) { // Iterate over the local GPU
const auto &local_gpu = resource_manager.get_local_gpu(id);
context.set_device(local_gpu->get_device_id());
/ / copy
do_forward_reorder(batch_size_per_gpu, slot_num, embedding_vec_size, total_gpu_count,
src_tensors[id].get_ptr(), dst_tensors[id].get_ptr(),
local_gpu->get_stream()); }}Copy the code
The do_forward_reorder code is as follows, which relies on the forward_reorder_kernel for specific logic.
template <typename TypeEmbeddingComp>
void do_forward_reorder(size_t batch_size_per_gpu, size_t slot_num, size_t embedding_vec_size,
size_t total_gpu_count, const TypeEmbeddingComp *input,
TypeEmbeddingComp *output, cudaStream_t stream) {
const size_t grid_size = batch_size_per_gpu;
const size_t block_size = embedding_vec_size;
forward_reorder_kernel<<<grid_size, block_size, 0, stream>>>(
batch_size_per_gpu, slot_num, embedding_vec_size, total_gpu_count, input, output);
}
Copy the code
This way of thinking
The specific logic is:
- Gpu_num = gpu_num = gPU_num = gPU_num = gPU_num = gPU_num = gPU_num = gPU_num
- Get the sample ID of the current sample on the current GPU (in fact, it is bid, each bid corresponds to a sample), and then proceed with this sample ID, so that only the sample of this GPU can be retained. For example, the second sample, sample_id = 1.
- Get the starting position of the first slot in the current sample, e.g. 1 * 4 * 8 = 32.
- We get the size of the embedding vector corresponding to slot, that is, the stride = 8 between slot and embedding vector
- Traverses the sample slots, ranging from 0 to slot num, to copy those slots from all2all to embedding_data_. Get_output_tensors, Therefore, we need to find the starting slot of the sample in all2All.
- For each slot, you need to find which GPU the slot is on.
- The purpose of traversing a GPU is to find the location of the GPU in front of it because slots are allocated according to the GPU. Offset_pre ultimately yields how many slots are on top of the GPU before this slot.
- The key code here is gpu_id = slot_id % gpu_num, which is used to determine “which slot is found above the buffer from which GPU”.
- For our example, when alltoall is sent, two slots are sent together. In this case, reorder requires data search from slot to slot. In this case, gpu_id is the key point to search for.
- Obtain several slots for each GPU.
- Get the offset of the current SAMPLE on the current GPU.
- Get the starting data position of the current sample in the other slot.
- Gets the target location of the current slot in embedding_data_. Get_output_tensors.
- Copy slot_id information corresponding to this sample.
- The purpose of traversing a GPU is to find the location of the GPU in front of it because slots are allocated according to the GPU. Offset_pre ultimately yields how many slots are on top of the GPU before this slot.
The code is as follows:
// reorder operation after all2all in forward propagation
template <typename TypeEmbeddingComp>
__global__ void forward_reorder_kernel(int batch_size_per_gpu, int slot_num, int embedding_vec_size,
int gpu_num, const TypeEmbeddingComp *input,
TypeEmbeddingComp *output) {
// blockDim.x = embedding_vec_size; // each thread corresponding to one element of embedding
// vector gridDim.x = batch_size / gpu_num = samples_per_gpu; // each block corresponding to one
// sample on each GPU Each thread needs to process slot_num slots
int tid = threadIdx.x;
int bid = blockIdx.x;
// Sample ID of the current GPU. The following processes are based on this SAMPLE ID, so that only the sample of the current GPU can be retained
int sample_id = bid; // sample_id on the current GPU, such as the second sample, sample_id = 1
if ((bid < batch_size_per_gpu) && (tid < embedding_vec_size)) {
// The starting position of the first slot in the current sample, e.g. 1 * 4 * 8 = 32
int dst_offset =
sample_id * slot_num * embedding_vec_size; // offset for the first slot of one sample
The local vector size of a slot is the stride = 8 between slots
int dst_stride = embedding_vec_size; // stride from slot to slot
// Traverses the sample slots with the range from 0 to slot num. The purpose is to copy these slots from all2all to embedding_data_. Get_output_tensors
// So we need to find the slot of this sample at the start of all2all
for (int slot_id = 0; slot_id < slot_num; slot_id++) {
int gpu_id = slot_id % gpu_num; // Key code to determine which GPU slot is on
int offset_pre = 0; // offset in previous gpus
// The purpose of traversing the GPU is to find the location of the previous GPU, since slots are allocated according to the GPU
// offset_pre finally yields the number of slots on top of the GPU before this slot
for (int id = 0; id < gpu_id; id++) {
int slot_num_per_gpu = slot_num / gpu_num + ((id < (slot_num % gpu_num)) ? 1 : 0);
int stride = batch_size_per_gpu * slot_num_per_gpu;
offset_pre += stride; // Find the front position
}
// Each GPU corresponds to several slots
int slot_num_per_gpu = slot_num / gpu_num + ((gpu_id < (slot_num % gpu_num)) ? 1 : 0);
// The current sample is offset of the current GPU
int offset_cur = sample_id * slot_num_per_gpu; // offset in current gpu
// Start data position of the current sample in another slot
// (offset_cur + offset_pre + (int)(slot_id/gpu_num)) specifies the number of slots in front of this slot
int src_addr = (offset_cur + offset_pre + (int)(slot_id / gpu_num)) * embedding_vec_size;
// The current slot is the target location in the embedding_data_. Get_output_tensors directory
int dst_addr = dst_offset + dst_stride * slot_id;
// Copy slot_id information corresponding to this sampleoutput[dst_addr + tid] = input[src_addr + tid]; }}}Copy the code
4.3.2 graphic
And just to show you, just to simplify the logic, embedding_feature_tensors_, all2all_tensors_ is supposed to be a one-dimensional array, but it’s abstracted into a two-dimensional array.
4.4 slot id
Finally, you need to store slot ids. The reason why we need to save the slot ID of the parameter is that there were different slots on each GPU. Now, we need to put a sample of all slots on the same GPU, so we need to know which slot to load.
// store slot_id by row_offset and value_index
template <typename TypeKey, typename TypeValueIndex>
__global__ void store_slot_id_kernel(size_t batch_size,
int slot_num, // total slot number in hash table
int slot_num_per_gpu,
int gpu_num, // total gpu number
int gpu_id, // global gpu device id
const TypeKey *row_offset, const TypeValueIndex *value_index,
TypeValueIndex *slot_id) {
size_t gid = blockIdx.x * blockDim.x + threadIdx.x;
if (gid < (batch_size * slot_num_per_gpu)) {
int sid = gid % slot_num_per_gpu;
sid = gpu_id + sid * gpu_num; // global slot id
if (sid < slot_num) {
TypeKey offset = row_offset[gid];
int value_num = row_offset[gid + 1] - offset;
for (int i = 0; i < value_num; i++) {
TypeValueIndex index = value_index[offset + i]; // row number
slot_id[index] = sid;
}
}
}
}
} // namespace
template <typename TypeKey>
void SparseEmbeddingFunctors::store_slot_id(size_t batch_size, size_t slot_num,
const std::vector<size_t> &slot_num_per_gpu,
const Tensors2<TypeKey> &row_offset_tensors,
const Tensors2<size_t> &value_index_tensors,
Tensors2<size_t> &slot_id_tensors,
const ResourceManager &resource_manager) {
CudaDeviceContext context;
size_t local_gpu_count = resource_manager.get_local_gpu_count(a);size_t total_gpu_count = resource_manager.get_global_gpu_count(a);for (size_t id = 0; id < local_gpu_count; id++) {
if (slot_num_per_gpu[id] == 0) {
continue;
}
const auto &local_gpu = resource_manager.get_local_gpu(id);
size_t local_device_id = local_gpu->get_device_id(a);size_t global_id = local_gpu->get_global_id(a);const size_t block_size = 64;
const size_t grid_size = (batch_size * slot_num_per_gpu[id] + block_size - 1) / block_size;
context.set_device(local_device_id);
store_slot_id_kernel<<<grid_size, block_size, 0, local_gpu->get_stream()>>>(
batch_size, slot_num, slot_num_per_gpu[id], total_gpu_count, global_id,
row_offset_tensors[id].get_ptr(), value_index_tensors[id].get_ptr(),
slot_id_tensors[id].get_ptr()); }}Copy the code
4.5 Output Matrix
We look at the size of the output dense matrix with a function called batch_size_per_GPU * slot_num * embedding_VEC_size.
// only used for results check
/** * Get the forward() results from GPUs and copy them to the host pointer * embedding_feature. This function is only used for unit test. * @param embedding_feature the host pointer for storing the forward() * results. */
void get_forward_results(bool is_train, Tensor2<TypeEmbeddingComp> &embedding_feature) {
size_t memcpy_size = embedding_data_.get_batch_size_per_gpu(is_train) *
embedding_data_.embedding_params_.slot_num *
embedding_data_.embedding_params_.embedding_vec_size;
functors_.get_forward_results(memcpy_size, embedding_data_.get_output_tensors(is_train),
embedding_feature, utest_forward_temp_tensors_,
embedding_data_.get_resource_manager());
return;
}
Copy the code
Get_batch_size_per_gpu is defined as follows:
size_t get_batch_size_per_gpu(bool is_train) const {
return embedding_params_.get_batch_size(is_train) / resource_manager_->get_global_gpu_count(a); }Copy the code
0x05 Backward propagation
Because forward propagation has done ALL2ALL and BACKWARD successively, backward propagation needs to do its reverse operation first and then backward.
Although we know that ALL2ALL_BACKWARD and backward_reorder are reverse operations of forward propagation respectively, the code here is still quite brain-burning, and it will be better to combine with the diagram.
/** * The first stage of backward propagation of embedding layer, * which computes the wgrad by the dgrad from the top layer. */
void backward(a) override {
// Read dgrad from output_tensors -> compute wgrad
// reorder
functors_.backward_reorder(embedding_data_.get_batch_size_per_gpu(true),
embedding_data_.embedding_params_.slot_num,
embedding_data_.embedding_params_.embedding_vec_size,
embedding_data_.get_output_tensors(true), all2all_tensors_,
embedding_data_.get_resource_manager());
// do all2all
#ifndef ENABLE_MPI
if (embedding_data_.get_resource_manager().get_global_gpu_count(a) >1) {
functors_.all2all_backward(embedding_data_.get_batch_size_per_gpu(true), slot_num_per_gpu_,
embedding_data_.embedding_params_.embedding_vec_size,
all2all_tensors_, embedding_feature_tensors_,
embedding_data_.get_resource_manager());
} else {
CudaDeviceContext context(embedding_data_.get_local_gpu(0).get_device_id());
CK_CUDA_THROW_(cudaMemcpyAsync(
embedding_feature_tensors_[0].get_ptr(), all2all_tensors_[0].get_ptr(),
embedding_data_.get_batch_size_per_gpu(true) * slot_num_per_gpu_[0] *
embedding_data_.embedding_params_.embedding_vec_size * sizeof(TypeEmbeddingComp),
cudaMemcpyDeviceToDevice, embedding_data_.get_local_gpu(0).get_stream()));
}
#else
if (embedding_data_.get_resource_manager().get_global_gpu_count(a) >1) {
functors_.all2all_backward(
embedding_data_.get_batch_size_per_gpu(true), embedding_data_.embedding_params_.slot_num,
embedding_data_.embedding_params_.embedding_vec_size, all2all_tensors_,
embedding_feature_tensors_, embedding_data_.get_resource_manager());
} else {
CudaDeviceContext context(embedding_data_.get_local_gpu(0).get_device_id());
CK_CUDA_THROW_(cudaMemcpyAsync(
embedding_feature_tensors_[0].get_ptr(), all2all_tensors_[0].get_ptr(),
embedding_data_.get_batch_size_per_gpu(true) * slot_num_per_gpu_[0] *
embedding_data_.embedding_params_.embedding_vec_size * sizeof(TypeEmbeddingComp),
cudaMemcpyDeviceToDevice, embedding_data_.get_local_gpu(0).get_stream()));
}
#endif
// do backward
functors_.backward(embedding_data_.embedding_params_.get_batch_size(true), slot_num_per_gpu_,
embedding_data_.embedding_params_.embedding_vec_size,
embedding_data_.embedding_params_.combiner,
embedding_data_.get_row_offsets_tensors(true), embedding_feature_tensors_,
wgrad_tensors_, embedding_data_.get_resource_manager());
return;
}
Copy the code
5.1 the Reorder backward
The purpose of Reorder backpropagation is to have gradients on all gpus copied to different locations on ALL2ALL_tensors_. In the following figure, each slot corresponds to a gradient embedding vector, train_output_tensors_(gradients) is a gradient. Now the gradient on each GPU is a complete gradient of two samples.
Here, there are two bids on each GPU, corresponding to sample 1 and sample 2 respectively:
// reorder operation before all2all in backward propagation
template <typename TypeEmbeddingComp>
__global__ void backward_reorder_kernel(int batch_size_per_gpu, int slot_num,
int embedding_vec_size, int gpu_num,
const TypeEmbeddingComp *input, TypeEmbeddingComp *output) {
// blockDim.x = embedding_vec_size; // each thread corresponding to one element of embedding
// vector gridDim.x = batch_size / gpu_num = samples_per_gpu; // each block corresponding to one
// sample on each GPU Each thread needs to process slot_num slots
int tid = threadIdx.x;
int bid = blockIdx.x;
int sample_id = bid; // sample_id on the current GPU
if ((bid < batch_size_per_gpu) && (tid < embedding_vec_size)) {
// Source: the starting position of the gradient in this sample. GPU0 is 0, and GPU1 is 1*4* embedding_VEC_size
int src_offset = sample_id * slot_num * embedding_vec_size;
int src_stride = embedding_vec_size; / / span. Here are 4
for (int slot_id = 0; slot_id < slot_num; slot_id++) { // The value ranges from 0 to 3
int gpu_id = slot_id % gpu_num; // The value is 0 or 1
int offset_pre = 0; // offset in previous gpus
for (int id = 0; id < gpu_id; id++) {
// The value is 2
int slot_num_per_gpu = slot_num / gpu_num + ((id < (slot_num % gpu_num)) ? 1 : 0);
// The value is 2*2
int stride = batch_size_per_gpu * slot_num_per_gpu;
GPU0 = 0 and GPU1 = 4
offset_pre += stride;
}
// Target location: find the starting location of this sample in the current GPU
// slot_num_per_gpu = 2
int slot_num_per_gpu = slot_num / gpu_num + ((gpu_id < (slot_num % gpu_num)) ? 1 : 0);
// 2*sample_id
int offset_cur = sample_id * slot_num_per_gpu; // offset in current gpu
// Note that embedding_vec_size is 4, but we all boil it down to one slot
// Embedding_vec_size is set to 1, so it is simplified as follows:
// GPU0=sample_id*2+0+slot_id/gpu_num, sample1 is 0 to 1, sample2 is 4 to 5
// GPU1=sample_id*2+4+slot_id/gpu_num, sample1 is 2 to 3, sample2 is 6 to 7
int dst_addr = (offset_cur + offset_pre + (int)(slot_id / gpu_num)) * embedding_vec_size;
// Source position: find the starting position of the sample in the current gradient
// Note that embedding_vec_size is 4, but we all boil it down to one slot
// Embedding_vec_size is set to 1, so it is simplified as follows:
// src_offset=sample_id * slot_num
// src_addr = sample_id * slot_num + slot_id
// Src_addr should be sample_id * slot_num + slot_id
// The values of GPU0 and GPU1 are sample1=0 ~ 3 and sample2=4 ~ 7
int src_addr = src_offset + src_stride * slot_id;
output[dst_addr + tid] = input[src_addr + tid]; // Copy the gradient of this sample into the all2ALL_tensors_ tensor where it should be}}}Copy the code
5.2 All2all backward
This is an exchange, essentially the same as the forward propagation initiation, where you send yourself out, but you only receive what you should receive. Each GPU ends up with only its own gradient on top of the original sample. As you can see, the resulting gradient corresponds exactly to the original embedding_feature_tensors_, whether it’s sample, slot, or specific value.
The specific code is as follows:
/** * nccl all2all communication for backward * @param batch_size_per_gpu batch size per GPU * @param slot_num slot number * @param embedding_vec_size embedding vector size * @param send_tensors the send tensors of multi GPUs. * @param recv_tensors the recv tensors of multi GPUs. * @param device_resources all gpus device resources. */
template <typename Type>
void SparseEmbeddingFunctors::all2all_backward(size_t batch_size_per_gpu, size_t slot_num,
size_t embedding_vec_size,
const Tensors2<Type> &send_tensors,
Tensors2<Type> &recv_tensors,
const ResourceManager &resource_manager) {
size_t local_gpu_count = resource_manager.get_local_gpu_count(a);size_t total_gpu_count = resource_manager.get_global_gpu_count(a);size_t num_proc = resource_manager.get_num_process(a);std::vector<const Type *> src(local_gpu_count);
std::vector<Type *> dst(local_gpu_count);
for (size_t id = 0; id < local_gpu_count; id++) {
src[id] = send_tensors[id].get_ptr(a);Send_tensors is a list of gpus
dst[id] = recv_tensors[id].get_ptr(a);// Recv_tensors is a list that corresponds to multiple Gpus
}
std::vector<std::vector<size_t>> send_table(local_gpu_count,
std::vector<size_t>(total_gpu_count));
std::vector<std::vector<size_t>> recv_table(local_gpu_count,
std::vector<size_t>(total_gpu_count));
// Fill in receiving partition table, ith Topo GPU receive from jth global GPU
for (size_t i = 0; i < local_gpu_count; i++) {
size_t global_id = resource_manager.get_local_gpu(i)->get_global_id(a);size_t slot_num_per_gpu =
slot_num / total_gpu_count + ((global_id < (slot_num % total_gpu_count)) ? 1 : 0);
size_t element_per_recv = batch_size_per_gpu * slot_num_per_gpu * embedding_vec_size;
for (size_t j = 0; j < total_gpu_count; j++) { recv_table[i][j] = element_per_recv; }}// Fill in sending partition table, ith Topo GPU send to jth global GPU
for (size_t j = 0; j < total_gpu_count; j++) {
size_t global_id = j;
size_t slot_num_per_gpu =
slot_num / total_gpu_count + ((global_id < (slot_num % total_gpu_count)) ? 1 : 0);
size_t element_per_send = batch_size_per_gpu * slot_num_per_gpu * embedding_vec_size;
for (size_t i = 0; i < local_gpu_count; i++) {
send_table[i][j] = element_per_send;
}
}
std::vector<std::vector<const Type *>> src_pos(local_gpu_count,
std::vector<const Type *>(total_gpu_count));
std::vector<std::vector<Type *>> dst_pos(local_gpu_count, std::vector<Type *>(total_gpu_count));
// Calculate the src offset pointer from each GPU to each other
for (size_t i = 0; i < local_gpu_count; i++) {
size_t src_offset = 0;
for (size_t j = 0; j < total_gpu_count; j++) { src_pos[i][j] = src[i] + src_offset; src_offset += send_table[i][j]; }}// Calculate the dst offset pointer from each GPU to each other
for (size_t i = 0; i < local_gpu_count; i++) {
size_t dst_offset = 0;
for (size_t j = 0; j < total_gpu_count; j++) { dst_pos[i][j] = dst[i] + dst_offset; dst_offset += recv_table[i][j]; }}// need to know the Type
ncclDataType_t type;
switch (sizeof(Type)) {
case 2:
type = ncclHalf;
break;
case 4:
type = ncclFloat;
break;
default:
CK_THROW_(Error_t::WrongInput, "Error: Type not support by now");
}
// Do the all2all transfer
CK_NCCL_THROW_(ncclGroupStart());
for (size_t i = 0; i < local_gpu_count; i++) {
const auto &local_gpu = resource_manager.get_local_gpu(i);
for (size_t j = 0; j < total_gpu_count; j++) {
CK_NCCL_THROW_(ncclSend(src_pos[i][j], send_table[i][j], type, j, local_gpu->get_nccl(),
local_gpu->get_stream()));
CK_NCCL_THROW_(ncclRecv(dst_pos[i][j], recv_table[i][j], type, j, local_gpu->get_nccl(),
local_gpu->get_stream())); }}CK_NCCL_THROW_(ncclGroupEnd());
return;
}
Copy the code
5.3 backward
Now we get the gradient corresponding to the original sample on GPU, and then we can conduct BACKWARD. This part has been introduced before, so we will not repeat it.
// do backward
functors_.backward(embedding_data_.embedding_params_.get_batch_size(true), slot_num_per_gpu_,
embedding_data_.embedding_params_.embedding_vec_size,
embedding_data_.embedding_params_.combiner,
embedding_data_.get_row_offsets_tensors(true), embedding_feature_tensors_,
wgrad_tensors_, embedding_data_.get_resource_manager());
Copy the code
0 x06 storage
Here’s a quick analysis. When storing, rank 0 writes files.
Error_t Session::download_params_to_files_(std::string weights_file,
std::string dense_opt_states_file,
const std::vector<std::string>& embedding_files,
const std::vector<std::string>& sparse_opt_state_files) {
try{{// Store parameters
int i = 0;
for (auto& embedding_file : embedding_files) {
embeddings_[i]->dump_parameters(embedding_file); i++; }} {// Storage optimizer
int i = 0;
for (auto& sparse_opt_state_file : sparse_opt_state_files) {
std::ofstream out_stream_opt(sparse_opt_state_file, std::ofstream::binary);
embeddings_[i]->dump_opt_states(out_stream_opt);
out_stream_opt.close();
i++;
}
}
// rank 0 is responsible for writing files
if (resource_manager_->is_master_process()) {
std::ofstream out_stream_weight(weights_file, std::ofstream::binary);
networks_[0] - >download_params_to_host(out_stream_weight);
std::ofstream out_dense_opt_state_weight(dense_opt_states_file, std::ofstream::binary);
networks_[0] - >download_opt_states_to_host(out_dense_opt_state_weight);
std::string no_trained_params = networks_[0] - >get_no_trained_params_in_string(a);if (no_trained_params.length() != 0) {
std::string ntp_file = weights_file + ".ntp.json";
std::ofstream out_stream_ntp(ntp_file, std::ofstream::out);
out_stream_ntp.write(no_trained_params.c_str(), no_trained_params.length());
out_stream_ntp.close(a); } out_stream_weight.close(a); out_dense_opt_state_weight.close();
}
} catch (const internal_runtime_error& rt_err) {
std::cerr << rt_err.what() << std::endl;
return rt_err.get_error(a); }catch (const std::exception& err) {
std::cerr << err.what() << std::endl;
return Error_t::UnspecificError;
}
return Error_t::Success;
}
Copy the code
In the case of Optimizer, other worker nodes send data to Rank 0, which processes the data after receiving it.
template <typename TypeEmbeddingComp>
void SparseEmbeddingFunctors::dump_opt_states(
std::ofstream& stream, const ResourceManager& resource_manager,
std::vector<Tensors2<TypeEmbeddingComp>>& opt_states) {
size_t local_gpu_count = resource_manager.get_local_gpu_count(a); CudaDeviceContext context;for (auto& opt_state : opt_states) {
size_t total_size = 0;
for (size_t id = 0; id < local_gpu_count; id++) {
total_size += opt_state[id].get_size_in_bytes(a); }size_t max_size = total_size;
#ifdef ENABLE_MPI
bool is_master_process = resource_manager.is_master_process(a);CK_MPI_THROW_(MPI_Reduce(is_master_process ? MPI_IN_PLACE : &max_size, &max_size,
sizeof(size_t), MPI_CHAR, MPI_MAX,
resource_manager.get_master_process_id(), MPI_COMM_WORLD));
#endif
std::unique_ptr<char[] >h_opt_state(new char[max_size]);
size_t offset = 0;
for (size_t id = 0; id < local_gpu_count; id++) {
size_t local_size = opt_state[id].get_size_in_bytes(a);auto& local_gpu = resource_manager.get_local_gpu(id);
context.set_device(local_gpu->get_device_id());
CK_CUDA_THROW_(cudaMemcpyAsync(h_opt_state.get() + offset, opt_state[id].get_ptr(),
local_size, cudaMemcpyDeviceToHost, local_gpu->get_stream()));
offset += local_size;
}
sync_all_gpus(resource_manager);
int pid = resource_manager.get_process_id(a);if (resource_manager.is_master_process()) {
// rank 0 is responsible for writing
stream.write(h_opt_state.get(), total_size);
}
#ifdef ENABLE_MPI
else {
// Other worker nodes send data to rank0
int tag = (pid << 8) | 0xBA;
CK_MPI_THROW_(MPI_Send(h_opt_state.get(), total_size, MPI_CHAR,
resource_manager.get_master_process_id(), tag, MPI_COMM_WORLD));
}
if (resource_manager.is_master_process()) {
for (int r = 1; r < resource_manager.get_num_process(a); r++) {int tag = (r << 8) | 0xBA;
int recv_size = 0;
MPI_Status status;
CK_MPI_THROW_(MPI_Probe(r, tag, MPI_COMM_WORLD, &status));
CK_MPI_THROW_(MPI_Get_count(&status, MPI_CHAR, &recv_size));
// Rank 0 receives data
CK_MPI_THROW_(MPI_Recv(h_opt_state.get(), recv_size, MPI_CHAR, r, tag, MPI_COMM_WORLD,
MPI_STATUS_IGNORE));
stream.write(h_opt_state.get(), recv_size); }}#endif
MESSAGE_("Done"); }}Copy the code
0xEE Personal information
★★★★ Thoughts on life and technology ★★★★★
Wechat official account: Rosie’s Thoughts
0 XFF reference
Developer.nvidia.com/blog/introd…
Developer.nvidia.com/blog/announ…
Developer.nvidia.com/blog/accele…
Read HugeCTR source code
How does embedding propagate back
Web.eecs.umich.edu/~justincj/t…
Sparse matrix storage format summary + storage efficiency comparison :COO,CSR,DIA,ELL,HYB
Out of Thin Air: On the Embedding idea in recommendation algorithm
Principle of the tf.nm.embedding_lookup function
Tensorflow’s embedding_lookup interface is embedding_lookup.
How does embedding do in the recommended scene of big factory