# @package utils # Module caffe2.python.utils from caffe2.proto import caffe2_pb2 from future.utils import viewitems from google.protobuf.message import DecodeError, Message from google.protobuf import text_format import sys import collections import copy import functools import numpy as np from six import integer_types, binary_type, text_type, string_types OPTIMIZER_ITERATION_NAME = "optimizer_iteration" ITERATION_MUTEX_NAME = "iteration_mutex" def OpAlmostEqual(op_a, op_b, ignore_fields=None): ''' Two ops are identical except for each field in the `ignore_fields`. ''' ignore_fields = ignore_fields or [] if not isinstance(ignore_fields, list): ignore_fields = [ignore_fields] assert all(isinstance(f, text_type) for f in ignore_fields), ( 'Expect each field is text type, but got {}'.format(ignore_fields)) def clean_op(op): op = copy.deepcopy(op) for field in ignore_fields: if op.HasField(field): op.ClearField(field) return op op_a = clean_op(op_a) op_b = clean_op(op_b) return op_a == op_b or str(op_a) == str(op_b) def CaffeBlobToNumpyArray(blob): if (blob.num != 0): # old style caffe blob. return (np.asarray(blob.data, dtype=np.float32) .reshape(blob.num, blob.channels, blob.height, blob.width)) else: # new style caffe blob. return (np.asarray(blob.data, dtype=np.float32) .reshape(blob.shape.dim)) def Caffe2TensorToNumpyArray(tensor): if tensor.data_type == caffe2_pb2.TensorProto.FLOAT: return np.asarray( tensor.float_data, dtype=np.float32).reshape(tensor.dims) elif tensor.data_type == caffe2_pb2.TensorProto.DOUBLE: return np.asarray( tensor.double_data, dtype=np.float64).reshape(tensor.dims) elif tensor.data_type == caffe2_pb2.TensorProto.INT64: return np.asarray( tensor.int64_data, dtype=np.int64).reshape(tensor.dims) elif tensor.data_type == caffe2_pb2.TensorProto.INT32: return np.asarray( tensor.int32_data, dtype=np.int).reshape(tensor.dims) # pb.INT32=>np.int use int32_data elif tensor.data_type == caffe2_pb2.TensorProto.INT16: return np.asarray( tensor.int32_data, dtype=np.int16).reshape(tensor.dims) # pb.INT16=>np.int16 use int32_data elif tensor.data_type == caffe2_pb2.TensorProto.UINT16: return np.asarray( tensor.int32_data, dtype=np.uint16).reshape(tensor.dims) # pb.UINT16=>np.uint16 use int32_data elif tensor.data_type == caffe2_pb2.TensorProto.INT8: return np.asarray( tensor.int32_data, dtype=np.int8).reshape(tensor.dims) # pb.INT8=>np.int8 use int32_data elif tensor.data_type == caffe2_pb2.TensorProto.UINT8: return np.asarray( tensor.int32_data, dtype=np.uint8).reshape(tensor.dims) # pb.UINT8=>np.uint8 use int32_data else: # TODO: complete the data type: bool, float16, byte, int64, string raise RuntimeError( "Tensor data type not supported yet: " + str(tensor.data_type)) def NumpyArrayToCaffe2Tensor(arr, name=None): tensor = caffe2_pb2.TensorProto() tensor.dims.extend(arr.shape) if name: tensor.name = name if arr.dtype == np.float32: tensor.data_type = caffe2_pb2.TensorProto.FLOAT tensor.float_data.extend(list(arr.flatten().astype(float))) elif arr.dtype == np.float64: tensor.data_type = caffe2_pb2.TensorProto.DOUBLE tensor.double_data.extend(list(arr.flatten().astype(np.float64))) elif arr.dtype == np.int64: tensor.data_type = caffe2_pb2.TensorProto.INT64 tensor.int64_data.extend(list(arr.flatten().astype(np.int64))) elif arr.dtype == np.int or arr.dtype == np.int32: tensor.data_type = caffe2_pb2.TensorProto.INT32 tensor.int32_data.extend(arr.flatten().astype(np.int).tolist()) elif arr.dtype == np.int16: tensor.data_type = caffe2_pb2.TensorProto.INT16 tensor.int32_data.extend(list(arr.flatten().astype(np.int16))) # np.int16=>pb.INT16 use int32_data elif arr.dtype == np.uint16: tensor.data_type = caffe2_pb2.TensorProto.UINT16 tensor.int32_data.extend(list(arr.flatten().astype(np.uint16))) # np.uint16=>pb.UNIT16 use int32_data elif arr.dtype == np.int8: tensor.data_type = caffe2_pb2.TensorProto.INT8 tensor.int32_data.extend(list(arr.flatten().astype(np.int8))) # np.int8=>pb.INT8 use int32_data elif arr.dtype == np.uint8: tensor.data_type = caffe2_pb2.TensorProto.UINT8 tensor.int32_data.extend(list(arr.flatten().astype(np.uint8))) # np.uint8=>pb.UNIT8 use int32_data else: # TODO: complete the data type: bool, float16, byte, string raise RuntimeError( "Numpy data type not supported yet: " + str(arr.dtype)) return tensor def MakeArgument(key, value): """Makes an argument based on the value type.""" argument = caffe2_pb2.Argument() argument.name = key iterable = isinstance(value, collections.abc.Iterable) # Fast tracking common use case where a float32 array of tensor parameters # needs to be serialized. The entire array is guaranteed to have the same # dtype, so no per-element checking necessary and no need to convert each # element separately. if isinstance(value, np.ndarray) and value.dtype.type is np.float32: argument.floats.extend(value.flatten().tolist()) return argument if isinstance(value, np.ndarray): value = value.flatten().tolist() elif isinstance(value, np.generic): # convert numpy scalar to native python type value = np.asscalar(value) if type(value) is float: argument.f = value elif type(value) in integer_types or type(value) is bool: # We make a relaxation that a boolean variable will also be stored as # int. argument.i = value elif isinstance(value, binary_type): argument.s = value elif isinstance(value, text_type): argument.s = value.encode('utf-8') elif isinstance(value, caffe2_pb2.NetDef): argument.n.CopyFrom(value) elif isinstance(value, Message): argument.s = value.SerializeToString() elif iterable and all(type(v) in [float, np.float_] for v in value): argument.floats.extend( v.item() if type(v) is np.float_ else v for v in value ) elif iterable and all( type(v) in integer_types or type(v) in [bool, np.int_] for v in value ): argument.ints.extend( v.item() if type(v) is np.int_ else v for v in value ) elif iterable and all( isinstance(v, binary_type) or isinstance(v, text_type) for v in value ): argument.strings.extend( v.encode('utf-8') if isinstance(v, text_type) else v for v in value ) elif iterable and all(isinstance(v, caffe2_pb2.NetDef) for v in value): argument.nets.extend(value) elif iterable and all(isinstance(v, Message) for v in value): argument.strings.extend(v.SerializeToString() for v in value) else: if iterable: raise ValueError( "Unknown iterable argument type: key={} value={}, value " "type={}[{}]".format( key, value, type(value), set(type(v) for v in value) ) ) else: raise ValueError( "Unknown argument type: key={} value={}, value type={}".format( key, value, type(value) ) ) return argument def TryReadProtoWithClass(cls, s): """Reads a protobuffer with the given proto class. Inputs: cls: a protobuffer class. s: a string of either binary or text protobuffer content. Outputs: proto: the protobuffer of cls Throws: google.protobuf.message.DecodeError: if we cannot decode the message. """ obj = cls() try: text_format.Parse(s, obj) return obj except (text_format.ParseError, UnicodeDecodeError): obj.ParseFromString(s) return obj def GetContentFromProto(obj, function_map): """Gets a specific field from a protocol buffer that matches the given class """ for cls, func in viewitems(function_map): if type(obj) is cls: return func(obj) def GetContentFromProtoString(s, function_map): for cls, func in viewitems(function_map): try: obj = TryReadProtoWithClass(cls, s) return func(obj) except DecodeError: continue else: raise DecodeError("Cannot find a fit protobuffer class.") def ConvertProtoToBinary(proto_class, filename, out_filename): """Convert a text file of the given protobuf class to binary.""" with open(filename) as f: proto = TryReadProtoWithClass(proto_class, f.read()) with open(out_filename, 'w') as fid: fid.write(proto.SerializeToString()) def GetGPUMemoryUsageStats(): """Get GPU memory usage stats from CUDAContext/HIPContext. This requires flag --caffe2_gpu_memory_tracking to be enabled""" from caffe2.python import workspace, core workspace.RunOperatorOnce( core.CreateOperator( "GetGPUMemoryUsage", [], ["____mem____"], device_option=core.DeviceOption(workspace.GpuDeviceType, 0), ), ) b = workspace.FetchBlob("____mem____") return { 'total_by_gpu': b[0, :], 'max_by_gpu': b[1, :], 'total': np.sum(b[0, :]), 'max_total': np.sum(b[1, :]) } def ResetBlobs(blobs): from caffe2.python import workspace, core workspace.RunOperatorOnce( core.CreateOperator( "Free", list(blobs), list(blobs), device_option=core.DeviceOption(caffe2_pb2.CPU), ), ) class DebugMode(object): ''' This class allows to drop you into an interactive debugger if there is an unhandled exception in your python script Example of usage: def main(): # your code here pass if __name__ == '__main__': from caffe2.python.utils import DebugMode DebugMode.run(main) ''' @classmethod def run(cls, func): try: return func() except KeyboardInterrupt: raise except Exception: import pdb print( 'Entering interactive debugger. Type "bt" to print ' 'the full stacktrace. Type "help" to see command listing.') print(sys.exc_info()[1]) print pdb.post_mortem() sys.exit(1) raise def raiseIfNotEqual(a, b, msg): if a != b: raise Exception("{}. {} != {}".format(msg, a, b)) def debug(f): ''' Use this method to decorate your function with DebugMode's functionality Example: @debug def test_foo(self): raise Exception("Bar") ''' @functools.wraps(f) def wrapper(*args, **kwargs): def func(): return f(*args, **kwargs) return DebugMode.run(func) return wrapper def BuildUniqueMutexIter( init_net, net, iter=None, iter_mutex=None, iter_val=0 ): ''' Often, a mutex guarded iteration counter is needed. This function creates a mutex iter in the net uniquely (if the iter already existing, it does nothing) This function returns the iter blob ''' iter = iter if iter is not None else OPTIMIZER_ITERATION_NAME iter_mutex = iter_mutex if iter_mutex is not None else ITERATION_MUTEX_NAME from caffe2.python import core if not init_net.BlobIsDefined(iter): # Add training operators. with core.DeviceScope( core.DeviceOption(caffe2_pb2.CPU, extra_info=["device_type_override:cpu"]) ): iteration = init_net.ConstantFill( [], iter, shape=[1], value=iter_val, dtype=core.DataType.INT64, ) iter_mutex = init_net.CreateMutex([], [iter_mutex]) net.AtomicIter([iter_mutex, iteration], [iteration]) else: iteration = init_net.GetBlobRef(iter) return iteration def EnumClassKeyVals(cls): # cls can only be derived from object assert type(cls) == type # Enum attribute keys are all capitalized and values are strings enum = {} for k in dir(cls): if k == k.upper(): v = getattr(cls, k) if isinstance(v, string_types): assert v not in enum.values(), ( "Failed to resolve {} as Enum: " "duplicate entries {}={}, {}={}".format( cls, k, v, [key for key in enum if enum[key] == v][0], v ) ) enum[k] = v return enum def ArgsToDict(args): """ Convert a list of arguments to a name, value dictionary. Assumes that each argument has a name. Otherwise, the argument is skipped. """ ans = {} for arg in args: if not arg.HasField("name"): continue for d in arg.DESCRIPTOR.fields: if d.name == "name": continue if d.label == d.LABEL_OPTIONAL and arg.HasField(d.name): ans[arg.name] = getattr(arg, d.name) break elif d.label == d.LABEL_REPEATED: list_ = getattr(arg, d.name) if len(list_) > 0: ans[arg.name] = list_ break else: ans[arg.name] = None return ans def NHWC2NCHW(tensor): assert tensor.ndim >= 1 return tensor.transpose((0, tensor.ndim - 1) + tuple(range(1, tensor.ndim - 1))) def NCHW2NHWC(tensor): assert tensor.ndim >= 2 return tensor.transpose((0,) + tuple(range(2, tensor.ndim)) + (1,))