0 x00 the

In the previous article, we introduced the overall architecture and Profile phases of PipeDream. In this article, we continue with the calculation partition phase. The function is to determine the running time of all layers based on the profile results, then use dynamic programming to partition the model into different stages, and obtain the number of replicas for each stage. The calculation results are as follows:

Pipelining parallelism other articles are linked below:

Deep learning pipeline parallel Gpipe(1)– pipeline basic implementation

Deep learning pipeline parallel GPipe (2) —– gradient accumulation

Deep learning pipeline parallel PipeDream(1)– Profile stage

0 x01 preface

1.1 Profile file

We first look at the profile file profiler/translation/profiles/GNMT/graph. TXT content, here only do.

Node1 -- Input0 -- forward_compute_time=0.000, backward_compute_time=0.000, activation_size=0.0, Parameter_size =0.000 node4 -- Embedding(32320, 1024, padding_idx=0) -- forward_compute_time=0.073, Backward_compute_time =6.949, activation_size=6291456.0, parameter_size=132382720.000 node5 -- EmuBidirLSTM((bidir): LSTM(1024, 1024, bidirectional=True) (layer1): LSTM(1024, 1024) (layer2): LSTM(1024, 1024)) -- forward_compute_time=5.247, backward_compute_time=0.016, activation_size=12582912.0, Parameter_size =67174400.000 node2 -- Input1 -- forward_compute_time=0.000, backward_compute_time=0.000, Activation_size =0.0, parameter_size=0.000 NODE6 -- Dropout(P =0.2) -- forward_compute_time=0.077, Backward_compute_time =0.196, activation_size=12582912.0, parameter_size=0.000 node7 -- LSTM(2048, 1024) -- forward_compute_time=3.190, backward_compute_time=5.348, activation_size=[6291456.0; 131072.0; 131072.0], Parameter_size =50364416.000 node8 -- __getitem__(0) -- forward_compute_time=0.000, backward_compute_time=0.000, Activation_size =6291456.0, parameter_size=0.000 node9 -- __getitem__(1) -- forward_compute_time=0.000, Backward_compute_time = 0.000, activation_size = 131072.0, Parameter_size =0.000 NODE10 -- Dropout(P =0.2) -- forward_compute_time=0.064, Backward_compute_time =0.128, Activation_size =6291456.0, parameter_size=0.000 node11 -- LSTM(1024, 1024) -- forward_compute_time=2.491, Backward_compute_time = 4.203, activation_size = [6291456.0, 131072.0, 131072.0]. Parameter_size =33587200.000 node12 -- __getitem__(0) -- forward_compute_time=0.000, backward_compute_time=0.000, Activation_size =6291456.0, parameter_size=0.000 node13 -- __getitem__(1) -- forward_compute_time=0.000, Backward_compute_time = 0.000, activation_size = 131072.0, Parameter_size =0.000 node14 -- Add -- forward_compute_time=0.000, backward_compute_time=0.000, Activation_size =6291456.0, parameter_size=0.000 NODE15 -- Dropout(P =0.2) -- forward_compute_time=0.059, Backward_compute_time =0.121, activation_size=6291456.0, parameter_size=0.000 node16 -- LSTM(1024, 1024) -- forward_compute_time=2.492, backward_compute_time=4.201, activation_size=[6291456.0; 131072.0; 131072.0], Parameter_size =33587200.000 node17 -- __getitem__(0) -- forward_compute_time=0.000, backward_compute_time=0.000, Activation_size = 6291456.0, parameter_size = 0.000... node1 -- node4 node4 -- node5 node2 -- node5 node5 -- node6 node6 -- node7 node7 -- node8 node7 -- node9 node8 -- node10  node10 -- node11 node11 -- node12 node11 -- node13 node12 -- node14 node8 -- node14 node14 -- node15 node15 -- node16 node16 -- node17 node16 -- node18 node17 -- node19 node14 -- node19 ......Copy the code

1.2 General Ideas

We mentioned a few challenges earlier, including:

  • How to efficiently divide the pipeline.

    • Model idiosyncrasies and hardware topologies reduce efficiency. Allocation algorithms must also consider model characteristics and hardware topology.
    • Excessive communication between machines reduces hardware efficiency.
  • How to prevent pipeline bottlenecks.

    • According to the barrel principle, the throughput of a pipeline is determined by the throughput of the slowest link on the pipeline. Therefore, it is necessary to ensure that all stages in the pipeline take roughly the same computation time, otherwise the slowest stage will become the bottleneck of the entire pipeline.

So when dividing layers into phases across machines, PipeDream’s automatic partitioning algorithm must ensure that each phase performs roughly the same total amount of work. At the same time, it is necessary to ensure that the amount of data communicated between stages is as small as possible to avoid communication interruption.

The overall goal of PipeDream’s automatic partition algorithm is to output a balanced pipeline. The algorithm is as follows:

  • The DNN layer is divided into stages so that each stage is completed at roughly the same rate, that is, it takes roughly the same amount of computing time.
  • Try to minimize communication between workers in a topology-aware manner (e.g., send larger outputs to higher-bandwidth links if possible).
  • Since DNN can not always evenly distribute among available workers, PipeDream allows replication of a stage, in which multiple workers are used for data parallelism, in order to further improve load balancing.

This partition problem is equivalent to minimizing the time spent in the slowest stage of the assembly line, and has the attribute of optimal subproblem: given worker’s workload, the assembly line maximizing throughput is composed of a series of subassembly lines, each of which maximizes its output for a smaller worker’s workload. So PipeDream uses dynamic programming to find the optimal solution.

The corresponding architecture diagram is shown as follows:

Let’s take a look at the preparation before calculating partitions: diagram correlation work and building antichains.

0 x02 figure

The definition of a graph is in the graph/graph.py file. There are two main data structures: graph and Node.

2.1 Graph

Graph is the data structure of a Graph. Its main members are:

  • Nodes: nodes in the diagram.
  • Edges: the output edges of each node in the graph;
  • In_edges: the input edges of each node of the graph;
  • 24. _Toraise: the preceding sequence of each node;
  • _successors: the sequence node of each node;
  • _antichain_dag: antichain DAG;
Class Graph(object): def __init__(self, node=None): self. Nodes = {} # node if node is not None: Self.nodes [node.node_id] = node self.edges = {} # outgoing self.in_edges = {} # incoming self._edges = {} # Preceding sequence node of each node Self._augmented_antichains = {} self._augmented_antichains = {} self._augmented_augmented_antichains = {} Self._next_antichains = {} self._antichain_dag = None # antichain DAG if node is not None: self.in_edges[node.node_id] = list()Copy the code

The node is defined as follows, which contains the structure obtained from the profile, for example:

  • Forward_compute_time: forward propagation time;
  • Backward_compute_time: indicates the backpropagation time.
  • Activation_size: size of the activation value;
  • Parameter_size: specifies the parameter size.
class Node(object): Def __init__(self, node_id, node_desc="", forward_compute_time=0.0, backward_compute_time=0.0, activation_size=0.0, Parameter_size = 0.0, stage_id = None) : self.node_id = node_id self.node_desc = node_desc self.forward_compute_time = forward_compute_time self.backward_compute_time = backward_compute_time self.activation_size = activation_size self.parameter_size = parameter_size self.stage_id = stage_id self.depth = None self.height = NoneCopy the code

If we look at it when we print it out, we can see what Graph looks like.

Gr = {Graph} # edge = {dict: 39} 'node1' = {list: 1} 0 = {Node} node4 -- Embedding(32320, 1024, padding_idx=0) -- forward_compute_time=0.073, Backward_compute_time =6.949, activation_size=6291456.0, parameter_size=132382720.000 'node4' = {list: 1} 0 = {Node} node5 -- EmuBidirLSTM( (bidir): LSTM(1024, 1024, bidirectional=True) (layer1): LSTM(1024, 1024) (layer2): LSTM(1024, 1024)) -- forward_compute_time=5.247, backward_compute_time=0.016, activation_size=12582912.0, Parameter_size = 67174400.000... # in_edges = {dict: 44} 'node4' = {list: 1} 0 = {Node} node1 -- Input0 -- forward_compute_time=0.000, backward_compute_time=0.000, activation_size=0.0, Parameter_size = 0.000 'node5 = {list: 4} 0 = {Node} node4 -- Embedding(32320, 1024, padding_idx=0) -- forward_compute_time=0.073, Backward_compute_time = 6.949, activation_size = 6291456.0, Parameter_size =132382720.000 1 = {Node} node2 -- Input1 -- forward_compute_time=0.000, backward_compute_time=0.000, Activation_size = 0.0, parameter_size = 0.000... # nodes = {dict: 48} 'node1' = {Node} node1 -- Input0 -- forward_compute_time=0.000, backward_compute_time=0.000, activation_size=0.0, Parameter_size =0.000 'node4' = {Node} node4 -- Embedding(32320, 1024, padding_idx=0) -- forward_compute_time=0.073, Backward_compute_time = 6.949, activation_size = 6291456.0, Parameter_size =132382720.000 'node5' = {Node} node5 -- EmuBidirLSTM((bidir): LSTM(1024, 1024, bidirectional=True) (layer1): LSTM(1024, 1024) (layer2): LSTM(1024, 1024)) -- forward_compute_time=5.247, backward_compute_time=0.016, activation_size=12582912.0, Parameter_size = 67174400.000... # frontrunner _Toraise = {dict: 36} 'node4' = {set: 0} set() __len__ = {int} 0 'node5' = {set: 1} {<graph.graph.Node object at 0x7fb055e4bf28>} {Node} node4 -- Embedding(32320, 1024, Padding_idx =0) -- forward_compute_time=0.073, backward_compute_time=6.949, activation_size=6291456.0, Parameter_size =132382720.000 __len__ = {int} 1 'node6' = {set: 2} {<graph.graph.Node object at 0x7fb055e4bf98>, <graph.graph.Node object at 0x7fb055e4bf28>} {Node} node5 -- EmuBidirLSTM( (bidir): LSTM(1024, 1024, bidirectional=True) (layer1): LSTM(1024, 1024) (layer2): LSTM(1024, 1024)) -- forward_compute_time=5.247, backward_compute_time=0.016, activation_size=12582912.0, Parameter_size =67174400.000 {Node} node4 -- Embedding(32320, 1024, padding_idx=0) -- forward_compute_time=0.073, Backward_compute_time =6.949, activation_size=6291456.0, parameter_size=132382720.000 __len__ = {int} 2 'node7' = {set: 3} {<graph.graph.Node object at 0x7fb055e4bf98>, <graph.graph.Node object at 0x7fb055e4bf28>, <graph.graph.Node object at 0x7fb055e670f0>} {Node} node5 -- EmuBidirLSTM( (bidir): LSTM(1024, 1024, bidirectional=True) (layer1): LSTM(1024, 1024) (layer2): LSTM(1024, 1024)) -- forward_compute_time=5.247, backward_compute_time=0.016, activation_size=12582912.0, Parameter_size =67174400.000 {Node} node4 -- Embedding(32320, 1024, padding_idx=0) -- forward_compute_time=0.073, Backward_compute_time = 6.949, activation_size = 6291456.0, Parameter_size =132382720.000 {Node} node6 -- Dropout(P =0.2) -- forward_compute_time=0.077, Backward_compute_time =0.196, Activation_size = 12582912.0, Parameter_size =0.000 __len__ = {int} 3 # other variables _Antichain_dag = {NoneType} None _augmented_Antichains = {dict: 0} {} _deaugmented_augmented_antichains = {dict: 0} {} _next_antichains = {dict: 0} {} _successors = {dict: 0} {}Copy the code

2.2 build figure

The graph is constructed from the strings of the profile file. So if we look at the contents of the profile, we can see that it’s different for each line.

