let workers exit gracefully

Conflicts:
	worker/worker.py
This commit is contained in:
Bernhard K. Weisshuhn 2014-02-06 15:04:36 +01:00
parent f342567474
commit caa2732225
2 changed files with 14 additions and 2 deletions

View File

@ -25,13 +25,13 @@ def _get_parent_logger():
def kill_time(signal, frame): def kill_time(signal, frame):
log_listener.end()
print "dying ..." print "dying ..."
for process in processes: for process in processes:
process.terminate() process.terminate()
print "rose" print "rose"
for process in processes: for process in processes:
process.join() process.join()
log_listener.end()
print "bud" print "bud"
sys.exit(0) sys.exit(0)

View File

@ -20,6 +20,7 @@ import datetime
import sys import sys
import time import time
import anyjson import anyjson
import signal
import kombu import kombu
import kombu.mixins import kombu.mixins
@ -42,6 +43,7 @@ from stacktach import views
from kombu.serialization import BytesIO, register from kombu.serialization import BytesIO, register
stacklog.set_default_logger_name('worker') stacklog.set_default_logger_name('worker')
shutdown_soon = False
def _get_child_logger(): def _get_child_logger():
@ -62,6 +64,7 @@ class Consumer(kombu.mixins.ConsumerMixin):
self.total_processed = 0 self.total_processed = 0
self.topics = topics self.topics = topics
self.exchange = exchange self.exchange = exchange
signal.signal(signal.SIGTERM, self._shutdown)
register('bufferjson', self.loads, anyjson.dumps, register('bufferjson', self.loads, anyjson.dumps,
content_type='application/json', content_type='application/json',
@ -144,9 +147,14 @@ class Consumer(kombu.mixins.ConsumerMixin):
(e, json.loads(str(message.body)))) (e, json.loads(str(message.body))))
raise raise
def _shutdown(self, signal, stackframe = False):
global shutdown_soon
self.should_stop = True
shutdown_soon = True
def continue_running(): def continue_running():
return True return not shutdown_soon
def exit_or_sleep(exit=False): def exit_or_sleep(exit=False):
@ -206,6 +214,10 @@ def run(deployment_config, deployment_id, exchange):
"exception=%s. Retrying in 5s" "exception=%s. Retrying in 5s"
logger.exception(msg % (name, exchange, e)) logger.exception(msg % (name, exchange, e))
exit_or_sleep(exit_on_exception) exit_or_sleep(exit_on_exception)
logger.info("Worker exiting.")
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_IGN)
POST_PROCESS_METHODS = { POST_PROCESS_METHODS = {
'RawData': views.post_process_rawdata, 'RawData': views.post_process_rawdata,