Merge "Stopping of vitrage-graph"
This commit is contained in:
commit
129a23b522
@ -14,6 +14,7 @@
|
|||||||
import abc
|
import abc
|
||||||
import cotyledon
|
import cotyledon
|
||||||
import multiprocessing
|
import multiprocessing
|
||||||
|
from six.moves import _thread
|
||||||
|
|
||||||
from oslo_concurrency import processutils as ps
|
from oslo_concurrency import processutils as ps
|
||||||
from oslo_log import log
|
from oslo_log import log
|
||||||
@ -44,7 +45,6 @@ GRAPH_UPDATE = 'graph_update'
|
|||||||
START_EVALUATION = 'start_evaluation'
|
START_EVALUATION = 'start_evaluation'
|
||||||
RELOAD_TEMPLATES = 'reload_templates'
|
RELOAD_TEMPLATES = 'reload_templates'
|
||||||
TEMPLATE_ACTION = 'template_action'
|
TEMPLATE_ACTION = 'template_action'
|
||||||
POISON_PILL = None
|
|
||||||
|
|
||||||
ADD = 'add'
|
ADD = 'add'
|
||||||
DELETE = 'delete'
|
DELETE = 'delete'
|
||||||
@ -66,10 +66,10 @@ class GraphWorkersManager(cotyledon.ServiceManager):
|
|||||||
self._template_queues = []
|
self._template_queues = []
|
||||||
self._api_queues = []
|
self._api_queues = []
|
||||||
self._all_queues = []
|
self._all_queues = []
|
||||||
|
self.register_hooks(on_terminate=self._stop)
|
||||||
self.add_evaluator_workers()
|
self.add_evaluator_workers()
|
||||||
self.add_template_workers()
|
self.add_template_workers()
|
||||||
self.add_api_workers()
|
self.add_api_workers()
|
||||||
self.register_hooks(on_terminate=self.submit_stop_workers)
|
|
||||||
|
|
||||||
def add_evaluator_workers(self):
|
def add_evaluator_workers(self):
|
||||||
"""Add evaluator workers
|
"""Add evaluator workers
|
||||||
@ -188,10 +188,6 @@ class GraphWorkersManager(cotyledon.ServiceManager):
|
|||||||
for t in templates:
|
for t in templates:
|
||||||
self._db.templates.update(t.uuid, 'status', new_status)
|
self._db.templates.update(t.uuid, 'status', new_status)
|
||||||
|
|
||||||
def submit_stop_workers(self):
|
|
||||||
for q in self._all_queues:
|
|
||||||
q.put(POISON_PILL)
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _submit_and_wait(queues, payload):
|
def _submit_and_wait(queues, payload):
|
||||||
for q in queues:
|
for q in queues:
|
||||||
@ -199,6 +195,11 @@ class GraphWorkersManager(cotyledon.ServiceManager):
|
|||||||
for q in queues:
|
for q in queues:
|
||||||
q.join()
|
q.join()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _stop():
|
||||||
|
# send SEGINT (instant exit) instead of SIGTERM
|
||||||
|
_thread.interrupt_main()
|
||||||
|
|
||||||
|
|
||||||
class GraphCloneWorkerBase(cotyledon.Service):
|
class GraphCloneWorkerBase(cotyledon.Service):
|
||||||
def __init__(self,
|
def __init__(self,
|
||||||
@ -220,7 +221,6 @@ class GraphCloneWorkerBase(cotyledon.Service):
|
|||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
LOG.info("%s - Starting %s", self.__class__.__name__, self.worker_id)
|
LOG.info("%s - Starting %s", self.__class__.__name__, self.worker_id)
|
||||||
self._running = True
|
|
||||||
self._entity_graph.notifier._subscriptions = [] # Quick n dirty
|
self._entity_graph.notifier._subscriptions = [] # Quick n dirty
|
||||||
self._init_instance()
|
self._init_instance()
|
||||||
self._read_queue()
|
self._read_queue()
|
||||||
@ -231,13 +231,10 @@ class GraphCloneWorkerBase(cotyledon.Service):
|
|||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
next_task = self._task_queue.get()
|
next_task = self._task_queue.get()
|
||||||
if next_task is POISON_PILL:
|
|
||||||
break
|
|
||||||
self.do_task(next_task)
|
self.do_task(next_task)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
LOG.exception("Graph may not be in sync: exception %s", e)
|
LOG.exception("Graph may not be in sync: exception %s", e)
|
||||||
self._task_queue.task_done()
|
self._task_queue.task_done()
|
||||||
LOG.info("%s - Stopped!", self.__class__.__name__)
|
|
||||||
|
|
||||||
def do_task(self, task):
|
def do_task(self, task):
|
||||||
action = task[0]
|
action = task[0]
|
||||||
|
Loading…
Reference in New Issue
Block a user