Scoped tokens

This commit is contained in:
Stan Lagun 2013-03-27 21:30:40 +04:00
parent c313c0959a
commit 8c2bf8bfe0
5 changed files with 416 additions and 407 deletions

View File

@ -27,7 +27,7 @@ def task_received(task, message_id):
reporter = reporting.Reporter(rmqclient, message_id, task['id']) reporter = reporting.Reporter(rmqclient, message_id, task['id'])
command_dispatcher = CommandDispatcher( command_dispatcher = CommandDispatcher(
task['id'], rmqclient, task['token']) task['id'], rmqclient, task['token'], task['tenant_id'])
workflows = [] workflows = []
for path in glob.glob("data/workflows/*.xml"): for path in glob.glob("data/workflows/*.xml"):
log.debug('Loading XML {0}'.format(path)) log.debug('Loading XML {0}'.format(path))
@ -79,7 +79,7 @@ class ConductorWorkflowService(service.Service):
with rabbitmq.RmqClient() as rmq: with rabbitmq.RmqClient() as rmq:
rmq.declare('tasks', 'tasks') rmq.declare('tasks', 'tasks')
rmq.declare('task-results') rmq.declare('task-results')
with rmq.open('tasks') as subscription: with rmq.open('tasks2') as subscription:
while True: while True:
msg = subscription.get_message() msg = subscription.get_message()
self.tg.add_thread( self.tg.add_thread(

View File

@ -1,146 +1,153 @@
import anyjson import anyjson
import eventlet import eventlet
from conductor.openstack.common import log as logging from conductor.openstack.common import log as logging
import conductor.helpers import conductor.helpers
from command import CommandBase from command import CommandBase
import conductor.config import conductor.config
from heatclient.client import Client from heatclient.client import Client
import heatclient.exc import heatclient.exc
import types from keystoneclient.v2_0 import client as ksclient
import types
log = logging.getLogger(__name__)
log = logging.getLogger(__name__)
class HeatExecutor(CommandBase):
def __init__(self, stack, token): class HeatExecutor(CommandBase):
self._update_pending_list = [] def __init__(self, stack, token, tenant_id):
self._delete_pending_list = [] self._update_pending_list = []
self._stack = 'e' + stack self._delete_pending_list = []
settings = conductor.config.CONF.heat self._stack = 'e' + stack
self._heat_client = Client('1', settings.url, settings = conductor.config.CONF.heat
token_only=True, token=token)
client = ksclient.Client(endpoint=settings.keystone)
def execute(self, command, callback, **kwargs): scoped_token = client.tokens.authenticate(
log.debug('Got command {0} on stack {1}'.format(command, self._stack)) tenant_id=tenant_id,
token=token).id
if command == 'CreateOrUpdate':
return self._execute_create_update( self._heat_client = Client('1', settings.url,
kwargs['template'], token_only=True, token=scoped_token)
kwargs['mappings'],
kwargs['arguments'], def execute(self, command, callback, **kwargs):
callback) log.debug('Got command {0} on stack {1}'.format(command, self._stack))
elif command == 'Delete':
return self._execute_delete(callback) if command == 'CreateOrUpdate':
return self._execute_create_update(
def _execute_create_update(self, template, mappings, arguments, callback): kwargs['template'],
with open('data/templates/cf/%s.template' % template) as template_file: kwargs['mappings'],
template_data = template_file.read() kwargs['arguments'],
callback)
template_data = conductor.helpers.transform_json( elif command == 'Delete':
anyjson.loads(template_data), mappings) return self._execute_delete(callback)
self._update_pending_list.append({ def _execute_create_update(self, template, mappings, arguments, callback):
'template': template_data, with open('data/templates/cf/%s.template' % template) as template_file:
'arguments': arguments, template_data = template_file.read()
'callback': callback
}) template_data = conductor.helpers.transform_json(
anyjson.loads(template_data), mappings)
def _execute_delete(self, callback):
self._delete_pending_list.append({ self._update_pending_list.append({
'callback': callback 'template': template_data,
}) 'arguments': arguments,
'callback': callback
def has_pending_commands(self): })
return len(self._update_pending_list) + \
len(self._delete_pending_list) > 0 def _execute_delete(self, callback):
self._delete_pending_list.append({
def execute_pending(self): 'callback': callback
r1 = self._execute_pending_updates() })
r2 = self._execute_pending_deletes()
return r1 or r2 def has_pending_commands(self):
return len(self._update_pending_list) + \
def _execute_pending_updates(self): len(self._delete_pending_list) > 0
if not len(self._update_pending_list):
return False def execute_pending(self):
r1 = self._execute_pending_updates()
template = {} r2 = self._execute_pending_deletes()
arguments = {} return r1 or r2
for t in self._update_pending_list:
template = conductor.helpers.merge_dicts( def _execute_pending_updates(self):
template, t['template'], max_levels=2) if not len(self._update_pending_list):
arguments = conductor.helpers.merge_dicts( return False
arguments, t['arguments'], max_levels=1)
template = {}
log.info( arguments = {}
'Executing heat template {0} with arguments {1} on stack {2}' for t in self._update_pending_list:
.format(anyjson.dumps(template), arguments, self._stack)) template = conductor.helpers.merge_dicts(
template, t['template'], max_levels=2)
try: arguments = conductor.helpers.merge_dicts(
self._heat_client.stacks.update( arguments, t['arguments'], max_levels=1)
stack_id=self._stack,
parameters=arguments, log.info(
template=template) 'Executing heat template {0} with arguments {1} on stack {2}'
log.debug( .format(anyjson.dumps(template), arguments, self._stack))
'Waiting for the stack {0} to be update'.format(self._stack))
self._wait_state('UPDATE_COMPLETE') try:
log.info('Stack {0} updated'.format(self._stack)) self._heat_client.stacks.update(
except heatclient.exc.HTTPNotFound: stack_id=self._stack,
self._heat_client.stacks.create( parameters=arguments,
stack_name=self._stack, template=template)
parameters=arguments, log.debug(
template=template) 'Waiting for the stack {0} to be update'.format(self._stack))
log.debug('Waiting for the stack {0} to be create'.format( self._wait_state('UPDATE_COMPLETE')
self._stack)) log.info('Stack {0} updated'.format(self._stack))
self._wait_state('CREATE_COMPLETE') except heatclient.exc.HTTPNotFound:
log.info('Stack {0} created'.format(self._stack)) self._heat_client.stacks.create(
stack_name=self._stack,
pending_list = self._update_pending_list parameters=arguments,
self._update_pending_list = [] template=template)
log.debug('Waiting for the stack {0} to be create'.format(
for item in pending_list: self._stack))
item['callback'](True) self._wait_state('CREATE_COMPLETE')
log.info('Stack {0} created'.format(self._stack))
return True
pending_list = self._update_pending_list
def _execute_pending_deletes(self): self._update_pending_list = []
if not len(self._delete_pending_list):
return False for item in pending_list:
item['callback'](True)
log.debug('Deleting stack {0}'.format(self._stack))
try: return True
self._heat_client.stacks.delete(
stack_id=self._stack) def _execute_pending_deletes(self):
log.debug( if not len(self._delete_pending_list):
'Waiting for the stack {0} to be deleted'.format(self._stack)) return False
self._wait_state(['DELETE_COMPLETE', ''])
log.info('Stack {0} deleted'.format(self._stack)) log.debug('Deleting stack {0}'.format(self._stack))
except Exception as ex: try:
log.exception(ex) self._heat_client.stacks.delete(
stack_id=self._stack)
pending_list = self._delete_pending_list log.debug(
self._delete_pending_list = [] 'Waiting for the stack {0} to be deleted'.format(self._stack))
self._wait_state(['DELETE_COMPLETE', ''])
for item in pending_list: log.info('Stack {0} deleted'.format(self._stack))
item['callback'](True) except Exception as ex:
return True log.exception(ex)
def _wait_state(self, state): pending_list = self._delete_pending_list
if isinstance(state, types.ListType): self._delete_pending_list = []
states = state
else: for item in pending_list:
states = [state] item['callback'](True)
return True
while True:
try: def _wait_state(self, state):
status = self._heat_client.stacks.get( if isinstance(state, types.ListType):
stack_id=self._stack).stack_status states = state
except heatclient.exc.HTTPNotFound: else:
status = '' states = [state]
if 'IN_PROGRESS' in status: while True:
eventlet.sleep(1) try:
continue status = self._heat_client.stacks.get(
if status not in states: stack_id=self._stack).stack_status
raise EnvironmentError() except heatclient.exc.HTTPNotFound:
return status = ''
if 'IN_PROGRESS' in status:
eventlet.sleep(1)
continue
if status not in states:
raise EnvironmentError()
return

View File

@ -1,33 +1,33 @@
import command import command
import cloud_formation import cloud_formation
import windows_agent import windows_agent
class CommandDispatcher(command.CommandBase): class CommandDispatcher(command.CommandBase):
def __init__(self, environment_id, rmqclient, token): def __init__(self, environment_id, rmqclient, token, tenant_id):
self._command_map = { self._command_map = {
'cf': cloud_formation.HeatExecutor(environment_id, token), 'cf': cloud_formation.HeatExecutor(environment_id, token, tenant_id),
'agent': windows_agent.WindowsAgentExecutor( 'agent': windows_agent.WindowsAgentExecutor(
environment_id, rmqclient) environment_id, rmqclient)
} }
def execute(self, name, **kwargs): def execute(self, name, **kwargs):
self._command_map[name].execute(**kwargs) self._command_map[name].execute(**kwargs)
def execute_pending(self): def execute_pending(self):
result = False result = False
for command in self._command_map.values(): for command in self._command_map.values():
result |= command.execute_pending() result |= command.execute_pending()
return result return result
def has_pending_commands(self): def has_pending_commands(self):
result = False result = False
for command in self._command_map.values(): for command in self._command_map.values():
result |= command.has_pending_commands() result |= command.has_pending_commands()
return result return result
def close(self): def close(self):
for t in self._command_map.values(): for t in self._command_map.values():
t.close() t.close()

View File

@ -1,213 +1,214 @@
#!/usr/bin/env python #!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4 # vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC. # Copyright 2011 OpenStack LLC.
# All Rights Reserved. # All Rights Reserved.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain # not use this file except in compliance with the License. You may obtain
# a copy of the License at # a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0 # http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software # Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
""" """
Routines for configuring Glance Routines for configuring Glance
""" """
import logging import logging
import logging.config import logging.config
import logging.handlers import logging.handlers
import os import os
import sys import sys
from oslo.config import cfg from oslo.config import cfg
from paste import deploy from paste import deploy
from conductor.version import version_info as version from conductor.version import version_info as version
from ConfigParser import SafeConfigParser from ConfigParser import SafeConfigParser
paste_deploy_opts = [ paste_deploy_opts = [
cfg.StrOpt('flavor'), cfg.StrOpt('flavor'),
cfg.StrOpt('config_file'), cfg.StrOpt('config_file'),
] ]
rabbit_opts = [ rabbit_opts = [
cfg.StrOpt('host', default='localhost'), cfg.StrOpt('host', default='localhost'),
cfg.IntOpt('port', default=5672), cfg.IntOpt('port', default=5672),
cfg.StrOpt('login', default='guest'), cfg.StrOpt('login', default='guest'),
cfg.StrOpt('password', default='guest'), cfg.StrOpt('password', default='guest'),
cfg.StrOpt('virtual_host', default='/'), cfg.StrOpt('virtual_host', default='/'),
] ]
heat_opts = [ heat_opts = [
cfg.StrOpt('url') cfg.StrOpt('url'),
] cfg.StrOpt('keystone')
]
CONF = cfg.CONF
CONF.register_opts(paste_deploy_opts, group='paste_deploy') CONF = cfg.CONF
CONF.register_opts(rabbit_opts, group='rabbitmq') CONF.register_opts(paste_deploy_opts, group='paste_deploy')
CONF.register_opts(heat_opts, group='heat') CONF.register_opts(rabbit_opts, group='rabbitmq')
CONF.register_opts(heat_opts, group='heat')
CONF.import_opt('verbose', 'conductor.openstack.common.log')
CONF.import_opt('debug', 'conductor.openstack.common.log') CONF.import_opt('verbose', 'conductor.openstack.common.log')
CONF.import_opt('log_dir', 'conductor.openstack.common.log') CONF.import_opt('debug', 'conductor.openstack.common.log')
CONF.import_opt('log_file', 'conductor.openstack.common.log') CONF.import_opt('log_dir', 'conductor.openstack.common.log')
CONF.import_opt('log_config', 'conductor.openstack.common.log') CONF.import_opt('log_file', 'conductor.openstack.common.log')
CONF.import_opt('log_format', 'conductor.openstack.common.log') CONF.import_opt('log_config', 'conductor.openstack.common.log')
CONF.import_opt('log_date_format', 'conductor.openstack.common.log') CONF.import_opt('log_format', 'conductor.openstack.common.log')
CONF.import_opt('use_syslog', 'conductor.openstack.common.log') CONF.import_opt('log_date_format', 'conductor.openstack.common.log')
CONF.import_opt('syslog_log_facility', 'conductor.openstack.common.log') CONF.import_opt('use_syslog', 'conductor.openstack.common.log')
CONF.import_opt('syslog_log_facility', 'conductor.openstack.common.log')
def parse_args(args=None, usage=None, default_config_files=None):
CONF(args=args, def parse_args(args=None, usage=None, default_config_files=None):
project='conductor', CONF(args=args,
version=version.cached_version_string(), project='conductor',
usage=usage, version=version.cached_version_string(),
default_config_files=default_config_files) usage=usage,
default_config_files=default_config_files)
def setup_logging():
""" def setup_logging():
Sets up the logging options for a log with supplied name """
""" Sets up the logging options for a log with supplied name
"""
if CONF.log_config:
# Use a logging configuration file for all settings... if CONF.log_config:
if os.path.exists(CONF.log_config): # Use a logging configuration file for all settings...
logging.config.fileConfig(CONF.log_config) if os.path.exists(CONF.log_config):
return logging.config.fileConfig(CONF.log_config)
else: return
raise RuntimeError("Unable to locate specified logging " else:
"config file: %s" % CONF.log_config) raise RuntimeError("Unable to locate specified logging "
"config file: %s" % CONF.log_config)
root_logger = logging.root
if CONF.debug: root_logger = logging.root
root_logger.setLevel(logging.DEBUG) if CONF.debug:
elif CONF.verbose: root_logger.setLevel(logging.DEBUG)
root_logger.setLevel(logging.INFO) elif CONF.verbose:
else: root_logger.setLevel(logging.INFO)
root_logger.setLevel(logging.WARNING) else:
root_logger.setLevel(logging.WARNING)
formatter = logging.Formatter(CONF.log_format, CONF.log_date_format)
formatter = logging.Formatter(CONF.log_format, CONF.log_date_format)
if CONF.use_syslog:
try: if CONF.use_syslog:
facility = getattr(logging.handlers.SysLogHandler, try:
CONF.syslog_log_facility) facility = getattr(logging.handlers.SysLogHandler,
except AttributeError: CONF.syslog_log_facility)
raise ValueError(_("Invalid syslog facility")) except AttributeError:
raise ValueError(_("Invalid syslog facility"))
handler = logging.handlers.SysLogHandler(address='/dev/log',
facility=facility) handler = logging.handlers.SysLogHandler(address='/dev/log',
elif CONF.log_file: facility=facility)
logfile = CONF.log_file elif CONF.log_file:
if CONF.log_dir: logfile = CONF.log_file
logfile = os.path.join(CONF.log_dir, logfile) if CONF.log_dir:
handler = logging.handlers.WatchedFileHandler(logfile) logfile = os.path.join(CONF.log_dir, logfile)
else: handler = logging.handlers.WatchedFileHandler(logfile)
handler = logging.StreamHandler(sys.stdout) else:
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(formatter)
root_logger.addHandler(handler) handler.setFormatter(formatter)
root_logger.addHandler(handler)
def _get_deployment_flavor():
""" def _get_deployment_flavor():
Retrieve the paste_deploy.flavor config item, formatted appropriately """
for appending to the application name. Retrieve the paste_deploy.flavor config item, formatted appropriately
""" for appending to the application name.
flavor = CONF.paste_deploy.flavor """
return '' if not flavor else ('-' + flavor) flavor = CONF.paste_deploy.flavor
return '' if not flavor else ('-' + flavor)
def _get_paste_config_path():
paste_suffix = '-paste.ini' def _get_paste_config_path():
conf_suffix = '.conf' paste_suffix = '-paste.ini'
if CONF.config_file: conf_suffix = '.conf'
# Assume paste config is in a paste.ini file corresponding if CONF.config_file:
# to the last config file # Assume paste config is in a paste.ini file corresponding
path = CONF.config_file[-1].replace(conf_suffix, paste_suffix) # to the last config file
else: path = CONF.config_file[-1].replace(conf_suffix, paste_suffix)
path = CONF.prog + '-paste.ini' else:
return CONF.find_file(os.path.basename(path)) path = CONF.prog + '-paste.ini'
return CONF.find_file(os.path.basename(path))
def _get_deployment_config_file():
""" def _get_deployment_config_file():
Retrieve the deployment_config_file config item, formatted as an """
absolute pathname. Retrieve the deployment_config_file config item, formatted as an
""" absolute pathname.
path = CONF.paste_deploy.config_file """
if not path: path = CONF.paste_deploy.config_file
path = _get_paste_config_path() if not path:
if not path: path = _get_paste_config_path()
msg = "Unable to locate paste config file for %s." % CONF.prog if not path:
raise RuntimeError(msg) msg = "Unable to locate paste config file for %s." % CONF.prog
return os.path.abspath(path) raise RuntimeError(msg)
return os.path.abspath(path)
def load_paste_app(app_name=None):
""" def load_paste_app(app_name=None):
Builds and returns a WSGI app from a paste config file. """
Builds and returns a WSGI app from a paste config file.
We assume the last config file specified in the supplied ConfigOpts
object is the paste config file. We assume the last config file specified in the supplied ConfigOpts
object is the paste config file.
:param app_name: name of the application to load
:param app_name: name of the application to load
:raises RuntimeError when config file cannot be located or application
cannot be loaded from config file :raises RuntimeError when config file cannot be located or application
""" cannot be loaded from config file
if app_name is None: """
app_name = CONF.prog if app_name is None:
app_name = CONF.prog
# append the deployment flavor to the application name,
# in order to identify the appropriate paste pipeline # append the deployment flavor to the application name,
app_name += _get_deployment_flavor() # in order to identify the appropriate paste pipeline
app_name += _get_deployment_flavor()
conf_file = _get_deployment_config_file()
conf_file = _get_deployment_config_file()
try:
logger = logging.getLogger(__name__) try:
logger.debug(_("Loading %(app_name)s from %(conf_file)s"), logger = logging.getLogger(__name__)
{'conf_file': conf_file, 'app_name': app_name}) logger.debug(_("Loading %(app_name)s from %(conf_file)s"),
{'conf_file': conf_file, 'app_name': app_name})
app = deploy.loadapp("config:%s" % conf_file, name=app_name)
app = deploy.loadapp("config:%s" % conf_file, name=app_name)
# Log the options used when starting if we're in debug mode...
if CONF.debug: # Log the options used when starting if we're in debug mode...
CONF.log_opt_values(logger, logging.DEBUG) if CONF.debug:
CONF.log_opt_values(logger, logging.DEBUG)
return app
except (LookupError, ImportError), e: return app
msg = _("Unable to load %(app_name)s from " except (LookupError, ImportError), e:
"configuration file %(conf_file)s." msg = _("Unable to load %(app_name)s from "
"\nGot: %(e)r") % locals() "configuration file %(conf_file)s."
logger.error(msg) "\nGot: %(e)r") % locals()
raise RuntimeError(msg) logger.error(msg)
raise RuntimeError(msg)
class Config(object):
CONFIG_PATH = './etc/app.config' class Config(object):
CONFIG_PATH = './etc/app.config'
def __init__(self, filename=None):
self.config = SafeConfigParser() def __init__(self, filename=None):
self.config.read(filename or self.CONFIG_PATH) self.config = SafeConfigParser()
self.config.read(filename or self.CONFIG_PATH)
def get_setting(self, section, name, default=None):
if not self.config.has_option(section, name): def get_setting(self, section, name, default=None):
return default if not self.config.has_option(section, name):
return self.config.get(section, name) return default
return self.config.get(section, name)
def __getitem__(self, item):
parts = item.rsplit('.', 1) def __getitem__(self, item):
return self.get_setting( parts = item.rsplit('.', 1)
parts[0] if len(parts) == 2 else 'DEFAULT', parts[-1]) return self.get_setting(
parts[0] if len(parts) == 2 else 'DEFAULT', parts[-1])

View File

@ -1,14 +1,15 @@
[DEFAULT] [DEFAULT]
log_file = logs/conductor.log log_file = logs/conductor.log
debug=True debug=True
verbose=True verbose=True
[heat] [heat]
url = http://172.18.124.101:8004/v1/16eb78cbb688459c8308d89678bcef50 url = http://172.18.124.101:8004/v1/16eb78cbb688459c8308d89678bcef50
keystone = http://172.18.124.101:5000/v2.0
[rabbitmq]
host = 172.18.124.101 [rabbitmq]
port = 5672 host = 172.18.124.101
virtual_host = keero port = 5672
login = keero virtual_host = keero
login = keero
password = keero password = keero