Why redesign a distributed deep learning framework like OneFlow?
An obvious starting point is that the original mainstream deep learning frameworks are inherently deficient.
Especially at the abstraction and API levels, they are designed with various shortcomings that greatly inconvenience developers when using them.
Although some of the deficiencies are being addressed, there are still some important issues being overlooked.
Therefore, we will launch a series of articles to discuss in detail the major shortcomings of the runtime system of the mainstream DL frameworks. The first article, The Limitations of Existing Deep Learning Frameworks: Resource Dependency, has been published, this article will discuss whether data movement should be put into the computation graph and the flaws of using callback functions.
At the end of this article, we will introduce the concept of representing data movement as first-class citizens.
Written by Yuan Jinhui; Translated by Wang Kaiyan, Dong Wenwen
Dependency Management Based on Computation Graph
In the last article, The Limitations of Existing Deep Learning Frameworks: Resource Dependency, we discussed three typical dependencies among ops in DL frameworks: data dependencies, control dependencies, and resource dependencies. Proper and efficient management of these dependencies is essential for DL frameworks.
As is common with big data systems, practitioners of DL frameworks have found the computation graph to be an effective tool for expressing and managing complex dependencies.
Theoretically, if all ops and all dependencies are represented in a computation graph, then only one mechanism, the graph executor, is needed to manage the execution order of the ops.
It may seem that all you need to do is manage the computation graph and everything will be fine. However, this is not the case with the implementation.
Should Data Movement Be Put into the Computation Graph?
The critical question is, does the computation graph only describe “computation”?
Are only computations performed on the CPU or GPU considered as ops?
Unfortunately, the traditional DL frameworks think so.
As shown in Figure 1, in a non-distributed scenario, all the computation operations will be executed inside GPU by copying the input data to the GPU, and copy the result away from the GPU after the computation is done.
In this scenario, data movement is straightforward (load from disk, preprocess and copy to GPU) and does not cause much trouble. As with existing DL frameworks, the computation graph encompasses and manages all the “computation” ops, and the computation graph-based execution engine solves most of the problems elegantly. The computation graph only describes “computation”, not data movement, which is managed by other functional modules.
However, in the distributed scenario, the situation is different.
In distributed training, multiple GPUs are connected via high-speed interconnect devices. Regardless they are trained by data parallelism, model parallelism, or pipeline parallelism, multiple times data movements between GPUs (across machines or devices) are needed to process every batch of data.
Figure 2 illustrates an example of a simple distributed training DL model. This is a three-layer neural network consisting of 3 ops f₁, f₂, f₃. Assume that the task is assigned to four GPUs d₁, d₂, d₃, d₄; f₁, f₂ are executed on d₁, d₂ using data parallelism, while f₃ is run on d₃, d₄ using model parallelism.
Then, the physical graph (or execution plan), whether generated by manually programming or by a smart compiler, needs to insert some data movement and data routing operations between computation ops, as shown in the lower part of Figure 2. For example, an $all-gather$ collective communication op g has to be inserted between f₁₂, f₂₂ and f₁₃, f₂₃ in the forward pass, while a reduce-scatter collective communication op s that reverses the operation of g is also required between b₁₃, b₂₃ and b₁₂, b₂₂ in the backward pass; the all-reduce collective communication op r₁ and r₂ are used to synchronize the model parameters since f₁ and f₂ both adopt data parallelism.
As you can imagine, the process of generating physical graphs from logical graphs is complex. Unlike the original DL frameworks, OneFlow is automatically mapped from logical to physical graphs by the compiler, while other frameworks still require manual programming to insert data movement and data routing operations.
Sometimes, the data movement time is on the same order of magnitude as the computation time (e.g., it takes 100 ms to carry 100 MB data at 10 GB bandwidth), and if the data movement time cannot be reduced by pipelining, the horizontal scalability will be severely limited. Even if more GPUs are added in, the resulting speedup will be much lower than the ideal linear speedup.
(However, how to automate the conversion from logical to physical graphs through the compiler and how to improve the computational efficiency of the whole distributed system are not what this article plans to discuss in depth. Interested readers can follow other technical information shared by the OneFlow team. The problem to be explored in this article is how to design and implement distributed DL systems more elegantly, given that data movement is critical.)
In summary, although there are plenty of data movement operations in distributed scenarios and the time they take is nearly the same as that of computation, the computation graph of the original DL frameworks only treats the computational logic executed inside a single GPU as ops, and schedules and manages the logic of “computation” through the graph executor.
That is, although there is a data production and consumption relationship between data movement and computation, data movement is not treated as an op in the computation graph, so how is the dependency relationship between data movement and computation managed? The answer is that it is managed by another set of mechanisms, i.e., there are two dependency management mechanisms in the whole system.
What are the disadvantages of using two sets of dependency management mechanisms?
The whole system’s concept and implementation are complex, especially at the interface and interaction between the two mechanisms.
TensorFlow as An Example
Readers who are familiar with the paper and code of TensorFlow must have seen the following diagram:
As shown in Figure 3, when data needs to be moved between devices, TensorFlow inserts the send and recv ops on each producer and consumer side, and completes the data movement through this pair of ops.
You may wonder why the existing frameworks have two sets of dependency management mechanisms since TensorFlow uses ops to represent data movement.
The answer is that TensorFlow only does this at the logical level, but not thoroughly at the physical level.
Imagine if the two devices A and B are inside one machine and support peer to peer transfers, then only a memCopyAsync is needed to move the data from A to B.
However, if peer to peer transfers are not supported by devices A and B, then at the physical level, data needs to be copied first from A to the main memory (this operation is denoted as m₁) and then from the main memory to device B (this operation is denoted as m₂). In fact, m₁ and m₂ are not represented as ops in the computation graph by TensorFlow, the dependencies between m₁ and m₂ are not expressed at the computation graph level, and their trigger is not managed by the graph executor. In the concrete implementation, m₁ and m₂ are linked in a chain of callback functions, i.e., m₂ is used as a callback function for input to m₁, and m₂ is called when m1 is finished.
If the two devices A and B are not on the same machine, the chain of data movement behind the send and recv op is much longer and may consist of carrying from device A to the main memory of the machine on which device A is located (m₁), then from the main memory of the machine on which device A is located to the main memory of the machine on which device B is located (m₂), and then from the main memory of the machine on which device B is located to device B (m₃). As you can imagine, the actual execution process is executed by a chain of callback functions.
TensorFlow’s chain of callback functions is another set of dependency management mechanism that collaborates with the computation graph’s dependency management to complete the distributed training.
Defects of Using Callback Functions
Having two dependency management systems is not simple enough, and callback functions can handle “single dependency” better, but it is not suitable for managing “multiple dependencies”.
For example, if there are two functions O₁ and O₂, and O₂ depends on O₁, then it is convenient to treat O₂ as a callback function executed in O₁.
If there are three functions O₁, O₂, O₃, and O₃ depends on O₁ and O₂, i.e. O₃ can only start executing after O₁ and O₂ are completed. If we know with certainty which of O₁ and O₂ will be executed first and which will be executed later, it is still an excellent solution to construct a chain of callback functions to execute.
However, if there is no dependency between O₁ and O₂, and there is no certainty which one will complete first, it is not clear when programming whether O₃ should be wrapped in the callback function of O₁ or O₂. In this case, a counter must be introduced to record whether both O₁ and O₂ have been executed, and O₃ can be executed only when these two functions have been executed.
This approach of relying on counters is essentially almost the same as the principle of the computation graph.
In real systems, “multiple dependencies” are commonplace, such as data dependencies, additional control dependencies, and even dependencies caused by resource sharing. It is not convenient to use callback functions to express such “multiple dependencies”.
The situation becomes more complicated if part of the “multiple dependencies” of the same op is represented by callback function and part of it is represented by computation graph. The following is an example.
As discussed above, the send and recv ops in TensorFlow only describe the logical relationships; the physical data movement operations are placed in callback functions and the dependencies between operations are reflected by inserting callback functions where necessary.
In the example illustrated in Figure 4, O₂ depends on O₁, and O₂ is wrapped in a callback function which is expected to be invoked on the completion of O₁. However, if O₂ has other dependencies such as the output data of other ops or the control dependencies inserted by the framework, the completion of O₂ does not necessarily suffice to resolve all the dependencies of O₂.
To correctly and safely schedule O₂, the callback function should notify the completion of O₁ to the scheduler so that the scheduler updates all the dependencies of O₁. If the scheduler returns that all the dependencies have been resolved, O₂ can be scheduled immediately. Otherwise, O₂ is inserted into a waiting list and will be scheduled in the future when other dependencies are resolved.
In practice, none of the existing DL frameworks provide support for such “multi-dependent” callback functions. When the dependencies between some ops are not all tracked and managed by the schedulers (e.g., partly in the callback functions and partly in the schedulers), they can only be solved by a similar idea as in Figure 4.
This requires the framework to expose the internal scheduler to users, because sometimes the callback functions are inserted manually by users rather than automatically inserted by the framework (as is often the case in Megatron-LM, DeepSpeed’s PyTorch-based secondary development), which is not an ideal practice.
Nevertheless, none of the existing DL frameworks have exposed the underlying scheduler to users yet, which means that the ideas described in Figure 4 cannot be implemented easily.
Treat Data Movement as First-Class Citizens
In summary, we hope that all dependencies, whether data movement or computation, to be managed and scheduled through one mechanism, which can be implemented by “treating data movement as first-class citizens”, as stated by OneFlow. Rather than just treating computation operations as ops as traditional DL frameworks do, OneFlow’s compiler explicitly represents all data movement operations as ops in the computation graph, and all dependencies between ops are also represented in the computation graph.
In other words, all callback functions that are inserted by the system and hard-coded to be triggered at runtime are extracted and converted into ordinary ops in the computation graph. This can also be interpreted that the chain of callback functions are expanded and statically represented as a computation graph.
The advantage of this change lies in that there is only one “dependency management” mechanism for the whole system, namely the computation graph (which is reflected as a production and consumption relationship between actors when running OneFlow). And in this way, the abstraction and implementation will be cleaner, the callback hell will be avoided, running efficiency may be improved (the compiler can analyze and optimize the execution order of ops, whereas the execution order described by the callback function is fixed), and some advanced features (e.g. flow control) will be supported.