Node1 -- Input0 -- forward_compute_time=0.000, backward_compute_time=0.000, activation_size=0.0, Parameter_size =0.000 node4 -- Embedding(32320, 1024, padding_idx=0) -- forward_compute_time=0.073, Backward_compute_time =6.949, activation_size=6291456.0, parameter_size=132382720.000 node5 -- EmuBidirLSTM((bidir): LSTM(1024, 1024, bidirectional=True) (layer1): LSTM(1024, 1024) (layer2): LSTM(1024, 1024)) -- forward_compute_time=5.247, backward_compute_time=0.016, activation_size=12582912.0, Parameter_size =67174400.000 node1: node4 Node4: Node5 Node2: Node5Copy the code

The specific code for constructing the graph is as follows:

@staticmethod def from_str(graph_str): gr = Graph() graph_str_lines = graph_str.strip().split('\n') for graph_str_line in graph_str_lines: If not graph_str_line.startswith('\t'): Node = node.from_str (graph_str_line.strip()) # Build node gr.nodes[node.node_id] = node else: # build edge [in_node_id, node_id] = graph_str_line.strip().split(" -- ") if node_id not in gr.in_edges: # Gr.in_edges [node_id] = [gr.nodes[in_node_id]] else: gr.in_edges[node_id].append(gr.nodes[in_node_id]) if in_node_id not in gr.edges: Edges [in_node_id] = [gr.nodes[node_id]] else: gr.edges[in_node_id].append(gr.nodes[node_id]) return GRCopy the code

The specific code of node construction is as follows:

@staticmethod def from_str(node_str): Node_str_tokens = node_str.strip().split(" -- ") node_id = node_str_tokens[0] # Node_desc = node_str_tokens[1] # Node_metadata = node_str_tokens[2] # stage_id = None if len(node_str_tokens) > 3 Stage_id = int(node_str_tokens[3].split("=")[1]) # parameter_size] = node_metadata.split(", ") forward_compute_time = float(forward_compute_time.split("=")[1]) # forward propagation computation time backward_compute_time = Float (backward_compute_time.split("=")[1]) # Backward_compute_time.split ("=")[1]) # Backward_compute_time if "[" in activation_size: Split ("=")[1] # activation_size = sum([float(x) for x in activation_size.lstrip("[").rstrip("]").split("; ")]) else: Parameter_size = float(activation_size.split("=")[1]) parameter_size = float(parameter_size.split("=")[1] Return Node(node_id, node_desc, forward_compute_time=forward_compute_time, backward_compute_time=backward_compute_time, activation_size=activation_size, parameter_size=parameter_size, stage_id=stage_id)Copy the code

2.3 trans

In directed acyclic graphs, there are some concepts as follows:

  • Chain: A chain is a collection of points on which any two points, x, y, satisfy the following conditions: either x can reach y, or y can reach x. It can also be considered as a fully ordered subset of a poset S (fully ordered means that any two elements can be compared).
  • Antichain: An antichain is also a collection of points on which any two points, x and y, satisfy the following conditions: x cannot reach y, and y cannot reach X. It can also be considered a subset of some poset S, where any two elements are not comparable.

In PipeDream’s graph data structure, there is also the concept of antichaining. Antichain nodes are defined as follows:

class AntichainNode(Node): def __init__(self, node_id, antichain, node_desc=""): Self. Antichain = antichain self.output_activation_size = 0.0 super(AntichainNode, self).__init__(node_id, node_desc)Copy the code

Because this is too complicated, we will devote a section below to the analysis.

0x03 Build an antichain

Because the concept of this section is quite round, so we reveal the plot in advance.

The purpose of searching for the subsequent antichain of A node is to find the next graph segmentation point A (which may be A combination of several nodes). In order to determine the running time of A (or other information), we need to find the enhanced antichain of A.

Here the code is in the optimizer_graph_restaurant.py file.

We use the following logic to demonstrate:

+-------+       +-------+
| node1 |       | node2 |
+---+---+       +---+---+
    |               |
    |               |
    |               |
    v               v
+---+---+       +---+---+        +-------+        +-------+
| node4 +-----> | node5 +------> | node6 +------->+ node7 |
+-------+       +-------+        +-------+        +-+-+---+
                                                    | |
                                                    | |
                                      +-------------+ |
                                      |               |
                                      v               v
                                 +----+--+        +---+---+
                                 | node9 |        | node8 +-----+
                                 +-------+        +---+---+     |
                                                      |         |
                    +---------------------------------+         |
                    |                                           |
                    v                                           |
               +----+---+       +--------+        +--------+    |
               | node10 +-----> | node11 +------> | node12 |    |
               +--------+       +---+----+        +----+---+    |
                                    |                  |        |
                                    |                  |        |
                                    v                  v        |
                                +---+----+        +----+---+    |
                                | node13 |        | node14 +<---+
                                +--------+        +-+----+-+
                                                    |    |
                                             +------+    +---+
                                             |               |
                                             v               v
                                        +----+---+        +--+-----+
                                        | node15 |        | node19 |
                                        +--------+        +--------+
Copy the code

3.1 Main function entry

Let’s start with the main function. The first part of main function is to construct antichain and topological sort, as follows:

  • Remove the Source node from the diagram. The goal is to eliminate interference, since the input must be in the first layer, there is no need for the optimizer to choose where to put the input, so remove it first and add it later when transforming the model.
  • Process the output of the graph to remove unused output.
  • You get antichain DAG.
  • Topological sort is carried out on anti-chain DAG and a sorted node list is obtained.

The specific code is as follows:

def main(all_num_machines, profile_filename, network_bandwidths, memory_size, straight_pipeline, use_memory_constraint, use_fewer_machines, activation_compression_ratio, output_directory, print_configuration=True, verbose=False): gr = graph.Graph.from_str(open(profile_filename, 'r').read()) # Zero out all metadata associated with inputs in graph, since the optimizer # shouldn't really get a choice with where to place the input (should always # be in the first # Remove interference, because input must be in the first layer, there is no need for the optimizer to choose where to put the input, so remove it first and add it later. Nodes_to_remove = OrderedDict() for source in sources: if source.node_desc.startswith("Input"): Forward_compute_time = 0.0 source.backward_compute_time = 0.0 source.activation_size = 0.0 Parameter_size = 0.0 noDES_to_remove [source] = [] for out_node in gr.edges[source.node_id]: Nodes_to_remove [source]. Append (out_node) # Because later I have to deal with gr.remove_node(source) # Remove these input source # Remove all unneeded sinks that are not used. Description: Makes code generation and # optimization easier. Sinks = gr.sinks() # To deal with the output of the picture and remove the unused output for sink in sinks: if sink.node_desc.startswith("__getitem__"): Remove_node (sink) antichain_gr = gr.antichain_dag() # DAG states = Antichain_gr.topological_sort () # Get a sorted list of nodesCopy the code

Here, take out the antichain node definition as follows, you can see the corresponding relationship with the code.

class AntichainNode(Node): def __init__(self, node_id, antichain, node_desc=""): Self. Antichain = antichain self.output_activation_size = 0.0 super(AntichainNode, self).__init__(node_id, node_desc)Copy the code

3.2 Enhance anti-chain

First, we will introduce the concept of enhanced antichain. The enhanced inverse chain of each node consists of: self node + partial pre-sequence node.

The selection algorithm of the pre-order node is as follows:

  1. Obtain the list of all the preceding nodes of this node;
  2. If the “outbound destination node” of a pre-ordered node is not in the list of all pre-ordered nodes, and the “outbound destination node” is not itself, the pre-ordered node is selected as part of the enhanced inverse chain.

As can be seen from the following legend, if A node A has A fork node Z in its front node, and one fork in this fork bypassing node A, its enhanced antichain for node A is [A, Z].

For the concept of enhanced anti-chain, it can be understood as: for node A, he can only determine the running time of his node by considering node Z together. Because if I think about the running time of node A, the general idea I understand is:

  • Since each stage can be pipelined in parallel, the running time of A should be the maximum of the following three times: the calculation time of A, the input time of A, and the output time of A.
  • The input time of A is the maximum of X –> output time of A node, and Z –> output time of A node.
  • However, since the internal operating mechanism of Z is not clear, it is uncertain whether there is A dependency between the two outputs of Z, for example, “Z- > D must be completed before Z- > A” can be output. Therefore, the transmission time of Z- > D also needs to be considered.

Therefore, it is necessary to consider [A, Z] together as A state. In fact, PipeDream deals with it in this way and uses [A, Z] as A unified calculation state.

As it is considered as A state, the output activation value of node A is calculated by traversing its anti-chain (enhanced anti-chain), that is, the output of the preceding nodes of its enhanced anti-chain is superimposed on itself.

+-----+ +-----+ | X | | Z | +--+--+ +--+-++ | | | | | | +------+ +-------+ | | | | v v | ++---++ | | A | | ++-+--+ | | |  | +---------+ | | | | | v v v +---+-+ +--+--+ +-+---+ | B | | C | | D | +-----+ +-----+ +-----+Copy the code

In this code, _augmented_Antichains is an enhanced antichain, also a dictionary class, key is the name of the node, value is the enhanced antichain of the key node, for example:

The augment_antichain function is used to find the enhanced antichain for each node.

def augment_antichain(self, antichain): Antichain_key = tuple(sorted(antichain)) If antichain_key in self._augmented_antichains: Return self._augmented_antichains[antichain_key] extra_nodes = set() all_ESTABLISHES = set() # Retrieve the leading node of each node and merge it with all_ESTABLISHED. for antichain_node in antichain: predecessors = self.predecessors(antichain_node) all_predecessors = all_predecessors.union(predecessors) # For antichain_node in antichain: Toraise = self. (Antichain_node) # To traverse every toraise node for predecessor in: # Look at the outgoing edge of each front node, if the outgoing edge is not in the list of front nodes, and the outgoing node is not equal to the antichain node for out_node in self. node_id [predecessor. Node_id]: if out_node not in predecessors and out_node.node_id ! = antichain_node: Extra_nodes. add(predecessor.node_id) # finally insert the list of additional nodes into the enhanced nodes self._augmented_antichains[antichain_key]  = list(extra_nodes) + antichain return self._augmented_antichains[antichain_key]Copy the code

For example, for the logic in the figure below, _augmented_Antichains after initialization

_augmented_antichains = {dict: 1} 
 ('node4',) = {list: 1} ['node4']
Copy the code

After the next iteration of Node 5, _augmented_Antichains is

_augmented_antichains = {dict: 2} 
 ('node4',) = {list: 1} ['node4']
 ('node5',) = {list: 1} ['node5']
 __len__ = {int} 2
Copy the code

Continue to iterate and enhance the anti-chain as follows:

_augmented_Antichains = {dict: 7} ('node4',) = {list: 1} ['node4'] # node4 enhanced antichains only themselves ('node5',) = {list: 1} [' node5] # node5 enhanced trans only oneself (' node6 ') = {list: 1} [' node6] (' node7 ') = {list: 1} [' node7] (' node8 ') = {list: 1} [' node8] (' node10 ') = {2} list: [' node8 ', 'node10] # node10 enhanced trans is' node8', 'node10' (' node14 ') = {list: 1} [' node14] (' node11 ') = {2} list: [' node8 ', 'node11] # node11 enhanced trans is' node8', 'node11' (' node15 ') = {list: 2} ['node14', 'node15'] ('node19',) = {list: 1} ['node19'] ('node12',) = {list: 2} ['node8', 'node12'] ('node16',) = {list: 2} ['node14', 'node16'] ('node23',) = {list: 2} ['node20', 'node23'] ('node17',) = {list: 2} ['node14', 'node17']Copy the code

As you can see from the legend, since there are outsides of Node 8 [Node 8, node 14], for nodes 10, 11, and 12, they must add Node 8 to their enhanced backchain.

For Node 10, we can assume that we must combine node 8 with node 10 to determine the running time of node 10. The figure below shows the augmented backchain of node 10 (self nodes + partial prenodes).

+-------+       +-------+
| node1 |       | node2 |
+---+---+       +---+---+
    |               |
    |               |
    |               |
    v               v
