From cbd1ccd06e3f18b69cbb1ee0476a84f3141918be Mon Sep 17 00:00:00 2001 From: Lisa Zangrando Date: Thu, 9 Jun 2016 15:32:27 +0200 Subject: [PATCH] :added pause() and resume() methods to the Manager class - manager.py: added pause() and resume() methods - service.py: ManagerRPC removed - service.py: fixed logging (logger.propagate=False) - service.py: used the proper HTTP Status Codes on the Synergy RESTful methods - common/service.py: fixed sigterm_handler() Change-Id: Ida87546587edac54c738c4fc01bf029ab4507c6a --- synergy/common/manager.py | 42 +++-- synergy/common/service.py | 4 +- synergy/service.py | 342 ++++++++++++++++---------------------- 3 files changed, 170 insertions(+), 218 deletions(-) diff --git a/synergy/common/manager.py b/synergy/common/manager.py index 88e216d..ce10771 100644 --- a/synergy/common/manager.py +++ b/synergy/common/manager.py @@ -2,8 +2,6 @@ from threading import Condition from threading import Event from threading import Thread -import time - __author__ = "Lisa Zangrando" __email__ = "lisa.zangrando[AT]pd.infn.it" __copyright__ = """Copyright (c) 2015 INFN - INDIGO-DataCloud @@ -35,6 +33,7 @@ class Manager(Thread): self.status = "CREATED" self.autostart = False self.rate = -1 + self.paused = True # start out paused self.managers = {} def execute(self, command, *args, **kargs): @@ -73,6 +72,7 @@ class Manager(Thread): def setAutoStart(self, autostart): self.autostart = autostart + self.paused = not self.autostart def getRate(self): return self.rate @@ -99,26 +99,38 @@ class Manager(Thread): def setStatus(self, status): with self.condition: self.status = status + self.condition.notifyAll() def stop(self): if self.isAlive(): # set event to signal thread to terminate self.stop_event.set() + self.resume() # block calling thread until thread really has terminated self.join() - def run(self): - """Periodically run the Manager task. + def pause(self): + with self.condition: + self.paused = True + self.condition.notifyAll() - Note: - This method will be automatically called by Thread.start(). - One should not manually call this method. - See https://docs.python.org/2/library/threading.html#thread-objects - """ - while not self.stop_event.is_set(): - try: - self.task() - time.sleep(self.rate * 60) - except Exception as ex: - print("task %r: %s" % (self.name, ex)) + def resume(self): + with self.condition: + self.paused = False + self.condition.notifyAll() + + def run(self): + while not self.stop_event.isSet(): + with self.condition: + if self.paused: + self.status = "ACTIVE" + self.condition.wait() + else: + self.status = "RUNNING" + + try: + self.task() + self.condition.wait(self.rate * 60) + except Exception as ex: + print("task %r: %s" % (self.name, ex)) diff --git a/synergy/common/service.py b/synergy/common/service.py index be7568e..80258a4 100644 --- a/synergy/common/service.py +++ b/synergy/common/service.py @@ -38,7 +38,9 @@ class Service(object): signal.signal(signal.SIGTERM, self.sigterm_handler) signal.signal(signal.SIGINT, self.sigterm_handler) - def sigterm_handler(self): + def sigterm_handler(self, signum, frame): + LOG.debug("Signal handler called with signal=%s" % signum) + global SIGTERM_SENT if not SIGTERM_SENT: LOG.info("Shutting down %s" % self.name) diff --git a/synergy/service.py b/synergy/service.py index 3631b96..3647d1b 100644 --- a/synergy/service.py +++ b/synergy/service.py @@ -66,6 +66,8 @@ def setLogger(name): # set logger level logger = logging.getLogger(name) + logger.propagate = False + try: logger.setLevel(cfg.CONF.Logger.level) except ValueError: # wrong level, we default to INFO @@ -74,94 +76,6 @@ def setLogger(name): logger.addHandler(handler) -class ManagerRPC(object): - - def __init__(self, managers): - self.managers = managers - - def list(self, ctx, **args): - result = [] - - for name, manager in self.managers.items(): - result.append(name) - - return result - - def start(self, ctx, **args): - manager_name = args.get("arg").get("manager", None) - result = {} - - for name, manager in self.managers.items(): - if manager.getStatus() == "ACTIVE" \ - and (not manager_name or manager_name == name): - LOG.info("starting the %s manager" % (name)) - try: - # self.managers[name].start() - self.managers[name].setStatus("RUNNING") - LOG.info("%s manager started" % (name)) - - result[name] = manager.getStatus() - except Exception as ex: - self.managers[name].setStatus("ERROR") - LOG.error("error occurred during the manager start-up %s" - % (ex)) - - result[name] = manager.getStatus() - - return result - - def stop(self, ctx, **args): - manager_name = args.get("arg").get("manager", None) - result = {} - - for name, manager in self.managers.items(): - if manager.getStatus() == "RUNNING" \ - and (not manager_name or manager_name == name): - LOG.info("stopping the %s manager" % (name)) - try: - # self.managers[name].stop() - self.managers[name].setStatus("ACTIVE") - LOG.info("%s manager stopped" % (name)) - - result[name] = manager.getStatus() - except Exception as ex: - self.managers[name].setStatus("ERROR") - LOG.error("error occurred during the manager stop %s" - % (ex)) - - result[name] = manager.getStatus() - - return result - - def execute(self, ctx, **args): - manager_name = args.get("arg").get("manager", None) - command = args.get("arg").get("command", None) - result = {} - - if not manager_name: - result["error"] = "manager name not defined!" - - if not command: - result["error"] = "command not defined!" - - if manager_name in self.managers: - manager = self.managers[manager_name] - manager.execute(cmd=command) - result["command"] = "OK" - - return result - - def status(self, ctx, **args): - manager_name = args.get("arg").get("manager", None) - result = {} - - for name, manager in self.managers.items(): - if not manager_name or manager_name == name: - result[name] = manager.getStatus() - - return result - - class Synergy(service.Service): """Service object for binaries running on hosts. @@ -207,8 +121,8 @@ class Synergy(service.Service): try: LOG.info("initializing the %r manager" % (manager.getName())) + manager.setup() - manager.setStatus("ACTIVE") LOG.info("manager %r initialized!" % (manager.getName())) except Exception as ex: @@ -243,20 +157,23 @@ class Synergy(service.Service): manager_list = parameters['manager'] else: manager_list = [parameters['manager']] + else: + manager_list = self.managers.keys() + else: + manager_list = self.managers.keys() - for manager in manager_list: - escape(manager) + for manager_name in manager_list: + manager_name = escape(manager_name) - for name, manager in self.managers.items(): - if not manager_list or name in manager_list: - result[name] = manager.getStatus() + if manager_name in self.managers: + result[manager_name] = self.managers[manager_name].getStatus() - if manager_list and len(manager_list) == 1 and len(result) == 0: + if len(manager_list) == 1 and len(result) == 0: start_response("404 NOT FOUND", [("Content-Type", "text/plain")]) return ["manager %r not found!" % manager_list[0]] - else: - start_response("200 OK", [("Content-Type", "text/html")]) - return ["%s" % json.dumps(result)] + + start_response("200 OK", [("Content-Type", "text/html")]) + return ["%s" % json.dumps(result)] def executeCommand(self, environ, start_response): manager_name = None @@ -264,98 +181,111 @@ class Synergy(service.Service): query = environ.get("QUERY_STRING", None) - if query: - parameters = parse_qs(query) - LOG.debug("execute command: parameters=%s" % parameters) + if not query: + start_response("400 BAD REQUEST", [("Content-Type", "text/plain")]) + return ["bad request"] - if "manager" in parameters: - manager_name = escape(parameters['manager'][0]) + parameters = parse_qs(query) + LOG.debug("execute command: parameters=%s" % parameters) - if "command" in parameters: - command = escape(parameters['command'][0]) + if "manager" not in parameters: + start_response("400 BAD REQUEST", [("Content-Type", "text/plain")]) + return ["manager not specified!"] - if "args" in parameters: - manager_args = escape(parameters['args'][0]) - manager_args = manager_args.replace("'", "\"") - manager_args = json.loads(manager_args) - else: - manager_args = {} + manager_name = escape(parameters['manager'][0]) - if not query or not manager_name or not command: - start_response("404 NOT FOUND", [("Content-Type", "text/plain")]) - return ["wrong command"] - - if manager_name in self.managers: - manager = self.managers[manager_name] - try: - result = manager.execute(command=command, **manager_args) - - if not isinstance(result, dict): - try: - result = result.toDict() - except Exception: - result = result.__dict__ - - LOG.debug("execute command: result=%s" % result) - - start_response("200 OK", [("Content-Type", "text/html")]) - return ["%s" % json.dumps(result)] - except Exception as ex: - LOG.debug("execute command: error=%s" % ex) - start_response("404 NOT FOUND", - [("Content-Type", "text/plain")]) - return ["error: %s" % ex] - else: + if manager_name not in self.managers: start_response("404 NOT FOUND", [("Content-Type", "text/plain")]) return ["manager %r not found!" % manager_name] + if "command" not in parameters: + start_response("400 BAD REQUEST", [("Content-Type", "text/plain")]) + return ["bad request"] + + command = escape(parameters['command'][0]) + + if "args" in parameters: + manager_args = escape(parameters['args'][0]) + manager_args = manager_args.replace("'", "\"") + manager_args = json.loads(manager_args) + else: + manager_args = {} + + manager = self.managers[manager_name] + + try: + result = manager.execute(command=command, **manager_args) + + if not isinstance(result, dict): + try: + result = result.toDict() + except Exception: + result = result.__dict__ + + LOG.debug("execute command: result=%s" % result) + + start_response("200 OK", [("Content-Type", "text/html")]) + return ["%s" % json.dumps(result)] + except Exception as ex: + LOG.debug("execute command: error=%s" % ex) + start_response("500 INTERNAL SERVER ERROR", + [("Content-Type", "text/plain")]) + return ["error: %s" % ex] + def startManager(self, environ, start_response): manager_list = None result = {} query = environ.get("QUERY_STRING", None) - if query: - parameters = parse_qs(query) + if not query: + start_response("400 BAD REQUEST", [("Content-Type", "text/plain")]) + return ["bad request"] - if "manager" in parameters: - if isinstance(parameters['manager'], (list, tuple)): - manager_list = parameters['manager'] - else: - manager_list = [parameters['manager']] + parameters = parse_qs(query) - for manager in manager_list: - escape(manager) + if "manager" not in parameters: + start_response("400 BAD REQUEST", [("Content-Type", "text/plain")]) + return ["manager not specified!"] - for name, manager in self.managers.items(): - if not manager_list or name in manager_list: - result[name] = {} + if isinstance(parameters['manager'], (list, tuple)): + manager_list = parameters['manager'] + else: + manager_list = [parameters['manager']] - if manager.getStatus() == "ACTIVE": - LOG.info("starting the %r manager" % (name)) - try: - # self.managers[name].start() - self.managers[name].setStatus("RUNNING") - LOG.info("%r manager started!" % (name)) + for manager_name in manager_list: + manager_name = escape(manager_name) - result[name]["message"] = "started successfully" - except Exception as ex: - self.managers[name].setStatus("ERROR") - LOG.error("error occurred during the manager start-up" - "%s" % (ex)) + if manager_name not in self.managers: + continue - result[name]["message"] = "ERROR: %s" % ex - else: - result[name]["message"] = "WARN: already started" + result[manager_name] = {} - result[name]["status"] = manager.getStatus() + manager = self.managers[manager_name] - if manager_list and len(manager_list) == 1 and len(result) == 0: + if manager.getStatus() == "ACTIVE": + LOG.info("starting the %r manager" % (manager_name)) + + manager.resume() + + LOG.info("%r manager started! (rate=%s min)" + % (manager_name, manager.getRate())) + + result[manager_name]["status"] = "RUNNING" + result[manager_name]["message"] = "started successfully" + elif manager.getStatus() == "RUNNING": + result[manager_name]["status"] = "RUNNING" + result[manager_name]["message"] = "WARN: already started" + elif manager.getStatus() == "ERROR": + result[manager_name]["status"] = "ERROR" + result[manager_name]["message"] = "wrong state" + + if len(manager_list) == 1 and len(result) == 0: start_response("404 NOT FOUND", [("Content-Type", "text/plain")]) return ["manager %r not found!" % manager_list[0]] - else: - start_response("200 OK", [("Content-Type", "text/html")]) - return ["%s" % json.dumps(result)] + + start_response("200 OK", [("Content-Type", "text/html")]) + return ["%s" % json.dumps(result)] def stopManager(self, environ, start_response): manager_list = None @@ -363,59 +293,65 @@ class Synergy(service.Service): query = environ.get("QUERY_STRING", None) - if query: - parameters = parse_qs(query) + if not query: + start_response("400 BAD REQUEST", [("Content-Type", "text/plain")]) + return ["bad request"] - if "manager" in parameters: - if isinstance(parameters['manager'], (list, tuple)): - manager_list = parameters['manager'] - else: - manager_list = [parameters['manager']] + parameters = parse_qs(query) - for manager in manager_list: - escape(manager) + if "manager" not in parameters: + start_response("400 BAD REQUEST", [("Content-Type", "text/plain")]) + return ["manager not specified!"] - for name, manager in self.managers.items(): - if not manager_list or name in manager_list: - result[name] = {} + if isinstance(parameters['manager'], (list, tuple)): + manager_list = parameters['manager'] + else: + manager_list = [parameters['manager']] - if manager.getStatus() == "RUNNING": - LOG.info("stopping the %r manager" % (name)) - try: - # self.managers[name].stop() - self.managers[name].setStatus("ACTIVE") - LOG.info("%r manager stopped!" % (name)) + for manager_name in manager_list: + manager_name = escape(manager_name) - result[name]["message"] = "stopped successfully" - except Exception as ex: - self.managers[name].setStatus("ERROR") - LOG.error("error occurred during the manager stop: %s" - % (ex)) + if manager_name not in self.managers: + continue - result[name]["message"] = "ERROR: %s" % ex - else: - result[name]["message"] = "WARN: already stopped" + result[manager_name] = {} - result[name]["status"] = manager.getStatus() + manager = self.managers[manager_name] - if manager_list and len(manager_list) == 1 and len(result) == 0: + if manager.getStatus() == "RUNNING": + LOG.info("stopping the %r manager" % (manager_name)) + + manager.pause() + + LOG.info("%r manager stopped!" % (manager_name)) + + result[manager_name]["status"] = "ACTIVE" + result[manager_name]["message"] = "stopped successfully" + elif manager.getStatus() == "ACTIVE": + result[manager_name]["status"] = "ACTIVE" + result[manager_name]["message"] = "WARN: already stopped" + elif manager.getStatus() == "ERROR": + result[manager_name]["status"] = "ERROR" + result[manager_name]["message"] = "wrong state" + + if len(manager_list) == 1 and len(result) == 0: start_response("404 NOT FOUND", [("Content-Type", "text/plain")]) return ["manager %r not found!" % manager_list[0]] - else: - start_response("200 OK", [("Content-Type", "text/html")]) - return ["%s" % json.dumps(result)] + + start_response("200 OK", [("Content-Type", "text/html")]) + return ["%s" % json.dumps(result)] def start(self): self.model_disconnected = False for name, manager in self.managers.items(): - if manager.getStatus() != "ERROR" and manager.isAutoStart(): + if manager.getStatus() != "ERROR": try: LOG.info("starting the %r manager" % (name)) manager.start() - manager.setStatus("RUNNING") - LOG.info("%r manager started! (rate=%s min)" - % (name, manager.getRate())) + + LOG.info("%r manager started! (rate=%s min, status=%s)" + % (name, manager.getRate(), manager.getStatus())) except Exception as ex: LOG.error("error occurred during the manager start %s" % (ex)) @@ -461,6 +397,8 @@ class Synergy(service.Service): # manager.join() # LOG.info("%s manager destroyed" % (name)) except Exception as ex: + LOG.error("Exception has occured", exc_info=1) + manager.setStatus("ERROR") LOG.error("error occurred during the manager destruction: %s" % ex)