From 8c2bf8bfe0e5f6ce911f0a57bd0590f82d0884bd Mon Sep 17 00:00:00 2001 From: Stan Lagun Date: Wed, 27 Mar 2013 21:30:40 +0400 Subject: [PATCH] Scoped tokens --- conductor/conductor/app.py | 4 +- .../conductor/commands/cloud_formation.py | 299 ++++++------ conductor/conductor/commands/dispatcher.py | 66 +-- conductor/conductor/config.py | 427 +++++++++--------- conductor/etc/conductor.conf | 27 +- 5 files changed, 416 insertions(+), 407 deletions(-) diff --git a/conductor/conductor/app.py b/conductor/conductor/app.py index 3d06e9d..b6eaf2e 100644 --- a/conductor/conductor/app.py +++ b/conductor/conductor/app.py @@ -27,7 +27,7 @@ def task_received(task, message_id): reporter = reporting.Reporter(rmqclient, message_id, task['id']) command_dispatcher = CommandDispatcher( - task['id'], rmqclient, task['token']) + task['id'], rmqclient, task['token'], task['tenant_id']) workflows = [] for path in glob.glob("data/workflows/*.xml"): log.debug('Loading XML {0}'.format(path)) @@ -79,7 +79,7 @@ class ConductorWorkflowService(service.Service): with rabbitmq.RmqClient() as rmq: rmq.declare('tasks', 'tasks') rmq.declare('task-results') - with rmq.open('tasks') as subscription: + with rmq.open('tasks2') as subscription: while True: msg = subscription.get_message() self.tg.add_thread( diff --git a/conductor/conductor/commands/cloud_formation.py b/conductor/conductor/commands/cloud_formation.py index 1d1a0e7..c9b3666 100644 --- a/conductor/conductor/commands/cloud_formation.py +++ b/conductor/conductor/commands/cloud_formation.py @@ -1,146 +1,153 @@ -import anyjson -import eventlet - -from conductor.openstack.common import log as logging -import conductor.helpers -from command import CommandBase -import conductor.config -from heatclient.client import Client -import heatclient.exc -import types - -log = logging.getLogger(__name__) - - -class HeatExecutor(CommandBase): - def __init__(self, stack, token): - self._update_pending_list = [] - self._delete_pending_list = [] - self._stack = 'e' + stack - settings = conductor.config.CONF.heat - self._heat_client = Client('1', settings.url, - token_only=True, token=token) - - def execute(self, command, callback, **kwargs): - log.debug('Got command {0} on stack {1}'.format(command, self._stack)) - - if command == 'CreateOrUpdate': - return self._execute_create_update( - kwargs['template'], - kwargs['mappings'], - kwargs['arguments'], - callback) - elif command == 'Delete': - return self._execute_delete(callback) - - def _execute_create_update(self, template, mappings, arguments, callback): - with open('data/templates/cf/%s.template' % template) as template_file: - template_data = template_file.read() - - template_data = conductor.helpers.transform_json( - anyjson.loads(template_data), mappings) - - self._update_pending_list.append({ - 'template': template_data, - 'arguments': arguments, - 'callback': callback - }) - - def _execute_delete(self, callback): - self._delete_pending_list.append({ - 'callback': callback - }) - - def has_pending_commands(self): - return len(self._update_pending_list) + \ - len(self._delete_pending_list) > 0 - - def execute_pending(self): - r1 = self._execute_pending_updates() - r2 = self._execute_pending_deletes() - return r1 or r2 - - def _execute_pending_updates(self): - if not len(self._update_pending_list): - return False - - template = {} - arguments = {} - for t in self._update_pending_list: - template = conductor.helpers.merge_dicts( - template, t['template'], max_levels=2) - arguments = conductor.helpers.merge_dicts( - arguments, t['arguments'], max_levels=1) - - log.info( - 'Executing heat template {0} with arguments {1} on stack {2}' - .format(anyjson.dumps(template), arguments, self._stack)) - - try: - self._heat_client.stacks.update( - stack_id=self._stack, - parameters=arguments, - template=template) - log.debug( - 'Waiting for the stack {0} to be update'.format(self._stack)) - self._wait_state('UPDATE_COMPLETE') - log.info('Stack {0} updated'.format(self._stack)) - except heatclient.exc.HTTPNotFound: - self._heat_client.stacks.create( - stack_name=self._stack, - parameters=arguments, - template=template) - log.debug('Waiting for the stack {0} to be create'.format( - self._stack)) - self._wait_state('CREATE_COMPLETE') - log.info('Stack {0} created'.format(self._stack)) - - pending_list = self._update_pending_list - self._update_pending_list = [] - - for item in pending_list: - item['callback'](True) - - return True - - def _execute_pending_deletes(self): - if not len(self._delete_pending_list): - return False - - log.debug('Deleting stack {0}'.format(self._stack)) - try: - self._heat_client.stacks.delete( - stack_id=self._stack) - log.debug( - 'Waiting for the stack {0} to be deleted'.format(self._stack)) - self._wait_state(['DELETE_COMPLETE', '']) - log.info('Stack {0} deleted'.format(self._stack)) - except Exception as ex: - log.exception(ex) - - pending_list = self._delete_pending_list - self._delete_pending_list = [] - - for item in pending_list: - item['callback'](True) - return True - - def _wait_state(self, state): - if isinstance(state, types.ListType): - states = state - else: - states = [state] - - while True: - try: - status = self._heat_client.stacks.get( - stack_id=self._stack).stack_status - except heatclient.exc.HTTPNotFound: - status = '' - - if 'IN_PROGRESS' in status: - eventlet.sleep(1) - continue - if status not in states: - raise EnvironmentError() - return +import anyjson +import eventlet + +from conductor.openstack.common import log as logging +import conductor.helpers +from command import CommandBase +import conductor.config +from heatclient.client import Client +import heatclient.exc +from keystoneclient.v2_0 import client as ksclient +import types + +log = logging.getLogger(__name__) + + +class HeatExecutor(CommandBase): + def __init__(self, stack, token, tenant_id): + self._update_pending_list = [] + self._delete_pending_list = [] + self._stack = 'e' + stack + settings = conductor.config.CONF.heat + + client = ksclient.Client(endpoint=settings.keystone) + scoped_token = client.tokens.authenticate( + tenant_id=tenant_id, + token=token).id + + self._heat_client = Client('1', settings.url, + token_only=True, token=scoped_token) + + def execute(self, command, callback, **kwargs): + log.debug('Got command {0} on stack {1}'.format(command, self._stack)) + + if command == 'CreateOrUpdate': + return self._execute_create_update( + kwargs['template'], + kwargs['mappings'], + kwargs['arguments'], + callback) + elif command == 'Delete': + return self._execute_delete(callback) + + def _execute_create_update(self, template, mappings, arguments, callback): + with open('data/templates/cf/%s.template' % template) as template_file: + template_data = template_file.read() + + template_data = conductor.helpers.transform_json( + anyjson.loads(template_data), mappings) + + self._update_pending_list.append({ + 'template': template_data, + 'arguments': arguments, + 'callback': callback + }) + + def _execute_delete(self, callback): + self._delete_pending_list.append({ + 'callback': callback + }) + + def has_pending_commands(self): + return len(self._update_pending_list) + \ + len(self._delete_pending_list) > 0 + + def execute_pending(self): + r1 = self._execute_pending_updates() + r2 = self._execute_pending_deletes() + return r1 or r2 + + def _execute_pending_updates(self): + if not len(self._update_pending_list): + return False + + template = {} + arguments = {} + for t in self._update_pending_list: + template = conductor.helpers.merge_dicts( + template, t['template'], max_levels=2) + arguments = conductor.helpers.merge_dicts( + arguments, t['arguments'], max_levels=1) + + log.info( + 'Executing heat template {0} with arguments {1} on stack {2}' + .format(anyjson.dumps(template), arguments, self._stack)) + + try: + self._heat_client.stacks.update( + stack_id=self._stack, + parameters=arguments, + template=template) + log.debug( + 'Waiting for the stack {0} to be update'.format(self._stack)) + self._wait_state('UPDATE_COMPLETE') + log.info('Stack {0} updated'.format(self._stack)) + except heatclient.exc.HTTPNotFound: + self._heat_client.stacks.create( + stack_name=self._stack, + parameters=arguments, + template=template) + log.debug('Waiting for the stack {0} to be create'.format( + self._stack)) + self._wait_state('CREATE_COMPLETE') + log.info('Stack {0} created'.format(self._stack)) + + pending_list = self._update_pending_list + self._update_pending_list = [] + + for item in pending_list: + item['callback'](True) + + return True + + def _execute_pending_deletes(self): + if not len(self._delete_pending_list): + return False + + log.debug('Deleting stack {0}'.format(self._stack)) + try: + self._heat_client.stacks.delete( + stack_id=self._stack) + log.debug( + 'Waiting for the stack {0} to be deleted'.format(self._stack)) + self._wait_state(['DELETE_COMPLETE', '']) + log.info('Stack {0} deleted'.format(self._stack)) + except Exception as ex: + log.exception(ex) + + pending_list = self._delete_pending_list + self._delete_pending_list = [] + + for item in pending_list: + item['callback'](True) + return True + + def _wait_state(self, state): + if isinstance(state, types.ListType): + states = state + else: + states = [state] + + while True: + try: + status = self._heat_client.stacks.get( + stack_id=self._stack).stack_status + except heatclient.exc.HTTPNotFound: + status = '' + + if 'IN_PROGRESS' in status: + eventlet.sleep(1) + continue + if status not in states: + raise EnvironmentError() + return diff --git a/conductor/conductor/commands/dispatcher.py b/conductor/conductor/commands/dispatcher.py index 3f7716a..92e1183 100644 --- a/conductor/conductor/commands/dispatcher.py +++ b/conductor/conductor/commands/dispatcher.py @@ -1,33 +1,33 @@ -import command -import cloud_formation -import windows_agent - - -class CommandDispatcher(command.CommandBase): - def __init__(self, environment_id, rmqclient, token): - self._command_map = { - 'cf': cloud_formation.HeatExecutor(environment_id, token), - 'agent': windows_agent.WindowsAgentExecutor( - environment_id, rmqclient) - } - - def execute(self, name, **kwargs): - self._command_map[name].execute(**kwargs) - - def execute_pending(self): - result = False - for command in self._command_map.values(): - result |= command.execute_pending() - - return result - - def has_pending_commands(self): - result = False - for command in self._command_map.values(): - result |= command.has_pending_commands() - - return result - - def close(self): - for t in self._command_map.values(): - t.close() +import command +import cloud_formation +import windows_agent + + +class CommandDispatcher(command.CommandBase): + def __init__(self, environment_id, rmqclient, token, tenant_id): + self._command_map = { + 'cf': cloud_formation.HeatExecutor(environment_id, token, tenant_id), + 'agent': windows_agent.WindowsAgentExecutor( + environment_id, rmqclient) + } + + def execute(self, name, **kwargs): + self._command_map[name].execute(**kwargs) + + def execute_pending(self): + result = False + for command in self._command_map.values(): + result |= command.execute_pending() + + return result + + def has_pending_commands(self): + result = False + for command in self._command_map.values(): + result |= command.has_pending_commands() + + return result + + def close(self): + for t in self._command_map.values(): + t.close() diff --git a/conductor/conductor/config.py b/conductor/conductor/config.py index f93bd78..1586918 100644 --- a/conductor/conductor/config.py +++ b/conductor/conductor/config.py @@ -1,213 +1,214 @@ -#!/usr/bin/env python -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2011 OpenStack LLC. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Routines for configuring Glance -""" - -import logging -import logging.config -import logging.handlers -import os -import sys - -from oslo.config import cfg -from paste import deploy - -from conductor.version import version_info as version -from ConfigParser import SafeConfigParser - -paste_deploy_opts = [ - cfg.StrOpt('flavor'), - cfg.StrOpt('config_file'), -] - -rabbit_opts = [ - cfg.StrOpt('host', default='localhost'), - cfg.IntOpt('port', default=5672), - cfg.StrOpt('login', default='guest'), - cfg.StrOpt('password', default='guest'), - cfg.StrOpt('virtual_host', default='/'), -] - -heat_opts = [ - cfg.StrOpt('url') -] - -CONF = cfg.CONF -CONF.register_opts(paste_deploy_opts, group='paste_deploy') -CONF.register_opts(rabbit_opts, group='rabbitmq') -CONF.register_opts(heat_opts, group='heat') - - -CONF.import_opt('verbose', 'conductor.openstack.common.log') -CONF.import_opt('debug', 'conductor.openstack.common.log') -CONF.import_opt('log_dir', 'conductor.openstack.common.log') -CONF.import_opt('log_file', 'conductor.openstack.common.log') -CONF.import_opt('log_config', 'conductor.openstack.common.log') -CONF.import_opt('log_format', 'conductor.openstack.common.log') -CONF.import_opt('log_date_format', 'conductor.openstack.common.log') -CONF.import_opt('use_syslog', 'conductor.openstack.common.log') -CONF.import_opt('syslog_log_facility', 'conductor.openstack.common.log') - - -def parse_args(args=None, usage=None, default_config_files=None): - CONF(args=args, - project='conductor', - version=version.cached_version_string(), - usage=usage, - default_config_files=default_config_files) - - -def setup_logging(): - """ - Sets up the logging options for a log with supplied name - """ - - if CONF.log_config: - # Use a logging configuration file for all settings... - if os.path.exists(CONF.log_config): - logging.config.fileConfig(CONF.log_config) - return - else: - raise RuntimeError("Unable to locate specified logging " - "config file: %s" % CONF.log_config) - - root_logger = logging.root - if CONF.debug: - root_logger.setLevel(logging.DEBUG) - elif CONF.verbose: - root_logger.setLevel(logging.INFO) - else: - root_logger.setLevel(logging.WARNING) - - formatter = logging.Formatter(CONF.log_format, CONF.log_date_format) - - if CONF.use_syslog: - try: - facility = getattr(logging.handlers.SysLogHandler, - CONF.syslog_log_facility) - except AttributeError: - raise ValueError(_("Invalid syslog facility")) - - handler = logging.handlers.SysLogHandler(address='/dev/log', - facility=facility) - elif CONF.log_file: - logfile = CONF.log_file - if CONF.log_dir: - logfile = os.path.join(CONF.log_dir, logfile) - handler = logging.handlers.WatchedFileHandler(logfile) - else: - handler = logging.StreamHandler(sys.stdout) - - handler.setFormatter(formatter) - root_logger.addHandler(handler) - - -def _get_deployment_flavor(): - """ - Retrieve the paste_deploy.flavor config item, formatted appropriately - for appending to the application name. - """ - flavor = CONF.paste_deploy.flavor - return '' if not flavor else ('-' + flavor) - - -def _get_paste_config_path(): - paste_suffix = '-paste.ini' - conf_suffix = '.conf' - if CONF.config_file: - # Assume paste config is in a paste.ini file corresponding - # to the last config file - path = CONF.config_file[-1].replace(conf_suffix, paste_suffix) - else: - path = CONF.prog + '-paste.ini' - return CONF.find_file(os.path.basename(path)) - - -def _get_deployment_config_file(): - """ - Retrieve the deployment_config_file config item, formatted as an - absolute pathname. - """ - path = CONF.paste_deploy.config_file - if not path: - path = _get_paste_config_path() - if not path: - msg = "Unable to locate paste config file for %s." % CONF.prog - raise RuntimeError(msg) - return os.path.abspath(path) - - -def load_paste_app(app_name=None): - """ - Builds and returns a WSGI app from a paste config file. - - We assume the last config file specified in the supplied ConfigOpts - object is the paste config file. - - :param app_name: name of the application to load - - :raises RuntimeError when config file cannot be located or application - cannot be loaded from config file - """ - if app_name is None: - app_name = CONF.prog - - # append the deployment flavor to the application name, - # in order to identify the appropriate paste pipeline - app_name += _get_deployment_flavor() - - conf_file = _get_deployment_config_file() - - try: - logger = logging.getLogger(__name__) - logger.debug(_("Loading %(app_name)s from %(conf_file)s"), - {'conf_file': conf_file, 'app_name': app_name}) - - app = deploy.loadapp("config:%s" % conf_file, name=app_name) - - # Log the options used when starting if we're in debug mode... - if CONF.debug: - CONF.log_opt_values(logger, logging.DEBUG) - - return app - except (LookupError, ImportError), e: - msg = _("Unable to load %(app_name)s from " - "configuration file %(conf_file)s." - "\nGot: %(e)r") % locals() - logger.error(msg) - raise RuntimeError(msg) - - -class Config(object): - CONFIG_PATH = './etc/app.config' - - def __init__(self, filename=None): - self.config = SafeConfigParser() - self.config.read(filename or self.CONFIG_PATH) - - def get_setting(self, section, name, default=None): - if not self.config.has_option(section, name): - return default - return self.config.get(section, name) - - def __getitem__(self, item): - parts = item.rsplit('.', 1) - return self.get_setting( - parts[0] if len(parts) == 2 else 'DEFAULT', parts[-1]) +#!/usr/bin/env python +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Routines for configuring Glance +""" + +import logging +import logging.config +import logging.handlers +import os +import sys + +from oslo.config import cfg +from paste import deploy + +from conductor.version import version_info as version +from ConfigParser import SafeConfigParser + +paste_deploy_opts = [ + cfg.StrOpt('flavor'), + cfg.StrOpt('config_file'), +] + +rabbit_opts = [ + cfg.StrOpt('host', default='localhost'), + cfg.IntOpt('port', default=5672), + cfg.StrOpt('login', default='guest'), + cfg.StrOpt('password', default='guest'), + cfg.StrOpt('virtual_host', default='/'), +] + +heat_opts = [ + cfg.StrOpt('url'), + cfg.StrOpt('keystone') +] + +CONF = cfg.CONF +CONF.register_opts(paste_deploy_opts, group='paste_deploy') +CONF.register_opts(rabbit_opts, group='rabbitmq') +CONF.register_opts(heat_opts, group='heat') + + +CONF.import_opt('verbose', 'conductor.openstack.common.log') +CONF.import_opt('debug', 'conductor.openstack.common.log') +CONF.import_opt('log_dir', 'conductor.openstack.common.log') +CONF.import_opt('log_file', 'conductor.openstack.common.log') +CONF.import_opt('log_config', 'conductor.openstack.common.log') +CONF.import_opt('log_format', 'conductor.openstack.common.log') +CONF.import_opt('log_date_format', 'conductor.openstack.common.log') +CONF.import_opt('use_syslog', 'conductor.openstack.common.log') +CONF.import_opt('syslog_log_facility', 'conductor.openstack.common.log') + + +def parse_args(args=None, usage=None, default_config_files=None): + CONF(args=args, + project='conductor', + version=version.cached_version_string(), + usage=usage, + default_config_files=default_config_files) + + +def setup_logging(): + """ + Sets up the logging options for a log with supplied name + """ + + if CONF.log_config: + # Use a logging configuration file for all settings... + if os.path.exists(CONF.log_config): + logging.config.fileConfig(CONF.log_config) + return + else: + raise RuntimeError("Unable to locate specified logging " + "config file: %s" % CONF.log_config) + + root_logger = logging.root + if CONF.debug: + root_logger.setLevel(logging.DEBUG) + elif CONF.verbose: + root_logger.setLevel(logging.INFO) + else: + root_logger.setLevel(logging.WARNING) + + formatter = logging.Formatter(CONF.log_format, CONF.log_date_format) + + if CONF.use_syslog: + try: + facility = getattr(logging.handlers.SysLogHandler, + CONF.syslog_log_facility) + except AttributeError: + raise ValueError(_("Invalid syslog facility")) + + handler = logging.handlers.SysLogHandler(address='/dev/log', + facility=facility) + elif CONF.log_file: + logfile = CONF.log_file + if CONF.log_dir: + logfile = os.path.join(CONF.log_dir, logfile) + handler = logging.handlers.WatchedFileHandler(logfile) + else: + handler = logging.StreamHandler(sys.stdout) + + handler.setFormatter(formatter) + root_logger.addHandler(handler) + + +def _get_deployment_flavor(): + """ + Retrieve the paste_deploy.flavor config item, formatted appropriately + for appending to the application name. + """ + flavor = CONF.paste_deploy.flavor + return '' if not flavor else ('-' + flavor) + + +def _get_paste_config_path(): + paste_suffix = '-paste.ini' + conf_suffix = '.conf' + if CONF.config_file: + # Assume paste config is in a paste.ini file corresponding + # to the last config file + path = CONF.config_file[-1].replace(conf_suffix, paste_suffix) + else: + path = CONF.prog + '-paste.ini' + return CONF.find_file(os.path.basename(path)) + + +def _get_deployment_config_file(): + """ + Retrieve the deployment_config_file config item, formatted as an + absolute pathname. + """ + path = CONF.paste_deploy.config_file + if not path: + path = _get_paste_config_path() + if not path: + msg = "Unable to locate paste config file for %s." % CONF.prog + raise RuntimeError(msg) + return os.path.abspath(path) + + +def load_paste_app(app_name=None): + """ + Builds and returns a WSGI app from a paste config file. + + We assume the last config file specified in the supplied ConfigOpts + object is the paste config file. + + :param app_name: name of the application to load + + :raises RuntimeError when config file cannot be located or application + cannot be loaded from config file + """ + if app_name is None: + app_name = CONF.prog + + # append the deployment flavor to the application name, + # in order to identify the appropriate paste pipeline + app_name += _get_deployment_flavor() + + conf_file = _get_deployment_config_file() + + try: + logger = logging.getLogger(__name__) + logger.debug(_("Loading %(app_name)s from %(conf_file)s"), + {'conf_file': conf_file, 'app_name': app_name}) + + app = deploy.loadapp("config:%s" % conf_file, name=app_name) + + # Log the options used when starting if we're in debug mode... + if CONF.debug: + CONF.log_opt_values(logger, logging.DEBUG) + + return app + except (LookupError, ImportError), e: + msg = _("Unable to load %(app_name)s from " + "configuration file %(conf_file)s." + "\nGot: %(e)r") % locals() + logger.error(msg) + raise RuntimeError(msg) + + +class Config(object): + CONFIG_PATH = './etc/app.config' + + def __init__(self, filename=None): + self.config = SafeConfigParser() + self.config.read(filename or self.CONFIG_PATH) + + def get_setting(self, section, name, default=None): + if not self.config.has_option(section, name): + return default + return self.config.get(section, name) + + def __getitem__(self, item): + parts = item.rsplit('.', 1) + return self.get_setting( + parts[0] if len(parts) == 2 else 'DEFAULT', parts[-1]) diff --git a/conductor/etc/conductor.conf b/conductor/etc/conductor.conf index 03f9913..2b052ed 100644 --- a/conductor/etc/conductor.conf +++ b/conductor/etc/conductor.conf @@ -1,14 +1,15 @@ -[DEFAULT] -log_file = logs/conductor.log -debug=True -verbose=True - -[heat] -url = http://172.18.124.101:8004/v1/16eb78cbb688459c8308d89678bcef50 - -[rabbitmq] -host = 172.18.124.101 -port = 5672 -virtual_host = keero -login = keero +[DEFAULT] +log_file = logs/conductor.log +debug=True +verbose=True + +[heat] +url = http://172.18.124.101:8004/v1/16eb78cbb688459c8308d89678bcef50 +keystone = http://172.18.124.101:5000/v2.0 + +[rabbitmq] +host = 172.18.124.101 +port = 5672 +virtual_host = keero +login = keero password = keero \ No newline at end of file