+---+---+       +---+---+        +-------+        +-------+
| node4 +-----> | node5 +------> | node6 +------->+ node7 |
+-------+       +-------+        +-------+        +-+-+---+
                                                    | |
                                                    | |
                                      +-------------+ |
                                      |               |
                                      v               v  augmented
                                 +----+--+        +---+---+
                                 | node9 |        | node8 +-----+
                                 +-------+        +---+---+     |
                                                      |         |
                    +---------------------------------+         |
                    |                                           |
                    v                                           |
               +----+---+       +--------+        +--------+    |
     antichain | node10 +-----> | node11 +------> | node12 |    |
               +--------+       +---+----+        +----+---+    |
             augmented              |                  |        |
                                    |                  |        |
                                    v                  v        |
                                +---+----+        +----+---+    |
                                | node13 |        | node14 +<---+
                                +--------+        +-+----+-+
                                                    |    |
                                             +------+    +---+
                                             |               |
                                             v               v
                                        +----+---+        +--+-----+
                                        | node15 |        | node19 |
                                        +--------+        +--------+
Copy the code

3.3 Subsequent anti-chain

In the code, _next_AntiChains is a dictionary class, key is the name of the node, and value is the subsequent chain of the key node.

For example, for node A, the next antichain is [node B, node C], where node B and node C cannot be sorted from each other. The purpose of searching for antichain is to find the next graph segmentation point.

+-----+ +-----+ | X | | Z | +--+--+ +--+-++ | | | | | | +------+ +-------+ | | | | v v | ++---++ | | A | | ++-+--+ | | |  | +---------+ | | | | | v v v +---+-+ +--+--+ +-+---+ | B | | C | | D | +-----+ +-----+ +-----+Copy the code

For each node antichain, the next_AntiChains function retrieves its subsequent antichains.

def next_antichains(self, antichain): Antichain_key = tuple(sorted(antichain)) If antichain_key in self._next_antichains: Return self. _next_AntiChains [Antichain_key] Next_Antichains = [] Antichain_set = set(Antichain) # Augmented_antichain = self. Augment_antichain (antichain) # augmented_antichain_node in augmented_antichain: Edges = self. Edges [augmented_antichain_node] if augmented_antichain_node in self If next_node.node_id in antichain_set: if next_node.node_id in antichain_set: if next_node.node_id in antichain_set: Continue # If the outgoing node is the subsequent antichain, then if the list is the antichain if self.is_next_antichain(augmented_antichain, next_node.node_id): next_antichain = self.construct_antichain(augmented_antichain, augmented_antichain_node, Next_node.node_id) next_AntiChains. Append (next_antichain) # Set antichains to self. _next_Antichains [Antichain_key] = next_antichains return self._next_antichains[antichain_key]Copy the code

The is_next_antichain method is used to determine whether a new node is a subsequent antichain.

def is_next_antichain(self, augmented_antichain, new_node): Successors = self. Successors (new_node) augmented_antichain_set = set(augmented_antichain) # antecedent in the succeeding nodes of a new node Successors: false if one of the successors is among the enhanced nodes, false is returned, it is not the successor antichain if succeeded. Return False # otherwise, the subsequent backchain return TrueCopy the code

_next_Antichains example below, you can combine the previous enhanced anti-chain comparison look.

  • Take Node 10 as an example, its enhanced nodes are: [Node 8, node 10],

  • Walk through these enhancement nodes and look at the outgoing edge of each enhancement node. The outbound edge of 8 is [node 10, node 14], and the outbound edge of 10 is [Node 11].

  • So there are three points node 10, node 11, node 14 and you can keep going. Node 10 is already in [node 8, node 10].

  • Call is_next_antichain with 14.

    • In is_next_antichain, augmented_antichain is [node 8, node 10] and new_node is node 14.
    • [Node31, node16, node23, node44, node48….] Is_next_antichain is true, and 14 is one of the subsequent antichain nodes.
  • Call is_next_antichain with 11.

    • In is_next_antichain, augmented_antichain is [node 8, node 10] and new_node is node 11.
    • [Node16, node40, node23,….] None of these nodes are in [node 8, node 10], so is_next_antichain is true, and 11 is one of the subsequent antichain nodes.

So the subsequent backchain of node 10 is [[‘node14’], [‘ node11’]].

For comparison, node10’s enhanced backchain is [‘node8’, ‘node10’],

_next_antichains = {dict: 99} ('node4',) = {list: 1} [['node5']] ('node5',) = {list: 1} [['node6']] ('node6',) = {list: 1} [['node7']] ('node7',) = {list: 1} [['node8']] ('node8',) = {list: 2} [['node10'], ['node14']] ('node10',) = {list: 2} [[' node14], [' node11]] # here (' node14 ') = {2} list: [[' node15], [' node19]] (' node11 ') = {list: 2} [['node14'], ['node12']] ('node15',) = {list: 2} [['node19'], ['node16']] ('node19',) = {list: 1} [['node23']] ('node12',) = {list: 2} [['node14'], ['node14']] ('node16',) = {list: 2} [['node19'], ['node17']]Copy the code

As can be seen from the following figure, Node 11 and Node 14 are indeed the subsequent reverse chain of Node 10, that is, the graph can be segmented on these two nodes.

For node 10, the next antichain is [node 11, node 14], where node 11 and node 14 cannot be sorted from each other. The purpose of searching for the following antichain is to find the next graph segmentation point.

+-------+ +-------+ | node1 | | node2 | +---+---+ +---+---+ | | | | | | v v +---+---+ +---+---+ +-------+ +-------+ | node4 +-----> | node5 +------> | node6 +------->+ node7 | +-------+ +-------+ +-------+ +-+-+---+ | | | | +-------------+ | | | v v augmented +----+--+ +---+---+ | node9 | | node8 +-----+ +-------+ +---+---+ | | | +---------------------------------+ | | | v next | +----+---+ +--------+ +--------+ | antichain | node10 +-----> | node11 +------> | node12 | | +--------+ +---+----+ +----+---+ | augmented | | | | | | v next v | +---+----+ +----+---+ |  | node13 | | node14 +<---+ +--------+ +-+----+-+ | | +------+ +---+ | | v v +----+---+ +--+-----+ | node15 | | node19 |  +--------+ +--------+Copy the code

3.4 Overall Construction

The purpose of antichain-daG is to build an antichain DAG based on the enhanced antichain list and the subsequent antichain list.

Let’s illustrate the above legend. Take Node 8 as an example.

def antichain_dag(self): if self._antichain_dag is not None: return self._antichain_dag antichain_dag = Graph() antichain_id = 0 antichain = [self.sources()[0].node_id] # Gets the first node of source. Augment_antichains = augmented_antichains = augmented_antichains = augmented_antichains = augmented_antichains = augmented_antichains = augmented_antichains = augmented_antichains = augmented_antichains source_node = AntichainNode("antichain_%d" % antichain_id, Source = source_node antichain_queue = [antichain] # Insert the first node into the queue Antichain_mapping = {tuple(sorted(antichain)): source_node} Antichain = antichain_queue.pop(0) # pop = antichain = antichain_queue.pop(0) For example, antichain_key = {tuple: 3} node8 antichain_key = tuple(sorted(antichain)) If antichain_key in self._next_antichains: if antichain_key in self. Continue # retrieve subsequent antichain, for 8, Here is [[10],[14]] Next_Antichains = self. Next_antichains (Antichain) # 3. Sorted (next_antichain) if next_antichain_key is not in antichain_mapping: # If it exists, Skip antichain_id += 1 # the next antichain_id node 10 is set to its enhanced node [8, 10 ] next_antichain_node = AntichainNode("antichain_%d" % antichain_id, Self. Augment_antichain (next_antichain)) # Set antichain_mapping antichain_mapping[next_antichain_key] = Next_antichain_node # insert edge to antichain DAG: Antichain_dag. Add_edge (antichain_mapping[antichain_key], antichain_mapping[next_antichain_key]) Append (next_antichain) self._antichain_dag = Antichain_dag return Antichain_dagCopy the code

The purpose here is to set antichain_mapping.

The process is:

  • Eject the first node from antichain_queue and assign antichain to node 8.
  • Get the subsequent antichain of antichain, for 8, here is [[10],[14]].
  • Iterate through subsequent antichains [10,14].
  • For example, set the key of the next antichain node to 10.
  • The next AntichainNode 10 is set as its enhanced node [8, 10], i.e. (‘node10’,) = {AntichainNode} antichain_5 — [‘node8’, ‘node10’].

It can be seen that the purpose of searching for the subsequent antichain of A node is to find the next graph segmentation point A. Then, in order to determine the running time of A (or other information), it is necessary to find the enhanced antichain of A (some enhanced antichain is some states), and antichain_mapping of A is its enhanced antichain.

The following is an example of antichain_mapping:

