:added pause() and resume() methods to the Manager class
- manager.py: added pause() and resume() methods - service.py: ManagerRPC removed - service.py: fixed logging (logger.propagate=False) - service.py: used the proper HTTP Status Codes on the Synergy RESTful methods - common/service.py: fixed sigterm_handler() Change-Id: Ida87546587edac54c738c4fc01bf029ab4507c6a
This commit is contained in:
parent
ee05df7f55
commit
cbd1ccd06e
@ -2,8 +2,6 @@ from threading import Condition
|
||||
from threading import Event
|
||||
from threading import Thread
|
||||
|
||||
import time
|
||||
|
||||
__author__ = "Lisa Zangrando"
|
||||
__email__ = "lisa.zangrando[AT]pd.infn.it"
|
||||
__copyright__ = """Copyright (c) 2015 INFN - INDIGO-DataCloud
|
||||
@ -35,6 +33,7 @@ class Manager(Thread):
|
||||
self.status = "CREATED"
|
||||
self.autostart = False
|
||||
self.rate = -1
|
||||
self.paused = True # start out paused
|
||||
self.managers = {}
|
||||
|
||||
def execute(self, command, *args, **kargs):
|
||||
@ -73,6 +72,7 @@ class Manager(Thread):
|
||||
|
||||
def setAutoStart(self, autostart):
|
||||
self.autostart = autostart
|
||||
self.paused = not self.autostart
|
||||
|
||||
def getRate(self):
|
||||
return self.rate
|
||||
@ -99,26 +99,38 @@ class Manager(Thread):
|
||||
def setStatus(self, status):
|
||||
with self.condition:
|
||||
self.status = status
|
||||
|
||||
self.condition.notifyAll()
|
||||
|
||||
def stop(self):
|
||||
if self.isAlive():
|
||||
# set event to signal thread to terminate
|
||||
self.stop_event.set()
|
||||
self.resume()
|
||||
# block calling thread until thread really has terminated
|
||||
self.join()
|
||||
|
||||
def run(self):
|
||||
"""Periodically run the Manager task.
|
||||
def pause(self):
|
||||
with self.condition:
|
||||
self.paused = True
|
||||
self.condition.notifyAll()
|
||||
|
||||
Note:
|
||||
This method will be automatically called by Thread.start().
|
||||
One should not manually call this method.
|
||||
See https://docs.python.org/2/library/threading.html#thread-objects
|
||||
"""
|
||||
while not self.stop_event.is_set():
|
||||
try:
|
||||
self.task()
|
||||
time.sleep(self.rate * 60)
|
||||
except Exception as ex:
|
||||
print("task %r: %s" % (self.name, ex))
|
||||
def resume(self):
|
||||
with self.condition:
|
||||
self.paused = False
|
||||
self.condition.notifyAll()
|
||||
|
||||
def run(self):
|
||||
while not self.stop_event.isSet():
|
||||
with self.condition:
|
||||
if self.paused:
|
||||
self.status = "ACTIVE"
|
||||
self.condition.wait()
|
||||
else:
|
||||
self.status = "RUNNING"
|
||||
|
||||
try:
|
||||
self.task()
|
||||
self.condition.wait(self.rate * 60)
|
||||
except Exception as ex:
|
||||
print("task %r: %s" % (self.name, ex))
|
||||
|
@ -38,7 +38,9 @@ class Service(object):
|
||||
signal.signal(signal.SIGTERM, self.sigterm_handler)
|
||||
signal.signal(signal.SIGINT, self.sigterm_handler)
|
||||
|
||||
def sigterm_handler(self):
|
||||
def sigterm_handler(self, signum, frame):
|
||||
LOG.debug("Signal handler called with signal=%s" % signum)
|
||||
|
||||
global SIGTERM_SENT
|
||||
if not SIGTERM_SENT:
|
||||
LOG.info("Shutting down %s" % self.name)
|
||||
|
@ -66,6 +66,8 @@ def setLogger(name):
|
||||
|
||||
# set logger level
|
||||
logger = logging.getLogger(name)
|
||||
logger.propagate = False
|
||||
|
||||
try:
|
||||
logger.setLevel(cfg.CONF.Logger.level)
|
||||
except ValueError: # wrong level, we default to INFO
|
||||
@ -74,94 +76,6 @@ def setLogger(name):
|
||||
logger.addHandler(handler)
|
||||
|
||||
|
||||
class ManagerRPC(object):
|
||||
|
||||
def __init__(self, managers):
|
||||
self.managers = managers
|
||||
|
||||
def list(self, ctx, **args):
|
||||
result = []
|
||||
|
||||
for name, manager in self.managers.items():
|
||||
result.append(name)
|
||||
|
||||
return result
|
||||
|
||||
def start(self, ctx, **args):
|
||||
manager_name = args.get("arg").get("manager", None)
|
||||
result = {}
|
||||
|
||||
for name, manager in self.managers.items():
|
||||
if manager.getStatus() == "ACTIVE" \
|
||||
and (not manager_name or manager_name == name):
|
||||
LOG.info("starting the %s manager" % (name))
|
||||
try:
|
||||
# self.managers[name].start()
|
||||
self.managers[name].setStatus("RUNNING")
|
||||
LOG.info("%s manager started" % (name))
|
||||
|
||||
result[name] = manager.getStatus()
|
||||
except Exception as ex:
|
||||
self.managers[name].setStatus("ERROR")
|
||||
LOG.error("error occurred during the manager start-up %s"
|
||||
% (ex))
|
||||
|
||||
result[name] = manager.getStatus()
|
||||
|
||||
return result
|
||||
|
||||
def stop(self, ctx, **args):
|
||||
manager_name = args.get("arg").get("manager", None)
|
||||
result = {}
|
||||
|
||||
for name, manager in self.managers.items():
|
||||
if manager.getStatus() == "RUNNING" \
|
||||
and (not manager_name or manager_name == name):
|
||||
LOG.info("stopping the %s manager" % (name))
|
||||
try:
|
||||
# self.managers[name].stop()
|
||||
self.managers[name].setStatus("ACTIVE")
|
||||
LOG.info("%s manager stopped" % (name))
|
||||
|
||||
result[name] = manager.getStatus()
|
||||
except Exception as ex:
|
||||
self.managers[name].setStatus("ERROR")
|
||||
LOG.error("error occurred during the manager stop %s"
|
||||
% (ex))
|
||||
|
||||
result[name] = manager.getStatus()
|
||||
|
||||
return result
|
||||
|
||||
def execute(self, ctx, **args):
|
||||
manager_name = args.get("arg").get("manager", None)
|
||||
command = args.get("arg").get("command", None)
|
||||
result = {}
|
||||
|
||||
if not manager_name:
|
||||
result["error"] = "manager name not defined!"
|
||||
|
||||
if not command:
|
||||
result["error"] = "command not defined!"
|
||||
|
||||
if manager_name in self.managers:
|
||||
manager = self.managers[manager_name]
|
||||
manager.execute(cmd=command)
|
||||
result["command"] = "OK"
|
||||
|
||||
return result
|
||||
|
||||
def status(self, ctx, **args):
|
||||
manager_name = args.get("arg").get("manager", None)
|
||||
result = {}
|
||||
|
||||
for name, manager in self.managers.items():
|
||||
if not manager_name or manager_name == name:
|
||||
result[name] = manager.getStatus()
|
||||
|
||||
return result
|
||||
|
||||
|
||||
class Synergy(service.Service):
|
||||
"""Service object for binaries running on hosts.
|
||||
|
||||
@ -207,8 +121,8 @@ class Synergy(service.Service):
|
||||
|
||||
try:
|
||||
LOG.info("initializing the %r manager" % (manager.getName()))
|
||||
|
||||
manager.setup()
|
||||
manager.setStatus("ACTIVE")
|
||||
|
||||
LOG.info("manager %r initialized!" % (manager.getName()))
|
||||
except Exception as ex:
|
||||
@ -243,20 +157,23 @@ class Synergy(service.Service):
|
||||
manager_list = parameters['manager']
|
||||
else:
|
||||
manager_list = [parameters['manager']]
|
||||
else:
|
||||
manager_list = self.managers.keys()
|
||||
else:
|
||||
manager_list = self.managers.keys()
|
||||
|
||||
for manager in manager_list:
|
||||
escape(manager)
|
||||
for manager_name in manager_list:
|
||||
manager_name = escape(manager_name)
|
||||
|
||||
for name, manager in self.managers.items():
|
||||
if not manager_list or name in manager_list:
|
||||
result[name] = manager.getStatus()
|
||||
if manager_name in self.managers:
|
||||
result[manager_name] = self.managers[manager_name].getStatus()
|
||||
|
||||
if manager_list and len(manager_list) == 1 and len(result) == 0:
|
||||
if len(manager_list) == 1 and len(result) == 0:
|
||||
start_response("404 NOT FOUND", [("Content-Type", "text/plain")])
|
||||
return ["manager %r not found!" % manager_list[0]]
|
||||
else:
|
||||
start_response("200 OK", [("Content-Type", "text/html")])
|
||||
return ["%s" % json.dumps(result)]
|
||||
|
||||
start_response("200 OK", [("Content-Type", "text/html")])
|
||||
return ["%s" % json.dumps(result)]
|
||||
|
||||
def executeCommand(self, environ, start_response):
|
||||
manager_name = None
|
||||
@ -264,98 +181,111 @@ class Synergy(service.Service):
|
||||
|
||||
query = environ.get("QUERY_STRING", None)
|
||||
|
||||
if query:
|
||||
parameters = parse_qs(query)
|
||||
LOG.debug("execute command: parameters=%s" % parameters)
|
||||
if not query:
|
||||
start_response("400 BAD REQUEST", [("Content-Type", "text/plain")])
|
||||
return ["bad request"]
|
||||
|
||||
if "manager" in parameters:
|
||||
manager_name = escape(parameters['manager'][0])
|
||||
parameters = parse_qs(query)
|
||||
LOG.debug("execute command: parameters=%s" % parameters)
|
||||
|
||||
if "command" in parameters:
|
||||
command = escape(parameters['command'][0])
|
||||
if "manager" not in parameters:
|
||||
start_response("400 BAD REQUEST", [("Content-Type", "text/plain")])
|
||||
return ["manager not specified!"]
|
||||
|
||||
if "args" in parameters:
|
||||
manager_args = escape(parameters['args'][0])
|
||||
manager_args = manager_args.replace("'", "\"")
|
||||
manager_args = json.loads(manager_args)
|
||||
else:
|
||||
manager_args = {}
|
||||
manager_name = escape(parameters['manager'][0])
|
||||
|
||||
if not query or not manager_name or not command:
|
||||
start_response("404 NOT FOUND", [("Content-Type", "text/plain")])
|
||||
return ["wrong command"]
|
||||
|
||||
if manager_name in self.managers:
|
||||
manager = self.managers[manager_name]
|
||||
try:
|
||||
result = manager.execute(command=command, **manager_args)
|
||||
|
||||
if not isinstance(result, dict):
|
||||
try:
|
||||
result = result.toDict()
|
||||
except Exception:
|
||||
result = result.__dict__
|
||||
|
||||
LOG.debug("execute command: result=%s" % result)
|
||||
|
||||
start_response("200 OK", [("Content-Type", "text/html")])
|
||||
return ["%s" % json.dumps(result)]
|
||||
except Exception as ex:
|
||||
LOG.debug("execute command: error=%s" % ex)
|
||||
start_response("404 NOT FOUND",
|
||||
[("Content-Type", "text/plain")])
|
||||
return ["error: %s" % ex]
|
||||
else:
|
||||
if manager_name not in self.managers:
|
||||
start_response("404 NOT FOUND", [("Content-Type", "text/plain")])
|
||||
return ["manager %r not found!" % manager_name]
|
||||
|
||||
if "command" not in parameters:
|
||||
start_response("400 BAD REQUEST", [("Content-Type", "text/plain")])
|
||||
return ["bad request"]
|
||||
|
||||
command = escape(parameters['command'][0])
|
||||
|
||||
if "args" in parameters:
|
||||
manager_args = escape(parameters['args'][0])
|
||||
manager_args = manager_args.replace("'", "\"")
|
||||
manager_args = json.loads(manager_args)
|
||||
else:
|
||||
manager_args = {}
|
||||
|
||||
manager = self.managers[manager_name]
|
||||
|
||||
try:
|
||||
result = manager.execute(command=command, **manager_args)
|
||||
|
||||
if not isinstance(result, dict):
|
||||
try:
|
||||
result = result.toDict()
|
||||
except Exception:
|
||||
result = result.__dict__
|
||||
|
||||
LOG.debug("execute command: result=%s" % result)
|
||||
|
||||
start_response("200 OK", [("Content-Type", "text/html")])
|
||||
return ["%s" % json.dumps(result)]
|
||||
except Exception as ex:
|
||||
LOG.debug("execute command: error=%s" % ex)
|
||||
start_response("500 INTERNAL SERVER ERROR",
|
||||
[("Content-Type", "text/plain")])
|
||||
return ["error: %s" % ex]
|
||||
|
||||
def startManager(self, environ, start_response):
|
||||
manager_list = None
|
||||
result = {}
|
||||
|
||||
query = environ.get("QUERY_STRING", None)
|
||||
|
||||
if query:
|
||||
parameters = parse_qs(query)
|
||||
if not query:
|
||||
start_response("400 BAD REQUEST", [("Content-Type", "text/plain")])
|
||||
return ["bad request"]
|
||||
|
||||
if "manager" in parameters:
|
||||
if isinstance(parameters['manager'], (list, tuple)):
|
||||
manager_list = parameters['manager']
|
||||
else:
|
||||
manager_list = [parameters['manager']]
|
||||
parameters = parse_qs(query)
|
||||
|
||||
for manager in manager_list:
|
||||
escape(manager)
|
||||
if "manager" not in parameters:
|
||||
start_response("400 BAD REQUEST", [("Content-Type", "text/plain")])
|
||||
return ["manager not specified!"]
|
||||
|
||||
for name, manager in self.managers.items():
|
||||
if not manager_list or name in manager_list:
|
||||
result[name] = {}
|
||||
if isinstance(parameters['manager'], (list, tuple)):
|
||||
manager_list = parameters['manager']
|
||||
else:
|
||||
manager_list = [parameters['manager']]
|
||||
|
||||
if manager.getStatus() == "ACTIVE":
|
||||
LOG.info("starting the %r manager" % (name))
|
||||
try:
|
||||
# self.managers[name].start()
|
||||
self.managers[name].setStatus("RUNNING")
|
||||
LOG.info("%r manager started!" % (name))
|
||||
for manager_name in manager_list:
|
||||
manager_name = escape(manager_name)
|
||||
|
||||
result[name]["message"] = "started successfully"
|
||||
except Exception as ex:
|
||||
self.managers[name].setStatus("ERROR")
|
||||
LOG.error("error occurred during the manager start-up"
|
||||
"%s" % (ex))
|
||||
if manager_name not in self.managers:
|
||||
continue
|
||||
|
||||
result[name]["message"] = "ERROR: %s" % ex
|
||||
else:
|
||||
result[name]["message"] = "WARN: already started"
|
||||
result[manager_name] = {}
|
||||
|
||||
result[name]["status"] = manager.getStatus()
|
||||
manager = self.managers[manager_name]
|
||||
|
||||
if manager_list and len(manager_list) == 1 and len(result) == 0:
|
||||
if manager.getStatus() == "ACTIVE":
|
||||
LOG.info("starting the %r manager" % (manager_name))
|
||||
|
||||
manager.resume()
|
||||
|
||||
LOG.info("%r manager started! (rate=%s min)"
|
||||
% (manager_name, manager.getRate()))
|
||||
|
||||
result[manager_name]["status"] = "RUNNING"
|
||||
result[manager_name]["message"] = "started successfully"
|
||||
elif manager.getStatus() == "RUNNING":
|
||||
result[manager_name]["status"] = "RUNNING"
|
||||
result[manager_name]["message"] = "WARN: already started"
|
||||
elif manager.getStatus() == "ERROR":
|
||||
result[manager_name]["status"] = "ERROR"
|
||||
result[manager_name]["message"] = "wrong state"
|
||||
|
||||
if len(manager_list) == 1 and len(result) == 0:
|
||||
start_response("404 NOT FOUND", [("Content-Type", "text/plain")])
|
||||
return ["manager %r not found!" % manager_list[0]]
|
||||
else:
|
||||
start_response("200 OK", [("Content-Type", "text/html")])
|
||||
return ["%s" % json.dumps(result)]
|
||||
|
||||
start_response("200 OK", [("Content-Type", "text/html")])
|
||||
return ["%s" % json.dumps(result)]
|
||||
|
||||
def stopManager(self, environ, start_response):
|
||||
manager_list = None
|
||||
@ -363,59 +293,65 @@ class Synergy(service.Service):
|
||||
|
||||
query = environ.get("QUERY_STRING", None)
|
||||
|
||||
if query:
|
||||
parameters = parse_qs(query)
|
||||
if not query:
|
||||
start_response("400 BAD REQUEST", [("Content-Type", "text/plain")])
|
||||
return ["bad request"]
|
||||
|
||||
if "manager" in parameters:
|
||||
if isinstance(parameters['manager'], (list, tuple)):
|
||||
manager_list = parameters['manager']
|
||||
else:
|
||||
manager_list = [parameters['manager']]
|
||||
parameters = parse_qs(query)
|
||||
|
||||
for manager in manager_list:
|
||||
escape(manager)
|
||||
if "manager" not in parameters:
|
||||
start_response("400 BAD REQUEST", [("Content-Type", "text/plain")])
|
||||
return ["manager not specified!"]
|
||||
|
||||
for name, manager in self.managers.items():
|
||||
if not manager_list or name in manager_list:
|
||||
result[name] = {}
|
||||
if isinstance(parameters['manager'], (list, tuple)):
|
||||
manager_list = parameters['manager']
|
||||
else:
|
||||
manager_list = [parameters['manager']]
|
||||
|
||||
if manager.getStatus() == "RUNNING":
|
||||
LOG.info("stopping the %r manager" % (name))
|
||||
try:
|
||||
# self.managers[name].stop()
|
||||
self.managers[name].setStatus("ACTIVE")
|
||||
LOG.info("%r manager stopped!" % (name))
|
||||
for manager_name in manager_list:
|
||||
manager_name = escape(manager_name)
|
||||
|
||||
result[name]["message"] = "stopped successfully"
|
||||
except Exception as ex:
|
||||
self.managers[name].setStatus("ERROR")
|
||||
LOG.error("error occurred during the manager stop: %s"
|
||||
% (ex))
|
||||
if manager_name not in self.managers:
|
||||
continue
|
||||
|
||||
result[name]["message"] = "ERROR: %s" % ex
|
||||
else:
|
||||
result[name]["message"] = "WARN: already stopped"
|
||||
result[manager_name] = {}
|
||||
|
||||
result[name]["status"] = manager.getStatus()
|
||||
manager = self.managers[manager_name]
|
||||
|
||||
if manager_list and len(manager_list) == 1 and len(result) == 0:
|
||||
if manager.getStatus() == "RUNNING":
|
||||
LOG.info("stopping the %r manager" % (manager_name))
|
||||
|
||||
manager.pause()
|
||||
|
||||
LOG.info("%r manager stopped!" % (manager_name))
|
||||
|
||||
result[manager_name]["status"] = "ACTIVE"
|
||||
result[manager_name]["message"] = "stopped successfully"
|
||||
elif manager.getStatus() == "ACTIVE":
|
||||
result[manager_name]["status"] = "ACTIVE"
|
||||
result[manager_name]["message"] = "WARN: already stopped"
|
||||
elif manager.getStatus() == "ERROR":
|
||||
result[manager_name]["status"] = "ERROR"
|
||||
result[manager_name]["message"] = "wrong state"
|
||||
|
||||
if len(manager_list) == 1 and len(result) == 0:
|
||||
start_response("404 NOT FOUND", [("Content-Type", "text/plain")])
|
||||
return ["manager %r not found!" % manager_list[0]]
|
||||
else:
|
||||
start_response("200 OK", [("Content-Type", "text/html")])
|
||||
return ["%s" % json.dumps(result)]
|
||||
|
||||
start_response("200 OK", [("Content-Type", "text/html")])
|
||||
return ["%s" % json.dumps(result)]
|
||||
|
||||
def start(self):
|
||||
self.model_disconnected = False
|
||||
|
||||
for name, manager in self.managers.items():
|
||||
if manager.getStatus() != "ERROR" and manager.isAutoStart():
|
||||
if manager.getStatus() != "ERROR":
|
||||
try:
|
||||
LOG.info("starting the %r manager" % (name))
|
||||
manager.start()
|
||||
manager.setStatus("RUNNING")
|
||||
LOG.info("%r manager started! (rate=%s min)"
|
||||
% (name, manager.getRate()))
|
||||
|
||||
LOG.info("%r manager started! (rate=%s min, status=%s)"
|
||||
% (name, manager.getRate(), manager.getStatus()))
|
||||
except Exception as ex:
|
||||
LOG.error("error occurred during the manager start %s"
|
||||
% (ex))
|
||||
@ -461,6 +397,8 @@ class Synergy(service.Service):
|
||||
# manager.join()
|
||||
# LOG.info("%s manager destroyed" % (name))
|
||||
except Exception as ex:
|
||||
LOG.error("Exception has occured", exc_info=1)
|
||||
|
||||
manager.setStatus("ERROR")
|
||||
LOG.error("error occurred during the manager destruction: %s"
|
||||
% ex)
|
||||
|
Loading…
Reference in New Issue
Block a user