added: new object logger
shell mode doesn't print anything, need to fix :)
This commit is contained in:
parent
dbe528a561
commit
46d17efd6c
1
setup.py
1
setup.py
@ -18,6 +18,7 @@ setup(name=pname,
|
||||
url='https://github.com/adobdin/timmy',
|
||||
long_description=open('README.md').read(),
|
||||
packages=[pname],
|
||||
install_requires=['pyyaml'],
|
||||
data_files=rqfiles,
|
||||
include_package_data=True,
|
||||
entry_points={'console_scripts': ['%s=%s.cli:main' % (pname, pname)]},
|
||||
|
23
timmy/cli.py
23
timmy/cli.py
@ -108,8 +108,7 @@ def parse_args():
|
||||
parser.add_argument('-q', '--quiet',
|
||||
help=('Print only command execution results and log'
|
||||
' messages. Good for quick runs / "watch" wrap.'
|
||||
' Also sets default loglevel to ERROR.'
|
||||
'This option disables any -v parameters.'),
|
||||
' This option disables any -v parameters.'),
|
||||
action='store_true')
|
||||
parser.add_argument('-m', '--maxthreads', type=int, default=100,
|
||||
help=('Maximum simultaneous nodes for command'
|
||||
@ -146,13 +145,15 @@ def main(argv=None):
|
||||
parser = parse_args()
|
||||
args = parser.parse_args(argv[1:])
|
||||
loglevels = [logging.WARNING, logging.INFO, logging.DEBUG]
|
||||
loglevel = loglevels[min(len(loglevels)-1, args.verbose)]
|
||||
if args.quiet:
|
||||
loglevel = logging.ERROR
|
||||
FORMAT = '%(asctime)s %(levelname)s %(module)s %(funcName)s(): %(message)s'
|
||||
args.verbose = 0
|
||||
loglevel = loglevels[min(len(loglevels)-1, args.verbose)]
|
||||
FORMAT = ('%(asctime)s %(levelname)s: %(module)s: '
|
||||
'%(funcName)s(): %(message)s')
|
||||
logging.basicConfig(filename=args.log_file,
|
||||
level=loglevel,
|
||||
format=FORMAT)
|
||||
logger = logging.getLogger(__name__)
|
||||
conf = load_conf(args.config)
|
||||
if args.fuel_ip:
|
||||
conf['fuel_ip'] = args.fuel_ip
|
||||
@ -199,8 +200,8 @@ def main(argv=None):
|
||||
if args.dest_file:
|
||||
conf['archive_dir'] = os.path.split(args.dest_file)[0]
|
||||
conf['archive_name'] = os.path.split(args.dest_file)[1]
|
||||
logging.info('Using rqdir: %s, rqfile: %s' %
|
||||
(conf['rqdir'], conf['rqfile']))
|
||||
logger.info('Using rqdir: %s, rqfile: %s' %
|
||||
(conf['rqdir'], conf['rqfile']))
|
||||
nm = pretty_run(args.quiet, 'Initializing node data',
|
||||
NodeManager,
|
||||
kwargs={'conf': conf, 'extended': args.extended,
|
||||
@ -221,7 +222,7 @@ def main(argv=None):
|
||||
size = pretty_run(args.quiet, 'Calculating logs size',
|
||||
nm.calculate_log_size, args=(args.maxthreads,))
|
||||
if size == 0:
|
||||
logging.warning('Size zero - no logs to collect.')
|
||||
logger.warning('Size zero - no logs to collect.')
|
||||
return
|
||||
enough = pretty_run(args.quiet, 'Checking free space',
|
||||
nm.is_enough_space)
|
||||
@ -231,9 +232,9 @@ def main(argv=None):
|
||||
kwargs={'maxthreads': args.logs_maxthreads,
|
||||
'fake': args.fake_logs})
|
||||
else:
|
||||
logging.warning(('Not enough space for logs in "%s", skipping'
|
||||
'log collection.') % nm.conf['archive_dir'])
|
||||
logging.info("Nodes:\n%s" % nm)
|
||||
logger.warning(('Not enough space for logs in "%s", skipping'
|
||||
'log collection.') % nm.conf['archive_dir'])
|
||||
logger.info("Nodes:\n%s" % nm)
|
||||
if not args.quiet:
|
||||
print('Run complete. Node information:')
|
||||
print(nm)
|
||||
|
116
timmy/nodes.py
116
timmy/nodes.py
@ -50,7 +50,7 @@ class Node(object):
|
||||
print_template += ' {6:<6} {7}'
|
||||
|
||||
def __init__(self, id, mac, cluster, roles, os_platform,
|
||||
online, status, ip, conf):
|
||||
online, status, ip, conf, logger=None):
|
||||
self.id = id
|
||||
self.mac = mac
|
||||
self.cluster = cluster
|
||||
@ -74,6 +74,7 @@ class Node(object):
|
||||
self.outputs_timestamp = False
|
||||
self.outputs_timestamp_dir = None
|
||||
self.apply_conf(conf)
|
||||
self.logger = logger or logging.getLogger(__name__)
|
||||
|
||||
def __str__(self):
|
||||
if not self.filtered_out:
|
||||
@ -157,30 +158,31 @@ class Node(object):
|
||||
timeout=self.timeout,
|
||||
prefix=self.prefix)
|
||||
if code != 0:
|
||||
logging.warning('node: %s: could not determine'
|
||||
' MOS release' % self.id)
|
||||
self.logger.warning('node: %s: could not determine'
|
||||
' MOS release' % self.id)
|
||||
else:
|
||||
self.release = release.strip('\n "\'')
|
||||
logging.info('node: %s, MOS release: %s' %
|
||||
(self.id, self.release))
|
||||
return self
|
||||
self.logger.info('node: %s, MOS release: %s' %
|
||||
(self.id, self.release))
|
||||
return release
|
||||
|
||||
def exec_cmd(self, fake=False, ok_codes=None):
|
||||
sn = 'node-%s' % self.id
|
||||
cl = 'cluster-%s' % self.cluster
|
||||
logging.debug('%s/%s/%s/%s' % (self.outdir, Node.ckey, cl, sn))
|
||||
self.logger.debug('%s/%s/%s/%s' % (self.outdir, Node.ckey, cl, sn))
|
||||
ddir = os.path.join(self.outdir, Node.ckey, cl, sn)
|
||||
if self.cmds:
|
||||
tools.mdir(ddir)
|
||||
self.cmds = sorted(self.cmds)
|
||||
mapcmds = {}
|
||||
for c in self.cmds:
|
||||
for cmd in c:
|
||||
dfile = os.path.join(ddir, 'node-%s-%s-%s' %
|
||||
(self.id, self.ip, cmd))
|
||||
if self.outputs_timestamp:
|
||||
dfile += self.outputs_timestamp_str
|
||||
logging.info('outfile: %s' % dfile)
|
||||
self.mapcmds[cmd] = dfile
|
||||
self.logger.info('outfile: %s' % dfile)
|
||||
mapcmds[cmd] = dfile
|
||||
if not fake:
|
||||
outs, errs, code = tools.ssh_node(ip=self.ip,
|
||||
command=c[cmd],
|
||||
@ -193,20 +195,21 @@ class Node(object):
|
||||
with open(dfile, 'w') as df:
|
||||
df.write(outs.encode('utf-8'))
|
||||
except:
|
||||
logging.error("can't write to file %s" %
|
||||
dfile)
|
||||
self.logger.error("can't write to file %s" %
|
||||
dfile)
|
||||
if self.scripts:
|
||||
tools.mdir(ddir)
|
||||
self.scripts = sorted(self.scripts)
|
||||
mapscr = {}
|
||||
for scr in self.scripts:
|
||||
f = os.path.join(self.rqdir, Node.skey, scr)
|
||||
logging.info('node:%s(%s), exec: %s' % (self.id, self.ip, f))
|
||||
self.logger.info('node:%s(%s), exec: %s' % (self.id, self.ip, f))
|
||||
dfile = os.path.join(ddir, 'node-%s-%s-%s' %
|
||||
(self.id, self.ip, os.path.basename(f)))
|
||||
if self.outputs_timestamp:
|
||||
dfile += self.outputs_timestamp_str
|
||||
logging.info('outfile: %s' % dfile)
|
||||
self.mapscr[scr] = dfile
|
||||
self.logger.info('outfile: %s' % dfile)
|
||||
mapscr[scr] = dfile
|
||||
if not fake:
|
||||
outs, errs, code = tools.ssh_node(ip=self.ip,
|
||||
filename=f,
|
||||
@ -219,12 +222,12 @@ class Node(object):
|
||||
with open(dfile, 'w') as df:
|
||||
df.write(outs.encode('utf-8'))
|
||||
except:
|
||||
logging.error("can't write to file %s" % dfile)
|
||||
return self
|
||||
self.logger.error("can't write to file %s" % dfile)
|
||||
return mapcmds, mapscr
|
||||
|
||||
def exec_simple_cmd(self, cmd, timeout=15, infile=None, outfile=None,
|
||||
fake=False, ok_codes=None, input=None):
|
||||
logging.info('node:%s(%s), exec: %s' % (self.id, self.ip, cmd))
|
||||
self.logger.info('node:%s(%s), exec: %s' % (self.id, self.ip, cmd))
|
||||
if not fake:
|
||||
outs, errs, code = tools.ssh_node(ip=self.ip,
|
||||
command=cmd,
|
||||
@ -238,7 +241,7 @@ class Node(object):
|
||||
self.check_code(code, 'exec_simple_cmd', cmd, ok_codes)
|
||||
|
||||
def get_files(self, timeout=15):
|
||||
logging.info('node: %s, IP: %s' % (self.id, self.ip))
|
||||
self.logger.info('node: %s, IP: %s' % (self.id, self.ip))
|
||||
sn = 'node-%s' % self.id
|
||||
cl = 'cluster-%s' % self.cluster
|
||||
if self.files or self.filelists:
|
||||
@ -261,9 +264,9 @@ class Node(object):
|
||||
if not line.isspace() and line[0] != '#':
|
||||
data += line
|
||||
except:
|
||||
logging.error('could not read file: %s' % fname)
|
||||
self.logger.error('could not read file: %s' % fname)
|
||||
data += '\n'.join(self.files)
|
||||
logging.debug('node: %s, data:\n%s' % (self.id, data))
|
||||
self.logger.debug('node: %s, data:\n%s' % (self.id, data))
|
||||
if data:
|
||||
o, e, c = tools.get_files_rsync(ip=self.ip,
|
||||
data=data,
|
||||
@ -273,7 +276,7 @@ class Node(object):
|
||||
self.check_code(c, 'get_files', 'tools.get_files_rsync')
|
||||
|
||||
def put_files(self):
|
||||
logging.info('node: %s, IP: %s' % (self.id, self.ip))
|
||||
self.logger.info('node: %s, IP: %s' % (self.id, self.ip))
|
||||
for f in self.put:
|
||||
outs, errs, code = tools.put_file_scp(ip=self.ip,
|
||||
file=f[0],
|
||||
@ -295,8 +298,8 @@ class Node(object):
|
||||
start = ''
|
||||
cmd = ("find '%s' -type f%s -exec du -b {} +" % (item['path'],
|
||||
start))
|
||||
logging.info('node: %s, logs du-cmd: %s' %
|
||||
(self.id, cmd))
|
||||
self.logger.info('node: %s, logs du-cmd: %s' %
|
||||
(self.id, cmd))
|
||||
outs, errs, code = tools.ssh_node(ip=self.ip,
|
||||
command=cmd,
|
||||
ssh_opts=self.ssh_opts,
|
||||
@ -304,9 +307,9 @@ class Node(object):
|
||||
timeout=timeout,
|
||||
prefix=self.prefix)
|
||||
if code == 124:
|
||||
logging.error("node: %s, ip: %s, command: %s, "
|
||||
"timeout code: %s, error message: %s" %
|
||||
(self.id, self.ip, cmd, code, errs))
|
||||
self.logger.error("node: %s, ip: %s, command: %s, "
|
||||
"timeout code: %s, error message: %s" %
|
||||
(self.id, self.ip, cmd, code, errs))
|
||||
break
|
||||
if len(outs):
|
||||
item['files'] = {}
|
||||
@ -315,8 +318,8 @@ class Node(object):
|
||||
size, f = line.split('\t')
|
||||
if filter_by_re(item, f):
|
||||
item['files'][f] = int(size)
|
||||
logging.debug('logs: %s' % (item['files']))
|
||||
return self
|
||||
self.logger.debug('logs: %s' % (item['files']))
|
||||
return self.logs
|
||||
|
||||
def logs_dict(self):
|
||||
result = {}
|
||||
@ -332,9 +335,9 @@ class Node(object):
|
||||
def check_code(self, code, func_name, cmd, ok_codes=None):
|
||||
if code:
|
||||
if not ok_codes or code not in ok_codes:
|
||||
logging.warning("%s: got bad exit code %s,"
|
||||
" node: %s, ip: %s, cmd: %s" %
|
||||
(func_name, code, self.id, self.ip, cmd))
|
||||
self.logger.warning("%s: got bad exit code %s,"
|
||||
" node: %s, ip: %s, cmd: %s" %
|
||||
(func_name, code, self.id, self.ip, cmd))
|
||||
|
||||
def print_results(self, result_map):
|
||||
# result_map should be either mapcmds or mapscr
|
||||
@ -348,8 +351,9 @@ class Node(object):
|
||||
class NodeManager(object):
|
||||
"""Class nodes """
|
||||
|
||||
def __init__(self, conf, extended=False, nodes_json=None):
|
||||
def __init__(self, conf, extended=False, nodes_json=None, logger=None):
|
||||
self.conf = conf
|
||||
self.logger = logger or logging.getLogger(__name__)
|
||||
if conf['outputs_timestamp'] or conf['dir_timestamp']:
|
||||
timestamp_str = datetime.now().strftime('_%F_%H-%M-%S')
|
||||
if conf['outputs_timestamp']:
|
||||
@ -363,8 +367,8 @@ class NodeManager(object):
|
||||
if not conf['shell_mode']:
|
||||
self.rqdir = conf['rqdir']
|
||||
if (not os.path.exists(self.rqdir)):
|
||||
logging.critical(('NodeManager: directory %s does not'
|
||||
' exist') % self.rqdir)
|
||||
self.logger.critical(('NodeManager: directory %s does not'
|
||||
' exist') % self.rqdir)
|
||||
sys.exit(1)
|
||||
if self.conf['rqfile']:
|
||||
self.import_rq()
|
||||
@ -384,7 +388,7 @@ class NodeManager(object):
|
||||
self.nodes_reapply_conf()
|
||||
self.conf_assign_once()
|
||||
if extended:
|
||||
logging.info('NodeManager: extended mode enabled')
|
||||
self.logger.info('NodeManager: extended mode enabled')
|
||||
'''TO-DO: load smth like extended.yaml
|
||||
do additional apply_conf(clean=False) with this yaml.
|
||||
Move some stuff from rq.yaml to extended.yaml'''
|
||||
@ -454,7 +458,7 @@ class NodeManager(object):
|
||||
|
||||
def fuel_init(self):
|
||||
if not self.conf['fuel_ip']:
|
||||
logging.critical('NodeManager: fuel_ip not set')
|
||||
self.logger.critical('NodeManager: fuel_ip not set')
|
||||
sys.exit(7)
|
||||
fuelnode = Node(id=0,
|
||||
cluster=0,
|
||||
@ -481,8 +485,8 @@ class NodeManager(object):
|
||||
timeout=fuelnode.timeout,
|
||||
prefix=fuelnode.prefix)
|
||||
if code != 0:
|
||||
logging.critical(('NodeManager: cannot get '
|
||||
'fuel node list: %s') % err)
|
||||
self.logger.critical(('NodeManager: cannot get '
|
||||
'fuel node list: %s') % err)
|
||||
sys.exit(4)
|
||||
return nodes_json
|
||||
|
||||
@ -514,7 +518,7 @@ class NodeManager(object):
|
||||
key=key))
|
||||
result = tools.run_batch(run_items, 100, dict_result=True)
|
||||
for key in result:
|
||||
self.nodes[key] = result[key]
|
||||
self.nodes[key].release = result[key]
|
||||
|
||||
def conf_assign_once(self):
|
||||
once = Node.conf_once_prefix
|
||||
@ -564,7 +568,8 @@ class NodeManager(object):
|
||||
key=key))
|
||||
result = tools.run_batch(run_items, maxthreads, dict_result=True)
|
||||
for key in result:
|
||||
self.nodes[key] = result[key]
|
||||
self.nodes[key].mapscmds = result[key][0]
|
||||
self.nodes[key].mapscr = result[key][1]
|
||||
|
||||
def calculate_log_size(self, timeout=15, maxthreads=100):
|
||||
total_size = 0
|
||||
@ -576,11 +581,11 @@ class NodeManager(object):
|
||||
key=key))
|
||||
result = tools.run_batch(run_items, maxthreads, dict_result=True)
|
||||
for key in result:
|
||||
self.nodes[key] = result[key]
|
||||
self.nodes[key].logs = result[key]
|
||||
for node in self.nodes.values():
|
||||
total_size += sum(node.logs_dict().values())
|
||||
logging.info('Full log size on nodes(with fuel): %s bytes' %
|
||||
total_size)
|
||||
self.logger.info('Full log size on nodes(with fuel): %s bytes' %
|
||||
total_size)
|
||||
self.alogsize = total_size / 1024
|
||||
return self.alogsize
|
||||
|
||||
@ -588,17 +593,18 @@ class NodeManager(object):
|
||||
tools.mdir(self.conf['outdir'])
|
||||
outs, errs, code = tools.free_space(self.conf['outdir'], timeout=1)
|
||||
if code != 0:
|
||||
logging.error("Can't get free space: %s" % errs)
|
||||
self.logger.error("Can't get free space: %s" % errs)
|
||||
return False
|
||||
try:
|
||||
fs = int(outs.rstrip('\n'))
|
||||
except:
|
||||
logging.error("can't get free space\nouts: %s" %
|
||||
outs)
|
||||
self.logger.error("can't get free space\nouts: %s" %
|
||||
outs)
|
||||
return False
|
||||
logging.info('logsize: %s Kb, free space: %s Kb' % (self.alogsize, fs))
|
||||
self.logger.info('logsize: %s Kb, free space: %s Kb' %
|
||||
(self.alogsize, fs))
|
||||
if (self.alogsize*coefficient > fs):
|
||||
logging.error('Not enough space on device')
|
||||
self.logger.error('Not enough space on device')
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
@ -609,10 +615,10 @@ class NodeManager(object):
|
||||
self.conf['archive_name'])
|
||||
cmd = "tar zcf '%s' -C %s %s" % (outfile, self.conf['outdir'], ".")
|
||||
tools.mdir(self.conf['archive_dir'])
|
||||
logging.debug("cmd: %s" % cmd)
|
||||
self.logger.debug("cmd: %s" % cmd)
|
||||
outs, errs, code = tools.launch_cmd(cmd, timeout)
|
||||
if code != 0:
|
||||
logging.error("Can't create archive %s" % (errs))
|
||||
self.logger.error("Can't create archive %s" % (errs))
|
||||
|
||||
def find_adm_interface_speed(self, defspeed):
|
||||
'''Returns interface speed through which logs will be dowloaded'''
|
||||
@ -622,7 +628,7 @@ class NodeManager(object):
|
||||
('cat /sys/class/net/', node.ip))
|
||||
out, err, code = tools.launch_cmd(cmd, node.timeout)
|
||||
if code != 0:
|
||||
logging.error("can't get interface speed: error: %s" % err)
|
||||
self.logger.error("can't get iface speed: error: %s" % err)
|
||||
return defspeed
|
||||
try:
|
||||
speed = int(out)
|
||||
@ -633,7 +639,7 @@ class NodeManager(object):
|
||||
@run_with_lock
|
||||
def get_logs(self, timeout, fake=False, maxthreads=10, speed=100):
|
||||
if fake:
|
||||
logging.info('fake = True, skipping')
|
||||
self.logger.info('fake = True, skipping')
|
||||
return
|
||||
txtfl = []
|
||||
speed = self.find_adm_interface_speed(speed)
|
||||
@ -642,8 +648,8 @@ class NodeManager(object):
|
||||
run_items = []
|
||||
for node in [n for n in self.nodes.values() if not n.filtered_out]:
|
||||
if not node.logs_dict():
|
||||
logging.info(("node %s - no logs "
|
||||
"to collect") % node.id)
|
||||
self.logger.info(("node %s - no logs "
|
||||
"to collect") % node.id)
|
||||
continue
|
||||
node.archivelogsfile = os.path.join(self.conf['archive_dir'],
|
||||
'logs-node-%s.tar.gz' %
|
||||
@ -669,7 +675,7 @@ class NodeManager(object):
|
||||
try:
|
||||
os.remove(tfile)
|
||||
except:
|
||||
logging.error("can't delete file %s" % tfile)
|
||||
self.logger.error("can't delete file %s" % tfile)
|
||||
|
||||
@run_with_lock
|
||||
def get_files(self, timeout=15):
|
||||
|
@ -46,27 +46,29 @@ while 1:
|
||||
|
||||
def interrupt_wrapper(f):
|
||||
def wrapper(*args, **kwargs):
|
||||
logger = logging.getLogger(__name__)
|
||||
try:
|
||||
f(*args, **kwargs)
|
||||
except KeyboardInterrupt:
|
||||
logging.warning('Interrupted, exiting.')
|
||||
logger.warning('Interrupted, exiting.')
|
||||
except Exception as e:
|
||||
logging.error('Error: %s' % e, exc_info=True)
|
||||
logger.error('Error: %s' % e, exc_info=True)
|
||||
for k in dir(e):
|
||||
'''debug: print all exception attrs except internal
|
||||
and except 'message', which is deprecated since Python 2.6'''
|
||||
if not k.startswith('__') and k != 'message':
|
||||
v = getattr(e, k)
|
||||
logging.debug('Error details: %s = %s' % (k, v))
|
||||
logger.debug('Error details: %s = %s' % (k, v))
|
||||
return wrapper
|
||||
|
||||
|
||||
def run_with_lock(f):
|
||||
def wrapper(*args, **kwargs):
|
||||
logger = logging.getLogger(__name__)
|
||||
lock = FLock(os.path.join(gettempdir(), 'timmy_%s.lock' % f.__name__))
|
||||
if not lock.lock():
|
||||
logging.warning('Unable to obtain lock, skipping "%s"' %
|
||||
f.__name__)
|
||||
logger.warning('Unable to obtain lock, skipping "%s"' %
|
||||
f.__name__)
|
||||
return ''
|
||||
f(*args, **kwargs)
|
||||
lock.unlock()
|
||||
@ -74,17 +76,19 @@ def run_with_lock(f):
|
||||
|
||||
|
||||
class RunItem():
|
||||
def __init__(self, target, args=None, key=None):
|
||||
def __init__(self, target, args=None, key=None, logger=None):
|
||||
self.target = target
|
||||
self.args = args
|
||||
self.key = key
|
||||
self.process = None
|
||||
self.queue = None
|
||||
self.logger = logger or logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SemaphoreProcess(Process):
|
||||
def __init__(self, semaphore, target, args=None, queue=None):
|
||||
def __init__(self, semaphore, target, args=None, queue=None, logger=None):
|
||||
Process.__init__(self)
|
||||
self.logger = logger or logging.getLogger(__name__)
|
||||
self.semaphore = semaphore
|
||||
self.target = target
|
||||
if not args:
|
||||
@ -98,21 +102,23 @@ class SemaphoreProcess(Process):
|
||||
if self.queue:
|
||||
self.queue.put_nowait(result)
|
||||
except Exception as error:
|
||||
logging.exception(error)
|
||||
self.logger.exception(error)
|
||||
if self.queue:
|
||||
self.queue.put_nowait(error)
|
||||
finally:
|
||||
logging.debug('finished call: %s' % self.target)
|
||||
self.logger.debug('finished call: %s' % self.target)
|
||||
self.semaphore.release()
|
||||
logging.debug('semaphore released')
|
||||
self.logger.debug('semaphore released')
|
||||
|
||||
|
||||
def run_batch(item_list, maxthreads, dict_result=False):
|
||||
def cleanup():
|
||||
logging.debug('cleanup processes')
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.debug('cleanup processes')
|
||||
for run_item in item_list:
|
||||
if run_item.process:
|
||||
run_item.process.terminate()
|
||||
logger = logging.getLogger(__name__)
|
||||
semaphore = BoundedSemaphore(maxthreads)
|
||||
try:
|
||||
for run_item in item_list:
|
||||
@ -127,7 +133,7 @@ def run_batch(item_list, maxthreads, dict_result=False):
|
||||
for run_item in item_list:
|
||||
run_item.result = run_item.queue.get()
|
||||
if isinstance(run_item.result, Exception):
|
||||
logging.critical('%s, exiting' % run_item.result)
|
||||
logger.critical('%s, exiting' % run_item.result)
|
||||
cleanup()
|
||||
sys.exit(42)
|
||||
run_item.process.join()
|
||||
@ -148,6 +154,7 @@ def get_dir_structure(rootdir):
|
||||
"""
|
||||
Creates a nested dictionary that represents the folder structure of rootdir
|
||||
"""
|
||||
logger = logging.getLogger(__name__)
|
||||
dir = {}
|
||||
try:
|
||||
rootdir = rootdir.rstrip(os.sep)
|
||||
@ -158,8 +165,8 @@ def get_dir_structure(rootdir):
|
||||
parent = reduce(dict.get, folders[:-1], dir)
|
||||
parent[folders[-1]] = subdir
|
||||
except:
|
||||
logging.critical('failed to create list of the directory: %s' %
|
||||
rootdir)
|
||||
logger.critical('failed to create list of the directory: %s' %
|
||||
rootdir)
|
||||
sys.exit(1)
|
||||
return dir
|
||||
|
||||
@ -168,19 +175,20 @@ def load_yaml_file(filename):
|
||||
"""
|
||||
Loads yaml data from file
|
||||
"""
|
||||
logger = logging.getLogger(__name__)
|
||||
try:
|
||||
with open(filename, 'r') as f:
|
||||
return yaml.load(f)
|
||||
except IOError as e:
|
||||
logging.critical("I/O error(%s): file: %s; msg: %s" %
|
||||
(e.errno, e.filename, e.strerror))
|
||||
logger.critical("I/O error(%s): file: %s; msg: %s" %
|
||||
(e.errno, e.filename, e.strerror))
|
||||
sys.exit(1)
|
||||
except ValueError:
|
||||
logging.critical("Could not convert data")
|
||||
logger.critical("Could not convert data")
|
||||
sys.exit(1)
|
||||
except yaml.parser.ParserError as e:
|
||||
logging.critical("Could not parse %s:\n%s" %
|
||||
(filename, str(e)))
|
||||
logger.critical("Could not parse %s:\n%s" %
|
||||
(filename, str(e)))
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
@ -188,12 +196,13 @@ def mdir(directory):
|
||||
"""
|
||||
Creates a directory if it doesn't exist
|
||||
"""
|
||||
logger = logging.getLogger(__name__)
|
||||
if not os.path.exists(directory):
|
||||
logging.debug('creating directory %s' % directory)
|
||||
logger.debug('creating directory %s' % directory)
|
||||
try:
|
||||
os.makedirs(directory)
|
||||
except:
|
||||
logging.critical("Can't create a directory: %s" % directory)
|
||||
logger.critical("Can't create a directory: %s" % directory)
|
||||
sys.exit(3)
|
||||
|
||||
|
||||
@ -209,13 +218,15 @@ def launch_cmd(cmd, timeout, input=None, ok_codes=None):
|
||||
return message
|
||||
|
||||
def _timeout_terminate(pid):
|
||||
logger = logging.getLogger(__name__)
|
||||
try:
|
||||
os.kill(pid, 15)
|
||||
logging.error("launch_cmd: pid %d killed by timeout" % pid)
|
||||
logger.error("launch_cmd: pid %d killed by timeout" % pid)
|
||||
except:
|
||||
pass
|
||||
|
||||
logging.info('cmd %s' % cmd)
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.info('cmd %s' % cmd)
|
||||
p = subprocess.Popen(cmd,
|
||||
shell=True,
|
||||
stdin=subprocess.PIPE,
|
||||
@ -234,27 +245,29 @@ def launch_cmd(cmd, timeout, input=None, ok_codes=None):
|
||||
p.kill()
|
||||
except:
|
||||
pass
|
||||
p.stdin = None
|
||||
outs, errs = p.communicate()
|
||||
outs = outs.decode('utf-8')
|
||||
errs = errs.decode('utf-8')
|
||||
errs = errs.rstrip('\n')
|
||||
logging.error(_log_msg(cmd, errs, p.returncode))
|
||||
logger.error(_log_msg(cmd, errs, p.returncode))
|
||||
finally:
|
||||
if timeout_killer:
|
||||
timeout_killer.cancel()
|
||||
logging.info(_log_msg(cmd, errs, p.returncode))
|
||||
logger.info(_log_msg(cmd, errs, p.returncode))
|
||||
input = input.decode('utf-8') if input else None
|
||||
logging.debug(_log_msg(cmd, errs, p.returncode, debug=True,
|
||||
stdin=input, stdout=outs))
|
||||
logger.debug(_log_msg(cmd, errs, p.returncode, debug=True,
|
||||
stdin=input, stdout=outs))
|
||||
if p.returncode:
|
||||
if not ok_codes or p.returncode not in ok_codes:
|
||||
logging.warning(_log_msg(cmd, errs, p.returncode))
|
||||
logger.warning(_log_msg(cmd, errs, p.returncode))
|
||||
return outs, errs, p.returncode
|
||||
|
||||
|
||||
def ssh_node(ip, command='', ssh_opts=None, env_vars=None, timeout=15,
|
||||
filename=None, inputfile=None, outputfile=None,
|
||||
ok_codes=None, input=None, prefix=None):
|
||||
logger = logging.getLogger(__name__)
|
||||
if not ssh_opts:
|
||||
ssh_opts = ''
|
||||
if not env_vars:
|
||||
@ -264,11 +277,11 @@ def ssh_node(ip, command='', ssh_opts=None, env_vars=None, timeout=15,
|
||||
if type(env_vars) is list:
|
||||
env_vars = ' '.join(env_vars)
|
||||
if (ip in ['localhost', '127.0.0.1']) or ip.startswith('127.'):
|
||||
logging.info("skip ssh")
|
||||
logger.info("skip ssh")
|
||||
bstr = "%s timeout '%s' bash -c " % (
|
||||
env_vars, timeout)
|
||||
else:
|
||||
logging.info("exec ssh")
|
||||
logger.info("exec ssh")
|
||||
bstr = "timeout '%s' ssh -t -T %s '%s' '%s' " % (
|
||||
timeout, ssh_opts, ip, env_vars)
|
||||
if filename is None:
|
||||
@ -280,7 +293,7 @@ def ssh_node(ip, command='', ssh_opts=None, env_vars=None, timeout=15,
|
||||
cmd = "%s < '%s'" % (cmd, inputfile)
|
||||
else:
|
||||
cmd = "%s'%s bash -s' < '%s'" % (bstr, prefix, filename)
|
||||
logging.info("inputfile selected, cmd: %s" % cmd)
|
||||
logger.info("inputfile selected, cmd: %s" % cmd)
|
||||
if outputfile is not None:
|
||||
cmd = "%s > '%s'" % (cmd, outputfile)
|
||||
cmd = ("input=\"$(cat | xxd -p)\"; trap 'kill $pid' 15; " +
|
||||
@ -290,10 +303,11 @@ def ssh_node(ip, command='', ssh_opts=None, env_vars=None, timeout=15,
|
||||
|
||||
|
||||
def get_files_rsync(ip, data, ssh_opts, dpath, timeout=15):
|
||||
logger = logging.getLogger(__name__)
|
||||
if type(ssh_opts) is list:
|
||||
ssh_opts = ' '.join(ssh_opts)
|
||||
if (ip in ['localhost', '127.0.0.1']) or ip.startswith('127.'):
|
||||
logging.info("skip ssh rsync")
|
||||
logger.info("skip ssh rsync")
|
||||
cmd = ("timeout '%s' rsync -avzr --files-from=- / '%s'"
|
||||
" --progress --partial --delete-before" %
|
||||
(timeout, dpath))
|
||||
@ -302,7 +316,7 @@ def get_files_rsync(ip, data, ssh_opts, dpath, timeout=15):
|
||||
" -oCompression=no' --files-from=- '%s':/ '%s'"
|
||||
" --progress --partial --delete-before"
|
||||
) % (timeout, ssh_opts, ip, dpath)
|
||||
logging.debug("command:%s\ndata:\n%s" % (cmd, data))
|
||||
logger.debug("command:%s\ndata:\n%s" % (cmd, data))
|
||||
if data == '':
|
||||
return cmd, '', 127
|
||||
return launch_cmd(cmd, timeout, input=data)
|
||||
|
Loading…
x
Reference in New Issue
Block a user