antichain_mapping = {dict: 99} ('node4',) = {AntichainNode} antichain_0 -- ['node4'] ('node5',) = {AntichainNode} antichain_1 -- ['node5'] ('node6',) = {AntichainNode} antichain_2 -- ['node6'] ('node7',) = {AntichainNode} antichain_3 -- ['node7'] ('node8',) =  {AntichainNode} antichain_4 -- ['node8'] ('node10',) = {AntichainNode} antichain_5 -- ['node8', ['node14'] ('node11',) = {AntichainNode} Antichain_6 -- ['node14'] ('node11',) = {AntichainNode} Antichain_7 -- ['node8', 'node11'] ('node15',) = {AntichainNode} antichain_8 -- ['node14', 'node15'] ('node19',) = {AntichainNode} antichain_9 -- ['node19'] ('node12',) = {AntichainNode} antichain_10 -- ['node8', 'node12'] ('node16',) = {AntichainNode} antichain_11 -- ['node14', 'node16'] ('node23',) = {AntichainNode} antichain_12 -- ['node20', 'node23'] ('node17',) = {AntichainNode} antichain_13 -- ['node14', 'node17']Copy the code

Antichain_dag is an example of an enhanced antichain_DAG:

antichain_dag = {Graph}
    nodes = {dict: 99} 
   'antichain_0' = {AntichainNode} antichain_0 -- ['node4']
   'antichain_1' = {AntichainNode} antichain_1 -- ['node5']
   'antichain_2' = {AntichainNode} antichain_2 -- ['node6']
   'antichain_3' = {AntichainNode} antichain_3 -- ['node7']
   'antichain_4' = {AntichainNode} antichain_4 -- ['node8']
   'antichain_5' = {AntichainNode} antichain_5 -- ['node8', 'node10']
   'antichain_6' = {AntichainNode} antichain_6 -- ['node14']
   'antichain_7' = {AntichainNode} antichain_7 -- ['node8', 'node11']
   'antichain_8' = {AntichainNode} antichain_8 -- ['node14', 'node15']
   'antichain_9' = {AntichainNode} antichain_9 -- ['node19']
   'antichain_10' = {AntichainNode} antichain_10 -- ['node8', 'node12']
   'antichain_11' = {AntichainNode} antichain_11 -- ['node14', 'node16']
   'antichain_12' = {AntichainNode} antichain_12 -- ['node20', 'node23']
   'antichain_13' = {AntichainNode} antichain_13 -- ['node14', 'node17']
   'antichain_14' = {AntichainNode} antichain_14 -- ['node20', 'node30', 'node23']
   'antichain_15' = {AntichainNode} antichain_15 -- ['node20', 'node36', 'node23']
   'antichain_16' = {AntichainNode} antichain_16 -- ['node20', 'node43', 'node23']
   'antichain_17' = {AntichainNode} antichain_17 -- ['node20', 'node23', 'node24']
Copy the code

3.5 Topology Sorting

After obtaining the enhanced anti-chain, need to carry on the topological sort before using.

antichain_gr = gr.antichain_dag()
states = antichain_gr.topological_sort()
Copy the code

The purpose of topological ordering is to ensure that all its preordering activities have been completed before reaching a node according to the vertex order of topological sequence, so that the whole project can be executed sequentially without conflict.

In Graph theory, Topological Sorting is a linear sequence of all vertices of a DAG (Directed Acyclic Graph). And the sequence must meet the following two conditions:

  1. Each vertex occurs only once.
  2. If there is A path from vertex A to vertex B, then vertex A precedes vertex B in the sequence.

Directed acyclic graphs (DAG) have topological sort. Non-dag graphs have no topological sort. A directed acyclic graph can have one or more topological ordering sequences.

For example, the following diagram:

+ -- -- -- -- -- -- -- -- + + -- -- -- -- -- -- -- -- + | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - > | | | 1 | | 4 + -- -- -- -- -- -- -- -- -- -- -- -- + | | + -- -- -- -- -- -- -- -- -- -- - > | | | + -- -- -- -- - + - + - + - + + | | | | | v | | | +--+--+ | | | +---> | 5 | | | | | +-----+ v | | | | v | +--------+ | +---+-----+ | | +----+ | | | | 2 + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- > + 3 + - + | | | | + -- -- -- -- -- -- -- -- + + + -- -- -- -- -- -- -- -- --Copy the code

The result of topological sorting is {1, 2, 4, 3, 5}.

The topological sorting algorithm here uses depth-first sorting.

    def topological_sort(self):
        # Algorithm from https://en.wikipedia.org/wiki/Topological_sorting
        self.sorted_nodes = []
        self.marked_nodes = set()
        self.temporarily_marked_nodes = set()
        nodes = list(self.nodes.values())
        nodes.sort(key=lambda x: x.node_desc)
        for node in nodes:
            if node.node_id in self.marked_nodes:
                continue
            self.topological_sort_helper(node.node_id)
        return [self.nodes[node_id] for node_id in self.sorted_nodes]
​
    def topological_sort_helper(self, node_id):
        if node_id in self.marked_nodes:
            return
        if node_id in self.temporarily_marked_nodes:
            raise Exception("Graph has a cycle")
        self.temporarily_marked_nodes.add(node_id)
        if node_id in self.edges:
            out_nodes = list(self.edges[node_id])
            out_nodes.sort(key=lambda x: (x.node_desc, x.height))
            for out_node in out_nodes:
                self.topological_sort_helper(out_node.node_id)
        self.marked_nodes.add(node_id)
        self.temporarily_marked_nodes.remove(node_id)
        self.sorted_nodes.insert(0, node_id)
Copy the code

The final result is as follows, and can be compared with the above antichain_DAg to see the similarities and differences:

states = {list: 99} 
 00 = {AntichainNode} antichain_0 -- ['node4']
 01 = {AntichainNode} antichain_1 -- ['node5']
 02 = {AntichainNode} antichain_2 -- ['node6']
 03 = {AntichainNode} antichain_3 -- ['node7']
 04 = {AntichainNode} antichain_4 -- ['node8']
 05 = {AntichainNode} antichain_5 -- ['node8', 'node10']
 06 = {AntichainNode} antichain_7 -- ['node8', 'node11']
 07 = {AntichainNode} antichain_10 -- ['node8', 'node12']
 08 = {AntichainNode} antichain_6 -- ['node14']
 09 = {AntichainNode} antichain_8 -- ['node14', 'node15']
 10 = {AntichainNode} antichain_11 -- ['node14', 'node16']
 11 = {AntichainNode} antichain_13 -- ['node14', 'node17']
 12 = {AntichainNode} antichain_9 -- ['node19']
 13 = {AntichainNode} antichain_12 -- ['node20', 'node23']
 14 = {AntichainNode} antichain_18 -- ['node23', 'node20', 'node26']
 15 = {AntichainNode} antichain_17 -- ['node23', 'node20', 'node24']
 16 = {AntichainNode} antichain_32 -- ['node23', 'node20', 'node28']
 17 = {AntichainNode} antichain_31 -- ['node23', 'node20', 'node26', 'node24']
 18 = {AntichainNode} antichain_63 -- ['node23', 'node20', 'node26', 'node28']
 19 = {AntichainNode} antichain_33 -- ['node20', 'node26', 'node29']
 20 = {AntichainNode} antichain_16 -- ['node20', 'node43', 'node23']
 21 = {AntichainNode} antichain_30 -- ['node23', 'node20', 'node43', 'node26']
 22 = {AntichainNode} antichain_29 -- ['node23', 'node20', 'node43', 'node24']
 23 = {AntichainNode} antichain_59 -- ['node23', 'node20', 'node43', 'node28']
Copy the code

We can also compare with the following enhanced anti-chain, and see that States is the result of topological sorting of the enhanced anti-chain DAG. It is logical to train in this order.

_augmented_antichains = {dict: 99} 
 ('node4',) = {list: 1} ['node4']
 ('node5',) = {list: 1} ['node5']
 ('node6',) = {list: 1} ['node6']
 ('node7',) = {list: 1} ['node7']
 ('node8',) = {list: 1} ['node8']
 ('node10',) = {list: 2} ['node8', 'node10']
 ('node14',) = {list: 1} ['node14']
 ('node11',) = {list: 2} ['node8', 'node11']
 ('node15',) = {list: 2} ['node14', 'node15']
 ('node19',) = {list: 1} ['node19']
 ('node12',) = {list: 2} ['node8', 'node12']
 ('node16',) = {list: 2} ['node14', 'node16']
 ('node23',) = {list: 2} ['node20', 'node23']
 ('node17',) = {list: 2} ['node14', 'node17']
 ('node23', 'node30') = {list: 3} ['node20', 'node30', 'node23']
 ('node23', 'node36') = {list: 3} ['node20', 'node36', 'node23']
 ('node23', 'node43') = {list: 3} ['node20', 'node43', 'node23']
 ('node24',) = {list: 3} ['node23', 'node20', 'node24']
 ('node26',) = {list: 3} ['node23', 'node20', 'node26']
 ('node23', 'node30', 'node36') = {list: 4} ['node20', 'node36', 'node30', 'node23']
 ('node23', 'node30', 'node43') = {list: 4} ['node20', 'node43', 'node30', 'node23']
 ('node31',) = {list: 3} ['node20', 'node26', 'node31']
 ('node24', 'node30') = {list: 4} ['node23', 'node20', 'node30', 'node24']
 ('node26', 'node30') = {list: 4} ['node23', 'node20', 'node30', 'node26']
 ('node23', 'node36', 'node43') = {list: 4} ['node20', 'node43', 'node36', 'node23']
 ('node37',) = {list: 4} ['node32', 'node20', 'node26', 'node37']
 ('node24', 'node36') = {list: 4} ['node23', 'node20', 'node36', 'node24']
 ('node26', 'node36') = {list: 4} ['node23', 'node20', 'node36', 'node26']
 ('node44',) = {list: 2} ['node40', 'node44']
 ('node24', 'node43') = {list: 4} ['node23', 'node20', 'node43', 'node24']
 ('node26', 'node43') = {list: 4} ['node23', 'node20', 'node43', 'node26']
 ('node24', 'node26') = {list: 4} ['node23', 'node20', 'node26', 'node24']
Copy the code

3.6 summarize

Due to the complexity of the current algorithm, we briefly summarize the work so far:

  • The enhanced antichain of each node is calculated and the enhanced antichain combination is finally obtained_augmented_antichains
  • The subsequent antichain of each node is calculated. The purpose of searching for the subsequent antichain of A node is to find the next graph segmentation point A, and then to determine the running time of A (or other information), we need to find the enhanced antichain of A (some enhanced antichain is some states). _next_antichains is a subsequent antichain combination.
  • Antichain_dag function basis_next_antichains_augmented_antichainsTo construct an antichain DAG, which is the variable antichain_dag.
  • After the enhanced anti-chain DAG is obtained, topology sorting is needed before it can be used. The purpose of topological ordering is to ensure that all its preordering activities have been completed before reaching a node according to the vertex order of topological sequence, so that the whole project can be executed sequentially without conflict.
  • States is the result of topological sorting of the enhanced anti-chain DAG. It is logical to train in this order. So the follow-up work is to run on the STATES basis.

0x04 Compute Partition

So far, the graph has been divided into states based on subsequent antichains, and one of the most important properties of each state is its enhanced antichain. States is the result of topological sorting of the enhanced antichain. It is logical to train in this order.

Automatic partition algorithm is divided into two parts.

  • Compute_partitioning uses dynamic programming algorithms to get an optimal result for these states, but without specific partitioning.
  • Analyze_partitioning uses the optimization results to do specific partitioning and gets a partial order result after sorting.

Let’s take a look at each one.

4.1 Logic of main function

The next logical step in the main function related to computing partitions is as follows:

  • Set index for each state.
  • Calculate the output activation value for each state by traversing its antichain (enhanced antichain), which can be thought of as the output given to it by its necessary preordering node.
  • Computing information for each state, such as calculation time, activation size, parameter size, and so on, is done through the front node.
  • Get the overall output size output_activation_sizes & all front node ids needed later when calculating partitions.
  • Compute_times_row is from I nodes to subsequent nodes (I +1, I +2…). The calculation time is similar below.
  • The activation value within the system is estimated according to the profile.
  • The parameters in the system are estimated according to profile.
  • Traversal machine set & network bandwidth combination. The pipeline can be straight (number 1) or parallel (number num_machines). According to the current information, as well as the number of machines, network bandwidth, etc., the dynamic programming algorithm is used to calculate partitions. If there are two combinations of machine set and network bandwidth, A dynamic programming algorithm will be used for each combination. Finally, all_as.append (A) is the result of the two dynamic programming, which is the optimal result after considering all necessary factors.

The specific code is as follows:

def main(all_num_machines, profile_filename, network_bandwidths, memory_size,
         straight_pipeline, use_memory_constraint, use_fewer_machines,
         activation_compression_ratio, output_directory,
         print_configuration=True, verbose=False):
    gr = graph.Graph.from_str(open(profile_filename, 'r').read())
​
    # Zero out all metadata associated with inputs in graph, since the optimizer
    # shouldn't really get a choice with where to place the input (should always
    # be in the first stage).
    # 排除干扰,因为input必然在第一层,没必要让优化器再来选择把输入放在哪里,所以先去除,后续会再加上。
    sources = gr.sources() # 对图的输入进行处理
    nodes_to_remove = OrderedDict()
    for source in sources:
        if source.node_desc.startswith("Input"): # 只处理input
            source.forward_compute_time = 0.0
            source.backward_compute_time = 0.0
            source.activation_size = 0.0
            source.parameter_size = 0.0
            nodes_to_remove[source] = []
            for out_node in gr.edges[source.node_id]:
                nodes_to_remove[source].append(out_node) # 记录这些删除source对应了哪些out节点,因为后续还要处理
            gr.remove_node(source) # 在图中移除这些input source
​
    # Remove all unneeded sinks that are not used, makes code generation and
    # optimization easier.
    sinks = gr.sinks() # 对图的输出进行处理,移除没有用到的输出
    for sink in sinks:
        if sink.node_desc.startswith("__getitem__"):
            gr.remove_node(sink)
​
    antichain_gr = gr.antichain_dag() # 得到反链DAG
    states = antichain_gr.topological_sort() # 拓扑排序,得到一个排序好的节点列表
​
    ###########################################################################
    # 之前代码在上节分析过,我们本节从这里继续分析
    ###########################################################################
    
    states_indices = {} # 为每个状态设置index
    for i in range(len(states)):
        states_indices[states[i]] = i
        
##################################### 运行时如下        
#states_indices = {dict: 99} 
# antichain_0 -- ['node4'] = {int} 0
# antichain_1 -- ['node5'] = {int} 1
# antichain_2 -- ['node6'] = {int} 2
# antichain_3 -- ['node7'] = {int} 3
# antichain_4 -- ['node8'] = {int} 4
# ......
         
    # 给每个状态计算出输出激活值大小,具体是通过遍历其反链(增强反链),可以认为就是其必要前序节点给自己的输出
    for i in range(len(states)):
        for antichain_node in states[i].antichain:
            states[i].output_activation_size += gr.nodes[antichain_node].activation_size
       
    # 给每个状态计算其信息,比如计算时间,激活大小,参数大小等等,都是通过前置节点完成的      
    for i in range(len(states)):
        antichain = states[i].antichain
        all_predecessors = gr.all_predecessors(antichain)
        states[i].compute_time = 0.0
        states[i].activation_size = 0.0
        states[i].parameter_size = 0.0
        for predecessor in all_predecessors: # 计算所有前置节点的信息
            states[i].compute_time += ((predecessor.forward_compute_time +
                                        predecessor.backward_compute_time) / 1000.0)
            states[i].activation_size += predecessor.activation_size
            states[i].parameter_size += predecessor.parameter_size
    gr.reset()
​
    # 得到总体输出大小 & 所有前置节点id,后面计算分区时候需要
    output_activation_sizes = [state.output_activation_size for state in states]
    all_predecessor_ids = [[states_indices[predecessor] for predecessor in
                            antichain_gr.predecessors(states[i].node_id)]
                           for i in range(len(states))]
​
##################################### 运行时如下      
# output_activation_sizes = {list: 99} 
# 00 = {float} 6291456.0
# 01 = {float} 12582912.0
# 02 = {float} 12582912.0
# 03 = {float} 6553600.0    
# .....
# all_predecessor_ids = {list: 99} 
#  00 = {list: 0} []
#  01 = {list: 1} [0]
#  02 = {list: 2} [0, 1]
#  03 = {list: 3} [0, 1, 2]
#  04 = {list: 4} [0, 1, 2, 3]
#  05 = {list: 5} [2, 3, 4, 0, 1]
#  06 = {list: 6} [2, 3, 4, 0, 1, 5]
#  07 = {list: 7} [6, 2, 3, 4, 0, 1, 5]
# ......
    
    compute_times = [] # 初始化计算时间
    activation_sizes = [] # 初始化激活值大小
    parameter_sizes = [] # 初始化参数值大小
    for i in range(len(states)+1): # 具体计算每一个节点的信息,去除他之前节点的影响
        compute_times_row = []
        activation_sizes_row = []
        parameter_sizes_row = []
        for j in range(len(states)): # 去除之前的节点
            if i == 0: # 列表中第一个节点
                compute_times_row.append(states[j].compute_time) # i 到 j 的计算时间
                activation_sizes_row.append(states[j].activation_size)
                parameter_sizes_row.append(states[j].parameter_size)
            else: # 列表中后续节点
                if j > (i-1):
                    compute_times_row.append(states[j].compute_time -
                        states[i-1].compute_time) # i 到 j 的计算时间
                    activation_sizes_row.append(states[j].activation_size -
                        states[i-1].activation_size)
                    parameter_sizes_row.append(states[j].parameter_size -
                        states[i-1].parameter_size)
                else:
                    compute_times_row.append(None)
                    activation_sizes_row.append(None)
                    parameter_sizes_row.append(None)
        compute_times.append(compute_times_row) # 依据profile估计出系统内部的计算时间,compute_times_row 是 i 节点到 后续节点(i+1, i+2, ...)的计算时间,下面类似
        activation_sizes.append(activation_sizes_row) # 依据profile估计出系统内部的激活值大小
        parameter_sizes.append(parameter_sizes_row) # 依据profile估计出系统内部的参数大小
​
##################################### 运行时如下  
# compute_times = {list: 100} 
# 000 = {list: 99} [0.0070220000000000005, 0.012285, 0.012558, 0.021096000000,...
# 001 = {list: 99} [None, 0.005263, 0.005535999999999999, 0.014074000000000003, ...
# 002 = {list: 99} [None, None, 0.00027299999999999894, 0.008811000000000003, ...
# 003 = {list: 99} [None, None, None, 0.008538000000000004, 0.008538, ...
# 004 = {list: 99} [None, None, None, None, -3.469446951953614e-18, 0.000191999999...
​
    counter = 1
    all_As = []
    num_machines_in_machine = 1 #第一个节点就是1
    # all_num_machines, network_bandwidths 是用户在输入中指定
    # 遍历机器集&网络带宽组合。流水线可以是straight(数目为1)或者并行(数目为num_machines)
    for num_machines, network_bandwidth in zip(all_num_machines, network_bandwidths):
        print("Solving optimization problem with %d machines with inter-machine bandwidth of %.2f GB/s" % (num_machines, network_bandwidth / 10**9))
        import numpy as np
        print(np.array(compute_times))
        # 依据目前的信息,以及机器数量,网络带宽等计算分区
        A = compute_partitioning(compute_times, activation_sizes, parameter_sizes,
                                 output_activation_sizes, all_predecessor_ids,
                                 num_machines, num_machines_in_machine,
                                 network_bandwidth,
                                 final_level=(counter==len(network_bandwidths)))
        num_machines_in_machine = num_machines # 因为计算完了,所以设置为本阶段的机器数目
        for i in range(len(compute_times)): # 遍历机器
            for j in range(len(compute_times[0])): # 后续机器
                compute_times[i][j] = A[i][j][-1][0] # 记录计算时间(本阶段最后一个机器的计算时间)
        counter += 1
        all_As.append(A) # 添加逻辑关系,就是里面包括了不同阶段的优化逻辑
    print(np.array(compute_times))
    
    # 省略后续代码
Copy the code

Compute_times is a two-dimensional array of computing time, which can also be considered as a matrix, as shown in the following examples.

[w12,w13,w14,w15], // The computation time of the first node to subsequent nodes [None, w23,w24,w25], // the computation time of the second node to subsequent nodes [None, None, w34, w35], [None, None, None, w45], // The computation time from the third node to the subsequent nodeCopy the code

Activation_sizes and parameter_sizes are similar.

4.2 Dynamic Planning

4.2.1 General idea

There are some dynamic programming algorithms that need to be analyzed.

The segmentation algorithm tries to reduce the overall training time of the model. For pipelined systems, this problem is equivalent to minimizing the time spent on the slowest phase of the pipeline. This problem has the property of optimization subproblem. Given the machine count, the pipe that maximizes throughput is made up of subpipes that each maximizes the throughput of its own subpipe. Therefore, we can use dynamic programming to find the optimal solution of this problem.

The partitioning algorithm takes the output of the profiling step and calculates:

1) Divide layers into multiple stages,

