""" Module version for monitoring CLI pipes (`... | python -m tqdm | ...`). """ import logging import re import sys from ast import literal_eval as numeric from textwrap import indent from .std import TqdmKeyError, TqdmTypeError, tqdm from .version import __version__ __all__ = ["main"] log = logging.getLogger(__name__) def cast(val, typ): log.debug((val, typ)) if " or " in typ: for t in typ.split(" or "): try: return cast(val, t) except TqdmTypeError: pass raise TqdmTypeError(f"{val} : {typ}") # sys.stderr.write('\ndebug | `val:type`: `' + val + ':' + typ + '`.\n') if typ == 'bool': if (val == 'True') or (val == ''): return True if val == 'False': return False raise TqdmTypeError(val + ' : ' + typ) if typ == 'chr': if len(val) == 1: return val.encode() if re.match(r"^\\\w+$", val): return eval(f'"{val}"').encode() raise TqdmTypeError(f"{val} : {typ}") if typ == 'str': return val if typ == 'int': try: return int(val) except ValueError as exc: raise TqdmTypeError(f"{val} : {typ}") from exc if typ == 'float': try: return float(val) except ValueError as exc: raise TqdmTypeError(f"{val} : {typ}") from exc raise TqdmTypeError(f"{val} : {typ}") def posix_pipe(fin, fout, delim=b'\\n', buf_size=256, callback=lambda float: None, callback_len=True): """ Params ------ fin : binary file with `read(buf_size : int)` method fout : binary file with `write` (and optionally `flush`) methods. callback : function(float), e.g.: `tqdm.update` callback_len : If (default: True) do `callback(len(buffer))`. Otherwise, do `callback(data) for data in buffer.split(delim)`. """ fp_write = fout.write if not delim: while True: tmp = fin.read(buf_size) # flush at EOF if not tmp: getattr(fout, 'flush', lambda: None)() return fp_write(tmp) callback(len(tmp)) # return buf = b'' len_delim = len(delim) # n = 0 while True: tmp = fin.read(buf_size) # flush at EOF if not tmp: if buf: fp_write(buf) if callback_len: # n += 1 + buf.count(delim) callback(1 + buf.count(delim)) else: for i in buf.split(delim): callback(i) getattr(fout, 'flush', lambda: None)() return # n while True: i = tmp.find(delim) if i < 0: buf += tmp break fp_write(buf + tmp[:i + len(delim)]) # n += 1 callback(1 if callback_len else (buf + tmp[:i])) buf = b'' tmp = tmp[i + len_delim:] # ((opt, type), ... ) RE_OPTS = re.compile(r'\n {4}(\S+)\s{2,}:\s*([^,]+)') # better split method assuming no positional args RE_SHLEX = re.compile(r'\s*(? : \2', d) split = RE_OPTS.split(d) opt_types_desc = zip(split[1::3], split[2::3], split[3::3]) d = ''.join(('\n --{0} : {2}{3}' if otd[1] == 'bool' else '\n --{0}=<{1}> : {2}{3}').format( otd[0].replace('_', '-'), otd[0], *otd[1:]) for otd in opt_types_desc if otd[0] not in UNSUPPORTED_OPTS) help_short = "Usage:\n tqdm [--help | options]\n" d = help_short + """ Options: -h, --help Print this help and exit. -v, --version Print version and exit. """ + d.strip('\n') + '\n' # opts = docopt(d, version=__version__) if any(v in argv for v in ('-v', '--version')): sys.stdout.write(__version__ + '\n') sys.exit(0) elif any(v in argv for v in ('-h', '--help')): sys.stdout.write(d + '\n') sys.exit(0) elif argv and argv[0][:2] != '--': sys.stderr.write(f"Error:Unknown argument:{argv[0]}\n{help_short}") argv = RE_SHLEX.split(' '.join(["tqdm"] + argv)) opts = dict(zip(argv[1::3], argv[3::3])) log.debug(opts) opts.pop('log', True) tqdm_args = {'file': fp} try: for (o, v) in opts.items(): o = o.replace('-', '_') try: tqdm_args[o] = cast(v, opt_types[o]) except KeyError as e: raise TqdmKeyError(str(e)) log.debug('args:' + str(tqdm_args)) delim_per_char = tqdm_args.pop('bytes', False) update = tqdm_args.pop('update', False) update_to = tqdm_args.pop('update_to', False) if sum((delim_per_char, update, update_to)) > 1: raise TqdmKeyError("Can only have one of --bytes --update --update_to") except Exception: fp.write("\nError:\n" + help_short) stdin, stdout_write = sys.stdin, sys.stdout.write for i in stdin: stdout_write(i) raise else: buf_size = tqdm_args.pop('buf_size', 256) delim = tqdm_args.pop('delim', b'\\n') tee = tqdm_args.pop('tee', False) manpath = tqdm_args.pop('manpath', None) comppath = tqdm_args.pop('comppath', None) if tqdm_args.pop('null', False): class stdout(object): @staticmethod def write(_): pass else: stdout = sys.stdout stdout = getattr(stdout, 'buffer', stdout) stdin = getattr(sys.stdin, 'buffer', sys.stdin) if manpath or comppath: from importlib import resources from os import path from shutil import copyfile def cp(name, dst): """copy resource `name` to `dst`""" if hasattr(resources, 'files'): copyfile(str(resources.files('tqdm') / name), dst) else: # py<3.9 with resources.path('tqdm', name) as src: copyfile(str(src), dst) log.info("written:%s", dst) if manpath is not None: cp('tqdm.1', path.join(manpath, 'tqdm.1')) if comppath is not None: cp('completion.sh', path.join(comppath, 'tqdm_completion.sh')) sys.exit(0) if tee: stdout_write = stdout.write fp_write = getattr(fp, 'buffer', fp).write class stdout(object): # pylint: disable=function-redefined @staticmethod def write(x): with tqdm.external_write_mode(file=fp): fp_write(x) stdout_write(x) if delim_per_char: tqdm_args.setdefault('unit', 'B') tqdm_args.setdefault('unit_scale', True) tqdm_args.setdefault('unit_divisor', 1024) log.debug(tqdm_args) with tqdm(**tqdm_args) as t: posix_pipe(stdin, stdout, '', buf_size, t.update) elif delim == b'\\n': log.debug(tqdm_args) write = stdout.write if update or update_to: with tqdm(**tqdm_args) as t: if update: def callback(i): t.update(numeric(i.decode())) else: # update_to def callback(i): t.update(numeric(i.decode()) - t.n) for i in stdin: write(i) callback(i) else: for i in tqdm(stdin, **tqdm_args): write(i) else: log.debug(tqdm_args) with tqdm(**tqdm_args) as t: callback_len = False if update: def callback(i): t.update(numeric(i.decode())) elif update_to: def callback(i): t.update(numeric(i.decode()) - t.n) else: callback = t.update callback_len = True posix_pipe(stdin, stdout, delim, buf_size, callback, callback_len)