Source code for mcot.utils.scripts

"""Utilities to create a common interface to all the scripts."""
import argparse
import ast
import importlib
import pkgutil
import sys
from loguru import logger
from .log import setup_log


[docs]def get_docstring(filename): """Extracts the filename from the docstring. :param filename: path to python file :return: module-level docstring (None if undefined) """ with open(filename) as f: base = ast.parse(f.read()) return ast.get_docstring(base)
[docs]def run_script(module, argc=None): """Run script from command line. :param add_to_parser: function that takes an argument parser and adds information to it :param run_from_args: function that runs the script based on the arguments of the parser :param argc: command line arguments """ doc_string = module.__doc__ logger.enable('mcot') script_logger = logger.opt(depth=1) if hasattr(module, 'main'): setup_log() script_logger.info('starting script') try: module.main(argc) except: script_logger.exception('Failed script') else: prog_name = ' '.join(['mcot'] + module.__name__[module.__name__.rfind("_scripts"):].split('.')[1:]) parser = argparse.ArgumentParser(prog_name, description=doc_string, formatter_class=argparse.RawTextHelpFormatter) module.add_to_parser(parser) args = parser.parse_args(argc) setup_log() script_logger.info('starting script') try: module.run_from_args(args) except Exception: script_logger.exception('failed script') raise script_logger.info('finished script')
[docs]class ScriptDirectories(object): """All script directories that have been registered. All .py files within this directory are considered scripts (except __init__ and __main__) Any sub-directories are considered sub- scripts (as long as they contain a __init__) """
[docs] def __init__(self, names=()): self.modules = [(None, importlib.import_module(name)) for name in names]
[docs] def add(self, name: str, group): """Adds a new script directory. In the __init__ of the script directory add: mcot.utils.scripts.directories.add(__name__) :param name: __name__ of the script directory :param group: what group to put the scripts in (set to None for no group) """ self.modules.append((group, importlib.import_module(name)))
[docs] def all_scripts(self, ): scripts = {} def process(module, script_dict): for module_info in pkgutil.iter_modules(module.__path__): if module_info.name.startswith('_'): continue full_name = f'{module.__name__}.{module_info.name}' if module_info.name in script_dict: raise ValueError(f"Dual script definition for {module_info.name}") if module_info.ispkg: script_dict[module_info.name] = {} process( importlib.import_module(full_name), script_dict[module_info.name] ) else: script_dict[module_info.name] = full_name for name, module in self.modules: if name in scripts: raise ValueError(f"Dual script definition for {name}") if name is None: process(module, scripts) else: scripts[name] = {} process(module, scripts[name]) return scripts
@staticmethod def _scripts2string(scripts, indent=0): if isinstance(scripts, dict): res = "\n" for name in sorted(scripts): res = res + " " * indent + f"- {name}: " + ScriptDirectories._scripts2string(scripts[name], indent + 2) return res + "" else: fn = pkgutil.find_loader(scripts).get_filename() res = get_docstring(fn) if res is None: return "\n" return res.splitlines()[0] + "\n" def __call__(self, args=None): """Runs a script identified by the arguments. :param args: optionally group name and script name together with the script arguments (default: sys.argv[1:]) """ if args is None: args = sys.argv[1:] if len(args) == 0: choose_script, args = (), None elif '.' in args[0]: choose_script, args = args[0].split('.'), args[1:] else: choose_script, args = args, None current_group = [] scripts = self.all_scripts() while len(choose_script) != 0 and isinstance(scripts, dict) and choose_script[0] in scripts.keys(): current_group.append(choose_script[0]) scripts = scripts[choose_script[0]] choose_script = choose_script[1:] if isinstance(scripts, dict): print('Usage: mcot [<script_group>...] <script_name> <args>...') if len(current_group) == 0: print('Available scripts:') else: print(f'Available scripts in script group {".".join(current_group)}:') print(self._scripts2string(scripts)[1:]) print('') print("Error: Incomplete or invalid script name provided") exit(1) if args is None: args = choose_script elif len(choose_script) != 0: raise ValueError(f"Script already fully define before processing .{'.'.join(choose_script)}") script = importlib.import_module(scripts) run_script(script, args)
[docs]def load_all_mcot(): mcot = importlib.import_module("mcot") script_modules = [] for module_info in pkgutil.iter_modules(mcot.__path__): if module_info.ispkg: name = f"mcot.{module_info.name}._scripts" try: importlib.import_module(name) script_modules.append(name) except ModuleNotFoundError as e: if e.msg != f"No module named '{name}'": raise ValueError(f"Failed to load scripts for {name}") return script_modules
[docs]def run(argv=None): modules = load_all_mcot() directories = ScriptDirectories(modules) directories(argv)