diff --git a/setup.py b/setup.py index 05ea800..76666c2 100644 --- a/setup.py +++ b/setup.py @@ -18,6 +18,7 @@ setup(name=pname, url='https://github.com/adobdin/timmy', long_description=open('README.md').read(), packages=[pname], + install_requires=['pyyaml'], data_files=rqfiles, include_package_data=True, entry_points={'console_scripts': ['%s=%s.cli:main' % (pname, pname)]}, diff --git a/timmy/cli.py b/timmy/cli.py index 20d76b6..8dc09eb 100755 --- a/timmy/cli.py +++ b/timmy/cli.py @@ -108,8 +108,7 @@ def parse_args(): parser.add_argument('-q', '--quiet', help=('Print only command execution results and log' ' messages. Good for quick runs / "watch" wrap.' - ' Also sets default loglevel to ERROR.' - 'This option disables any -v parameters.'), + ' This option disables any -v parameters.'), action='store_true') parser.add_argument('-m', '--maxthreads', type=int, default=100, help=('Maximum simultaneous nodes for command' @@ -146,13 +145,15 @@ def main(argv=None): parser = parse_args() args = parser.parse_args(argv[1:]) loglevels = [logging.WARNING, logging.INFO, logging.DEBUG] - loglevel = loglevels[min(len(loglevels)-1, args.verbose)] if args.quiet: - loglevel = logging.ERROR - FORMAT = '%(asctime)s %(levelname)s %(module)s %(funcName)s(): %(message)s' + args.verbose = 0 + loglevel = loglevels[min(len(loglevels)-1, args.verbose)] + FORMAT = ('%(asctime)s %(levelname)s: %(module)s: ' + '%(funcName)s(): %(message)s') logging.basicConfig(filename=args.log_file, level=loglevel, format=FORMAT) + logger = logging.getLogger(__name__) conf = load_conf(args.config) if args.fuel_ip: conf['fuel_ip'] = args.fuel_ip @@ -199,8 +200,8 @@ def main(argv=None): if args.dest_file: conf['archive_dir'] = os.path.split(args.dest_file)[0] conf['archive_name'] = os.path.split(args.dest_file)[1] - logging.info('Using rqdir: %s, rqfile: %s' % - (conf['rqdir'], conf['rqfile'])) + logger.info('Using rqdir: %s, rqfile: %s' % + (conf['rqdir'], conf['rqfile'])) nm = pretty_run(args.quiet, 'Initializing node data', NodeManager, kwargs={'conf': conf, 'extended': args.extended, @@ -221,7 +222,7 @@ def main(argv=None): size = pretty_run(args.quiet, 'Calculating logs size', nm.calculate_log_size, args=(args.maxthreads,)) if size == 0: - logging.warning('Size zero - no logs to collect.') + logger.warning('Size zero - no logs to collect.') return enough = pretty_run(args.quiet, 'Checking free space', nm.is_enough_space) @@ -231,9 +232,9 @@ def main(argv=None): kwargs={'maxthreads': args.logs_maxthreads, 'fake': args.fake_logs}) else: - logging.warning(('Not enough space for logs in "%s", skipping' - 'log collection.') % nm.conf['archive_dir']) - logging.info("Nodes:\n%s" % nm) + logger.warning(('Not enough space for logs in "%s", skipping' + 'log collection.') % nm.conf['archive_dir']) + logger.info("Nodes:\n%s" % nm) if not args.quiet: print('Run complete. Node information:') print(nm) diff --git a/timmy/nodes.py b/timmy/nodes.py index dcd3c62..fef0ddc 100644 --- a/timmy/nodes.py +++ b/timmy/nodes.py @@ -50,7 +50,7 @@ class Node(object): print_template += ' {6:<6} {7}' def __init__(self, id, mac, cluster, roles, os_platform, - online, status, ip, conf): + online, status, ip, conf, logger=None): self.id = id self.mac = mac self.cluster = cluster @@ -74,6 +74,7 @@ class Node(object): self.outputs_timestamp = False self.outputs_timestamp_dir = None self.apply_conf(conf) + self.logger = logger or logging.getLogger(__name__) def __str__(self): if not self.filtered_out: @@ -157,30 +158,31 @@ class Node(object): timeout=self.timeout, prefix=self.prefix) if code != 0: - logging.warning('node: %s: could not determine' - ' MOS release' % self.id) + self.logger.warning('node: %s: could not determine' + ' MOS release' % self.id) else: self.release = release.strip('\n "\'') - logging.info('node: %s, MOS release: %s' % - (self.id, self.release)) - return self + self.logger.info('node: %s, MOS release: %s' % + (self.id, self.release)) + return release def exec_cmd(self, fake=False, ok_codes=None): sn = 'node-%s' % self.id cl = 'cluster-%s' % self.cluster - logging.debug('%s/%s/%s/%s' % (self.outdir, Node.ckey, cl, sn)) + self.logger.debug('%s/%s/%s/%s' % (self.outdir, Node.ckey, cl, sn)) ddir = os.path.join(self.outdir, Node.ckey, cl, sn) if self.cmds: tools.mdir(ddir) self.cmds = sorted(self.cmds) + mapcmds = {} for c in self.cmds: for cmd in c: dfile = os.path.join(ddir, 'node-%s-%s-%s' % (self.id, self.ip, cmd)) if self.outputs_timestamp: dfile += self.outputs_timestamp_str - logging.info('outfile: %s' % dfile) - self.mapcmds[cmd] = dfile + self.logger.info('outfile: %s' % dfile) + mapcmds[cmd] = dfile if not fake: outs, errs, code = tools.ssh_node(ip=self.ip, command=c[cmd], @@ -193,20 +195,21 @@ class Node(object): with open(dfile, 'w') as df: df.write(outs.encode('utf-8')) except: - logging.error("can't write to file %s" % - dfile) + self.logger.error("can't write to file %s" % + dfile) if self.scripts: tools.mdir(ddir) self.scripts = sorted(self.scripts) + mapscr = {} for scr in self.scripts: f = os.path.join(self.rqdir, Node.skey, scr) - logging.info('node:%s(%s), exec: %s' % (self.id, self.ip, f)) + self.logger.info('node:%s(%s), exec: %s' % (self.id, self.ip, f)) dfile = os.path.join(ddir, 'node-%s-%s-%s' % (self.id, self.ip, os.path.basename(f))) if self.outputs_timestamp: dfile += self.outputs_timestamp_str - logging.info('outfile: %s' % dfile) - self.mapscr[scr] = dfile + self.logger.info('outfile: %s' % dfile) + mapscr[scr] = dfile if not fake: outs, errs, code = tools.ssh_node(ip=self.ip, filename=f, @@ -219,12 +222,12 @@ class Node(object): with open(dfile, 'w') as df: df.write(outs.encode('utf-8')) except: - logging.error("can't write to file %s" % dfile) - return self + self.logger.error("can't write to file %s" % dfile) + return mapcmds, mapscr def exec_simple_cmd(self, cmd, timeout=15, infile=None, outfile=None, fake=False, ok_codes=None, input=None): - logging.info('node:%s(%s), exec: %s' % (self.id, self.ip, cmd)) + self.logger.info('node:%s(%s), exec: %s' % (self.id, self.ip, cmd)) if not fake: outs, errs, code = tools.ssh_node(ip=self.ip, command=cmd, @@ -238,7 +241,7 @@ class Node(object): self.check_code(code, 'exec_simple_cmd', cmd, ok_codes) def get_files(self, timeout=15): - logging.info('node: %s, IP: %s' % (self.id, self.ip)) + self.logger.info('node: %s, IP: %s' % (self.id, self.ip)) sn = 'node-%s' % self.id cl = 'cluster-%s' % self.cluster if self.files or self.filelists: @@ -261,9 +264,9 @@ class Node(object): if not line.isspace() and line[0] != '#': data += line except: - logging.error('could not read file: %s' % fname) + self.logger.error('could not read file: %s' % fname) data += '\n'.join(self.files) - logging.debug('node: %s, data:\n%s' % (self.id, data)) + self.logger.debug('node: %s, data:\n%s' % (self.id, data)) if data: o, e, c = tools.get_files_rsync(ip=self.ip, data=data, @@ -273,7 +276,7 @@ class Node(object): self.check_code(c, 'get_files', 'tools.get_files_rsync') def put_files(self): - logging.info('node: %s, IP: %s' % (self.id, self.ip)) + self.logger.info('node: %s, IP: %s' % (self.id, self.ip)) for f in self.put: outs, errs, code = tools.put_file_scp(ip=self.ip, file=f[0], @@ -295,8 +298,8 @@ class Node(object): start = '' cmd = ("find '%s' -type f%s -exec du -b {} +" % (item['path'], start)) - logging.info('node: %s, logs du-cmd: %s' % - (self.id, cmd)) + self.logger.info('node: %s, logs du-cmd: %s' % + (self.id, cmd)) outs, errs, code = tools.ssh_node(ip=self.ip, command=cmd, ssh_opts=self.ssh_opts, @@ -304,9 +307,9 @@ class Node(object): timeout=timeout, prefix=self.prefix) if code == 124: - logging.error("node: %s, ip: %s, command: %s, " - "timeout code: %s, error message: %s" % - (self.id, self.ip, cmd, code, errs)) + self.logger.error("node: %s, ip: %s, command: %s, " + "timeout code: %s, error message: %s" % + (self.id, self.ip, cmd, code, errs)) break if len(outs): item['files'] = {} @@ -315,8 +318,8 @@ class Node(object): size, f = line.split('\t') if filter_by_re(item, f): item['files'][f] = int(size) - logging.debug('logs: %s' % (item['files'])) - return self + self.logger.debug('logs: %s' % (item['files'])) + return self.logs def logs_dict(self): result = {} @@ -332,9 +335,9 @@ class Node(object): def check_code(self, code, func_name, cmd, ok_codes=None): if code: if not ok_codes or code not in ok_codes: - logging.warning("%s: got bad exit code %s," - " node: %s, ip: %s, cmd: %s" % - (func_name, code, self.id, self.ip, cmd)) + self.logger.warning("%s: got bad exit code %s," + " node: %s, ip: %s, cmd: %s" % + (func_name, code, self.id, self.ip, cmd)) def print_results(self, result_map): # result_map should be either mapcmds or mapscr @@ -348,8 +351,9 @@ class Node(object): class NodeManager(object): """Class nodes """ - def __init__(self, conf, extended=False, nodes_json=None): + def __init__(self, conf, extended=False, nodes_json=None, logger=None): self.conf = conf + self.logger = logger or logging.getLogger(__name__) if conf['outputs_timestamp'] or conf['dir_timestamp']: timestamp_str = datetime.now().strftime('_%F_%H-%M-%S') if conf['outputs_timestamp']: @@ -363,8 +367,8 @@ class NodeManager(object): if not conf['shell_mode']: self.rqdir = conf['rqdir'] if (not os.path.exists(self.rqdir)): - logging.critical(('NodeManager: directory %s does not' - ' exist') % self.rqdir) + self.logger.critical(('NodeManager: directory %s does not' + ' exist') % self.rqdir) sys.exit(1) if self.conf['rqfile']: self.import_rq() @@ -384,7 +388,7 @@ class NodeManager(object): self.nodes_reapply_conf() self.conf_assign_once() if extended: - logging.info('NodeManager: extended mode enabled') + self.logger.info('NodeManager: extended mode enabled') '''TO-DO: load smth like extended.yaml do additional apply_conf(clean=False) with this yaml. Move some stuff from rq.yaml to extended.yaml''' @@ -454,7 +458,7 @@ class NodeManager(object): def fuel_init(self): if not self.conf['fuel_ip']: - logging.critical('NodeManager: fuel_ip not set') + self.logger.critical('NodeManager: fuel_ip not set') sys.exit(7) fuelnode = Node(id=0, cluster=0, @@ -481,8 +485,8 @@ class NodeManager(object): timeout=fuelnode.timeout, prefix=fuelnode.prefix) if code != 0: - logging.critical(('NodeManager: cannot get ' - 'fuel node list: %s') % err) + self.logger.critical(('NodeManager: cannot get ' + 'fuel node list: %s') % err) sys.exit(4) return nodes_json @@ -514,7 +518,7 @@ class NodeManager(object): key=key)) result = tools.run_batch(run_items, 100, dict_result=True) for key in result: - self.nodes[key] = result[key] + self.nodes[key].release = result[key] def conf_assign_once(self): once = Node.conf_once_prefix @@ -564,7 +568,8 @@ class NodeManager(object): key=key)) result = tools.run_batch(run_items, maxthreads, dict_result=True) for key in result: - self.nodes[key] = result[key] + self.nodes[key].mapscmds = result[key][0] + self.nodes[key].mapscr = result[key][1] def calculate_log_size(self, timeout=15, maxthreads=100): total_size = 0 @@ -576,11 +581,11 @@ class NodeManager(object): key=key)) result = tools.run_batch(run_items, maxthreads, dict_result=True) for key in result: - self.nodes[key] = result[key] + self.nodes[key].logs = result[key] for node in self.nodes.values(): total_size += sum(node.logs_dict().values()) - logging.info('Full log size on nodes(with fuel): %s bytes' % - total_size) + self.logger.info('Full log size on nodes(with fuel): %s bytes' % + total_size) self.alogsize = total_size / 1024 return self.alogsize @@ -588,17 +593,18 @@ class NodeManager(object): tools.mdir(self.conf['outdir']) outs, errs, code = tools.free_space(self.conf['outdir'], timeout=1) if code != 0: - logging.error("Can't get free space: %s" % errs) + self.logger.error("Can't get free space: %s" % errs) return False try: fs = int(outs.rstrip('\n')) except: - logging.error("can't get free space\nouts: %s" % - outs) + self.logger.error("can't get free space\nouts: %s" % + outs) return False - logging.info('logsize: %s Kb, free space: %s Kb' % (self.alogsize, fs)) + self.logger.info('logsize: %s Kb, free space: %s Kb' % + (self.alogsize, fs)) if (self.alogsize*coefficient > fs): - logging.error('Not enough space on device') + self.logger.error('Not enough space on device') return False else: return True @@ -609,10 +615,10 @@ class NodeManager(object): self.conf['archive_name']) cmd = "tar zcf '%s' -C %s %s" % (outfile, self.conf['outdir'], ".") tools.mdir(self.conf['archive_dir']) - logging.debug("cmd: %s" % cmd) + self.logger.debug("cmd: %s" % cmd) outs, errs, code = tools.launch_cmd(cmd, timeout) if code != 0: - logging.error("Can't create archive %s" % (errs)) + self.logger.error("Can't create archive %s" % (errs)) def find_adm_interface_speed(self, defspeed): '''Returns interface speed through which logs will be dowloaded''' @@ -622,7 +628,7 @@ class NodeManager(object): ('cat /sys/class/net/', node.ip)) out, err, code = tools.launch_cmd(cmd, node.timeout) if code != 0: - logging.error("can't get interface speed: error: %s" % err) + self.logger.error("can't get iface speed: error: %s" % err) return defspeed try: speed = int(out) @@ -633,7 +639,7 @@ class NodeManager(object): @run_with_lock def get_logs(self, timeout, fake=False, maxthreads=10, speed=100): if fake: - logging.info('fake = True, skipping') + self.logger.info('fake = True, skipping') return txtfl = [] speed = self.find_adm_interface_speed(speed) @@ -642,8 +648,8 @@ class NodeManager(object): run_items = [] for node in [n for n in self.nodes.values() if not n.filtered_out]: if not node.logs_dict(): - logging.info(("node %s - no logs " - "to collect") % node.id) + self.logger.info(("node %s - no logs " + "to collect") % node.id) continue node.archivelogsfile = os.path.join(self.conf['archive_dir'], 'logs-node-%s.tar.gz' % @@ -669,7 +675,7 @@ class NodeManager(object): try: os.remove(tfile) except: - logging.error("can't delete file %s" % tfile) + self.logger.error("can't delete file %s" % tfile) @run_with_lock def get_files(self, timeout=15): diff --git a/timmy/tools.py b/timmy/tools.py index 46b2b41..098235a 100644 --- a/timmy/tools.py +++ b/timmy/tools.py @@ -46,27 +46,29 @@ while 1: def interrupt_wrapper(f): def wrapper(*args, **kwargs): + logger = logging.getLogger(__name__) try: f(*args, **kwargs) except KeyboardInterrupt: - logging.warning('Interrupted, exiting.') + logger.warning('Interrupted, exiting.') except Exception as e: - logging.error('Error: %s' % e, exc_info=True) + logger.error('Error: %s' % e, exc_info=True) for k in dir(e): '''debug: print all exception attrs except internal and except 'message', which is deprecated since Python 2.6''' if not k.startswith('__') and k != 'message': v = getattr(e, k) - logging.debug('Error details: %s = %s' % (k, v)) + logger.debug('Error details: %s = %s' % (k, v)) return wrapper def run_with_lock(f): def wrapper(*args, **kwargs): + logger = logging.getLogger(__name__) lock = FLock(os.path.join(gettempdir(), 'timmy_%s.lock' % f.__name__)) if not lock.lock(): - logging.warning('Unable to obtain lock, skipping "%s"' % - f.__name__) + logger.warning('Unable to obtain lock, skipping "%s"' % + f.__name__) return '' f(*args, **kwargs) lock.unlock() @@ -74,17 +76,19 @@ def run_with_lock(f): class RunItem(): - def __init__(self, target, args=None, key=None): + def __init__(self, target, args=None, key=None, logger=None): self.target = target self.args = args self.key = key self.process = None self.queue = None + self.logger = logger or logging.getLogger(__name__) class SemaphoreProcess(Process): - def __init__(self, semaphore, target, args=None, queue=None): + def __init__(self, semaphore, target, args=None, queue=None, logger=None): Process.__init__(self) + self.logger = logger or logging.getLogger(__name__) self.semaphore = semaphore self.target = target if not args: @@ -98,21 +102,23 @@ class SemaphoreProcess(Process): if self.queue: self.queue.put_nowait(result) except Exception as error: - logging.exception(error) + self.logger.exception(error) if self.queue: self.queue.put_nowait(error) finally: - logging.debug('finished call: %s' % self.target) + self.logger.debug('finished call: %s' % self.target) self.semaphore.release() - logging.debug('semaphore released') + self.logger.debug('semaphore released') def run_batch(item_list, maxthreads, dict_result=False): def cleanup(): - logging.debug('cleanup processes') + logger = logging.getLogger(__name__) + logger.debug('cleanup processes') for run_item in item_list: if run_item.process: run_item.process.terminate() + logger = logging.getLogger(__name__) semaphore = BoundedSemaphore(maxthreads) try: for run_item in item_list: @@ -127,7 +133,7 @@ def run_batch(item_list, maxthreads, dict_result=False): for run_item in item_list: run_item.result = run_item.queue.get() if isinstance(run_item.result, Exception): - logging.critical('%s, exiting' % run_item.result) + logger.critical('%s, exiting' % run_item.result) cleanup() sys.exit(42) run_item.process.join() @@ -148,6 +154,7 @@ def get_dir_structure(rootdir): """ Creates a nested dictionary that represents the folder structure of rootdir """ + logger = logging.getLogger(__name__) dir = {} try: rootdir = rootdir.rstrip(os.sep) @@ -158,8 +165,8 @@ def get_dir_structure(rootdir): parent = reduce(dict.get, folders[:-1], dir) parent[folders[-1]] = subdir except: - logging.critical('failed to create list of the directory: %s' % - rootdir) + logger.critical('failed to create list of the directory: %s' % + rootdir) sys.exit(1) return dir @@ -168,19 +175,20 @@ def load_yaml_file(filename): """ Loads yaml data from file """ + logger = logging.getLogger(__name__) try: with open(filename, 'r') as f: return yaml.load(f) except IOError as e: - logging.critical("I/O error(%s): file: %s; msg: %s" % - (e.errno, e.filename, e.strerror)) + logger.critical("I/O error(%s): file: %s; msg: %s" % + (e.errno, e.filename, e.strerror)) sys.exit(1) except ValueError: - logging.critical("Could not convert data") + logger.critical("Could not convert data") sys.exit(1) except yaml.parser.ParserError as e: - logging.critical("Could not parse %s:\n%s" % - (filename, str(e))) + logger.critical("Could not parse %s:\n%s" % + (filename, str(e))) sys.exit(1) @@ -188,12 +196,13 @@ def mdir(directory): """ Creates a directory if it doesn't exist """ + logger = logging.getLogger(__name__) if not os.path.exists(directory): - logging.debug('creating directory %s' % directory) + logger.debug('creating directory %s' % directory) try: os.makedirs(directory) except: - logging.critical("Can't create a directory: %s" % directory) + logger.critical("Can't create a directory: %s" % directory) sys.exit(3) @@ -209,13 +218,15 @@ def launch_cmd(cmd, timeout, input=None, ok_codes=None): return message def _timeout_terminate(pid): + logger = logging.getLogger(__name__) try: os.kill(pid, 15) - logging.error("launch_cmd: pid %d killed by timeout" % pid) + logger.error("launch_cmd: pid %d killed by timeout" % pid) except: pass - logging.info('cmd %s' % cmd) + logger = logging.getLogger(__name__) + logger.info('cmd %s' % cmd) p = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, @@ -234,27 +245,29 @@ def launch_cmd(cmd, timeout, input=None, ok_codes=None): p.kill() except: pass + p.stdin = None outs, errs = p.communicate() outs = outs.decode('utf-8') errs = errs.decode('utf-8') errs = errs.rstrip('\n') - logging.error(_log_msg(cmd, errs, p.returncode)) + logger.error(_log_msg(cmd, errs, p.returncode)) finally: if timeout_killer: timeout_killer.cancel() - logging.info(_log_msg(cmd, errs, p.returncode)) + logger.info(_log_msg(cmd, errs, p.returncode)) input = input.decode('utf-8') if input else None - logging.debug(_log_msg(cmd, errs, p.returncode, debug=True, - stdin=input, stdout=outs)) + logger.debug(_log_msg(cmd, errs, p.returncode, debug=True, + stdin=input, stdout=outs)) if p.returncode: if not ok_codes or p.returncode not in ok_codes: - logging.warning(_log_msg(cmd, errs, p.returncode)) + logger.warning(_log_msg(cmd, errs, p.returncode)) return outs, errs, p.returncode def ssh_node(ip, command='', ssh_opts=None, env_vars=None, timeout=15, filename=None, inputfile=None, outputfile=None, ok_codes=None, input=None, prefix=None): + logger = logging.getLogger(__name__) if not ssh_opts: ssh_opts = '' if not env_vars: @@ -264,11 +277,11 @@ def ssh_node(ip, command='', ssh_opts=None, env_vars=None, timeout=15, if type(env_vars) is list: env_vars = ' '.join(env_vars) if (ip in ['localhost', '127.0.0.1']) or ip.startswith('127.'): - logging.info("skip ssh") + logger.info("skip ssh") bstr = "%s timeout '%s' bash -c " % ( env_vars, timeout) else: - logging.info("exec ssh") + logger.info("exec ssh") bstr = "timeout '%s' ssh -t -T %s '%s' '%s' " % ( timeout, ssh_opts, ip, env_vars) if filename is None: @@ -280,7 +293,7 @@ def ssh_node(ip, command='', ssh_opts=None, env_vars=None, timeout=15, cmd = "%s < '%s'" % (cmd, inputfile) else: cmd = "%s'%s bash -s' < '%s'" % (bstr, prefix, filename) - logging.info("inputfile selected, cmd: %s" % cmd) + logger.info("inputfile selected, cmd: %s" % cmd) if outputfile is not None: cmd = "%s > '%s'" % (cmd, outputfile) cmd = ("input=\"$(cat | xxd -p)\"; trap 'kill $pid' 15; " + @@ -290,10 +303,11 @@ def ssh_node(ip, command='', ssh_opts=None, env_vars=None, timeout=15, def get_files_rsync(ip, data, ssh_opts, dpath, timeout=15): + logger = logging.getLogger(__name__) if type(ssh_opts) is list: ssh_opts = ' '.join(ssh_opts) if (ip in ['localhost', '127.0.0.1']) or ip.startswith('127.'): - logging.info("skip ssh rsync") + logger.info("skip ssh rsync") cmd = ("timeout '%s' rsync -avzr --files-from=- / '%s'" " --progress --partial --delete-before" % (timeout, dpath)) @@ -302,7 +316,7 @@ def get_files_rsync(ip, data, ssh_opts, dpath, timeout=15): " -oCompression=no' --files-from=- '%s':/ '%s'" " --progress --partial --delete-before" ) % (timeout, ssh_opts, ip, dpath) - logging.debug("command:%s\ndata:\n%s" % (cmd, data)) + logger.debug("command:%s\ndata:\n%s" % (cmd, data)) if data == '': return cmd, '', 127 return launch_cmd(cmd, timeout, input=data)