2) Replicator factor (number of workers) at each stage,

3) Keep the best dynamic small batch number of training pipeline busy.

PipeDream’s optimizer assumes that the machine topology is hierarchical and can be organized into multiple levels, as shown in the figure below. The bandwidth is the same within a level, but different across levels. We assume that class K consists of MK k-1 layer components connected by a link of bandwidth Bk. In the figure below, m2=2 and m1=4. In addition, we define m0 to be 1. That is, four M0s form one M1, and two m1s form one M2.

Layer 0 is the green rectangle representing the lowest level of computing devices, such as gpus. Four Gpus form a layer 1 (the dotted rectangle represents a server) and two form a layer 2 (all modules in the figure below).

PipeDream’s optimizer solves the dynamic programming problem step by step from the lowest level to the highest level. Intuitively, this process finds the best partitions in the server and then uses those partitions to optimally split the model between the servers.

4.2.2 Specific analysis

Suppose A(j, m) represents the time taken in the slowest phase using M machines in the optimal pipe between layers 1 and j.

The goal of our algorithm is to find A(N,M) and the corresponding partition. Let T(I → j,m) represent the time it takes to cross a single level from layer I to j, which is replicated on m machines.

Among them:

  • The left term in Max is the total computation time of all layers in this phase, and the right term is the total communication time of all layers in this phase.
  • Because computation and communication can overlap, you don’t need to add them and just take the maximum number.
  • An optimal pipeline consisting of m machines from 1 to J can replicate a single stage m times, or it can consist of multiple stages.

When the optimal pipe contains multiple stages, it can be decomposed into an optimal sub-pipe (composed of m − M ‘machines from 1 to I) and a subsequent single stage (composed of m’ machines from I +1 to J). So, using the properties of optimal subproblems, we get

Where, Max:

  • The first term is the time taken for the slowest phase of the optimal subpipe (composed of m-m’ machines) between layer 1 and layer I.
  • The second term is the time it takes to transfer the activation and gradient between layers I and I + 1.
  • The third term is the time of the last single phase (consisting of m’ machines with parallel data).

Let’s see how it works. Suppose a graph has the following logic:

                       +----------------+
+-----+                |                +--------+
|     +------------->  |  k[m_prime]    |        |          +-----+
|  i  |                |                |        +--------->+     |
|     +----+           +----------------+                   |  j  |
+-----+    |                                      +-------->+     |
           |           +----------------+         |         +-----+
           |           |                |         |
           +-------->  |  k[m-m_prime]  +---------+
                       |                |
                       +----------------+
Copy the code

Select A maximum from (A [I] [k] [m-m_prime] [0], last_stage_time, output_transfer_time, input_transfer_time) :

  • A [I] [k] [m-m_prime] [0] : the calculation time between I and k, is A calculated subproblem.

  • Last_stage_time: last_stage_time is (calculation time from K to j) + transfer time.

    • Compute_times [k + 1] [j] is the calculation time from K to j, and compute_times[k + 1] corresponds to the output of K.
    • The transfer time is calculated based on parameter_SIZES [K + 1] [j], the next parameter size from K to J.
    • Last_stage_time = compute_times[k + 1] + (parameter_sizes[K + 1] [j])
  • Input_transfer_time: Transfer time calculated using the output activation size of K (that is, the input of J).

  • Output_transfer_time: Transfer time calculated using j’s output activation size.

Because transmission and computation can overlap, you can maximize the value in this way.

Finally, A is the result of dynamic programming optimization, where each element A[I][J][M] is A triplet (min_pipeline_time, optimal_split, optimal_num_machines). A[I][j][m] represents the calculation result from node I to node J. The triplet is (minimum pipeline time, optimal split point between I and j, optimal number of machines).

The general stage is shown in the figure below:

                                                       +----------------+
                                                       | i              |
                                                       |                |
                                                       |                |
                                                       +--+------+------+
                                                          |      |
                                                          |      +----------+
                                  A[i][k][m+m_prime][0]   |                 |
                                                          |                 |
                                                          v                 v
                                        +-----------------+-------+    +----+--------+
                                        | k[m-m_prime]            |    | k[m_prime]  |
                                        |                         |    |             |
last_stage_time = compute_times[k+1][j] |                         |    |             |
            + (parameter_sizes[k+1][j]) | output_activation_sizes |    |             |
                                        |                         |    |             |
                                        |                         |    |             |
                                        +-----------------+-------+    +-----+-------+
                                     input_transfer_time  |                  |
                                                          |      +-----------+
                                                          |      |
                                                          |      |
                                                          v      v
                                             +------------+------+------+
                                             | j                        |
                                             |                          |
                                             |                          |
                                             |                          |
                                             |  output_activation_sizes |
                                             |                          |
                                             +------------------+-------+
                                          output_transfer_time  |
                                                                |
                                                                |
                                                                v
Copy the code

The specific code is as follows:

