## @package muji # Module caffe2.python.muji """muji.py does multi-gpu training for caffe2 with no need to change the c++ side code. Everything is defined on the computation graph level. We support the following use cases: - 2 gpus, where peer access is enabled between them. - 4 gpus, where peer access are enabled between all of them. - 4 gpus, where peer access are enabled in two groups, between {1, 2} and {3, 4} - 8 gpus, where peer access are enabled in two groups, between {1, 2, 3, 4} and {5, 6, 7, 8}. If above cases are not satisfied, a fallback function which does not rely on peer access will be called. """ import numpy as np from caffe2.proto import caffe2_pb2 from caffe2.python import workspace def OnGPU(gpu_id): """A utility function that returns a device option protobuf of the specified gpu id. """ device_option = caffe2_pb2.DeviceOption() device_option.device_type = workspace.GpuDeviceType device_option.device_id = gpu_id return device_option def OnCPU(): device_option = caffe2_pb2.DeviceOption() device_option.device_type = caffe2_pb2.CPU return device_option def Allreduce(net, blobs, reduced_affix="_reduced", gpu_indices=None): """The general Allreduce interface that reroutes the function calls. CPUs and AMD GPUs are not supported because GetGpuPeerAccessPattern is called to get gpu peer access pattern. """ if gpu_indices is None: gpu_indices = list(range(len(blobs))) if len(gpu_indices) != len(blobs): raise RuntimeError( "gpu_indices length and blobs length mismatch: %d vs %d" % (len(gpu_indices), len(blobs)) ) pattern = workspace.GetGpuPeerAccessPattern() if len(blobs) == 2 and pattern.shape[0] >= 2 and np.all(pattern[:2, :2]): return Allreduce2(net, blobs, reduced_affix, gpu_indices) elif len(blobs) == 4 and pattern.shape[0] >= 4 and np.all(pattern[:4, :4]): return Allreduce4(net, blobs, reduced_affix, gpu_indices) elif len(blobs) == 4 and pattern.shape[0] >= 4 and np.all(pattern[:2, :2]) and np.all(pattern[2:4, 2:4]): return Allreduce4Group2(net, blobs, reduced_affix, gpu_indices) elif len(blobs) == 8 and pattern.shape[0] >= 8 and np.all(pattern[:8, :8]): return Allreduce8(net, blobs, reduced_affix, gpu_indices) else: return AllreduceFallback(net, blobs, reduced_affix, gpu_indices) def Allreduce2(net, blobs, reduced_affix, gpu_indices): """Allreduce for 2 gpus. Algorithm: 0r <- 0 + 1, 1r <- 0r, where r means "reduced" """ a, b = blobs gpu_a, gpu_b = gpu_indices a_reduced = net.Add([a, b], a + reduced_affix, device_option=OnGPU(gpu_a)) b_reduced = a_reduced.Copy( [], b + reduced_affix, device_option=OnGPU(gpu_b) ) return a_reduced, b_reduced def Allreduce4(net, blobs, reduced_affix, gpu_indices): """Allreduce for 4 gpus. Algorithm: 2 level reduction. 0r <- 0 + 1, 2r <- 2 + 3 0r <- 0r + 2r 2r <- 0r, 1r <- 0r, 3r <- 2r """ a, b, c, d = blobs gpu_a, gpu_b, gpu_c, gpu_d = gpu_indices # a_reduced <- a+b, c_reduced <- c + d a_reduced = net.Add( [a, b], str(a) + reduced_affix, device_option=OnGPU(gpu_a) ) c_reduced = net.Add( [c, d], str(c) + reduced_affix, device_option=OnGPU(gpu_c) ) # a_reduced <- a_reduced + c_reduced a_reduced = a_reduced.Add(c_reduced, a_reduced, device_option=OnGPU(gpu_a)) # broadcast a_reduced to c_reduced c_reduced = a_reduced.Copy([], c_reduced, device_option=OnGPU(gpu_c)) # broadcast to b and d b_reduced = a_reduced.Copy( [], str(b) + reduced_affix, device_option=OnGPU(gpu_b) ) d_reduced = c_reduced.Copy( [], str(d) + reduced_affix, device_option=OnGPU(gpu_d) ) return a_reduced, b_reduced, c_reduced, d_reduced def Allreduce4Group2(net, blobs, reduced_affix, gpu_indices): """Allreduce for 4 gpus where peer access are enabled in {0,1} and {2,3} Algorithm: 2 level reduction. 0r <- 0 + 1, 2r <- 2 + 3 0r <- 0r + 2r 2r <- 0r, 1r <- 0r, 3r <- 2r """ a, b, c, d = blobs gpu_a, gpu_b, gpu_c, gpu_d = gpu_indices # a_reduced <- a+b, c_reduced <- c + d a_reduced = net.Add( [a, b], str(a) + reduced_affix, device_option=OnGPU(gpu_a) ) c_reduced = net.Add( [c, d], str(c) + reduced_affix, device_option=OnGPU(gpu_c) ) # copy from c_reduce(gpu_c) to c_reduce_copy(gpu_a) c_reduced_copy = c_reduced.Copy( [], str(c_reduced) + '_copy', device_option=OnGPU(gpu_a) ) # a_reduced <- a_reduced + c_reduced_copy a_reduced = a_reduced.Add(c_reduced_copy, a_reduced, device_option=OnGPU(gpu_a)) # broadcast a_reduced to c_reduced c_reduced = a_reduced.Copy([], c_reduced, device_option=OnGPU(gpu_c)) # broadcast to b and d b_reduced = a_reduced.Copy( [], str(b) + reduced_affix, device_option=OnGPU(gpu_b) ) d_reduced = c_reduced.Copy( [], str(d) + reduced_affix, device_option=OnGPU(gpu_d) ) return a_reduced, b_reduced, c_reduced, d_reduced def Allreduce8(net, blobs, reduced_affix, gpu_indices): """Allreduce for 8 gpus. Algorithm: 3 level reduction. 0r <- 0 + 1, 2r <- 2 + 3, 4r <- 4 + 5, 6r <- 6 + 7 0r <- 0r + 2r, 4r <- 4r + 6r 0r <- 0r + 4r 4r <- 0r 2r <- 0r, 6r <- 4r 1r <- 0r, 3r <- 2r, 5r <- 4r, 7r <- 6r """ reduced = [None] * 8 # Reduction level 1 for i in [0, 2, 4, 6]: reduced[i] = net.Add( [blobs[i], blobs[i + 1]], blobs[i] + reduced_affix, device_option=OnGPU(gpu_indices[i]) ) # Reduction level 2 for i in [0, 4]: reduced[i] = net.Add( [reduced[i], reduced[i + 2]], str(blobs[i]) + reduced_affix, device_option=OnGPU(gpu_indices[i]) ) # Reduction level 3: this involves a copy. reduced_4_copy = reduced[4].Copy( [], str(reduced[4]) + '_copy', device_option=OnGPU(gpu_indices[0]) ) reduced[0] = reduced[0].Add( reduced_4_copy, reduced[0], device_option=OnGPU(gpu_indices[0]) ) # Broadcast level 1 reduced[4] = reduced[0].Copy( [], reduced[4], device_option=OnGPU(gpu_indices[4]) ) # Broadcast level 2 for i in [2, 6]: reduced[i] = reduced[i - 2].Copy( [], reduced[i], device_option=OnGPU(gpu_indices[i]) ) # Broadcast level 3 for i in [1, 3, 5, 7]: reduced[i] = reduced[i - 1].Copy( [], blobs[i] + reduced_affix, device_option=OnGPU(gpu_indices[i]) ) return reduced def AllreduceFallback(net, blobs, reduced_affix, gpu_indices): """A fallback option for Allreduce with no assumption on p2p. Algorithm: a flat operation on gpu 0 0r <- 0 0r <- 0r + i for i in gpu_indices[1:] ir <- 0r for i in gpu_indices[1:] """ reduced = [None] * len(gpu_indices) if reduced_affix != '': # copy first reduced[0] = net.Copy( blobs[0], blobs[0] + reduced_affix, device_option=OnGPU(gpu_indices[0]) ) else: reduced[0] = blobs[0] # do temp copy and add temp_name = reduced[0] + '_temp_copy' for i in range(1, len(gpu_indices)): temp = net.Copy( blobs[i], temp_name, device_option=OnGPU(gpu_indices[0]) ) reduced[0] = net.Add( [temp, reduced[0]], reduced[0], device_option=OnGPU(gpu_indices[0]) ) # Broadcast to everyone else for i in range(1, len(gpu_indices)): reduced[i] = net.Copy( reduced[0], blobs[i] + reduced_affix, device_option=OnGPU(gpu_indices[i]) ) return reduced