Before detailing the various strategies of TensorFlow distribution, we first need to look at the foundation of distribution: the distributed environment. Only lay a solid foundation, in order to the greatest extent in the later analysis of the work to clear away obstacles, get twice the result with half the effort. This article reviews the static logic of the Master.
The other articles in this series are:
Heterogeneous Distribute Learning based on TensorFlow distributed thesis [翻译
Implementation of Control Flow in TensorFlow
TensorFlow Distributed environment (1) — overall architecture
1. An overview
Two RPC services, MasterService and WorkerService, are running on the Server. If the Client connects to the Server, the Server is the Master, and the Client accesses the MasterService (MasterService coordinates and controls the execution of multiple WorkerServices).
The specific implementation of the Master role is the Master Service. Master Service is a GRPC Service that interacts with a series of remote distributed devices to coordinate multiple worker services.
- The Master Service corresponding to the “/ / tensorflow/core/protobuf/master_service proto”, its internal CreateSession, RunStep interface, All TensorFlow servers implement the Master Service.
- Clients can interact with the Master Service to perform distributed TensorFlow calculations.
- A Master Service tracks multiple “Master sessions”. Each Master Session encapsulates a computed graph and its associated state.
- The Master session runs on the Master. After the session is established, the Master returns a handle to the client. The handle can be used to associate the client with the Master session.
- Each Master session usually corresponds to a client session. The client can send an initial graph to the master by calling CreateSession and add nodes to the graph by calling ExtendSession.
- A Master is both a conceptual role, such as a Master node, and a concrete Master class.
2. The interface
2.1 Interface Specifications
The Client invokes the MasterService through GrpcSession. Since it is an RPC Service, there needs to be an interface specification between the Client and MasterService. This specification is defined in the master_service.proto file, which defines the message body of each interface.
service MasterService {
// Creates a session.
rpc CreateSession(CreateSessionRequest) returns (CreateSessionResponse);
// Extends a session.
rpc ExtendSession(ExtendSessionRequest) returns (ExtendSessionResponse);
// Prepares future partial run calls.
rpc PartialRunSetup(PartialRunSetupRequest) returns (PartialRunSetupResponse);
// Drives the graph computation.
rpc RunStep(RunStepRequest) returns (RunStepResponse);
// Closes a session.
rpc CloseSession(CloseSessionRequest) returns (CloseSessionResponse);
// List the devices usable by the master.
rpc ListDevices(ListDevicesRequest) returns (ListDevicesResponse);
// Close and abandon all existing sessions. Ongoing computations
// will no longer affect fresh ones via the resources in containers listed in
// the ResetRequest. See ResetRequest for more details.
rpc Reset(ResetRequest) returns (ResetResponse);
// Registers a callable for execution with RunCallable.
rpc MakeCallable(MakeCallableRequest) returns (MakeCallableResponse);
// Executes a callable registered with MakeCallable.
rpc RunCallable(RunCallableRequest) returns (RunCallableResponse);
// Frees resources associated with a callable registered with MakeCallable.
rpc ReleaseCallable(ReleaseCallableRequest) returns (ReleaseCallableResponse);
}
Copy the code
2.2 MasterInterface
The Client uses the MasterInterface to obtain the remote MasterService service. MasterInterface is an abstract interface used by clients to communicate with TensorFlow Master Service. This interface supports both rPC-based master implementations and in-process master implementations that do not require RPC round-trips. All MasterInterface interfaces are synchronous interfaces, so that the Client can call the services provided by the remote MasterService just like calling local functions.
There are two implementations of MasterInterface, both used to communicate with the Master service.
- LocalMaster is used for direct communication between processes. In this case, the Client and Master are in the same process.
- GrpcRemoteMaster uses Grpc to communicate with the Master service. The Client and Master are deployed in two different processes.
- You can call the factory method NewGrpcMaster to generate an instance of GrpcRemoteMaster.
- GrpcRemoteMaster implements the gRPC client. It accesses the MasterService on the remote Master through the Stub. The specific service is GrpcMasterService.
- Because MasterInterface is a synchronous interface, the Client accesses MasterService as if it were a local function.
class MasterInterface {
public:
virtual ~MasterInterface() {}
virtual Status CreateSession(CallOptions* call_options,
const CreateSessionRequest* request,
CreateSessionResponse* response) = 0;
virtual Status ExtendSession(CallOptions* call_options,
const ExtendSessionRequest* request,
ExtendSessionResponse* response) = 0;
virtual Status PartialRunSetup(CallOptions* call_options,
const PartialRunSetupRequest* request,
PartialRunSetupResponse* response) {
return errors::Unimplemented("Partial run not implemented for this master");
}
virtual Status RunStep(CallOptions* call_options, RunStepRequestWrapper* request, MutableRunStepResponseWrapper* response) = 0;
virtual Status RunStep(CallOptions* call_options,
const RunStepRequest* request,
RunStepResponse* response) {
std::unique_ptr<RunStepRequestWrapper> wrapped_request(
new ProtoRunStepRequest(request));
std::unique_ptr<MutableRunStepResponseWrapper> wrapped_response(
new NonOwnedProtoRunStepResponse(response));
return RunStep(call_options, wrapped_request.get(), wrapped_response.get());
}
virtual MutableRunStepRequestWrapper* CreateRunStepRequest(a) {
MutableProtoRunStepRequest* ret = new MutableProtoRunStepRequest;
ret->request_.set_request_id(GetUniqueRequestId());
return ret;
}
virtual MutableRunStepResponseWrapper* CreateRunStepResponse(a) {
return new OwnedProtoRunStepResponse;
}
virtual Status CloseSession(CallOptions* call_options,
const CloseSessionRequest* request,
CloseSessionResponse* response) = 0;
virtual Status ListDevices(CallOptions* call_options,
const ListDevicesRequest* request,
ListDevicesResponse* response) = 0;
virtual Status Reset(CallOptions* call_options, const ResetRequest* request,
ResetResponse* response) = 0;
virtual Status MakeCallable(CallOptions* call_options,
const MakeCallableRequest* request,
MakeCallableResponse* response) = 0;
virtual Status RunCallable(CallOptions* call_options,
const RunCallableRequest* request,
RunCallableResponse* response) = 0;
virtual Status ReleaseCallable(CallOptions* call_options,
const ReleaseCallableRequest* request,
ReleaseCallableResponse* response) = 0;
protected:
// NOTE: This should only be called by implementations of this
// interface whose CreateRunStepResponse() method returns a
// proto-based wrappers for the RunStepResponse message.
RunStepResponse* get_proto_from_wrapper( MutableRunStepResponseWrapper* wrapper) {
return wrapper->get_proto();
}
};
Copy the code
If the Client and Master are in the same process, use LocalMaster directly. Otherwise, use GrpcRemoteMaster to access the remote GrpcMasterService using gRPC. The two rectangles encapsulating the Master represent the actual Master class, which implements specific Master functions.
Figure 1. Master logical structure
2.3 call
The following pseudocode shows how the client interacts with the master. This is actually the process of using GrpcRemoteMaster to interact with the remote Master server via gRPC in distributed mode.
stub = NewStub("/job:mnist/replica:0/task:0")
{handle} = stub->CreateSession({graph_def})
do {
stub->RunStep({handle, {feeds}, {fetches}})
// The client can evaluate a predicate locally, based on the
// result of fetches, to determine whether to terminate. For
// example, it might fetch the loss and evaluate whether it is less
// than some threshold.
} while (!should_stop({fetches}));
stub->CloseSession({handle})
Copy the code
3. LocalMaster
When called by the Client, GrpcSession uses LocalMaster to get the LocalMaster. If not, GrpcRemoteMaster is used. In this case, there is no cross-node between the Client and master. LocalMaster enables the Client and master to communicate directly in the process, thus providing more efficient master service to clients in the same process.
3.1 define
LocalMaster is defined as follows, and the main member variable is master_IMPL_. LocalMaster is just a shell that is forwarded directly to master_IMPL_. Master_impl_ is a class that is called directly locally when the Client and master do not cross nodes.
class LocalMaster : public MasterInterface {
private:
Master* master_impl_; // Not owned.
const int64 default_timeout_in_ms_;
// See LocalMaster::Lookup for the factory function that creates
// objects of this type.
LocalMaster(Master* master_impl, const int64 default_timeout_in_ms);
TF_DISALLOW_COPY_AND_ASSIGN(LocalMaster);
};
Copy the code
3.2 registered
LocalMaster has a static variable local_master_registry_ for registration.
typedef std::unordered_map<string, MasterInfo> LocalMasterRegistry;
LocalMasterRegistry* local_master_registry(a) {
static LocalMasterRegistry* local_master_registry_ = new LocalMasterRegistry;
return local_master_registry_;
}
Copy the code
At GrpcServer initialization, register the Master generated by target=” GRPC ://” with the local LocalMaster.
LocalMaster::Register(target(), master_impl_.get(), config.operation_timeout_in_ms());
Copy the code
Register master with local_master_registry_ static variable.
/* static */
void LocalMaster::Register(const string& target, Master* master,
int64 default_timeout_in_ms) {
mutex_lock l(*get_local_master_registry_lock());
local_master_registry() - >insert(
{target, MasterInfo(master, default_timeout_in_ms)});
}
Copy the code
3.3 find
When the GrpcSession::Create method is called, if the Client and Master are in the same process and the Lookup Lookup can find the registered Master locally, a LocalMaster return is generated. At the same time, LocalMaster’s master_impl_ is configured to be the Master found. If not, it returns null, and the GrpcSession::Create method creates a GrpcRemoterMaster that interacts with the remote Master.
/* static */
std::unique_ptr<LocalMaster> LocalMaster::Lookup(const string& target) {
std::unique_ptr<LocalMaster> ret;
mutex_lock l(*get_local_master_registry_lock());
auto iter = local_master_registry() - >find(target);
if(iter ! =local_master_registry() - >end()) {
ret.reset(new LocalMaster(iter->second.master,
iter->second.default_timeout_in_ms));
}
return ret;
}
Copy the code
The following is the same process, Lookup can find the case, generate LocalMaster for local operations.
Figure 2 Operating with the master process
Let’s look at the different processes. GrpcSession::Create: Null GrpcSession::Create: Null GrpcSession::Create: Null GrpcSession::Create: Null The GrpcSession::Create method performs the second step, creating the GrpcRemoteMaster for remote interaction. In process 2, LocalMaster does not point to any Master because no client called GrpcSession::Create.
Figure 3. Cross-process master operation
3.4 features
LocalMaster calls its internal member variable master_IMPL_ to complete the business function.
Status LocalMaster::CreateSession(CallOptions* call_options,
const CreateSessionRequest* request,
CreateSessionResponse* response) {
Notification n;
Status ret;
master_impl_->CreateSession(request, response, [&n, &ret](const Status& s) {
ret.Update(s);
n.Notify(a); });TF_RETURN_IF_ERROR(
WaitForNotification(call_options, default_timeout_in_ms_, &n));
return ret;
}
Status LocalMaster::ExtendSession(CallOptions* call_options,
const ExtendSessionRequest* request,
ExtendSessionResponse* response) {
Notification n;
Status ret;
master_impl_->ExtendSession(request, response, [&n, &ret](const Status& s) {
ret.Update(s);
n.Notify(a); });TF_RETURN_IF_ERROR(
WaitForNotification(call_options, default_timeout_in_ms_, &n));
return ret;
}
Status LocalMaster::RunStep(CallOptions* call_options, RunStepRequestWrapper* request, MutableRunStepResponseWrapper* response) {
Notification n;
Status ret;
master_impl_->RunStep(call_options, request, response,
[&n, &ret](const Status& s) {
ret.Update(s);
n.Notify(a); });TF_RETURN_IF_ERROR(
WaitForNotification(call_options, default_timeout_in_ms_, &n));
return ret;
}
Copy the code
4. GrpcRemoteMaster
GrpcRemoteMaster is an implementation of the gRPC client that eventually invokes the GrpcMasterService service on the remote Master through the Stub, acting as if it were a local function call. The remote GrpcMasterService implements all interfaces defined by The MasterService and is the real entity of the MasterService. When creating an instance of GrpcRemoteMaster, specify the address and port of the Master service through target and create the corresponding RPC channel. Both GrpcSession and GrpcRemoteMaster are strictly part of the Client implementation.
4.1 define
GrpcRemoteMaster is defined as follows, mainly using MasterServiceStub.
// GrpcRemoteMaster is an implementation of the MasterInterface
// that uses gRPC to talk to the Master service.
class GrpcRemoteMaster : public MasterInterface {
using MasterServiceStub = grpc::MasterService::Stub;
public:
explicit GrpcRemoteMaster(const SharedGrpcChannelPtr& client_channel)
: stub_(grpc::MasterService::NewStub(client_channel)) {}
~GrpcRemoteMaster(a)override {}
std::unique_ptr<MasterServiceStub> stub_;
};
Copy the code
4.2 features
GrpcRemoteMaster simply calls the corresponding interface of the remote Master service through a stub of the gRPC.
2 CreateSession
We use CreateSession as an example to complete the functionality using CallWithRetry.
Status CreateSession(CallOptions* call_options,
const CreateSessionRequest* request,
CreateSessionResponse* response) override {
return CallWithRetry(call_options, request, response,
&MasterServiceStub::CreateSession);
}
Copy the code
The CallWithRetry code is as follows, which in turn calls s = FromGrpcStatus((stub_.get()->*pfunc)(& CTX, * Request, Response)) to get the Stub to complete the function.
template <typename Request, typename Response>
Status CallWithRetry(CallOptions* call_options, const Request* request,
Response* response,
::grpc::Status (MasterServiceStub::*pfunc)(
::grpc::ClientContext*, const Request&, Response*),
string trace_string = {}) {
absl::Duration timeout = absl::Milliseconds(call_options->GetTimeout());
absl::Time expired_time = absl::FromUnixMicros(Env::Default() - >NowMicros());
if (timeout > absl::ZeroDuration()) {
expired_time += timeout;
}
Status s;
for (int num_retries = 0;; ++num_retries) {
::grpc::ClientContext ctx;
std::unique_ptr<profiler::TraceMe> trace;
if(! trace_string.empty()) {
trace.reset(NewTraceRpc(trace_string, &ctx));
}
ctx.set_fail_fast(false);
if (timeout > absl::ZeroDuration()) {
// We do not modify the timeout here to match legacy behavior. However,
// this could violate the contract of tensorflow::Session. If we retry
// an RPC just before the deadline is exceeded, we will still set the
// timeout to the original value. This leads to the overall timeout
// being double what was expected.
ctx.set_deadline(absl::ToChronoTime(absl::Now() + timeout));
}
s = FromGrpcStatus((stub_.get()->*pfunc)(&ctx, *request, response));
if(! errors::IsUnavailable(s)) {
return s;
}
// TODO(b/117162170): we may want to make this configurable.
constexpr int kMaxRetries = 10;
if (num_retries >= kMaxRetries) {
return s;
}
absl::Time now = absl::FromUnixMicros(Env::Default() - >NowMicros());
const absl::Time deadline_with_backoff =
now + absl::Microseconds(ComputeBackoffMicroseconds(num_retries));
// Wait for a short period of time before retrying the RPC. If our
// backoff would put us past the RPC deadline, we truncate it to ensure
// our RPC starts before the deadline.
const auto backoff_until = (timeout <= absl::ZeroDuration() ||
expired_time > deadline_with_backoff)
? deadline_with_backoff
: expired_time;
Env::Default() - >SleepForMicroseconds(
absl::ToInt64Microseconds(backoff_until - now));
now = absl::FromUnixMicros(Env::Default() - >NowMicros());
if (now > expired_time && timeout > absl::ZeroDuration()) {
// If timeout_in_ms is set, exit the retry loop on timeout.
return errors::DeadlineExceeded(ctx.debug_error_string()); }}}Copy the code
4.2.2 Master Service Stub
Next we see a Stub, which is based on “/ / tensorflow/core/protobuf/master_service proto” GRPC implementation to use.
class Stub final : public StubInterface {
public:
Stub(conststd::shared_ptr< ::grpc::ChannelInterface>& channel); : :grpc::Status CreateSession(::grpc::ClientContext* context,
const CreateSessionRequest& request,
CreateSessionResponse* response) override; : :grpc::Status ExtendSession(::grpc::ClientContext* context,
const ExtendSessionRequest& request,
ExtendSessionResponse* response) override; : :grpc::Status PartialRunSetup(::grpc::ClientContext* context,
const PartialRunSetupRequest& request,
PartialRunSetupResponse* response) override; : :grpc::Status RunStep(::grpc::ClientContext* context,
const RunStepRequest& request,
RunStepResponse* response) override; : :grpc::Status CloseSession(::grpc::ClientContext* context,
const CloseSessionRequest& request,
CloseSessionResponse* response) override; : :grpc::Status ListDevices(::grpc::ClientContext* context,
const ListDevicesRequest& request,
ListDevicesResponse* response) override; : :grpc::Status Reset(::grpc::ClientContext* context,
const ResetRequest& request,
ResetResponse* response) override; : :grpc::Status MakeCallable(::grpc::ClientContext* context,
const MakeCallableRequest& request,
MakeCallableResponse* response) override; : :grpc::Status RunCallable(::grpc::ClientContext* context,
const RunCallableRequest& request,
RunCallableResponse* response) override; : :grpc::Status ReleaseCallable(::grpc::ClientContext* context,
const ReleaseCallableRequest& request,
ReleaseCallableResponse* response) override;
private:
std::shared_ptr< ::grpc::ChannelInterface> channel_;
const ::grpc::internal::RpcMethod rpcmethod_CreateSession_;
const ::grpc::internal::RpcMethod rpcmethod_ExtendSession_;
const ::grpc::internal::RpcMethod rpcmethod_PartialRunSetup_;
const ::grpc::internal::RpcMethod rpcmethod_RunStep_;
const ::grpc::internal::RpcMethod rpcmethod_CloseSession_;
const ::grpc::internal::RpcMethod rpcmethod_ListDevices_;
const ::grpc::internal::RpcMethod rpcmethod_Reset_;
const ::grpc::internal::RpcMethod rpcmethod_MakeCallable_;
const ::grpc::internal::RpcMethod rpcmethod_RunCallable_;
const ::grpc::internal::RpcMethod rpcmethod_ReleaseCallable_;
};
Copy the code
The corresponding remote method is as follows:
static const char* grpcMasterService_method_names[] = {
"/tensorflow.MasterService/CreateSession"."/tensorflow.MasterService/ExtendSession"."/tensorflow.MasterService/PartialRunSetup"."/tensorflow.MasterService/RunStep"."/tensorflow.MasterService/CloseSession"."/tensorflow.MasterService/ListDevices"."/tensorflow.MasterService/Reset"."/tensorflow.MasterService/MakeCallable"."/tensorflow.MasterService/RunCallable"."/tensorflow.MasterService/ReleaseCallable"};std::unique_ptr<MasterService::Stub> MasterService::NewStub(
const std::shared_ptr< ::grpc::ChannelInterface>& channel,
const ::grpc::StubOptions& options) {
std::unique_ptr<MasterService::Stub> stub(new MasterService::Stub(channel));
return stub;
}
Copy the code
The Stub internally invokes the GRPC to complete the sending function.
::grpc::Status MasterService::Stub::CreateSession(
::grpc::ClientContext* context, const CreateSessionRequest& request,
CreateSessionResponse* response) {
return ::grpc::internal::BlockingUnaryCall(
channel_.get(), rpcmethod_CreateSession_, context, request, response);
}
Copy the code
So, if it was GrpcRemoteMaster, the call flow would be: GrpcRemoteMaster receives the GRPC Session request and forwards it to the GRPC Master Service. During this period, a series of processes including GrpcSession -> GrpcRemoteMaster -> GrpcMasterService -> Master -> MasterSession were experienced.
4.3 create
When a GrpcSession is set up, the create method looks for the Master. And if you find it, you just go back to LocalMaster, which we talked about earlier. If Lookup cannot be found. So NewGrpcMaster is called to generate a GrpcRemoteMaster.
/* static */
Status GrpcSession::Create(const SessionOptions& options,
std::unique_ptr<GrpcSession>* out_session) {
std::unique_ptr<GrpcSession> session(new GrpcSession(options));
std::unique_ptr<MasterInterface> master;
// For testing, we enable the client to disable the use of the local
// master registry, so that the RPC stack is exercised.
if(! options.config.rpc_options().use_rpc_for_inprocess_master()) {
master = LocalMaster::Lookup(options.target);
}
if(! master) { SharedGrpcChannelPtr master_channel;TF_RETURN_IF_ERROR(
NewHostPortGrpcChannel(options.target.substr(kSchemePrefixLength),
&options.config.rpc_options(), &master_channel));
// Create GrpcRemoteMaster to interact with the remote Master
master.reset(NewGrpcMaster(master_channel));
} else {
session->is_local_ = true;
}
session->SetRemoteMaster(std::move(master));
*out_session = std::move(session);
return Status::OK(a); }Copy the code
The NewGrpcMaster method is as follows:
MasterInterface* NewGrpcMaster(const SharedGrpcChannelPtr& channel) {
return new GrpcRemoteMaster(channel);
}
Copy the code
5. GrpcMasterService
GrpcMasterService implements the MasterService corresponding to RPC. GrpcMasterService will:
- Knowing in advance what local devices are available to the customer, you can also find remote devices and track their statistics.
- Maintains/manages masterSessions, which invoke local or remote devices to perform computations on received graphs.
- The session function is to analyze the received graph, prune it, put the node on the available device, and perform graph calculation on the worker by calling RunGraph.
5.1 create
In GrpcServer, master_service_ is a variable of type GrpcMasterService.
// Create Master and GrpcMasterService
master_impl_ = CreateMaster(&master_env_);
master_service_ = NewGrpcMasterService(master_impl_.get(), config, &builder);
Copy the code
GrpcServer uses the master_thread_ thread to execute the HandleRPCsLoop method of GrpcMasterService.
master_thread_.reset(
env_->StartThread(ThreadOptions(), "TF_master_service"[this] { master_service_->HandleRPCsLoop(a); }));Copy the code
5.2 define
GrpcMasterService is defined as follows. Master_impl_ is the master pointer passed by the Server and is an instance of the master class:
class GrpcMasterService : public AsyncServiceInterface {
Master* master_impl_ = nullptr; // Not owned.
std::unique_ptr<::grpc::ServerCompletionQueue> cq_;
grpc::MasterService::AsyncService master_service_;
mutex mu_;
bool is_shutdown_ TF_GUARDED_BY(mu_);
const ConfigProto default_session_config_;
::grpc::Alarm* shutdown_alarm_ = nullptr;
template <class RequestMessage.class ResponseMessage>
using MasterCall = Call<GrpcMasterService, grpc::MasterService::AsyncService,
RequestMessage, ResponseMessage>;
}
Copy the code
When GrpcMasterService is initialized, the message queue cq_ of GRPC is obtained.
GrpcMasterService(Master* master, const ConfigProto& default_session_config,
::grpc::ServerBuilder* builder)
: master_impl_(master),
is_shutdown_(false),
default_session_config_(default_session_config) {
builder->RegisterService(&master_service_);
cq_ = builder->AddCompletionQueue(a); }Copy the code
5.3 the main loop
As mentioned earlier, the master_thread_ thread executes the HandleRPCsLoop method of GrpcMasterService. HandleRPCsLoop calls the GrpcMasterService internal function to process RPC messages. The main loop HandleRPCsLoop code is as follows:
void HandleRPCsLoop() override {
ENQUEUE_REQUEST(CreateSession, true);
ENQUEUE_REQUEST(ExtendSession, false);
for (int i = 0; i < 100; ++i) {
ENQUEUE_REQUEST(PartialRunSetup, false);
ENQUEUE_REQUEST(RunStep, true);
}
ENQUEUE_REQUEST(CloseSession, false);
ENQUEUE_REQUEST(ListDevices, false);
ENQUEUE_REQUEST(Reset, false);
ENQUEUE_REQUEST(MakeCallable, false);
for (int i = 0; i < 100; ++i) {
ENQUEUE_REQUEST(RunCallable, true);
}
ENQUEUE_REQUEST(ReleaseCallable, false);
void* tag;
bool ok;
while (cq_->Next(&tag, &ok)) {
UntypedCall<GrpcMasterService>::Tag* callback_tag =
static_cast<UntypedCall<GrpcMasterService>::Tag*>(tag);
if (callback_tag) {
callback_tag->OnCompleted(this, ok);
} else {
// NOTE(mrry): A null callback_tag indicates that this is// the shutdown alarm. cq_->Shutdown(); }}}Copy the code
There are some best practices in the code above, specifically around ENQUEUE_REQUEST:
- This ->cq_ is the GRPC queue.
- The ENQUEUE_REQUEST macro creates a new request for the given RPC method name (for example, ENQUEUE_REQUEST(GetStatus, false) generates a GetStatus request), These requests will be queued on this->cq_.
- If a task is called by a task-response handler, the handler calls ENQUEUE_REQUEST() to add the same call to the queue. This ensures that the completion queue CQ_ has enough tasks to handle the incoming request. This processing will not block and the overall processing speed will increase.
- The while loop at the end of the code reads the contents of the gRPC queue, which is the final piece of work after the gRPC call.
#define ENQUEUE_REQUEST(method, supports_cancel) \
do { \
mutex_lock l(mu_); \
if(! is_shutdown_) { \ Call<GrpcMasterService, grpc::MasterService::AsyncService, \ method##Request, method##Response>:: \ EnqueueRequest(&master_service_, cq_.get(), \ &grpc::MasterService::AsyncService::Request##method, \ &GrpcMasterService::method##Handler, \ (supports_cancel)); \ } \ } while (0)
Copy the code
5.4 Message Processing
In the concrete message response, master_IMPL_ is called for processing, and when the Master process is complete, the handler calls back a lambda expression to return the response message to the Client. As you can see, the code inserts a request of the same type using ENQUEUE_REQUEST at the end, such as the CreateSessionResponse returned to the Client below.
// RPC handler for creating a session.
void CreateSessionHandler(
MasterCall<CreateSessionRequest, CreateSessionResponse>* call) {
CreateSessionRequest* rewritten_req = new CreateSessionRequest;
rewritten_req->mutable_config()->MergeFrom(default_session_config_);
rewritten_req->MergeFrom(call->request);
master_impl_->CreateSession(rewritten_req, &call->response,
[call, rewritten_req](const Status& status) {
call->SendResponse(ToGrpcStatus(status));
delete rewritten_req;
});
ENQUEUE_REQUEST(CreateSession, true);
}
Copy the code
5.5 features
GrpcMasterService provides the following apis:
static const char* grpcMasterService_method_names[] = {
"/tensorflow.MasterService/CreateSession"."/tensorflow.MasterService/ExtendSession"."/tensorflow.MasterService/PartialRunSetup"."/tensorflow.MasterService/RunStep"."/tensorflow.MasterService/CloseSession"."/tensorflow.MasterService/ListDevices"."/tensorflow.MasterService/Reset"."/tensorflow.MasterService/MakeCallable"."/tensorflow.MasterService/RunCallable"."/tensorflow.MasterService/ReleaseCallable"};Copy the code
Let’s cite three specific functions for analysis:
5.5.1 CreateSession
CreateSessionRequest messages contain computations and configuration information set by the Client. After the Master receives the request, it creates a MasterSession instance for the Client and a session_handle that uniquely identifies the MasterSession instance. This is done using the Master member variable STD ::unordered_map<string, MasterSession*> sessions_, which is of type string.
The Master returns a CreateSessionResponse message to the Client. The CreateSessionResponse message carries:
- Session_handle. The GrpcSession of the Client is associated with the MasterSession of the Master. In subsequent interactions, the Client will carry the session_handle in messages. Subsequently, in all interactions between the Client and the Master, By carrying session_handle in the request message, the Master will find the corresponding MasterSession instance in STD ::unordered_map
,>
sessions_.
- The initial graph_version. It is used for subsequent ExtendSession operations to append new nodes to the original graph.
Figure 4 CreateSession
Specific response codes are as follows:
// RPC handler for creating a session.
void CreateSessionHandler( MasterCall
* call)
,> {
CreateSessionRequest* rewritten_req = new CreateSessionRequest;
rewritten_req->mutable_config() - >MergeFrom(default_session_config_);
rewritten_req->MergeFrom(call->request);
master_impl_->CreateSession(rewritten_req, &call->response,
[call, rewritten_req](const Status& status) {
call->SendResponse(ToGrpcStatus(status));
delete rewritten_req;
});
ENQUEUE_REQUEST(CreateSession, true);
}
Copy the code
5.5.2 ExtendSession
After a Session is established, the Client can use ExtendSession to tell the Master that it needs to extend the size of the original compute graph (only subgraphs can be added, but cannot be modified or deleted).
In the request message ExtendSessionRequest there are:
- Session_handle: used to find which MasterSession instance;
- Graph_def: nodes that need to be added to the graph;
- Current_graph_version: the version of the compute graph that you want to expand;
New_graph_version is returned in the response message ExtendSessionResponse for the next ExtendSession operation.
Figure 5 ExtendSession
The specific code is as follows:
// RPC handler for extending a session.
void ExtendSessionHandler( MasterCall
* call)
,> {
master_impl_->ExtendSession(&call->request, &call->response,
[call](const Status& status) {
call->SendResponse(ToGrpcStatus(status));
});
ENQUEUE_REQUEST(ExtendSession, false);
}
Copy the code
5.5.3 RunStep
The client executes RunStep iteratively, requesting the message RunStepRequest with more variables, such as:
- Session_handle: used to find which MasterSession instance;
- Feed: NamedTensor list for input;
- Fetch: List of names for the Tensor to output;
- Target: execute the node list.
The response message RunStepResponse mainly carries:
- Tensor: the output of the tensor list;
Figure 6 RunStep
The message definition is as follows:
message RunStepRequest {
// REQUIRED: session_handle must be returned by a CreateSession call
// to the same master service.
string session_handle = 1;
// Tensors to be fed in the step. Each feed is a named tensor.
repeated NamedTensorProto feed = 2;
// Fetches. A list of tensor names. The caller expects a tensor to
// be returned for each fetch[i] (see RunStepResponse.tensor). The
// order of specified fetches does not change the execution order.
repeated string fetch = 3;
// Target Nodes. A list of node names. The named nodes will be run
// to but their outputs will not be fetched.
repeated string target = 4;
// Options for the run call.
RunOptions options = 5;
// Partial run handle (optional). If specified, this will be a partial run
// execution, run up to the specified fetches.
string partial_run_handle = 6;
// If true then some errors, e.g., execution errors that have long
// error messages, may return an OK RunStepResponse with the actual
// error saved in the status_code/status_error_message fields of the
// response body. This is a workaround since the RPC subsystem may
// truncate long metadata messages.
bool store_errors_in_response_body = 7;
// Unique identifier for this request. Every RunStepRequest must
// have a unique request_id, and retried RunStepRequest must have
// the same request_id. If request_id is zero, retry detection is disabled.
int64 request_id = 8;
}
message RunStepResponse {
// NOTE: The order of the returned tensors may or may not match
// the fetch order specified in RunStepRequest.
repeated NamedTensorProto tensor = 1;
// Returned metadata if requested in the options.
RunMetadata metadata = 2;
// If store_errors_in_response_body is true in the request, then
// optionally the server may return an OK status for the RPC and
// fill the true status into the fields below, to allow for messages
// that are too long to fit in metadata.
error.Code status_code = 3;
string status_error_message = 4;
}
Copy the code
The specific code is as follows:
// RPC handler for running one step in a session.
void RunStepHandler(MasterCall<RunStepRequest, RunStepResponse>* call) {
auto* trace = TraceRpc("RunStep/Server", call->client_metadata());
CallOptions* call_opts = new CallOptions;
if (call->request.options().timeout_in_ms(a) >0) {
call_opts->SetTimeout(call->request.options().timeout_in_ms());
} else {
call_opts->SetTimeout(default_session_config_.operation_timeout_in_ms());
}
RunStepRequestWrapper* wrapped_request =
new ProtoRunStepRequest(&call->request);
MutableRunStepResponseWrapper* wrapped_response =
new NonOwnedProtoRunStepResponse(&call->response);
call->SetCancelCallback([call_opts]() { call_opts->StartCancel(a); }); master_impl_->RunStep(
call_opts, wrapped_request, wrapped_response,
[call, call_opts, wrapped_request, trace](const Status& status) {
call->ClearCancelCallback(a);delete call_opts;
delete wrapped_request;
delete trace;
if (call->request.store_errors_in_response_body() && !status.ok()) {
call->response.set_status_code(status.code());
call->response.set_status_error_message(status.error_message());
call->SendResponse(ToGrpcStatus(Status::OK()));
} else {
call->SendResponse(ToGrpcStatus(status)); }});ENQUEUE_REQUEST(RunStep, true);
}
Copy the code
6. Business implementation Master class
6.1 create
As mentioned earlier, GrpcServer creates instances of the Master class.
std::unique_ptr<Master> GrpcServer::CreateMaster(MasterEnv* master_env) {
return std::unique_ptr<Master>(new Master(master_env, 0.0));
}
Copy the code
In this way, after receiving the message from the Client, the GrpcMasterService thread will call master_IMPL_ to process the specific message response, which is to delegate the business logic to the Master class. So let’s see what the Master does.
// RPC handler for creating a session.
void CreateSessionHandler( MasterCall
* call)
,> {
CreateSessionRequest* rewritten_req = new CreateSessionRequest;
rewritten_req->mutable_config() - >MergeFrom(default_session_config_);
rewritten_req->MergeFrom(call->request);
master_impl_->CreateSession(rewritten_req, &call->response,
[call, rewritten_req](const Status& status) {
call->SendResponse(ToGrpcStatus(status));
delete rewritten_req;
});
ENQUEUE_REQUEST(CreateSession, true);
}
Copy the code
6.2 define
The Master is not MasterInterface derived class, its definition in tensorflow/core/distributed_runtime/Master. Cc. This can be seen from the member variable sessions_, which manages MasterSession.
class Master {
private:
typedef Master ME;
// Not owned.
MasterEnv* env_ = nullptr;
// Owned.
mutex mu_;
// shutdown_ is set to true by the dtor.
condition_variable shutdown_cv_;
bool shutdown_ TF_GUARDED_BY(mu_) = false;
Thread* gc_thread_;
// Maps session handles to sessions.
std::unordered_map<string, MasterSession*> sessions_ TF_GUARDED_BY(mu_);
// Moving average of step times.
MovingAverage last_1000_steps_ TF_GUARDED_BY(mu_);
// Cumulative number of steps executed.
int64 step_count_ TF_GUARDED_BY(mu_);
// If a session is not active for this many seconds, it will be
// closed automatically.
const double session_gc_seconds_;
// Used to track ids for incoming requests so we can detect duplicates.
RecentRequestIds recent_request_ids_;
};
Copy the code
6.3 features
Let’s recall what we mentioned earlier.
The core of distributed operation is how to operate the computing graph, but the computing function is split into three roles: Client, Master and Worker.
The Client constructs the graph and the Worker performs the computation, but how does the Worker know what to calculate? TensorFlow inserts a Master role between the two for coordination and scheduling.
Although Master is not a derived class of MasterInterface, it implements the specific services of MasterService. Master is responsible for:
- The Master knows in advance which devices are available locally for use by the customer. It also discovers remote devices and tracks statistics about these remote devices.
- A Master contains multiple “Master sessions”. Each Master Session encapsulates a computed graph and its associated state.
- The main session will:
- Simplify and optimize computation diagrams, such as prune/split/insert send and receive operators.
- Coordinate/schedule resources. For example, which calculation should be run on which Device, specifically, according to the graph -> Partition -> Device strategy to divide the subgraph on the hardware Device.
- Each subgraph after segmentation is sent to each worker, and each subgraph corresponds to a MasterSession. And ultimately drive graph computation by launching RunGraph on the worker.
- Master maintains the state of the real-time graph computing session.
The static structure of the Master is now covered. We will discuss the specific Master functions later in the Session section.
recommended
Finally, two gods are highly recommended:
- [TensorFlow Internals] (github.com/horance-liu… TF internal implementation mechanism interested friends are to read, will definitely harvest.
- Home.cnblogs.com/u/deep-lear… It’s not just TensorFlow, but there are a lot of other areas that are at the forefront of the industry.
0xEE Personal information
★★★★ Thoughts on life and technology ★★★★★
Wechat official account: Rosie’s Thoughts
0 XFF reference
TensorFlow Internals
TensorFlow Architecture and Design: Overview
TensorFlow kernel analysis
TensorFlow Architecture and Design: OP Essentialism
TensorFlow whitepaper
Tensorflow Developer Summit 2017
Jcf94.com/2018/02/28/…
TensorFlow 拆包(五):Distributed
TensorFlow Architecture
Tensorflow (Tensorflow)
What are In-graph replication and between-graph replication?
TensorFlow (1): create a session
05tensorflow Distributed session
Section 8, configure distributed TensorFlow
TensorFlow Distributed TensorFlow
Distributed_runtime for tensorflow source code parsing
Distributed TensorFlow: A Gentle Introduction
This article explains the essential knowledge of Tensorflow distributed training
Placer, the Placement heuristic algorithm module in TensorFlow
Graph Partitioner for TensorFlow
A communication mechanism for TensorFlow
TensorFlow distributed pit mining
TensorFlow: Distributed execution of model optimization
Tensorflow architecture process]