diff --git a/worker/start_workers.py b/worker/start_workers.py index cbc3edd..3654cbc 100644 --- a/worker/start_workers.py +++ b/worker/start_workers.py @@ -25,13 +25,13 @@ def _get_parent_logger(): def kill_time(signal, frame): - log_listener.end() print "dying ..." for process in processes: process.terminate() print "rose" for process in processes: process.join() + log_listener.end() print "bud" sys.exit(0) diff --git a/worker/worker.py b/worker/worker.py index 10abd19..3c3f89f 100644 --- a/worker/worker.py +++ b/worker/worker.py @@ -20,6 +20,7 @@ import datetime import sys import time import anyjson +import signal import kombu import kombu.mixins @@ -42,6 +43,7 @@ from stacktach import views from kombu.serialization import BytesIO, register stacklog.set_default_logger_name('worker') +shutdown_soon = False def _get_child_logger(): @@ -62,6 +64,7 @@ class Consumer(kombu.mixins.ConsumerMixin): self.total_processed = 0 self.topics = topics self.exchange = exchange + signal.signal(signal.SIGTERM, self._shutdown) register('bufferjson', self.loads, anyjson.dumps, content_type='application/json', @@ -144,9 +147,14 @@ class Consumer(kombu.mixins.ConsumerMixin): (e, json.loads(str(message.body)))) raise + def _shutdown(self, signal, stackframe = False): + global shutdown_soon + self.should_stop = True + shutdown_soon = True + def continue_running(): - return True + return not shutdown_soon def exit_or_sleep(exit=False): @@ -206,6 +214,10 @@ def run(deployment_config, deployment_id, exchange): "exception=%s. Retrying in 5s" logger.exception(msg % (name, exchange, e)) exit_or_sleep(exit_on_exception) + logger.info("Worker exiting.") + +signal.signal(signal.SIGINT, signal.SIG_IGN) +signal.signal(signal.SIGTERM, signal.SIG_IGN) POST_PROCESS_METHODS = { 'RawData': views.post_process_rawdata,