diff --git a/portas/bin/portas-api b/portas/bin/portas-api index 103269b..bcf7b31 100755 --- a/portas/bin/portas-api +++ b/portas/bin/portas-api @@ -21,6 +21,7 @@ import sys # If ../portas/__init__.py exists, add ../ to Python search path, so that # it will override what happens to be installed in /usr/(local/)lib/python... +from portas.common.service import TaskResultHandlerService possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), os.pardir, @@ -40,11 +41,15 @@ if __name__ == '__main__': config.parse_args() log.setup('portas') + launcher = service.ServiceLauncher() + api_service = wsgi.Service(config.load_paste_app(), port=config.CONF.bind_port, host=config.CONF.bind_host) - launcher = service.Launcher() - launcher.run_service(api_service) + + launcher.launch_service(api_service) + launcher.launch_service(TaskResultHandlerService()) + launcher.wait() except RuntimeError, e: sys.stderr.write("ERROR: %s\n" % e) sys.exit(1) diff --git a/portas/etc/portas-api.conf b/portas/etc/portas-api.conf index fc991f2..e494aaa 100644 --- a/portas/etc/portas-api.conf +++ b/portas/etc/portas-api.conf @@ -16,4 +16,19 @@ bind_port = 8082 log_file = /tmp/portas-api.log #A valid SQLAlchemy connection string for the metadata database -sql_connection = sqlite:///portas.sqlite \ No newline at end of file +sql_connection = sqlite:///portas.sqlite + +[reports] +results_exchange = task-results +results_queue = task-results +reports_exchange = task-reports +reports_queue = task-reports + + +[rabbitmq] +host = localhost +port = 5672 +use_ssl = false +userid = guest +password = guest +virtual_host = / \ No newline at end of file diff --git a/portas/portas/common/config.py b/portas/portas/common/config.py index cae0d3c..c0ea7d0 100644 --- a/portas/portas/common/config.py +++ b/portas/portas/common/config.py @@ -41,9 +41,28 @@ bind_opts = [ cfg.IntOpt('bind_port'), ] +reports_opts = [ + cfg.StrOpt('results_exchange', default='task-results'), + cfg.StrOpt('results_queue', default='task-results'), + cfg.StrOpt('reports_exchange', default='task-reports'), + cfg.StrOpt('reports_queue', default='task-reports') +] + +rabbit_opts = [ + cfg.StrOpt('host', default='localhost'), + cfg.IntOpt('port', default=5672), + cfg.BoolOpt('use_ssl', default=False), + cfg.StrOpt('userid', default='guest'), + cfg.StrOpt('password', default='guest'), + cfg.StrOpt('virtual_host', default='/'), +] + CONF = cfg.CONF CONF.register_opts(paste_deploy_opts, group='paste_deploy') CONF.register_opts(bind_opts) +CONF.register_opts(reports_opts, group='reports') +CONF.register_opts(rabbit_opts, group='rabbitmq') + CONF.import_opt('verbose', 'portas.openstack.common.log') CONF.import_opt('debug', 'portas.openstack.common.log') diff --git a/portas/portas/common/service.py b/portas/portas/common/service.py new file mode 100644 index 0000000..037d79f --- /dev/null +++ b/portas/portas/common/service.py @@ -0,0 +1,52 @@ +from eventlet import patcher + +amqp = patcher.import_patched('amqplib.client_0_8') + +from portas.openstack.common import service +from portas.openstack.common import log as logging +from portas.common import config + +conf = config.CONF.reports +rabbitmq = config.CONF.rabbitmq +log = logging.getLogger(__name__) +channel = None + + +class TaskResultHandlerService(service.Service): + def __init__(self, threads=1000): + super(TaskResultHandlerService, self).__init__(threads) + + def start(self): + super(TaskResultHandlerService, self).start() + self.tg.add_thread(self._handle_results) + + def stop(self): + super(TaskResultHandlerService, self).stop() + + def _handle_results(self): + connection = amqp.Connection(rabbitmq.host, virtual_host=rabbitmq.virtual_host, + userid=rabbitmq.userid, password=rabbitmq.password, + ssl=rabbitmq.use_ssl, insist=True) + ch = connection.channel() + + def bind(exchange, queue): + ch.exchange_declare(exchange, 'direct') + ch.queue_declare(queue) + ch.queue_bind(queue, exchange, queue) + + bind(conf.results_exchange, conf.results_queue) + bind(conf.reports_exchange, conf.reports_queue) + + ch.basic_consume('task-results', callback=handle_result) + ch.basic_consume('task-reports', callback=handle_report) + while ch.callbacks: + ch.wait() + + +def handle_report(msg): + msg.channel.basic_ack(msg.delivery_tag) + log.debug(_('Got report message from orchestration engine:\n{0}'.format(msg.body))) + +def handle_result(msg): + msg.channel.basic_ack(msg.delivery_tag) + log.debug(_('Got result message from orchestration engine:\n{0}'.format(msg.body))) \ No newline at end of file diff --git a/portas/tools/pip-requires b/portas/tools/pip-requires index c5a332f..2b71bd9 100644 --- a/portas/tools/pip-requires +++ b/portas/tools/pip-requires @@ -13,6 +13,7 @@ httplib2 kombu pycrypto>=2.1.0alpha1 iso8601>=0.1.4 +amqplib # Note you will need gcc buildtools installed and must # have installed libxml headers for lxml to be successfully