From caa2732225f918781c55b761870e188505a2bc71 Mon Sep 17 00:00:00 2001 From: "Bernhard K. Weisshuhn" Date: Thu, 6 Feb 2014 15:04:36 +0100 Subject: [PATCH] let workers exit gracefully Conflicts: worker/worker.py --- worker/start_workers.py | 2 +- worker/worker.py | 14 +++++++++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/worker/start_workers.py b/worker/start_workers.py index cbc3edd..3654cbc 100644 --- a/worker/start_workers.py +++ b/worker/start_workers.py @@ -25,13 +25,13 @@ def _get_parent_logger(): def kill_time(signal, frame): - log_listener.end() print "dying ..." for process in processes: process.terminate() print "rose" for process in processes: process.join() + log_listener.end() print "bud" sys.exit(0) diff --git a/worker/worker.py b/worker/worker.py index 10abd19..3c3f89f 100644 --- a/worker/worker.py +++ b/worker/worker.py @@ -20,6 +20,7 @@ import datetime import sys import time import anyjson +import signal import kombu import kombu.mixins @@ -42,6 +43,7 @@ from stacktach import views from kombu.serialization import BytesIO, register stacklog.set_default_logger_name('worker') +shutdown_soon = False def _get_child_logger(): @@ -62,6 +64,7 @@ class Consumer(kombu.mixins.ConsumerMixin): self.total_processed = 0 self.topics = topics self.exchange = exchange + signal.signal(signal.SIGTERM, self._shutdown) register('bufferjson', self.loads, anyjson.dumps, content_type='application/json', @@ -144,9 +147,14 @@ class Consumer(kombu.mixins.ConsumerMixin): (e, json.loads(str(message.body)))) raise + def _shutdown(self, signal, stackframe = False): + global shutdown_soon + self.should_stop = True + shutdown_soon = True + def continue_running(): - return True + return not shutdown_soon def exit_or_sleep(exit=False): @@ -206,6 +214,10 @@ def run(deployment_config, deployment_id, exchange): "exception=%s. Retrying in 5s" logger.exception(msg % (name, exchange, e)) exit_or_sleep(exit_on_exception) + logger.info("Worker exiting.") + +signal.signal(signal.SIGINT, signal.SIG_IGN) +signal.signal(signal.SIGTERM, signal.SIG_IGN) POST_PROCESS_METHODS = { 'RawData': views.post_process_rawdata,