def compute_partitioning(compute_times, activation_sizes, parameter_sizes, output_activation_sizes, all_predecessor_ids, num_machines, num_machines_within_machine, bandwidth, final_level=True): Row_A = [] for j in range(len(compute_times[0])): Row_row_A = [] for m in range(num_machines): Append ((None, None, None) row_a.append (row_row_A) a.append (row_A) # obtain the computation time for I in range(len(compute_times)): For j in range(I, len(compute_times[0])): Cum_compute_time = compute_times[I][j] # I -> j Cum_activation_size = Activation_sizes [I][j] # I -> j Cum_parameter_size = parameter_sizes[I][j] # I -> j max_m = 1 if straight_pipeline else num_machines # Line for m in range(max_m): Stashed_data_size = math.ceil((num_machines - (m+1))/(m+1)) * \ (cum_activation_size + Cum_parameter_size (cum_parameter_size) # memory_size (cum_parameter_size) # use_memory_size (constraint) If use_memory_constraint and stashed_data_size > memory_size Continue # Data parallel communication time based on parameter size, bandwidth, Number of machines in the next phase data_PARALLEL_COMMUNICation_time = (4 * M * cum_parameter_size)/(bandwidth * (m+1)) # Data_parallel_communication_time /= num_MACHINes_within_machine if cum_compute_time is None: A[I][j][m] = (None, None, None) else: None, (m+1), min_pipeline_time, optimal_split, optimal_num_machines, A[I][j][m] = (cum_compute_time, data_Parallel_communication_time])/(m+1), None, Min_machines = 1 max_i = len(compute_times) if not final_level else 1 for I in range(max_i): For j in range(I +1, len(compute_times[0])): Min_pipeline_time, optimal_split, Optimal_num_machines) = A[I][j][m] if use_fewer_machines and m > 0 and ( Min_pipeline_time is None or A[I][j][M-1][0] < pipeline_time is None or A[I][j][M-1][0] (pipeline_time, optimal_split, optimal_num_machines) = A[I][J][m-1] # pipeline_time (pipeline_time, optimal_split, optimal_num_machines) = A[I][j][m-1] # pipeline_time (pipeline_time, optimal_num_machines) A[I][k][m-m_prime][0] is already an optimal subproblem for k in all_predecessor_ids[j]: If I > 0 and k in all_predecessor_ids[i-1] Continue # set prime max_m_prime = 2 if straight_pipeline else (m+1) for m_prime in range(1, max_M_prime): Input_transfer_time Calculates input_transfer_time using the output activation size of K = (2.0 * output_activation_sizes[k]) / \ (bandwidth * m_prime) # output transfer time output_transfer_time = None if j < len(output_activation_sizes) -1: Output_transfer_time = (2.0 * output_activation_sizes[j])/(bandwidth * m_prime) # last_stage_time = k to j, Compute_times [k+1] last_stage_time = compute_times[k+1][j] if last_stage_time is None: last_stage_time = last_stage_time [k+1][j] if last_stage_time is None: Continue # select last_STAGe_parameter_size = parameter_sizes[k+1][j] # select stASheD_data_size from k to j = (activation_sizes[k+1][j]) + last_stage_parameter_size *= math.ceil((num_machines -) (m+1))/m_prime) # if use_memory_constraint and stashed_data_size > memory_size: Continue # plus the transfer time, Last_stage_time = sum([last_stage_time, last_stage_time) = sum([last_stage_time, last_stage_time, ((4 * (m_prime - 1) * last_STAGe_parameter_size)/(bandwidth * m_prime))])) LAST_stage_time /= m_Prime # If A[I][k][m-m_prime][0] is None: Continue # If I to k has calculated time, Pipeline_time = Max (A[I][k][m-M_prime][0], last_stage_time) if activation_compression_ratio is not None: pipeline_time = Max (A[I][k][m-M_prime][0], last_stage_time) if compression_ratio is not None: (A[I][k][m-m_prime][0], last_stage_time, output_transfer_time, Input_transfer_time /= activation_compression_ratio # output_transfer_time also compresses if output_transfer_time is not None: # select a large pipeline_time = Max (pipeline_time, pipeline_time, input_transfer_time) if output_transfer_time is not None: Pipeline_time = Max (pipeline_time, output_transfer_time) # if min_pipeline_time is smaller than min_pipeline_time If min_pipeline_time is None or min_pipeline_time > pipeline_time for the next loop: optimal_split = (k, Optimal_num_machines = m_prime min_pipeline_time = pipeline_time [I][j][m] = (min_pipeline_time, optimal_split, optimal_num_machines) return ACopy the code

All_As is the result of dynamic programming as shown in the following example:

all_As = {list: 2} 0 = {list: 100} 000 = {list: 99} 00 = {list: 5} [(0.0070220000000000005, None, 1), (0.1689894, None, 2), (0.14943257777777777, None, 3), (0.1258643, None, 4), (0.107310576, None, 5)] 01 = {list: } [(0.012285, None, 1), (0.0070220000000000005, (0, 0), 1), (0.0865995, (0, 0), 2), (0.07639255555555556, (0, 0), 3), (0.06429175000000001, (0, 0), 4)] 02 = {list: } [(0.012558, None, 1), (0.0070220000000000005, (0, 0), 1), (0.0070220000000000005, (1, 1), 1), (0.0070220000000000005, (1, 1), 2), (0.0070220000000000005, (1, 1), 3)] 03 = {list: 5} [(0.021096, None, 1), (0.012285, (1, 0), 1), (0.008538, (2, 1), 1), (0.008538, (2, 2), 1), (0.008538, (2, 3), 1)]... __len__ = {int} 100 1 = {list: 100} 000 = {list: 99} 00 = {list: 5} [(0.107310576, None, 1), (0.080131832, None, 2), (0.05930489777777778, None, 3), (0.046685052000000005, None, 4), (0.03840710336000001, None, 5)] 01 = {list: 5} [(0.06429175000000001, None, 1), (0.072057299, None, 2), (0.05690740466666667, None, 3), (0.0460065055, None, 4), (0.03840166136, None, 5)] 02 = {list: 5} [(0.0070220000000000005, None, 1), (0.043422424, None, 2), (0.037817488, None, 3), (0.031689068, None, 4), (0.026947711359999998, None, 5)] 03 = {list: } [(0.008538, None, 1), (0.0419991328, (2, 0), 1), (0.043422424, (2, 1), (0.0396227304, None, 4), (0.033697556608, None, 5)] ...... __len__ = {int} 100 __len__ = {int} 2Copy the code

Holdings difference

Next we’ll examine the difference between two variables with similar names by the author of the code.

Activation_sizes: The sum of activation_sizes of all front nodes of a node.

for predecessor in all_predecessors: States [I].compute_time += ((predecessor. Forward_compute_time + predecessor. Backward_compute_time) / 1000.0) states[i].activation_size += predecessor.activation_size states[i].parameter_size += predecessor.parameter_sizeCopy the code

Used to calculate the stashed data size to see if the node’s configured memory limit has been exceeded.

stashed_data_size = (activation_sizes[k+1][j]) + last_stage_parameter_size
stashed_data_size *= math.ceil((num_machines - (m+1)) / m_prime)
if use_memory_constraint and stashed_data_size > memory_size:
        continue
Copy the code

Output_activation_sizes: sum of all activation_sizes of enhanced backchain for a node.

for i in range(len(states)):
    for antichain_node in states[i].antichain:
        states[i].output_activation_size += gr.nodes[antichain_node].activation_size
Copy the code

Used to calculate the output propagation time and input propagation time.

Input_transfer_time = (2.0 * output_activation_sizes[k]) / \ (bandwidth * m_prime) output_transfer_time = None if j < Len (output_activation_sizes) -1: output_transfer_time = (2.0 * output_activation_sizes[j])/(bandwidth * m_prime)Copy the code

0x05 Analyze the partition

5.1 Main Function Logic

The previous partition calculation only obtained a dynamic planning optimization result, which needs to be analyzed and divided in analyze_partitioning and then assigned to each stage.

The next logical step in the main function related to computing partitions is as follows:

  • States is the result of anti-chain DAG, and all_As is the optimization result of dynamic programming, which may be multiple.
  • The initial split has only one binary element (0, Len (States)).
  • The dynamic optimization results of all_As will be traversed. For each dynamic optimization result, its logical relations will be traversed and analyzed by calling Analyze_partitioning. The splits will be updated step by step (the splits will be divided step by step). Analyze_partitioning returns a Partial_Splits.
  • Walk through the Partial_Splits and obtain all the front-loaded nodes of the states for each split point and enter stage_id for those nodes. This is traversed from front to back, so the stage_id value is incremented.
  • Write the diagram to a file. Subsequent convert_graph_to_model.py converts this file into a model.
  • Analyze and compare.

The specific code is as follows:

def main(all_num_machines, profile_filename, network_bandwidths, memory_size,
         straight_pipeline, use_memory_constraint, use_fewer_machines,
         activation_compression_ratio, output_directory,
         print_configuration=True, verbose=False):
    gr = graph.Graph.from_str(open(profile_filename, 'r').read())

    # Zero out all metadata associated with inputs in graph, since the optimizer
    # shouldn't really get a choice with where to place the input (should always
    # be in the first stage).
    # 排除干扰,因为input必然在第一层,没必要让优化器再来选择把输入放在哪里,所以先去除,后续会再加上。
    sources = gr.sources() # 对图的输入进行处理
    nodes_to_remove = OrderedDict()
    for source in sources:
        if source.node_desc.startswith("Input"): # 只处理input
            source.forward_compute_time = 0.0
            source.backward_compute_time = 0.0
            source.activation_size = 0.0
            source.parameter_size = 0.0
            nodes_to_remove[source] = []
            for out_node in gr.edges[source.node_id]:
                nodes_to_remove[source].append(out_node) # 记录这些删除source对应了哪些out节点,因为后续还要处理
            gr.remove_node(source) # 在图中移除这些input source

    # Remove all unneeded sinks that are not used, makes code generation and
    # optimization easier.
    sinks = gr.sinks() # 对图的输出进行处理,移除没有用到的输出
    for sink in sinks:
        if sink.node_desc.startswith("__getitem__"):
            gr.remove_node(sink)

    antichain_gr = gr.antichain_dag() # 得到反链DAG
    states = antichain_gr.topological_sort() # 拓扑排序,得到一个排序好的节点列表

    ###########################################################################
    # 计算阶段
    ###########################################################################
    states_indices = {} # 为每个状态设置index
    for i in range(len(states)):
        states_indices[states[i]] = i
        
##################################### 运行时如下        
#states_indices = {dict: 99} 
# antichain_0 -- ['node4'] = {int} 0
# antichain_1 -- ['node5'] = {int} 1
# antichain_2 -- ['node6'] = {int} 2
# antichain_3 -- ['node7'] = {int} 3
# antichain_4 -- ['node8'] = {int} 4
# ......
         
    # 给每个状态计算出输出激活值大小,具体是通过遍历其反链(增强反链),可以认为就是其必要前序节点给自己的输出
    for i in range(len(states)):
        for antichain_node in states[i].antichain:
            states[i].output_activation_size += gr.nodes[antichain_node].activation_size
       
    # 给每个状态计算其信息,比如计算时间,激活大小,参数大小等等,都是通过前置节点完成的      
    for i in range(len(states)):
        antichain = states[i].antichain
        all_predecessors = gr.all_predecessors(antichain)
        states[i].compute_time = 0.0
        states[i].activation_size = 0.0
        states[i].parameter_size = 0.0
        for predecessor in all_predecessors: # 计算所有前置节点的信息
            states[i].compute_time += ((predecessor.forward_compute_time +
                                        predecessor.backward_compute_time) / 1000.0)
            states[i].activation_size += predecessor.activation_size
            states[i].parameter_size += predecessor.parameter_size
    gr.reset()

    # 得到总体输出大小 & 所有前置节点id,后面计算分区时候需要
    output_activation_sizes = [state.output_activation_size for state in states]
    all_predecessor_ids = [[states_indices[predecessor] for predecessor in
                            antichain_gr.predecessors(states[i].node_id)]
                           for i in range(len(states))]
                           

##################################### 运行时如下      
# output_activation_sizes = {list: 99} 
# 00 = {float} 6291456.0
# 01 = {float} 12582912.0
# 02 = {float} 12582912.0
# 03 = {float} 6553600.0    
# .....
# all_predecessor_ids = {list: 99} 
#  00 = {list: 0} []
#  01 = {list: 1} [0]
#  02 = {list: 2} [0, 1]
#  03 = {list: 3} [0, 1, 2]
#  04 = {list: 4} [0, 1, 2, 3]
#  05 = {list: 5} [2, 3, 4, 0, 1]
#  06 = {list: 6} [2, 3, 4, 0, 1, 5]
#  07 = {list: 7} [6, 2, 3, 4, 0, 1, 5]
# ......
    
    compute_times = [] # 初始化计算时间
    activation_sizes = [] # 初始化激活值大小
    parameter_sizes = [] # 初始化参数值大小
    for i in range(len(states)+1): # 具体计算每一个节点的信息,去除他之前节点的影响
        compute_times_row = []
        activation_sizes_row = []
        parameter_sizes_row = []
        for j in range(len(states)): # 去除之前的节点
            if i == 0: # 列表中第一个节点
                compute_times_row.append(states[j].compute_time) # i 到 j 的计算时间
                activation_sizes_row.append(states[j].activation_size)
                parameter_sizes_row.append(states[j].parameter_size)
            else: # 列表中后续节点
                if j > (i-1):
                    compute_times_row.append(states[j].compute_time -
                        states[i-1].compute_time) # i 到 j 的计算时间
                    activation_sizes_row.append(states[j].activation_size -
                        states[i-1].activation_size)
                    parameter_sizes_row.append(states[j].parameter_size -
                        states[i-1].parameter_size)
                else:
                    compute_times_row.append(None)
                    activation_sizes_row.append(None)
                    parameter_sizes_row.append(None)
        compute_times.append(compute_times_row) # 依据profile估计出系统内部的计算时间,compute_times_row 是 i 节点到 后续节点(i+1, i+2, ...)的计算时间,下面类似
        activation_sizes.append(activation_sizes_row) # 依据profile估计出系统内部的激活值大小
        parameter_sizes.append(parameter_sizes_row) # 依据profile估计出系统内部的参数大小

##################################### 运行时如下  
# compute_times = {list: 100} 
# 000 = {list: 99} [0.0070220000000000005, 0.012285, 0.012558, 0.021096000000,...
# 001 = {list: 99} [None, 0.005263, 0.005535999999999999, 0.014074000000000003, ...
# 002 = {list: 99} [None, None, 0.00027299999999999894, 0.008811000000000003, ...
# 003 = {list: 99} [None, None, None, 0.008538000000000004, 0.008538, ...
# 004 = {list: 99} [None, None, None, None, -3.469446951953614e-18, 0.000191999999...

    counter = 1
    all_As = []
    num_machines_in_machine = 1 #第一个节点就是1
    # all_num_machines, network_bandwidths 是用户在输入中指定
    # 遍历机器集&网络带宽组合。流水线可以是straight(数目为1)或者并行(数目为num_machines)
    for num_machines, network_bandwidth in zip(all_num_machines, network_bandwidths):
        print("Solving optimization problem with %d machines with inter-machine bandwidth of %.2f GB/s" % (num_machines, network_bandwidth / 10**9))
        import numpy as np
        print(np.array(compute_times))
        # 依据目前的信息,以及机器数量,网络带宽等计算分区
        A = compute_partitioning(compute_times, activation_sizes, parameter_sizes,
                                 output_activation_sizes, all_predecessor_ids,
                                 num_machines, num_machines_in_machine,
                                 network_bandwidth,
                                 final_level=(counter==len(network_bandwidths)))
        num_machines_in_machine = num_machines # 因为计算完了,所以设置为本阶段的机器数目
        for i in range(len(compute_times)): # 遍历机器
            for j in range(len(compute_times[0])): # 后续机器
                compute_times[i][j] = A[i][j][-1][0] # 记录计算时间(本阶段最后一个机器的计算时间)
        counter += 1
        all_As.append(A) # 添加逻辑关系,就是里面包括了不同阶段的优化逻辑
    print(np.array(compute_times))
    
    ###########################################################################
    # 我们从这里继续分析
    ###########################################################################
    
    # 分析阶段
    # 在 analyze_partitioning 内部做了具体分析
    # 这里最重要的是对 gr.all_predecessors 做设置,就是设置 gr 之中每个node的stage_id,这样就是利用stage_id把初始流水线重新划分
    splits = [(0, len(states))] # 如何分割,states是反链DAG的结果,所以 splits 初始化时候就只有一个二元组元素:最初的划分 (0, len(states))
    i = len(all_As) - 1 # all_As 就是动态规划得到的优化结果
    while i >= 0: # 遍历优化的出来的各个逻辑关系
        print("======================================")
        print("Level %d" % (i+1))
        print("======================================")
        new_splits = []
        stage_id = 0 # 在后续的convert_graph_to_model.py 之中会使用到
        for (start, end) in splits: # 在分割中遍历,splits会逐步更新
            # 依据新的splits中的二元组重新计算
            partial_splits = \
                analyze_partitioning(all_As[i], states, start, end,
                                     network_bandwidths[i], all_num_machines[i],
                                     activation_compression_ratio,
                                     print_configuration, verbose)
            start_point = start # 起始点
            for split in partial_splits: # 遍历分析得出的节点
                new_splits.append((start_point, split)) # 添加一个新的二元祖
                if i == 0:
                    predecessors = gr.all_predecessors(states[split-1].antichain)
                    for predecessor in predecessors:
                        if predecessor.stage_id is None:
                            predecessor.set_stage_id(stage_id) # 设置所在阶段
                start_point = split # 下一个阶段
                stage_id += 1 # 增加所在阶段
            new_splits.append((start_point, end)) # 添加一个新的二元祖
            if i == 0:                
                predecessors = gr.all_predecessors(states[end-1].antichain)
                for predecessor in predecessors:
                    if predecessor.stage_id is None:
                        predecessor.set_stage_id(stage_id) # 设置所在阶段
            stage_id += 1 # 增加所在阶段
        
        print("Total number of stages: %d" % stage_id)
        splits = new_splits # 加入新的分割
        i -= 1

    # 以下是为了把图写到文件之中。后续convert_graph_to_model.py会把这个文件转换成模型 
    for source in nodes_to_remove: # 之前移除了input节点,现在需要加回到图中
        for out_node in nodes_to_remove[source]: # input对应的哪些输出
            source.stage_id = 0
            gr.add_edge(source, out_node)

    if output_directory is not None:
        total_num_machines = 1
        for num_machines in all_num_machines:
            total_num_machines *= num_machines
        gr.to_dot(os.path.join(output_directory, "gpus=%d" % total_num_machines))
        gr_str = str(gr)
        with open(os.path.join(output_directory, "gpus=%d.txt" % total_num_machines), 'w') as f:
            f.write(gr_str)

    # 以下是为了做分析对比        
    # 计算数据并行需要的时间,以便接下来做比较,这个时间要比动态规划时间长。        
    total_time = states[-1].compute_time # 最后一个阶段的计算时间,是没有经过优化的最初计算时间
    total_parameter_size = states[-1].parameter_size
    data_parallel_total_time = total_time # 先赋值为最后一阶段的计算时间
    num_machines_in_machine = 1 # 本阶段的机器数目
    # 遍历流水线上各个阶段,因为没有优化,所以就是严格按照用户原始配置的流水线阶段来逐一计算
    for (num_machines, network_bandwidth) in zip(all_num_machines, network_bandwidths):
        # 计算传输时间。num_machines是下一阶段流水线机器数目,所以带宽需要乘以这个数字
        data_parallel_communication_time = (
            (4 * (num_machines - 1) * total_parameter_size) /
            (network_bandwidth * num_machines)) / num_machines_in_machine
        # 总时间需要加上传输时间
        data_parallel_total_time = sum(
            [data_parallel_total_time, data_parallel_communication_time]) / num_machines
        # 下个迭代中,本阶段的机器数目需要设置为num_machines
        num_machines_in_machine = num_machines

    # 这个是用动态规划算法得出来的优化时间    
    pipeline_parallel_total_time = A[0][len(states)-1][num_machines-1][0]

    # 可以看到用户需要注意哪些数据
    if verbose:
        print()
        print("Time taken by single-stage pipeline:", total_time)
        print("Time per stage in pipeline:", pipeline_parallel_total_time)
        print("Throughput increase (compared to single machine):",
              total_time / pipeline_parallel_total_time)
        dp_str = ",".join([str(elem) for elem in all_num_machines])
        print(("[Note that single-machine and (%s)-machine DP might not fit "
               "given memory constraints]") % dp_str)
        print("Throughput increase of (%s)-machine DP compared to single "
              "machine:" % dp_str, total_time / data_parallel_total_time)
        print("Throughput increase (compared to (%s)-machine DP):" % dp_str,
              data_parallel_total_time / pipeline_parallel_total_time)
    return pipeline_parallel_total_time, data_parallel_total_time                             
                           
Copy the code

5.2 Analysis Phase

For details of the analysis phase, see the following notes.

def analyze_partitioning(A, states, start, end, network_bandwidth, num_machines, activation_compression_ratio, print_configuration, verbose): # start, end is the starting point of this group of nodes, Metadata = A[start][end-1][num_pipeline_time] # Optimal_num_machines next_split = metadata[1] # metadata[1] The (k, M-m_prime) Remaining_MACHINes_left = num_machines Splits = [] Replication_factors = [] prev_split = end - 1 # While next_split is not None: # num_machines_used = metadata[2] # optimal_num_machines if verbose: print("-------------------------------------") print("Number of machines used: %d..." % num_machines_used) print("Split between layers %d and %d..." % (next_split[0], next_split[0] + 1)) print("Split before antichain %s..." % (states[next_split[0]+1].antichain)) our append(next_split[0]+1) # has k +1. That is due to the split time = States [prev_split-1].compute_time-\ states[next_split[0]].compute_time parameter_size = states[prev_split-1].parameter_size - \ states[next_split[0]].parameter_size dp_communication_time = (4 * (num_machines_used - 1) * parameter_size) \ / (network_bandwidth * num_machines_used) pp_communication_time_input = ( # Next stage data input time 2.0 * states[next_split[0]].output_activation_size * (1.0 / float(num_MACHINes_used)))/network_bandwidth Pp_communication_time_output = (# states[prev_split-1]. Output_activation_size * (1.0 /) Float (num_machines_used))/network_bandwidth # If activation_compression_ratio is not None: pp_communication_time_input /= activation_compression_ratio pp_communication_time_output /= activation_compression_ratio  if activation_compression_ratio is None: Pp_communication_time_input = 0.0 pp_COMMUNICation_TIME_output = 0.0 compute_time /= num_MACHINes_used # This stage calculation time Dp_communication_time /= num_machines_used print(("Compute time = %f, Data-parallel communication time = %f, " "Pipeline-parallel communication time = %f..." ) % ( compute_time, dp_communication_time, max(pp_communication_time_input, Splits = k, m-m_prime), # A[I][j][m] (min_pipeline_time, optimal_split, Optimal_num_machines) metadata = A[start][next_split[0]][next_split[1]] next_split = metadata[1] Optimal_split replication_factors.append(num_machines_used) # replication factor remaining_MACHINes_left -= Num_machines_used # remaining machines if verbose: print (" -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- ") print (" Number of those 2: %d..." % metadata[2]) # num_MACHINes_used = metadata[2] Remaining_MACHINes_left -= num_MACHINes_used # Remaining machines compute_time = states[prev_split-1].compute_time parameter_size = states[prev_split-1].parameter_size dp_communication_time = ((4 * (num_machines_used - 1) * parameter_size) / (network_bandwidth * num_machines_used)) compute_time /= num_machines_used # Dp_communication_time /= num_machines_used print("Compute time = %f, Data-parallel communication time = %f..." % (compute_time, dp_communication_time)) print("-------------------------------------") if print_configuration: print("Number of machines in budget not used: %d..." % remaining_machines_left) print() print("(Split start, Split end)/compute time taken per stage ""/ replication factor per stage:") split end) / compute time taken per stage / replication factor per stage prev_split = start splits.reverse() # splits.append(end) replication_factors.append(num_machines_used) replication_factors.reverse() for i in Range (Len (Splits)): Time = 0 if prev_split > 0: time = states[splits[i]-1].compute_time - states[prev_split-1].compute_time else: time = states[splits[i]-1].compute_time if print_configuration: print((prev_split, splits[i]), time, replication_factors[i]) prev_split = splits[i] if print_configuration: The value of print() return SplitsCopy the code

Let’s use an example to illustrate.

Here is the segmentation from the back, for example analysis as follows, the total number of machines is set as 10:

A[j][M] = (min_pipeline_time, optimal_split, optimal_num_machines), Optimal_split = (k, m-m_prime) is an optimization point of this stage.

Metadata A[0][99][10], i.e. (0.01903199999999998, (95, 8), 1), next_split = (95, 8), Prev_split = end-1 = 98.

Next_split is the next split point, and that splits our current split sequence.

The first while loop:

Splits = (95, 8) and that splits = append(Next_split [0]+1) = [96], States [prev_split-1] – states[next_split[0]] = state[97] – state[95]. This divides 0~99 into 0~ 95 and 96 ~99.

Then prev_split = 96, go to A[0] [95] [8] and get meta = (0.01903199999999999993, (78, 7), 1), next_split = (78, 7).

So the next round starts at 78.

Second while loop:

Our new split sequence is that Next_split = (78, 7) and our splits = [96, 79]. States [96-1] -states [next_split[0]] = state[96] -state [78]. That splits the 0~99 into 0~ 78,79 ~ 95 and 96 ~99 using the mono = [96, 79].

Prev_split =79, go to A[0] [78] [7] and get meta = (0.011081, (48, 6), 1), next_split = (48, 6).

So the next round starts at 48, and so on.

Splits = [96, 79, 49, 15, 12, 7, 5, 3, 1].

So the following code needs to be reversed.

prev_split = start
splits.reverse()
splits.append(end)
replication_factors.append(num_machines_used)
replication_factors.reverse()
Copy the code

Splits = {1,3,5,7,12,15,49,79,96}. And then end is equal to 99.

Our final value is returned to That of {1, 3, 5, 7, 12, 15, 49, 79, 96} and the newly added end is removed.

According to {1,3,5,7,12,15,49,79, 96} to get the final segmentation sequences are [(0, 1), (1, 3), (3, 5), (5, 7), (7, 12), (12, 15), (15, 49), (49, 79), (79, 96), (96, 99)]. This list will be used later in the “Setup stage”.

5.3 set the stage

Now that we have an ideal partition sequence, but that’s not the end of the story, let’s recall the purpose of the partitioning algorithm: determine the running time of all the layers based on the profile results, then use dynamic programming to partition the model into different stages, and get the number of replicas for each stage.

Therefore, the ultimate goal of analysis is to assign a stage to each sub-layer of the model. If some sub-layers belong to the same stage, these sub-layers will eventually be assigned to the same worker (node) for execution.

Since there are multiple subnets involved, we will continue to use examples.

If there are two subnets, suppose:

All_num_machines = [5,5] network_bandwidths = [800000000, 1000000000]Copy the code

The initial splits = [0,99].

In the first round of while, I = 1,

Our results about the splits of [(0, 99)] were analyzed and the partial_Splits of [3, 6, 30, 75, 99] were obtained by applying analyze_partitioning for each segment.

Finally, splits update for: [(0, 3), (3, 6), (6, 30), and (30, 75), (75, 99)].

Stage_id is not set at this time.

In the second round of while, I = 0,

The splits of the first round [(0, 3), (3, 6), (6, 30), (30, 75), and (75, 99)] will be traversed and analyze_partitioning will be applied to each of these segments. For example, apply analyze_partitioning to (0,3), analyze_partitioning to (3,6), and also apply analyze_partitioning to (6,30),…… The new Partial_Splits [1, 2, 3, 4, 5, 6, 8, 10, 13, 28, 30, 45, 49, 51, 75, 79, 96, 99].

Our final updates to that field are as follows: [(0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 8), (8, 10), (10, 13), (13, 28), (28, 30), and (30 to 45), (45, 49), (49, 51), (51, 75), (75, 79), (79, 96), (96, 99)].

This list is the ideal split sequence.

Our splits will be split between the two monographs and that of The Splits

States [split-1]. Antichain obtains all pre-nodes of its enhanced antichain and assigns stage_id corresponding to split to these nodes.

Recall the significance of enhancing antichain:

  • The enhanced inverse chain of each node consists of: self node + partial pre-sequence node.
  • For the concept of enhanced anti-chain, it can be understood as: for node A, he can only determine the running time of his node by considering node Z together.

Split = 1, 1-1 =0, states[0]. Antichain = ‘node4’, stage_id=0, stage_id=0 Note ‘node4’ is trained on a worker node “corresponding to stage_id=0”.

If in doubt, recall how state is constructed, as an ordered “combination of nodes”.

antichain_gr = gr.antichain_dag()
states = antichain_gr.topological_sort()
Copy the code

Details are as follows.

states = {list: 99} 00 = {AntichainNode} antichain_0 -- ['node4'] # states[0].antichain 01 = {AntichainNode} antichain_1 -- ['node5'] 02  = {AntichainNode} antichain_2 -- ['node6'] 03 = {AntichainNode} antichain_3 -- ['node7'] 04 = {AntichainNode} antichain_4 -- ['node8'] 05 = {AntichainNode} antichain_5 -- ['node8', 'node10'] 06 = {AntichainNode} antichain_7 -- ['node8', 'node11'] 07 = {AntichainNode} antichain_10 -- ['node8', 'node12'] 08 = {AntichainNode} antichain_6 -- ['node14'] 09 = {AntichainNode} antichain_8 -- ['node14', 'node15'] 10 = {AntichainNode} antichain_11 -- ['node14', 'node16'] 11 = {AntichainNode} antichain_13 -- ['node14', 'node17'] 12 = {AntichainNode} antichain_9 -- ['node19'] 13 = {AntichainNode} antichain_12 -- ['node20', 'node23'] 14 = {AntichainNode} antichain_18 -- ['node23', 'node20', 'node26'] 15 = {AntichainNode} antichain_17 -- ['node23', 'node20', 'node24'] 16 = {AntichainNode} antichain_32 -- ['node23', 'node20', 'node28'] 17 = {AntichainNode} antichain_31 -- ['node23', 'node20', 'node26', 'node24'] 18 = {AntichainNode} antichain_63 -- ['node23', 'node20', 'node26', 'node28'] 19 = {AntichainNode} antichain_33 -- ['node20', 'node26', 'node29'] 20 = {AntichainNode} antichain_16 -- ['node20', 'node43', 'node23'] 21 = {AntichainNode} antichain_30 -- ['node23', 'node20', 'node43', 'node26'] 22 = {AntichainNode} antichain_29 -- ['node23', 'node20', 'node43', 'node24'] 23 = {AntichainNode} antichain_59 -- ['node23', 'node20', 'node43', 'node28']Copy the code

The specific code for stage setting is as follows:

splits = [(0, len(states))] i = len(all_As) - 1 while i >= 0: new_splits = [] stage_id = 0 for (start, end) in splits: partial_splits = \ analyze_partitioning(all_As[i], states, start, end, network_bandwidths[i], all_num_machines[i], activation_compression_ratio, print_configuration, verbose) start_point = start for split in partial_splits: New_splits ((start_point, split)) if I == 0: # Final while # for each node, 24. Find all the counter-chains on each node = gr.all_Toraon (States [split-1]. Antichain) for predecessor in: if predecessor.stage_id is None: Start.set_stage_id (stage_id) # Punch stage ID Start_point = split stage_id += 1 new_destructor.append ((start_point, end)) if i == 0: # finally while toraon = gr. all_toraon (states[end-1]. Antichain) for predecessor in: if predecessor.stage_id is None: Our predecessor. Set_stage_id (Stage_id) # splits stage ID Stage_id += 1 And new_Israeli -= 1Copy the code

5.4 summarize

Let’s summarize what we did to calculate and analyze partitions:

  • The antichained DAG has been split into states, and an important property of each state is that it enhances the antichained. States is the result of topological sorting of the enhanced antichain. It is logical to train in this order.
  • Compute_partitioning uses dynamic programming algorithm to obtain an optimization result for these states, but this calculation partition only obtains a dynamic programming optimization result, which needs to be analyzed and divided in analyze_partitioning. Assign to each stage.
  • Analyze_partitioning uses the optimization results of dynamic programming algorithm to do specific partitioning, and a partial order result is obtained after sorting, which is the ideal partitioning sequence.
  • According to the results of analyze_partitioning, assign a stage to each sub-layer of the model. If some sub-layers belong to the same stage, these sub-layers will eventually be assigned to the same worker (node) for execution.

0 x06 output

The output file is shown below (excerpt), as you can see, the key is to add stages to each node, which will be analyzed in the next article. Such as:

Stage_id =0 corresponds to Node4.

Stage_id =1 corresponds to node5 and node6.

Stage_id =2 Corresponds to Node7.

Stage_id =3 corresponds to node8, node10, node11, node12.

.

Details are as follows:

Node4 -- Embedding(32320, 1024, padding_idx=0) -- forward_compute_time=0.073, backward_compute_time=6.949, Activation_size =6291456.0, parameter_size=132382720.000 -- stage_id=0 node5 -- EmuBidirLSTM((bidir): LSTM(1024, 1024, bidirectional=True) (layer1): LSTM(1024, 1024) (layer2): LSTM(1024, 1024)) -- forward_compute_time=5.247, backward_compute_time=0.016, activation_size=12582912.0, Parameter_size =67174400.000 -- stage_id=1 NODE6 -- Dropout(P =0.2) -- forward_compute_time=0.077, Backward_compute_time =0.196, activation_size=12582912.0, parameter_size=0.000 -- stage_id=1 node7 -- LSTM(2048, 1024) -- forward_compute_time=3.190, backward_compute_time=5.348, activation_size=6553600.0, Parameter_size =50364416.000 -- stage_id=2 node8 -- __getitem__(0) -- forward_compute_time=0.000, Backward_compute_time = 0.000, activation_size = 6291456.0, Parameter_size =0.000 -- stage_id=3 NODE10 -- Dropout(P =0.2) -- forward_compute_time=0.064, backward_compute_time=0.128, Activation_size =6291456.0, parameter_size=0.000 -- stage_id=3 node11 -- LSTM(1024, 1024) -- forward_compute_time=2.491, Backward_compute_time = 4.203, activation_size = 6553600.0, Parameter_size =33587200.000 -- stage_id=3 node12 -- __getitem__(0) -- forward_compute_time=0.000, Backward_compute_time = 0.000, activation_size = 6291456.0, Parameter_size =0.000 -- stage_id=3 node14 -- Add -- forward_compute_time=0.000, backward_compute_time=0.000, Activation_size =6291456.0, parameter_size=0.000 -- STAGe_id =4 NODE15 -- Dropout(P =0.2) -- forward_compute_time=0.059, Backward_compute_time =0.121, activation_size=6291456.0, parameter_size=0.000 -- stage_id=4 node16 -- LSTM(1024, 1024) -- forward_compute_time=2.492, backward_compute_time=4.201, activation_size=6553600.0, Parameter_size =33587200.000 -- stage_id=4 node17 -- __getitem__(0) -- forward_compute_time=0.000, Backward_compute_time = 0.000, activation_size = 6291456.0, Parameter_size =0.000 -- stage_id=5 node19 -- Add -- forward_compute_time=0.000, backward_compute_time=0.000, Activation_size = 6291456.0, Parameter_size =0.000 -- stage_id=5 Node1 -- 4 Node4 -- 5 Node2 -- 5 Node5 -- 6 Node6 -- 7 Node7 -- node8 node8 -- node10 node10 -- node11 node11 -- node12 node12 -- node14 node8 -- node14 node14 -- node15 node15 -- node16 node16 -- node17 node17 -- node19Copy the code

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

0 XFF reference

Deep learning pipeline parallel PipeDream(1)– Profile stage