#!/usr/bin/python2 # # Copyright 2013 Rackspace Australia # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. """ worker_server.py is an executable worker server that loads and runs task_plugins. """ import argparse import daemon import extras import imp import json import logging import os import signal import sys import worker_manager # as of python-daemon 1.6 it doesn't bundle pidlockfile anymore # instead it depends on lockfile-0.9.1 which uses pidfile. PID_FILE_MODULE = extras.try_imports(['daemon.pidlockfile', 'daemon.pidfile']) class Server(object): """ This is the worker server object to be daemonized """ log = logging.getLogger("worker_server.Server") def __init__(self, config): # Config init self.config = config self.manager = None self.plugins = [] self.load_plugins() # Python logging output file. self.debug_log = self.config['debug_log'] self.tasks = {} def setup_logging(self): if self.debug_log: if not os.path.isdir(os.path.dirname(self.debug_log)): os.makedirs(os.path.dirname(self.debug_log)) logging.basicConfig(format='%(asctime)s %(message)s', filename=self.debug_log, level=logging.DEBUG) else: logging.basicConfig(format='%(asctime)s %(message)s', level=logging.WARN) self.log.debug('Log pusher starting.') def load_plugins(self): """ Load the available plugins from task_plugins """ # Load plugins for plugin in self.config['plugins']: print plugin_info = imp.find_module('task', [(os.path.dirname( os.path.realpath(__file__)) + '/task_plugins/' + plugin)]) self.plugins.append(imp.load_module('task', *plugin_info)) def run_tasks(self): """ Run the tasks """ for plugin in self.plugins: self.tasks[plugin.__worker_name__] = plugin.Runner(self.config) self.tasks[plugin.__worker_name__].daemon = True self.tasks[plugin.__worker_name__].start() self.manager = worker_manager.GearmanManager(self.config, self.tasks) self.manager.daemon = True self.manager.start() def exit_handler(self, signum): signal.signal(signal.SIGUSR1, signal.SIG_IGN) for task_name, task in self.tasks.items(): task.stop() self.manager.stop() sys.exit(0) def main(self): self.setup_logging() self.run_tasks() while True: try: signal.pause() except KeyboardInterrupt: print "Ctrl + C: asking tasks to exit nicely...\n" self.exit_handler(signal.SIGINT) def main(): parser = argparse.ArgumentParser() parser.add_argument('-c', '--config', default= '/etc/turbo-hipster/config.json', help='Path to json config file.') parser.add_argument('--background', action='store_true', help='Run in the background.') parser.add_argument('-p', '--pidfile', default='/var/run/turbo-hipster/' 'sql-migrate-gearman-worker.pid', help='PID file to lock during daemonization.') args = parser.parse_args() with open(args.config, 'r') as config_stream: config = json.load(config_stream) server = Server(config) if args.background: pidfile = PID_FILE_MODULE.TimeoutPIDLockFile(args.pidfile, 10) with daemon.DaemonContext(pidfile=pidfile): server.main() else: server.main() if __name__ == '__main__': main()