0 x00 the
In this series, we introduced HugeCTR, an industry-oriented recommendation system training framework optimized for large-scale CTR models with model parallel embedding and data-parallel dense networks.
This paper mainly introduces the first two levels of the assembly line, and the last level will be written independently. Thanks for using HugeCTR source code to read this masterpiece.
Other articles in this series are as follows:
NVIDIA HugeCTR, GPU version parameter server –(1)
NVIDIA HugeCTR, GPU version parameter server — (2)
NVIDIA HugeCTR, GPU version parameter server –(3)
0x01 General Process
HugeCTR’s scalability and the number of active Gpus has increased due to efficient data exchange and three-level pipelines. This assembly line includes three levels:
- Read data from a file.
- Data transfer from host to device (between and within nodes).
- Computing with GPU.
The data reads overlap and trains the GPU. The following figure shows the scalability of HugeCTR with a batch size of 16,384 and seven layers on the DGX1 server.
0x02 DataReader
The DataReader is used to copy data from the dataset to the embed layer. It is the entrance to the pipeline and includes the first two steps of the pipeline: reading the file and copying it to the GPU.
Figure 5. HugeCTR training pipeline with its data reader.
2.1 define
For the sake of analysis, only member variables are given, and methods will be described in detail when they are used.
From a dynamic perspective, two of the member variables are important:
- Worker_group: a group of worker threads responsible for reading data from the dataset file into memory. This can be considered the first level of the pipeline. The previous version had a HeapEx data structure for the intermediate cache, which has now been removed.
- Data_collector_ : Has a thread that copies data to the GPU. This can be thought of as the second level of the assembly line.
From the static point of view, there are mainly the following three buffers:
std::vector<std::shared_ptr<ThreadBuffer>> thread_buffers_
: Buffer used internally by a thread.std::shared_ptr<BroadcastBuffer> broadcast_buffer_
: is used for subsequent interactions with the collector, which uses it as an intermediate buffer.std::shared_ptr<DataReaderOutput> output_
: the output of reader, the last read of the training is here.
ThreadBuffer –> BroadcastBuffer –> DataReaderOutput
From the perspective of resources, it is:
- STD ::shared_ptr resource_manager_ : This is the Session member variable passed in the DataReader constructor.
- Const STD :: Vector params_ : This is sparse parameter meta information compiled from the configuration file.
/** * @brief Data reading controller. * * Control the data reading from data set to embedding. * An instance of DataReader will maintain independent * threads for data reading (IDataReaderWorker) * from dataset to heap. Meanwhile one independent * thread consumes the data (DataCollector), * and copy the data to GPU buffer. */
template <typename TypeKey>
class DataReader : public IDataReader {
private:
std::vector<std::shared_ptr<ThreadBuffer>> thread_buffers_; // gpu_id -> thread_idx
std::shared_ptr<BroadcastBuffer> broadcast_buffer_;
std::shared_ptr<DataReaderOutput> output_;
std::shared_ptr<DataReaderWorkerGroup> worker_group_;
std::shared_ptr<DataCollector<TypeKey>> data_collector_; /**< pointer of DataCollector */
/* Each gpu will have several csr output for different embedding */
const std::vector<DataReaderSparseParam> params_;
std::shared_ptr<ResourceManager> resource_manager_; /**< gpu resource used in this data reader*/
const size_t batchsize_; /**< batch size */
const size_t label_dim_; /**< dimention of label e.g. 1 for BinaryCrossEntropy */
const size_t dense_dim_; /**< dimention of dense */
long long current_batchsize_;
bool repeat_;
std::string file_name_;
SourceType_t source_type_;
}
Copy the code
2.2 build
The DataReader build is divided into two parts:
- In the constructor:
- Configure the various buffers.
- Build DataCollector.
- Train_data_reader and evaluate_data_reader are handled separately in create_datareader. Workgroups are then set up for them.
We’ll omit the analysis of constructors because they involve a series of data structures. After the introduction of data structure, we will discuss.
2.3 DataReaderSparseParam
2.3.1 definition
DataReaderSparseParam is the meta information of Sparse parameter based on configuration. Its main member variables are as follows:
-
Sparse_name is the name of the sparse input tensor referenced by its subsequent layers. There is no default value and should be specified by the user.
-
Nnz_per_slot is the maximum number of features for a specified SPARSE input per slot.
- ‘nnz_per_slot’ can be ‘int’, which is the average NNZ for each slot, so the maximum number of functions per instance should be ‘nnz_per_slot* SLOt_num ‘.
- Alternatively, ‘nnz_per_slot’ can be initialized with List[int], where the maximum number of features per sample should be ‘sum(nNZ_per_slot)’, in which case the array ‘nnz_per_slot’ should have the same length as ‘slot_num’.
-
‘is_fixed_length’ is used to identify whether each slot of the categorical inputs in all samples has the same length. If different samples have the same number of features for each slot, the user can set “is_fixed_length=True” and Hugetr can use this information to reduce data transfer time.
-
Slot_num Specifies the number of slots used for this sparse input in the dataset.
- ** Note: ** If you specify more than one ‘DataReaderSparseParam’, there should be no overlap between any pair of ‘DataReaderSparseParam’. For example, in [WDL sample] (.. /samples/ WDL /wdl.py), we have a total of 27 slots; We specify the first slot as “wide_data” and the next 26 slots as “deep_data”.
struct DataReaderSparseParam {
std::string top_name;
std::vector<int> nnz_per_slot;
bool is_fixed_length;
int slot_num;
DataReaderSparse_t type;
int max_feature_num;
int max_nnz;
DataReaderSparseParam() {}
DataReaderSparseParam(const std::string& top_name_, const std::vector<int>& nnz_per_slot_,
bool is_fixed_length_, int slot_num_)
: top_name(top_name_),
nnz_per_slot(nnz_per_slot_),
is_fixed_length(is_fixed_length_),
slot_num(slot_num_),
type(DataReaderSparse_t::Distributed) {
max_feature_num = std::accumulate(nnz_per_slot.begin(), nnz_per_slot.end(), 0);
max_nnz = *std::max_element(nnz_per_slot.begin(), nnz_per_slot.end());
}
DataReaderSparseParam(const std::string& top_name_, const int nnz_per_slot_,
bool is_fixed_length_, int slot_num_)
: top_name(top_name_),
nnz_per_slot(slot_num_, nnz_per_slot_),
is_fixed_length(is_fixed_length_),
slot_num(slot_num_),
type(DataReaderSparse_t::Distributed) {
max_feature_num = std::accumulate(nnz_per_slot.begin(), nnz_per_slot.end(), 0);
max_nnz = *std::max_element(nnz_per_slot.begin(), nnz_per_slot.end()); }};Copy the code
2.3.2 use
Mentioned before the Parser to parse the configuration file, HugeCTR also support code sets, such as the following will set the two DataReaderSparseParam, also has the corresponding DistributedSlotSparseEmbeddingHash.
model = hugectr.Model(solver, reader, optimizer)
model.add(hugectr.Input(label_dim = 1, label_name = "label",
dense_dim = 13, dense_name = "dense",
data_reader_sparse_param_array =
[hugectr.DataReaderSparseParam("wide_data".30, True, 1),
hugectr.DataReaderSparseParam("deep_data".2, False, 26)]))
model.add(hugectr.SparseEmbedding(embedding_type = hugectr.Embedding_t.DistributedSlotSparseEmbeddingHash,
workspace_size_per_gpu_in_mb = 23,
embedding_vec_size = 1,
combiner = "sum",
sparse_embedding_name = "sparse_embedding2",
bottom_name = "wide_data",
optimizer = optimizer))
model.add(hugectr.SparseEmbedding(embedding_type = hugectr.Embedding_t.DistributedSlotSparseEmbeddingHash,
workspace_size_per_gpu_in_mb = 358,
embedding_vec_size = 16,
combiner = "sum",
sparse_embedding_name = "sparse_embedding1",
bottom_name = "deep_data",
optimizer = optimizer))
Copy the code
0x03 DataReader Buffer mechanism
Let’s take a look at several buffers for the DataReader. Depending on these buffers, HugeCTR implements the first two levels of the pipeline.
3.1 comparing
Let’s start with a historical comparison to see how this part of the code has evolved. Let’s take a look at version 3.1 of the code. DataReader we have selected some member variables. Prior to version 3.1, one heap was used to operate, cSR_HEAP_ below.
class DataReader : public IDataReader {
std::shared_ptr<HeapEx<CSRChunk<TypeKey>>> csr_heap_; /**< heap to cache the data set */
Tensors2<float> label_tensors_; /**< Label tensors for the usage of loss */
std::vector<TensorBag2> dense_tensors_; /**< Dense tensors for the usage of loss */
/* Each gpu will have several csr output for different embedding */
Tensors2<TypeKey> csr_buffers_; /**< csr_buffers contains row_offset_tensor and value_tensors */
Tensors2<TypeKey> row_offsets_tensors_; /**< row offset tensors*/
Tensors2<TypeKey> value_tensors_; /**< value tensors */
std::vector<std::shared_ptr<size_t>> nnz_array_;
const size_t label_dim_; /**< dimention of label e.g. 1 for BinaryCrossEntropy */
const size_t dense_dim_; /**< dimention of dense */
}
Copy the code
Let’s look at the code in version 3.2.1, which also selects some member variables.
template <typename TypeKey>
class DataReader : public IDataReader {
std::vector<std::shared_ptr<ThreadBuffer>> thread_buffers_; // gpu_id -> thread_idx
std::shared_ptr<BroadcastBuffer> broadcast_buffer_;
std::shared_ptr<DataReaderOutput> output_;
const size_t label_dim_; /**< dimention of label e.g. 1 for BinaryCrossEntropy */
const size_t dense_dim_; /**< dimention of dense */
}
Copy the code
3.2.1 Here is:
- the
label_tensors_
.dense_tensors_
Move to AsyncReader. - The csr_heap_ with
thread_buffers_
.broadcast_buffer_
.output_
And so on. - Row_offsets_tensors_, value_tensors_, nnz_array_, etc. SparseTensorBag in the DataReaderOutput to include, and manage, unified CSRS.
3.2 Buffer related classes
Let’s look at the historical version above.
- In previous releases (e.g. 3.1), there was a HeapEX class that implemented a data caching function between CPU and GPU.
- In the latest version, this was changed to a series of buffer-related classes, such as ThreadBuffer and BroadcastBuffer, whose states are implemented by BufferState.
enum class BufferState : int { FileEOF, Reading, ReadyForRead, Writing, ReadyForWrite };
Copy the code
Here are the definitions of the three buffers.
struct ThreadBuffer {
std::vector<SparseTensorBag> device_sparse_buffers; // same number as embedding number
std::vector<unsigned char> is_fixed_length; // same number as embedding number
TensorBag2 device_dense_buffers;
std::atomic<BufferState> state;
long long current_batch_size;
int batch_size;
size_t param_num;
int label_dim;
int dense_dim;
int batch_size_start_idx; // dense buffer
int batch_size_end_idx;
};
struct BroadcastBuffer {
std::vector<SparseTensorBag>
sparse_buffers; // same number as (embedding number * local device number)
std::vector<unsigned char> is_fixed_length; // same number as embedding number
std::vector<TensorBag2> dense_tensors; // same number as local device number
std::vector<cudaEvent_t> finish_broadcast_events; // same number as local device number
std::atomic<BufferState> state;
long long current_batch_size;
size_t param_num;
};
struct DataReaderOutput {
std::map<std::string, std::vector<SparseTensorBag>> sparse_tensors_map;
std::vector<std::string> sparse_name_vec;
std::vector<TensorBag2> label_tensors;
std::vector<TensorBag2> dense_tensors;
bool use_mixed_precision;
int label_dense_dim;
};
Copy the code
These classes correspond to the following member variables of the DataReader.
class DataReader : public IDataReader {
private:
std::vector<std::shared_ptr<ThreadBuffer>> thread_buffers_; // gpu_id -> thread_idx
std::shared_ptr<BroadcastBuffer> broadcast_buffer_;
std::shared_ptr<DataReaderOutput> output_;
}
Copy the code
Next, we will analyze them one by one.
3.3 the DataReader structure
After skipping the DataReader constructor, we will examine the constructors, which reserve space and allocate memory for the three types of buffers, and finally build the Collector.
DataReader(int batchsize, size_t label_dim, int dense_dim,
std::vector<DataReaderSparseParam> ¶ms,
const std::shared_ptr<ResourceManager> &resource_manager, bool repeat, int num_threads,
bool use_mixed_precision)
: broadcast_buffer_(new BroadcastBuffer()),
output_(new DataReaderOutput()),
params_(params),
resource_manager_(resource_manager),
batchsize_(batchsize),
label_dim_(label_dim),
dense_dim_(dense_dim),
repeat_(repeat) {
size_t local_gpu_count = resource_manager_->get_local_gpu_count(a);size_t total_gpu_count = resource_manager_->get_global_gpu_count(a);// batchsize_ is a multiple of total_gpu_count
size_t batch_size_per_gpu = batchsize_ / total_gpu_count;
// 1. A temporary variable buffs is generated to allocate memory. There are several CudaAllocators in the variable
std::vector<std::shared_ptr<GeneralBuffer2<CudaAllocator>>> buffs;
// Reserve some memory space
buffs.reserve(local_gpu_count);
// Initialize a GeneralBuffer2 for each GPU
for (size_t i = 0; i < local_gpu_count; ++i) {
buffs.push_back(GeneralBuffer2<CudaAllocator>::create());
}
// 2
/ / thread_buffers_ processing
thread_buffers_.reserve(num_threads);
for (int i = 0; i < num_threads; ++i) {
// a worker may maintain multiple buffers on device i % local_gpu_count
auto local_gpu = resource_manager_->get_local_gpu(i % local_gpu_count);
CudaCPUDeviceContext context(local_gpu->get_device_id());
auto &buff = buffs[i % local_gpu_count]; // Locate the CudaAllocator corresponding to the GPU and allocate it
std::shared_ptr<ThreadBuffer> current_thread_buffer = std::make_shared<ThreadBuffer>();
thread_buffers_.push_back(current_thread_buffer);
current_thread_buffer->device_sparse_buffers.reserve(params.size());
current_thread_buffer->is_fixed_length.reserve(params.size()); / / vector reserve
for (size_t param_id = 0; param_id < params.size(a); ++param_id) {auto ¶m = params_[param_id];
SparseTensor<TypeKey> temp_sparse_tensor;
// Reserved memory
buff->reserve({(size_t)batchsize, (size_t)param.max_feature_num}, param.slot_num,
&temp_sparse_tensor);
current_thread_buffer->device_sparse_buffers.push_back(temp_sparse_tensor.shrink());
current_thread_buffer->is_fixed_length.push_back(param.is_fixed_length);
}
Tensor2<float> temp_dense_tensor;
// Reserved memory
buff->reserve({batch_size_per_gpu * local_gpu_count, label_dim + dense_dim},
&temp_dense_tensor);
current_thread_buffer->device_dense_buffers = temp_dense_tensor.shrink(a); current_thread_buffer->state.store(BufferState::ReadyForWrite);
current_thread_buffer->current_batch_size = 0;
current_thread_buffer->batch_size = batchsize;
current_thread_buffer->param_num = params.size(a); current_thread_buffer->label_dim = label_dim; current_thread_buffer->dense_dim = dense_dim; current_thread_buffer->batch_size_start_idx = batch_size_per_gpu * resource_manager_->get_gpu_global_id_from_local_id(0);
current_thread_buffer->batch_size_end_idx =
current_thread_buffer->batch_size_start_idx + batch_size_per_gpu * local_gpu_count;
}
// Handle broadcast buffer. Note that reserve is a vector of data structures, not reserved memory
broadcast_buffer_->sparse_buffers.reserve(local_gpu_count * params.size());
broadcast_buffer_->is_fixed_length.reserve(local_gpu_count * params.size());
broadcast_buffer_->dense_tensors.reserve(local_gpu_count);
broadcast_buffer_->finish_broadcast_events.resize(local_gpu_count);
broadcast_buffer_->state.store(BufferState::ReadyForWrite);
broadcast_buffer_->current_batch_size = 0;
broadcast_buffer_->param_num = params.size(a);// Handle output buffers. Note that reserve is a vector of data structures, not reserved memory
output_->dense_tensors.reserve(local_gpu_count);
output_->label_tensors.reserve(local_gpu_count);
output_->use_mixed_precision = use_mixed_precision;
output_->label_dense_dim = label_dim + dense_dim;
// Reserve sparse tensor note that reserve means vector data structure, not reserved memory
for (size_t param_id = 0; param_id < params.size(a); ++param_id) {auto ¶m = params_[param_id];
output_->sparse_tensors_map[param.top_name].reserve(local_gpu_count);
output_->sparse_name_vec.push_back(param.top_name);
}
// Iterate over the local GPU
for (size_t local_id = 0; local_id < local_gpu_count; ++local_id) {
// We still need to find the corresponding CudaAllocator for allocation for each GPU
auto local_gpu = resource_manager_->get_local_gpu(local_id);
CudaDeviceContext ctx(local_gpu->get_device_id());
auto &buff = buffs[local_id];
for (size_t param_id = 0; param_id < params.size(a); ++param_id) {auto ¶m = params_[param_id];
SparseTensor<TypeKey> temp_sparse_tensor;
// allocate memory for broadcast_buffer_
buff->reserve({(size_t)batchsize, (size_t)param.max_feature_num}, param.slot_num,
&temp_sparse_tensor);
broadcast_buffer_->sparse_buffers.push_back(temp_sparse_tensor.shrink());
broadcast_buffer_->is_fixed_length.push_back(param.is_fixed_length);
}
Tensor2<float> temp_dense_tensor;
buff->reserve({batch_size_per_gpu, label_dim + dense_dim}, &temp_dense_tensor);
broadcast_buffer_->dense_tensors.push_back(temp_dense_tensor.shrink());
CK_CUDA_THROW_(cudaEventCreateWithFlags(&broadcast_buffer_->finish_broadcast_events[local_id],
cudaEventDisableTiming));
for (size_t param_id = 0; param_id < params.size(a); ++param_id) {auto ¶m = params_[param_id];
SparseTensor<TypeKey> temp_sparse_tensor;
// Reserved memory
buff->reserve({(size_t)batchsize, (size_t)param.max_feature_num}, param.slot_num,
&temp_sparse_tensor);
output_->sparse_tensors_map[param.top_name].push_back(temp_sparse_tensor.shrink());
}
Tensor2<float> label_tensor;
// Reserved memory
buff->reserve({batch_size_per_gpu, label_dim}, &label_tensor);
output_->label_tensors.push_back(label_tensor.shrink());
if (use_mixed_precision) {
Tensor2<__half> dense_tensor;
// Reserved memory
buff->reserve({(size_t)batch_size_per_gpu, (size_t)dense_dim}, &dense_tensor);
output_->dense_tensors.push_back(dense_tensor.shrink());
} else {
Tensor2<float> dense_tensor;
// Reserved memory
buff->reserve({(size_t)batch_size_per_gpu, (size_t)dense_dim}, &dense_tensor);
output_->dense_tensors.push_back(dense_tensor.shrink());
}
buff->allocate(a);// allocate memory
}
// 4. Build DataCollector
data_collector_ = std::make_shared<DataCollector<TypeKey>>(thread_buffers_, broadcast_buffer_,
output_, resource_manager);
return;
}
Copy the code
We’ll take a closer look at the different parts of the construction code.
3.3.1 auxiliary GeneralBuffer2
First we examine the buffs part of the code above. This variable is used to allocate memory uniformly.
// 1. A temporary variable buffs is generated
std::vector<std::shared_ptr<GeneralBuffer2<CudaAllocator>>> buffs;
// Reserve some capacity first
buffs.reserve(local_gpu_count);
// Initialize a GeneralBuffer2 for each GPU
for (size_t i = 0; i < local_gpu_count; ++i) {
buffs.push_back(GeneralBuffer2<CudaAllocator>::create());
}
Copy the code
3.3.2 rainfall distribution on 10-12 ThreadBuffer
Then let’s look at the handling of thread_buffers_, which is for the thread buffer. We first get the ThreadBuffer class defined as follows, which can be compared later in the analysis.
struct ThreadBuffer {
std::vector<SparseTensorBag> device_sparse_buffers; // same number as embedding number
std::vector<unsigned char> is_fixed_length; // same number as embedding number
TensorBag2 device_dense_buffers;
std::atomic<BufferState> state;
long long current_batch_size;
int batch_size;
size_t param_num;
int label_dim;
int dense_dim;
int batch_size_start_idx; // dense buffer
int batch_size_end_idx;
};
Copy the code
Second, the logic in the concrete build function is as follows:
- First, for the vector thread_buffers_, we expand the vector to the number of threads.
- Get the buffer corresponding to this thread (or GPU) in buffs and assign it to a buff.
- For each thread, a ThreadBuffer, named current_thread_buffer, is generated and placed in thread_buffers_.
- For each ThreadBuffer, reserve the capacity of the device_SPARse_buffers and IS_fixed_length vectors of ThreadBuffer.
- Iterating through sparse parameters, a temporary tensor is built for each parameter, buffed through reserved memory (CPU or GPU), and then placed into device_SPARse_buffers.
- Build a tensor for dense and put temporary tensors into Device_dense_buffers by buffing reserved tensor memory.
- Set the current_thread_buffer state.
- Set current_thread_buffer additional information.
// Processing thread_buffers_ extends the size of the vector to the number of threads
thread_buffers_.reserve(num_threads);
for (int i = 0; i < num_threads; ++i) { // Iterate over the thread
// a worker may maintain multiple buffers on device i % local_gpu_count
auto local_gpu = resource_manager_->get_local_gpu(i % local_gpu_count);
CudaCPUDeviceContext context(local_gpu->get_device_id());
auto &buff = buffs[i % local_gpu_count]; // Get the buffer corresponding to this thread (or GPU) in buffs
// Generate a ThreadBuffer and store it to thread_buffers_
std::shared_ptr<ThreadBuffer> current_thread_buffer = std::make_shared<ThreadBuffer>();
thread_buffers_.push_back(current_thread_buffer);
// Reserve the capacity of the device_sparse_buffers and is_fixed_length vectors of ThreadBuffer
current_thread_buffer->device_sparse_buffers.reserve(params.size());
current_thread_buffer->is_fixed_length.reserve(params.size());
// Iterate over the parameters
for (size_t param_id = 0; param_id < params.size(a); ++param_id) {auto ¶m = params_[param_id];
SparseTensor<TypeKey> temp_sparse_tensor;
// Create a temporary tensor and reserve memory (CPU or GPU)
buff->reserve({(size_t)batchsize, (size_t)param.max_feature_num}, param.slot_num,
&temp_sparse_tensor);
// put the tensor into device_sparse_buffers
current_thread_buffer->device_sparse_buffers.push_back(temp_sparse_tensor.shrink());
current_thread_buffer->is_fixed_length.push_back(param.is_fixed_length);
}
Build a tensor against dense
Tensor2<float> temp_dense_tensor;
// Reserve tensor memory
buff->reserve({batch_size_per_gpu * local_gpu_count, label_dim + dense_dim},
&temp_dense_tensor);
// Put the temporary tensor into device_dense_buffers
current_thread_buffer->device_dense_buffers = temp_dense_tensor.shrink(a);// Set the state
current_thread_buffer->state.store(BufferState::ReadyForWrite);
// Set other information
current_thread_buffer->current_batch_size = 0;
current_thread_buffer->batch_size = batchsize;
current_thread_buffer->param_num = params.size(a); current_thread_buffer->label_dim = label_dim; current_thread_buffer->dense_dim = dense_dim; current_thread_buffer->batch_size_start_idx = batch_size_per_gpu * resource_manager_->get_gpu_global_id_from_local_id(0);
current_thread_buffer->batch_size_end_idx =
current_thread_buffer->batch_size_start_idx + batch_size_per_gpu * local_gpu_count;
}
Copy the code
Note that the DataReader includes multiple threadBuffers.
3.3.3 BroadcastBuffer
Let’s see how to build a BroadcastBuffer.
BroadcastBuffer is defined as follows:
struct BroadcastBuffer {
std::vector<SparseTensorBag>
sparse_buffers; // same number as (embedding number * local device number)
std::vector<unsigned char> is_fixed_length; // same number as embedding number
std::vector<TensorBag2> dense_tensors; // same number as local device number
std::vector<cudaEvent_t> finish_broadcast_events; // same number as local device number
std::atomic<BufferState> state;
long long current_batch_size;
size_t param_num;
};
Copy the code
According to the build code, there are only some reservations and Settings, there is no memory, memory will be processed in the future.
// Process broadcast buffer
// Reserve the capacity of vector
broadcast_buffer_->sparse_buffers.reserve(local_gpu_count * params.size());
// Reserve the capacity of vector
broadcast_buffer_->is_fixed_length.reserve(local_gpu_count * params.size());
// Reserve the capacity of vector
broadcast_buffer_->dense_tensors.reserve(local_gpu_count);
broadcast_buffer_->finish_broadcast_events.resize(local_gpu_count);
// Set the state
broadcast_buffer_->state.store(BufferState::ReadyForWrite);
broadcast_buffer_->current_batch_size = 0;
broadcast_buffer_->param_num = params.size(a);Copy the code
3.3.4 DataReaderOutput
Let’s look at how to build the data Output.
struct DataReaderOutput {
std::map<std::string, std::vector<SparseTensorBag>> sparse_tensors_map;
std::vector<std::string> sparse_name_vec;
std::vector<TensorBag2> label_tensors;
std::vector<TensorBag2> dense_tensors;
bool use_mixed_precision;
int label_dense_dim;
};
Copy the code
According to the build code, there are only some reservations and Settings, there is no memory, memory will be processed in the future.
output_->dense_tensors.reserve(local_gpu_count); // Reserve the capacity of vector
output_->label_tensors.reserve(local_gpu_count); // Reserve the capacity of vector
output_->use_mixed_precision = use_mixed_precision;
output_->label_dense_dim = label_dim + dense_dim;
for (size_t param_id = 0; param_id < params.size(a); ++param_id) {auto ¶m = params_[param_id];
output_->sparse_tensors_map[param.top_name].reserve(local_gpu_count);
output_->sparse_name_vec.push_back(param.top_name);
}
Copy the code
3.3.5 Reservation and Allocation
Broadcast and output are reserved, and memory is allocated uniformly.
for (size_t local_id = 0; local_id < local_gpu_count; ++local_id) { / / traverse the GPU
auto local_gpu = resource_manager_->get_local_gpu(local_id);
CudaDeviceContext ctx(local_gpu->get_device_id());
auto &buff = buffs[local_id]; // Get the allocator for a local GPU in temporary buffs
for (size_t param_id = 0; param_id < params.size(a); ++param_id) {auto ¶m = params_[param_id];
SparseTensor<TypeKey> temp_sparse_tensor;
// Allocate SPARSE memory
buff->reserve({(size_t)batchsize, (size_t)param.max_feature_num}, param.slot_num,
&temp_sparse_tensor);
// Assign the value to broadcast
broadcast_buffer_->sparse_buffers.push_back(temp_sparse_tensor.shrink());
broadcast_buffer_->is_fixed_length.push_back(param.is_fixed_length);
}
// Allocate dense memory
Tensor2<float> temp_dense_tensor;
buff->reserve({batch_size_per_gpu, label_dim + dense_dim}, &temp_dense_tensor);
// Assign the value to broadcast
broadcast_buffer_->dense_tensors.push_back(temp_dense_tensor.shrink());
CK_CUDA_THROW_(cudaEventCreateWithFlags(&broadcast_buffer_->finish_broadcast_events[local_id],
cudaEventDisableTiming));
for (size_t param_id = 0; param_id < params.size(a); ++param_id) {auto ¶m = params_[param_id];
// Allocate SPARSE memory
SparseTensor<TypeKey> temp_sparse_tensor;
buff->reserve({(size_t)batchsize, (size_t)param.max_feature_num}, param.slot_num,
&temp_sparse_tensor);
// Assign to output
output_->sparse_tensors_map[param.top_name].push_back(temp_sparse_tensor.shrink());
}
// Allocate memory for the label
Tensor2<float> label_tensor;
buff->reserve({batch_size_per_gpu, label_dim}, &label_tensor);
// Assign to output
output_->label_tensors.push_back(label_tensor.shrink());
if (use_mixed_precision) {
Tensor2<__half> dense_tensor;
// Allocate dense memory
buff->reserve({(size_t)batch_size_per_gpu, (size_t)dense_dim}, &dense_tensor);
// Assign to output
output_->dense_tensors.push_back(dense_tensor.shrink());
} else {
Tensor2<float> dense_tensor;
// Allocate dense memory
buff->reserve({(size_t)batch_size_per_gpu, (size_t)dense_dim}, &dense_tensor);
// Assign to output
output_->dense_tensors.push_back(dense_tensor.shrink());
}
buff->allocate(a);// assign a uniform allocation
}
Copy the code
The specific logic of the reserved buffer is as follows:
After allocation, it is noted that these are simplified versions and do not reflect the status of multiple local Gpus. For example, member variables of the following three classes are distributed across multiple local Gpus.
// Embedding number refers to the number of DataReaderSparseParam in this model
struct ThreadBuffer {
std::vector<SparseTensorBag> device_sparse_buffers; // same number as embedding number
// Device_SPARse_buffers will be allocated across multiple local Gpus
struct BroadcastBuffer {
std::vector<SparseTensorBag>
sparse_buffers; // same number as (embedding number * local device number)
// Sparse_buffers will also be allocated across multiple local Gpus
struct DataReaderOutput {
std::map<std::string, std::vector<SparseTensorBag>> sparse_tensors_map;
// Each sparse_tensors_map[param.top_name] is allocated on multiple local Gpus
/ / such as output_ - > sparse_tensors_map [param. Top_name]. Reserve (local_gpu_count);
Copy the code
The following simplified versions reflect only one GPU, and these buffers are located on the GPU.
Now that the DataReader has a set of buffers, let’s look at how to use them.
0x04 DataReaderWorkerGroup
DataReaderWorkerGroup Is responsible for data read operations.
4.1 build
Create_datareader creates a DataReaderWorkerGroup.
switch (format) {
case DataReaderType_t::Norm: {
train_data_reader->create_drwg_norm(source_data, check_type, start_right_now);
evaluate_data_reader->create_drwg_norm(eval_source, check_type, start_right_now);
break;
}
case DataReaderType_t::Raw: {
train_data_reader->create_drwg_raw(source_data, num_samples, float_label_dense, true.false);
evaluate_data_reader->create_drwg_raw(eval_source, eval_num_samples, float_label_dense,
false.false);
break;
}
case DataReaderType_t::Parquet: {
train_data_reader->create_drwg_parquet(source_data, slot_offset, true);
evaluate_data_reader->create_drwg_parquet(eval_source, slot_offset, true);
break;
}
Copy the code
We use create_drwg_norm to continue to analysis, found that the constructed DataReaderWorkerGroupNorm. Namely, configure the member variables of the DataReader worker_group_ for a DataReaderWorkerGroupNorm.
Note that thread_buffers_ is passed in, indicating that the DataReaderWorkerGroup operates on thread_buffers_ of the DataReader.
void create_drwg_norm(std::string file_name, Check_t check_type,
bool start_reading_from_beginning = true) override {
source_type_ = SourceType_t::FileList;
worker_group_.reset(new DataReaderWorkerGroupNorm<TypeKey>(
thread_buffers_, resource_manager_, file_name, repeat_, check_type, params_,
start_reading_from_beginning));
file_name_ = file_name;
}
Copy the code
4.2 DataReaderWorkerGroup definition
We only look at its member variables, mainly IDataReaderWorker, which is the wroker that specifically reads data.
class DataReaderWorkerGroup {
std::vector<std::thread> data_reader_threads_; /**< A vector of the pointers of data reader .*/
protected:
int data_reader_loop_flag_{0}; /**< p_loop_flag a flag to control the loop */
DataReaderType_t data_reader_type_;
std::vector<std::shared_ptr<IDataReaderWorker>>
data_readers_; /**< A vector of DataReaderWorker' pointer.*/
std::shared_ptr<ResourceManager> resource_manager_;
}
Copy the code
4.3 DataReaderWorkerGroupNorm
We use DataReaderWorkerGroupNorm to analysis, the most important is to build DataReaderWorker, set each DataReaderWorker corresponding which GPU resources.
template <typename TypeKey>
class DataReaderWorkerGroupNorm : public DataReaderWorkerGroup {
std::string file_list_; /**< file list of data set */
std::shared_ptr<Source> create_source(size_t worker_id, size_t num_worker,
const std::string &file_name, bool repeat) override {
return std::make_shared<FileSource>(worker_id, num_worker, file_name, repeat);
}
public:
// Ctor
DataReaderWorkerGroupNorm(const std::vector<std::shared_ptr<ThreadBuffer>> &output_buffers,
const std::shared_ptr<ResourceManager> &resource_manager_,
std::string file_list, bool repeat, Check_t check_type,
const std::vector<DataReaderSparseParam> ¶ms,
bool start_reading_from_beginning = true)
: DataReaderWorkerGroup(start_reading_from_beginning, DataReaderType_t::Norm) {
int num_threads = output_buffers.size(a);size_t local_gpu_count = resource_manager_->get_local_gpu_count(a);// create data reader workers
int max_feature_num_per_sample = 0;
for (auto ¶m : params) {
max_feature_num_per_sample += param.max_feature_num;
}
set_resource_manager(resource_manager_);
for (int i = 0; i < num_threads; i++) {
std::shared_ptr<IDataReaderWorker> data_reader(new DataReaderWorker<TypeKey>(
// Set the GPU resource for each DataReaderWorker
i, num_threads, resource_manager_->get_local_gpu(i % local_gpu_count),
&data_reader_loop_flag_, output_buffers[i], file_list, max_feature_num_per_sample, repeat,
check_type, params));
data_readers_.push_back(data_reader);
}
create_data_reader_threads(a);// Create multiple worker threads}};Copy the code
4.4 Creating a Thread
Create_data_reader_threads creates multiple worker threads and sets the GPU resources for each thread.
/** * Create threads to run data reader workers */
void create_data_reader_threads(a) {
size_t local_gpu_count = resource_manager_->get_local_gpu_count(a);for (size_t i = 0; i < data_readers_.size(a); ++i) {// Set the GPU resources for each thread
auto local_gpu = resource_manager_->get_local_gpu(i % local_gpu_count);
// Specifies the thread body function
data_reader_threads_.emplace_back(data_reader_thread_func_, data_readers_[i],
&data_reader_loop_flag_, local_gpu->get_device_id()); }}Copy the code
4.5 Thread body functions
Data_reader_thread_func_ is the main function of the worker thread, which sets the device of the thread and then calls IDataReaderWorker to read the data.
/** * A helper function to read data from dataset to heap in a new thread. * @param data_reader a pointer of data_reader. * @param p_loop_flag a flag to control the loop, and break loop when IDataReaderWorker is destroyed. */
static void data_reader_thread_func_(const std::shared_ptr<IDataReaderWorker>& data_reader,
int* p_loop_flag, int device_id) {
try {
CudaCPUDeviceContext context(device_id); // Set the device for this thread
while ((*p_loop_flag) == 0) {
usleep(2);
}
while (*p_loop_flag) {
data_reader->read_a_batch(a);// Start reading file data}}catch (const std::runtime_error& rt_err) {
std::cerr << rt_err.what() << std::endl; }}Copy the code
Therefore, which sample should be placed on which card is set here. For example, the following 4 threads correspond to GPU 0 and GPU 1 respectively.
4.6 DataReaderWorker
The DataReaderWorker is a business module that parses data. IDataReaderWorker is the base class, and buffer_ is the key, pointing to ThreadBuffer.
class IDataReaderWorker {
std::shared_ptr<Source> source_; /**< source: can be file or network */
int worker_id_;
int worker_num_;
std::shared_ptr<GPUResource> gpu_resource_; // This is the worker's GPU resource
bool is_eof_;
int *loop_flag_;
std::shared_ptr<ThreadBuffer> buffer_;
IDataReaderWorker(const int worker_id, const int worker_num,
const std::shared_ptr<GPUResource> &gpu_resource, bool is_eof, int *loop_flag,
const std::shared_ptr<ThreadBuffer> &buff)
: worker_id_(worker_id),
worker_num_(worker_num),
gpu_resource_(gpu_resource), // Set GPU resources
is_eof_(is_eof),
loop_flag_(loop_flag),
buffer_(buff) {}
};
Copy the code
DataReaderWorker is defined as follows:
template <class T>
class DataReaderWorker : public IDataReaderWorker {
private:
DataSetHeader
data_set_header_; /**< the header of data set, which has main informations of a data file */
size_t buffer_length_; /**< buffer size for internal use */
Check_t check_type_; /**< check type for data set */
std::vector<DataReaderSparseParam> params_; /**< configuration of data reader sparse input */
std::shared_ptr<Checker> checker_; /**< checker aim to perform error check of the input data */
bool skip_read_{false}; /**< set to true when you want to stop the data reading */
int current_record_index_{0};
size_t total_slot_num_;
std::vector<size_t> last_batch_nnz_;
Tensor2<float> temp_host_dense_buffer_; // read data to make checker move
Tensor2<float> host_dense_buffer_;
std::vector<CSR<T>> host_sparse_buffer_;
}
Copy the code
Its construction code is as follows, which needs to be noted:
- A variable STD :: shareD_ptr buffer_, inherited from the base class, points to ThreadBuffer.
- The host_SPARse_buffer_ variable is built on Host, not GPU. The host_SPARse_buffer_ variable is used to read data from a file, parse it into a CSR, and place it on host_SPARse_buffer_.
- The variable DataReaderSparseParam is a DataReaderSparseParam array. If set as follows, params_ contains three elements, which are separated into user, good, and good respectively.
model.add(hugectr.Input(label_dim = 1, label_name = "label",
dense_dim = 0, dense_name = "dense",
data_reader_sparse_param_array =
[hugectr.DataReaderSparseParam("UserID".1, True, 1),
hugectr.DataReaderSparseParam("GoodID".1, True, 11),
hugectr.DataReaderSparseParam("CateID".1, True, 11)))Copy the code
DataReaderWorker is defined as follows:
DataReaderWorker(const int worker_id, const int worker_num,
const std::shared_ptr<GPUResource>& gpu_resource, int* loop_flag,
const std::shared_ptr<ThreadBuffer>& buffer, const std::string& file_list,
size_t buffer_length, bool repeat, Check_t check_type,
const std::vector<DataReaderSparseParam>& params)
: IDataReaderWorker(worker_id, worker_num, gpu_resource, ! repeat, loop_flag, buffer),buffer_length_(buffer_length),
check_type_(check_type),
params_(params),
total_slot_num_(0),
last_batch_nnz_(params.size(), 0) {
total_slot_num_ = 0;
for (auto& p : params) {
total_slot_num_ += p.slot_num;
}
source_ = std::make_shared<FileSource>(worker_id, worker_num, file_list, repeat);
create_checker(a);int batch_size = buffer->batch_size;
int batch_size_start_idx = buffer->batch_size_start_idx;
int batch_size_end_idx = buffer->batch_size_end_idx;
int label_dim = buffer->label_dim;
int dense_dim = buffer->dense_dim;
CudaCPUDeviceContext ctx(gpu_resource->get_device_id()); // Get which GPU this worker corresponds to
std::shared_ptr<GeneralBuffer2<CudaHostAllocator>> buff =
GeneralBuffer2<CudaHostAllocator>::create(a); buff->reserve({static_cast<size_t>(batch_size_end_idx - batch_size_start_idx),
static_cast<size_t>(label_dim + dense_dim)},
&host_dense_buffer_);
buff->reserve({static_cast<size_t>(label_dim + dense_dim)}, &temp_host_dense_buffer_);
for (auto& param : params) {
host_sparse_buffer_.emplace_back(batch_size * param.slot_num,
batch_size * param.max_feature_num);
}
buff->allocate(a); }Copy the code
The specific expansion is as follows, where each thread contains a worker:
Or we could simplify a few more memory classes and get something like this: the DataReaderWorker operates on a ThreadBuffer in the DataReader,
4.7 Reading Data
When Reader is built, a checker_ is created to read data from a file.
4.7.1 the Checker
void create_checker(a) {
switch (check_type_) {
case Check_t::Sum:
checker_ = std::make_shared<CheckSum>(*source_);
break;
case Check_t::None:
checker_ = std::make_shared<CheckNone>(*source_);
break;
default:
assert(!"Error: no such Check_t && should never get here!!"); }}Copy the code
In the case of CheckNone, you can see that this is reading the file.
class CheckNone : public Checker {
private:
const int MAX_TRY{10};
public:
CheckNone(Source& src) : Checker(src) {}
/** * Read "bytes_to_read" byte to the memory associated to ptr. * Users don't need to manualy maintain the check bit offset, just specify * number of bytes you really want to see in ptr. * @param ptr pointer to user located buffer * @param bytes_to_read bytes to read * @return `DataCheckError` `OutOfBound` `Success` `UnspecificError` */
Error_t read(char* ptr, size_t bytes_to_read) noexcept {
try {
Checker::src_.read(ptr, bytes_to_read);
return Error_t::Success;
} catch (const std::runtime_error& rt_err) {
std::cerr << rt_err.what() << std::endl;
returnError_t::BrokenFile; }}/** * Start a new file to read. * @return `FileCannotOpen` or `UnspecificError` */
Error_t next_source(a) {
for (int i = MAX_TRY; i > 0; i--) {
Error_t flag_eof = Checker::src_.next_source(a);if (flag_eof == Error_t::Success || flag_eof == Error_t::EndOfFile) {
returnflag_eof; }}CK_THROW_(Error_t::FileCannotOpen, "Checker::src_.next_source() == Error_t::Success failed");
return Error_t::FileCannotOpen; // to elimate compile error}};Copy the code
4.7.2 CSR sample
Let’s take a look at the format of the CSR file by extracting a code from samples/ NCF /preprocess-1m.
def write_hugeCTR_data(huge_ctr_data, filename='huge_ctr_data.dat') :
with open(filename, 'wb') as f:
#write header
f.write(ll(0)) # 0: no error check; 1: check_num
f.write(ll(huge_ctr_data.shape[0])) # the number of samples in this data file
f.write(ll(1)) # dimension of label
f.write(ll(1)) # dimension of dense feature
f.write(ll(2)) # long long slot_num
for _ in range(3): f.write(ll(0)) # reserved for future use
for i in tqdm.tqdm(range(huge_ctr_data.shape[0])):
f.write(c_float(huge_ctr_data[i,2])) # float label[label_dim];
f.write(c_float(0)) # dummy dense feature
f.write(c_int(1)) # slot 1 nnz: user ID
f.write(c_uint(huge_ctr_data[i,0]))
f.write(c_int(1)) # slot 2 nnz: item ID
f.write(c_uint(huge_ctr_data[i,1]))
Copy the code
4.7.3 Reading batch Data
Read_a_batch completes the detailed parsing of the data set.
- The data is first read from the file.
- Wait for the state of ThreadBuffer (thread_buffers_ member variable of DataReader) to change to ReadyForWrite.
- Parse it into CSR and put it in host_dense_buffer_.
- Call wait_until_H2D_ready to wait for the copy to complete.
- Next, cudaMemcpyAsync is called to copy data from host_dense_buffer_ to ThreadBuffer. There are two important points here:
- The data is currently on host_SPARse_buffer_ (CPU) and needs to be copied to the GPU (target is the Device_SPARse_buffers member variable of ThreadBuffer).
- Also, host_SPARse_buffer_ is in CSR format, and the Device_SPARse_buffers member variable of ThreadBuffer is in SparseTensor format and needs to be transformed.
- This is just a copy of the conversion.
The points are as follows:
- NNZ means non-zero feature number.
- Each slot data corresponds to a CSR row.
The specific code is as follows:
/** * read a batch of data from data set to heap. */
void read_a_batch(a) {
// Get various configurations
long long current_batch_size = buffer_->batch_size;
int label_dim = buffer_->label_dim;
int dense_dim = buffer_->dense_dim;
int label_dense_dim = label_dim + dense_dim;
int batch_size_start_idx = buffer_->batch_size_start_idx;
int batch_size_end_idx = buffer_->batch_size_end_idx;
try {
if(! checker_->is_open()) {
read_new_file(a);// Read a new file}}catch (const internal_runtime_error& rt_err) {
Error_t err = rt_err.get_error(a);if (err == Error_t::EndOfFile) { // The file is finished
if (!wait_until_h2d_ready()) return; // Wait for buffer_ status to change to ReadyForWrite
buffer_->current_batch_size = 0;
assert(buffer_->state.load() == BufferState::Writing); / / set
is_eof_ = true;
buffer_->state.store(BufferState::ReadyForRead); // Set the state to readable
while (buffer_->state.load() != BufferState::ReadyForWrite) {
usleep(2);
if (*loop_flag_ == 0) return; // in case main thread exit
}
return; // need this return to run from begining
} else {
throw; }}// if the EOF is faced, the current batch size can be changed later
for (auto& each_csr : host_sparse_buffer_) {
each_csr.reset(a); }// batch loop
for (int batch_idx = 0; batch_idx < buffer_->batch_size; ++batch_idx) {// Read batch
if (batch_idx >= current_batch_size) { // If all the data in batch has been read
for (size_t param_id = 0; param_id < params_.size(a); ++param_id) {/ / multiple embedding
// In the case of the previous example, user, good, cate are traversed here
auto& param = params_[param_id];
// the host_sparse_buffer_ type is STD ::vector
>
auto& current_csr = host_sparse_buffer_[param_id];
for (int k = 0; k < param.slot_num; k++) { // The slot number is the number of rows
current_csr.new_row(a);// Add a line}}if (batch_idx >= batch_size_start_idx &&
batch_idx < batch_size_end_idx) { // only read local device dense data
/ / set the dense
float* ptr =
host_dense_buffer_.get_ptr() + (batch_idx - batch_size_start_idx) * label_dense_dim;
for (int j = 0; j < label_dense_dim; j++) {
ptr[j] = 0.f; }}continue;
}
try {
try {
if (batch_idx >= batch_size_start_idx &&
batch_idx < batch_size_end_idx) { // only read local device dense data
// Read the dense parameter
CK_THROW_(checker_->read(reinterpret_cast<char*>(host_dense_buffer_.get_ptr() +
(batch_idx - batch_size_start_idx) *
label_dense_dim),
sizeof(float) * label_dense_dim),
"failure in reading label_dense");
} else {
// Read the dense parameter
CK_THROW_(checker_->read(reinterpret_cast<char*>(temp_host_dense_buffer_.get_ptr()),
sizeof(float) * label_dense_dim),
"failure in reading label_dense");
}
for (size_t param_id = 0; param_id < params_.size(a); ++param_id) {auto& current_csr = host_sparse_buffer_[param_id];
current_csr.set_check_point(a); }// Read the sparse parameter
for (size_t param_id = 0; param_id < params_.size(a); ++param_id) {auto& param = params_[param_id];
auto& current_csr = host_sparse_buffer_[param_id];
for (int k = 0; k < param.slot_num; k++) {
int nnz; // Read an int to NNZ to get a non-zero feature number
CK_THROW_(checker_->read(reinterpret_cast<char*>(&nnz), sizeof(int)),
"failure in reading nnz");
current_csr.new_row(a);/ / a newline
size_t num_value = current_csr.get_num_values(a);// Read NNZ data
CK_THROW_(checker_->read(reinterpret_cast<char*>(
current_csr.get_value_tensor().get_ptr() + num_value),
sizeof(T) * nnz),
"failure in reading feature_ids_");
current_csr.update_value_size(nnz); }}}catch (const internal_runtime_error& rt_err) { / / back to back
batch_idx--; // restart i-th sample
for (auto& each_csr : host_sparse_buffer_) {
each_csr.roll_back(a); } Error_t err = rt_err.get_error(a);if (err == Error_t::DataCheckError) {
ERROR_MESSAGE_("Error_t::DataCheckError");
} else { // Error_t::BrokenFile, Error_t::UnspecificEror, ...
read_new_file(a);// can throw Error_t::EOF
}
}
current_record_index_++;
// start a new file when finish one file read
if (current_record_index_ >= data_set_header_.number_of_records) {
read_new_file(a);// can throw Error_t::EOF}}catch (const internal_runtime_error& rt_err) {
Error_t err = rt_err.get_error(a);if (err == Error_t::EndOfFile) {
current_batch_size = batch_idx + 1;
} else {
throw; }}}for (auto& each_csr : host_sparse_buffer_) {
each_csr.new_row(a); }// do h2d
// wait buffer and schedule
// The data is currently on host_sparse_buffer_ (CPU) and needs to be copied to the GPU (target the Device_SPARse_buffers member of ThreadBuffer) using cudaMemcpyHostToDevice
// Also, host_SPARse_buffer_ is in CSR
format, and the Device_sparse_buffers member variable of ThreadBuffer is in SparseTensor
format
if (!wait_until_h2d_ready()) return;
buffer_->current_batch_size = current_batch_size;
{
CudaCPUDeviceContext context(gpu_resource_->get_device_id());
The target is the device_sparse_buffers member variable of ThreadBuffer
auto dst_dense_tensor = Tensor2<float> : :stretch_from(buffer_->device_dense_buffers);
CK_CUDA_THROW_(cudaMemcpyAsync(dst_dense_tensor.get_ptr(), host_dense_buffer_.get_ptr(),
host_dense_buffer_.get_size_in_bytes(), cudaMemcpyHostToDevice,
gpu_resource_->get_memcpy_stream()));
for (size_t param_id = 0; param_id < params_.size(a); ++param_id) {// Iterate over the embedding layer
auto dst_sparse_tensor =
SparseTensor<T>::stretch_from(buffer_->device_sparse_buffers[param_id]);
if (buffer_->is_fixed_length[param_id] &&
last_batch_nnz_[param_id] == host_sparse_buffer_[param_id].get_num_values()) {
// Copy it to the GPU, and at the same time transform it, extract the CSR member variables, and copy it to the corresponding address of SparseTensor
CK_CUDA_THROW_(cudaMemcpyAsync(dst_sparse_tensor.get_value_ptr(),
host_sparse_buffer_[param_id].get_value_tensor().get_ptr(),
host_sparse_buffer_[param_id].get_num_values(*)sizeof(T),
cudaMemcpyHostToDevice,
gpu_resource_->get_memcpy_stream()));
} else {
// Copy to GPU
sparse_tensor_helper::cuda::copy_async(dst_sparse_tensor, host_sparse_buffer_[param_id],
gpu_resource_->get_memcpy_stream());
last_batch_nnz_[param_id] = host_sparse_buffer_[param_id].get_num_values();
}
}
// Synchronize
CK_CUDA_THROW_(cudaStreamSynchronize(gpu_resource_->get_memcpy_stream()));
}
assert(buffer_->state.load() == BufferState::Writing);
buffer_->state.store(BufferState::ReadyForRead); }};Copy the code
4.7.3.1 waiting
Here wait_until_H2D_ready waits.
bool wait_until_h2d_ready(a) {
BufferState expected = BufferState::ReadyForWrite;
while(! buffer_->state.compare_exchange_weak(expected, BufferState::Writing)) {
expected = BufferState::ReadyForWrite;
usleep(2);
if (*loop_flag_ == 0) return false; // in case main thread exit
}
return true;
}
Copy the code
4.7.3.2 Reading files
Read_new_file completes reading the file.
void read_new_file(a) {
constexpr int MAX_TRY = 10;
for (int i = 0; i < MAX_TRY; i++) {
if (checker_->next_source() == Error_t::EndOfFile) {
throw internal_runtime_error(Error_t::EndOfFile, "EndOfFile");
}
Error_t err =
checker_->read(reinterpret_cast<char*>(&data_set_header_), sizeof(DataSetHeader));
current_record_index_ = 0;
if(! (data_set_header_.error_check ==0&& check_type_ == Check_t::None) && ! (data_set_header_.error_check ==1 && check_type_ == Check_t::Sum)) {
ERROR_MESSAGE_("DataHeaderError");
continue;
}
if (static_cast<size_t>(data_set_header_.slot_num) ! = total_slot_num_) {ERROR_MESSAGE_("DataHeaderError");
continue;
}
if (err == Error_t::Success) {
return; }}CK_THROW_(Error_t::BrokenFile, "failed to read a file");
}
Copy the code
4.7.4 summary
To summarize the logic, the thread keeps calling data_reader_thread_func_ to loop through reads:
Another logical perspective:
- Multithreading calls data_reader_thread_func_, which uses read_a_batch to read data from a data file and parse it into CSR. Each embedding layer corresponds to a CSR.
- The CSR is put into host_SPARse_buffer_ of the DataReaderWorker.
- The number of CSR rows increases with each batch read. Each slot has a row. Therefore, a batch number of ROWS is batch_size x SLOt_num.
- Use cudaMemcpyAsync to copy the CSR from host_SPARse_buffer_ to ThreadBuffer (on the GPU). ThreadBuffer is a SparseTensor type.
- Currently CSR data is on the GPU.
This simplifies the situation of multiple Gpus and workers.
0x05 Reads the embedding
Let’s look at the DataCollector, which is the second level of the pipeline, which is this yellow box right here, Copy to GPU. In fact, its internal text is modified as: Copy To Embedding is more appropriate.
5.1 DataCollector
Let’s first look at DataCollector’s definition, which omits the member function. The main member variable is.
- STD :: shareD_ptr Broadcast_BUFFer_ : CPU data is copied onto the GPU, which is right here.
- STD :: shareD_ptr output_buffer_ : STD :: shareD_ptr output_buffer_ : STD ::shared_ptr output_buffer_ : STD ::shared_ptr output_buffer_ : STD ::shared_ptr output_buffer_ : STD ::shared_ptr output_buffer_ :
- BackgroundDataCollectorThread background_collector_ : The thread body, which includes ThreadBuffer and BroadcastBuffer, copies data from ThreadBuffer to BroadcastBuffer.
- STD ::thread background_COLLector_thread_ : worker thread.
/** * @brief A helper class of data reader. * * This class implement asynchronized data collecting from heap * to output of data reader, thus data collection and training * can work in a pipeline. */
template <typename T>
class DataCollector {
class BackgroundDataCollectorThread {
std::vector<std::shared_ptr<ThreadBuffer>> thread_buffers_;
std::shared_ptr<BroadcastBuffer> broadcast_buffer_;
std::atomic<bool> loop_flag_;
int counter_;
std::vector<size_t> last_batch_nnz_; // local_gpu_count * embedding number
std::vector<char> worker_status_;
int eof_worker_num_;
std::shared_ptr<ResourceManager> resource_manager_;
}
std::shared_ptr<BroadcastBuffer> broadcast_buffer_;
std::shared_ptr<DataReaderOutput> output_buffer_;
BackgroundDataCollectorThread background_collector_;
std::thread background_collector_thread_;
std::atomic<bool> loop_flag_;
std::vector<size_t> last_batch_nnz_;
std::shared_ptr<ResourceManager> resource_manager_;
};
Copy the code
Broadcast_buffer_ and output_buffer_ are broadcast_buffer_ and output_buffer_ are broadcast_buffer_ and output_buffer_ are broadcast_buffer_ and output_buffer_ are broadcast_buffer_.
5.2 ThreadBuffer 2 BroadBuffer
5.2.1 Worker Threads
BackgroundDataCollectorThread’s role is to copy the data from the DataReader thread_buffers_ to broadcast_buffer_.
class BackgroundDataCollectorThread {
std::vector<std::shared_ptr<ThreadBuffer>> thread_buffers_;
std::shared_ptr<BroadcastBuffer> broadcast_buffer_;
std::atomic<bool> loop_flag_;
int counter_;
std::vector<size_t> last_batch_nnz_; // local_gpu_count * embedding number
std::vector<char> worker_status_;
int eof_worker_num_;
std::shared_ptr<ResourceManager> resource_manager_;
public:
BackgroundDataCollectorThread(const std::vector<std::shared_ptr<ThreadBuffer>> &thread_buffers,
const std::shared_ptr<BroadcastBuffer> &broadcast_buffer,
const std::shared_ptr<ResourceManager> &resource_manager)
: thread_buffers_(thread_buffers),
broadcast_buffer_(broadcast_buffer),
loop_flag_{true},
counter_{0},
last_batch_nnz_(
broadcast_buffer->is_fixed_length.size() * resource_manager->get_local_gpu_count(),
0),
worker_status_(thread_buffers.size(), 0),
eof_worker_num_(0),
resource_manager_(resource_manager) {}
void start(a) {
while (loop_flag_.load()) {
// ThreadBuffer is the source data, broadcast Buffer is the target data
auto ¤t_src_buffer = thread_buffers_[counter_];
auto &dst_buffer = broadcast_buffer_;
auto src_expected = BufferState::ReadyForRead; // Expect the source data to be in this state
auto dst_expected = BufferState::ReadyForWrite; // The desired target data is in this state
if (worker_status_[counter_]) {
counter_ = (counter_ + 1) % thread_buffers_.size(a);continue;
}
if ((current_src_buffer->state.load() == BufferState::Reading ||
current_src_buffer->state.compare_exchange_weak(src_expected, BufferState::Reading)) &&
(dst_buffer->state.load() == BufferState::Writing ||
dst_buffer->state.compare_exchange_weak(dst_expected, BufferState::Writing))) {
// If the source data is readable or reading, and the target data is writable or writing, the operation can be performed
if (current_src_buffer->current_batch_size == 0) {
worker_status_[counter_] = 1;
eof_worker_num_ += 1;
current_src_buffer->state.store(BufferState::FileEOF);
}
if (static_cast<size_t>(eof_worker_num_) ! = thread_buffers_.size() &&
current_src_buffer->current_batch_size == 0) {
counter_ = (counter_ + 1) % thread_buffers_.size(a); dst_buffer->state.store(BufferState::ReadyForWrite); // Set the state of target data
continue;
}
dst_buffer->current_batch_size = current_src_buffer->current_batch_size;
if(current_src_buffer->current_batch_size ! =0) {
// Perform broadcast operations
broadcast<T>(current_src_buffer, dst_buffer, last_batch_nnz_, resource_manager_);
current_src_buffer->state.store(BufferState::ReadyForWrite); // Set the state of target data
counter_ = (counter_ + 1) % thread_buffers_.size(a); }else {
memset(worker_status_.data(), 0.sizeof(char) * worker_status_.size());
eof_worker_num_ = 0;
counter_ = 0;
}
dst_buffer->state.store(BufferState::ReadyForRead); // The source is notified that it is ready to continue reading
} else {
usleep(2); // Otherwise wait a while}}}void stop(a) { loop_flag_.store(false); }};Copy the code
5.2.2 Copy Operations
This is copying from source data to target data, parameter by parameter. This is a copy inside the device.
template <typename T>
void broadcast(const std::shared_ptr<ThreadBuffer>& thread_buffer,
std::shared_ptr<BroadcastBuffer>& broadcast_buffer,
std::vector<size_t>& last_batch_nnz_,
const std::shared_ptr<ResourceManager>& resource_manager) {
int param_num = thread_buffer->param_num;
int dense_dim = thread_buffer->dense_dim;
int label_dim = thread_buffer->label_dim;
int batch_size = thread_buffer->batch_size;
int batch_size_per_gpu = batch_size / resource_manager->get_global_gpu_count(a);int local_gpu_count = resource_manager->get_local_gpu_count(a);#pragma omp parallel for num_threads(local_gpu_count)
for (int i = 0; i < local_gpu_count; ++i) { // Iterate over the local GPU
auto local_gpu = resource_manager->get_local_gpu(i);
CudaDeviceContext ctx(local_gpu->get_device_id());
for (int param_id = 0; param_id < param_num; ++param_id) { // Iterate over the embedding layer
// Copy broadcast_buffer from thread_buffer
auto src_sparse_tensor =
SparseTensor<T>::stretch_from(thread_buffer->device_sparse_buffers[param_id]);
auto dst_sparse_tensor =
SparseTensor<T>::stretch_from(broadcast_buffer->sparse_buffers[i * param_num + param_id]);
// Copy sparse parameters
if (thread_buffer->is_fixed_length[param_id] &&
last_batch_nnz_[i * param_num + param_id] == src_sparse_tensor.nnz()) {
CK_CUDA_THROW_(cudaMemcpyAsync(dst_sparse_tensor.get_value_ptr(),
src_sparse_tensor.get_value_ptr(),
src_sparse_tensor.nnz(*)sizeof(T),
cudaMemcpyDeviceToDevice, local_gpu->get_p2p_stream()));
} else {
sparse_tensor_helper::cuda::copy_async(dst_sparse_tensor, src_sparse_tensor,
cudaMemcpyDeviceToDevice,
local_gpu->get_p2p_stream());
last_batch_nnz_[i * param_num + param_id] = src_sparse_tensor.nnz();
}
}
// Copy the dense parameters
auto dst_dense_tensor = Tensor2<float> : :stretch_from(broadcast_buffer->dense_tensors[i]);
auto src_dense_tensor = Tensor2<float> : :stretch_from(thread_buffer->device_dense_buffers);
CK_CUDA_THROW_(cudaMemcpyAsync(
dst_dense_tensor.get_ptr(),
src_dense_tensor.get_ptr() + i * batch_size_per_gpu * (label_dim + dense_dim),
batch_size_per_gpu * (label_dim + dense_dim) * sizeof(float), cudaMemcpyDeviceToDevice,
local_gpu->get_p2p_stream()));
/ / synchronize
CK_CUDA_THROW_(cudaStreamSynchronize(local_gpu->get_p2p_stream())); }}Copy the code
The logic is as follows, with an additional step from ThreadBuffer to BroadcastBuffer.
5.3 Reading output
DataFile –> Host buffer —-> ThreadBuffer —-> BroadcastBuffer.
Now that the data has been copied to the BroadcastBuffer on the GPU, we need to see how to get the data in the final training.
5.3.1 “Train”
Let’s first go back to the train function, which calls read_a_batch_to_device_delay_release to copy data from the BroadcastBuffer.
bool Session::train(a) {
try {
// Make sure train_datA_reader_ is started
if (train_data_reader_->is_started() = =false) {
CK_THROW_(Error_t::IllegalCall,
"Start the data reader first before calling Session::train()");
}
#ifndef DATA_READING_TEST
Batchsize = batchsize;
long long current_batchsize = train_data_reader_->read_a_batch_to_device_delay_release(a);// Read data
if(! current_batchsize) {return false; // No data is available
}
#pragma omp parallel num_threads(networks_.size()) // Subsequent statements will be executed in parallel by networks_.size() threads
{
size_t id = omp_get_thread_num(a);CudaCPUDeviceContext ctx(resource_manager_->get_local_gpu(id)->get_device_id());
cudaStreamSynchronize(resource_manager_->get_local_gpu(id)->get_stream());
}
// Reader can start parsing data
train_data_reader_->ready_to_collect(a);#ifdef ENABLE_PROFILING
global_profiler.iter_check(a);#endif
// If true we're gonna use overlaping, if false we use default
if (solver_config_.use_overlapped_pipeline) {
train_overlapped(a); }else {
for (const auto& one_embedding : embeddings_) {
one_embedding->forward(true); // The embedding layer propagates forward, that is, reads the embedding from the parameter server for processing
}
// Network forward / backward
if (networks_.size(a) >1) {
// Single-machine multi-card or multi-machine multi-card
// execute dense forward and backward with multi-cpu threads
#pragma omp parallel num_threads(networks_.size())
{
// Forward reversal of the dense network
size_t id = omp_get_thread_num(a);long long current_batchsize_per_device =
train_data_reader_->get_current_batchsize_per_device(id);
networks_[id]->train(current_batchsize_per_device); // Forward operation
const auto& local_gpu = resource_manager_->get_local_gpu(id);
local_gpu->set_compute_event_sync(local_gpu->get_stream());
local_gpu->wait_on_compute_event(local_gpu->get_comp_overlap_stream()); }}else if (resource_manager_->get_global_gpu_count(a) >1) {
// Multi-machine single card
long long current_batchsize_per_device =
train_data_reader_->get_current_batchsize_per_device(0);
networks_[0] - >train(current_batchsize_per_device); // Forward operation
const auto& local_gpu = resource_manager_->get_local_gpu(0);
local_gpu->set_compute_event_sync(local_gpu->get_stream());
local_gpu->wait_on_compute_event(local_gpu->get_comp_overlap_stream());
} else {
// Single card
long long current_batchsize_per_device =
train_data_reader_->get_current_batchsize_per_device(0);
networks_[0] - >train(current_batchsize_per_device); // Forward operation
const auto& local_gpu = resource_manager_->get_local_gpu(0);
local_gpu->set_compute_event_sync(local_gpu->get_stream());
local_gpu->wait_on_compute_event(local_gpu->get_comp_overlap_stream());
networks_[0] - >update_params(a); }// Embedding backward
for (const auto& one_embedding : embeddings_) {
one_embedding->backward(a);// The embedded layer works in reverse
}
// Exchange wgrad and update params
if (networks_.size(a) >1) {
#pragma omp parallel num_threads(networks_.size())
{
size_t id = omp_get_thread_num(a);exchange_wgrad(id); // Gradient of exchange of dense parameters between multiple cards
networks_[id]->update_params();
}
} else if (resource_manager_->get_global_gpu_count(a) >1) {
exchange_wgrad(0);
networks_[0] - >update_params(a); }for (const auto& one_embedding : embeddings_) {
one_embedding->update_params(a);// Insert layer update sparse parameter
}
// Join streams
if (networks_.size(a) >1) {
#pragma omp parallel num_threads(networks_.size())
{
size_t id = omp_get_thread_num(a);const auto& local_gpu = resource_manager_->get_local_gpu(id);
local_gpu->set_compute2_event_sync(local_gpu->get_comp_overlap_stream());
local_gpu->wait_on_compute2_event(local_gpu->get_stream()); }}else {
const auto& local_gpu = resource_manager_->get_local_gpu(0);
local_gpu->set_compute2_event_sync(local_gpu->get_comp_overlap_stream());
local_gpu->wait_on_compute2_event(local_gpu->get_stream());
}
return true;
}
#else
data_reader_->read_a_batch_to_device(a);#endif
} catch (const internal_runtime_error& err) {
std::cerr << err.what() << std::endl;
throw err;
} catch (const std::exception& err) {
std::cerr << err.what() << std::endl;
throw err;
}
return true;
}
Copy the code
5.3.2 read_a_batch_to_device_delay_release
The read_A_batch_to_device_delay_release is where the embedding data is finally configured.
long long read_a_batch_to_device_delay_release(a) override {
current_batchsize_ = data_collector_->read_a_batch_to_device(a);return current_batchsize_;
}
Copy the code
Let’s look at read_A_batch_to_device. Here, read_A_batch_to_DEVICe_delay_RELEASE and read_A_batch_to_device are named after the old version and are not consistent with the current situation.
See if the state of broadcast_buffer_ can read ReadyForRead. If not, wait. If possible, proceed, i.e. traverse the GPU, copy from broadcast to output (also between devices) one by one, split label and dense as well.
long long read_a_batch_to_device(a) {
BufferState expected = BufferState::ReadyForRead;
while(! broadcast_buffer_->state.compare_exchange_weak(expected, BufferState::Reading)) {
expected = BufferState::ReadyForRead;
usleep(2);
}
long long current_batch_size = broadcast_buffer_->current_batch_size;
if(current_batch_size ! =0) {
int local_gpu_count = resource_manager_->get_local_gpu_count(a);#pragma omp parallel for num_threads(local_gpu_count)
for (int i = 0; i < local_gpu_count; ++i) {
auto local_gpu = resource_manager_->get_local_gpu(i);
CudaDeviceContext ctx(local_gpu->get_device_id());
// wait until last iteration finish
auto label_tensor = Tensor2<float> : :stretch_from(output_buffer_->label_tensors[i]);
auto label_dense_tensor = Tensor2<float> : :stretch_from(broadcast_buffer_->dense_tensors[i]);
// Iterate through sparse parameters
for (size_t param_id = 0; param_id < output_buffer_->sparse_name_vec.size(a); ++param_id) {const auto &top_name = output_buffer_->sparse_name_vec[param_id];
int idx_broadcast = i * broadcast_buffer_->param_num + param_id;
// Broadcast is the source
auto src_sparse_tensor =
SparseTensor<T>::stretch_from(broadcast_buffer_->sparse_buffers[idx_broadcast]);
if (output_buffer_->sparse_tensors_map.find(top_name) ==
output_buffer_->sparse_tensors_map.end()) {
CK_THROW_(Error_t::IllegalCall, "can not find sparse name");
}
// output is the target
auto dst_sparse_tensor =
SparseTensor<T>::stretch_from(output_buffer_->sparse_tensors_map[top_name][i]);
// Copy broadcast to output
if (broadcast_buffer_->is_fixed_length[idx_broadcast] &&
last_batch_nnz_[idx_broadcast] == src_sparse_tensor.nnz()) {
CK_CUDA_THROW_(cudaMemcpyAsync(dst_sparse_tensor.get_value_ptr(),
src_sparse_tensor.get_value_ptr(),
src_sparse_tensor.nnz(*)sizeof(T),
cudaMemcpyDeviceToDevice, local_gpu->get_stream()));
} else {
// Copy broadcast to output
sparse_tensor_helper::cuda::copy_async(dst_sparse_tensor, src_sparse_tensor,
cudaMemcpyDeviceToDevice,
local_gpu->get_stream());
last_batch_nnz_[idx_broadcast] = src_sparse_tensor.nnz();
}
}
const int label_dense_dim = output_buffer_->label_dense_dim;
// Copy label and dense
if (output_buffer_->use_mixed_precision) {
auto dense_tensor = Tensor2<__half>::stretch_from(output_buffer_->dense_tensors[i]);
// Do the partitioning
split(label_tensor, dense_tensor, label_dense_tensor, label_dense_dim,
local_gpu->get_stream());
} else {
auto dense_tensor = Tensor2<float> : :stretch_from(output_buffer_->dense_tensors[i]);
split(label_tensor, dense_tensor, label_dense_tensor, label_dense_dim,
local_gpu->get_stream()); }}}else {
broadcast_buffer_->state.store(BufferState::ReadyForWrite);
}
return current_batch_size;
}
Copy the code
5.3.3 the split
Label and Dense have already been copied to GPU. This step is to divide into blocks and then operate with GPU thread.
template <typename TypeComp>
__global__ void split_kernel__(int batchsize, float* label_ptr, int label_dim, TypeComp* dense_ptr,
int dense_dim, const float* label_dense, int label_dense_dim) {
int idx = blockDim.x * blockIdx.x + threadIdx.x;
if (idx < batchsize * label_dense_dim) {
const int in_col = idx % label_dense_dim;
const int in_row = idx / label_dense_dim;
const int out_row = in_row;
if (in_col < label_dim) {
const int out_col = in_col;
label_ptr[out_row * label_dim + out_col] = label_dense[idx];
} else {
const intout_col = in_col - label_dim; dense_ptr[out_row * dense_dim + out_col] = label_dense[idx]; }}return;
}
template <typename TypeComp>
void split(Tensor2<float>& label_tensor, Tensor2<TypeComp>& dense_tensor,
const Tensor2<float>& label_dense_buffer, const int label_dense_dim,
cudaStream_t stream) {
// check the input size
assert(label_tensor.get_dimensions(to)0] == dense_tensor.get_dimensions(to)0]);
assert(label_tensor.get_num_elements() + dense_tensor.get_num_elements() ==
label_dense_buffer.get_num_elements());
const int batchsize = label_tensor.get_dimensions(to)0];
const int label_dim = label_tensor.get_dimensions(to)1];
const int dense_dim = dense_tensor.get_dimensions(to)1];
const int BLOCK_DIM = 256;
const int GRID_DIM = (label_dense_buffer.get_num_elements() - 1) / BLOCK_DIM + 1;
if (dense_dim > 0) {
split_kernel__<<<GRID_DIM, BLOCK_DIM, 0, stream>>>(
batchsize, label_tensor.get_ptr(), label_dim, dense_tensor.get_ptr(), dense_dim,
label_dense_buffer.get_ptr(), label_dense_dim);
} else if (dense_dim == 0) {
split_kernel__<<<GRID_DIM, BLOCK_DIM, 0, stream>>>(
batchsize, label_tensor.get_ptr(), label_dim, (TypeComp*)0.0, label_dense_buffer.get_ptr(),
label_dense_dim);
} else {
CK_THROW_(Error_t::WrongInput, "dense_dim < 0");
}
return;
}
Copy the code
This allows for subsequent training, which is read through finalize_Batch.
void finalize_batch(a) {
for (size_t i = 0; i < resource_manager_->get_local_gpu_count(a); i++) {const auto &local_gpu = resource_manager_->get_local_gpu(i);
CudaDeviceContext context(local_gpu->get_device_id());
CK_CUDA_THROW_(cudaStreamSynchronize(local_gpu->get_stream()));
}
broadcast_buffer_->state.store(BufferState::ReadyForWrite);
}
template <typename SparseType>
void AsyncReader<SparseType>::ready_to_collect() {
auto raw_device_id = reader_impl_->get_last_batch_device(a);auto local_gpu = resource_manager_->get_local_gpu(raw_device_id);
CudaDeviceContext ctx(local_gpu->get_device_id());
CK_CUDA_THROW_(cudaEventRecord(completion_events_[raw_device_id], local_gpu->get_stream()));
reader_impl_->finalize_batch(&completion_events_[raw_device_id]);
}
Copy the code
0 x06 summary
The specific logic is as follows. In this section, each buffer is copied according to its state of ReadyForRead and ReadyForWrite. The final sparse parameter embedding is at DataReaderOutput, that is, subsequent computation on GPU starts from output.
0xEE Personal information
★★★★ Thoughts on life and technology ★★★★★
Wechat official account: Rosie’s Thoughts
0 XFF reference
Developer.nvidia.com/blog/introd…
Developer.nvidia.com/blog/announ…
Developer.nvidia.com/blog/accele…
Read HugeCTR source code
How does embedding propagate back
Web.eecs.umich.edu/~justincj/t…
Sparse matrix storage format summary + storage efficiency comparison :COO,CSR,DIA,ELL,HYB