diff --git a/portas/portas/api/v1/environments.py b/portas/portas/api/v1/environments.py index 4129bbf..5e85fa7 100644 --- a/portas/portas/api/v1/environments.py +++ b/portas/portas/api/v1/environments.py @@ -1,10 +1,16 @@ +from amqplib.client_0_8 import Message +import anyjson +import eventlet from webob import exc +from portas.common import config from portas.api.v1 import get_env_status from portas.db.session import get_session from portas.db.models import Environment from portas.openstack.common import wsgi from portas.openstack.common import log as logging +amqp = eventlet.patcher.import_patched('amqplib.client_0_8') +rabbitmq = config.CONF.rabbitmq log = logging.getLogger(__name__) @@ -85,6 +91,24 @@ class Controller(object): with session.begin(): session.delete(environment) + #preparing data for removal from conductor + env = environment.description + env['services'] = [] + env['deleted'] = True + + connection = amqp.Connection('{0}:{1}'. + format(rabbitmq.host, rabbitmq.port), + virtual_host=rabbitmq.virtual_host, + userid=rabbitmq.userid, + password=rabbitmq.password, + ssl=rabbitmq.use_ssl, insist=True) + channel = connection.channel() + channel.exchange_declare('tasks', 'direct', durable=True, + auto_delete=False) + + channel.basic_publish(Message(body=anyjson.serialize(env)), 'tasks', + 'tasks') + return None diff --git a/portas/portas/api/v1/sessions.py b/portas/portas/api/v1/sessions.py index b1584d0..5fb30c0 100644 --- a/portas/portas/api/v1/sessions.py +++ b/portas/portas/api/v1/sessions.py @@ -1,7 +1,6 @@ from amqplib.client_0_8 import Message import anyjson import eventlet -from eventlet.semaphore import Semaphore from webob import exc from portas.common import config from portas.db.models import Session, Status, Environment @@ -15,18 +14,6 @@ log = logging.getLogger(__name__) class Controller(object): - def __init__(self): - self.write_lock = Semaphore(1) - connection = amqp.Connection('{0}:{1}'. - format(rabbitmq.host, rabbitmq.port), - virtual_host=rabbitmq.virtual_host, - userid=rabbitmq.userid, - password=rabbitmq.password, - ssl=rabbitmq.use_ssl, insist=True) - self.ch = connection.channel() - self.ch.exchange_declare('tasks', 'direct', durable=True, - auto_delete=False) - def index(self, request, environment_id): log.debug(_('Session:List '.format(environment_id))) @@ -121,14 +108,22 @@ class Controller(object): session.state = 'deploying' session.save(unit) - #Set X-Auth-Tokenconductor for conductor + #Set X-Auth-Token for conductor env = session.description env['token'] = request.context.auth_token - with self.write_lock: - self.ch.basic_publish(Message(body=anyjson. - serialize(env)), - 'tasks', 'tasks') + connection = amqp.Connection('{0}:{1}'. + format(rabbitmq.host, rabbitmq.port), + virtual_host=rabbitmq.virtual_host, + userid=rabbitmq.userid, + password=rabbitmq.password, + ssl=rabbitmq.use_ssl, insist=True) + channel = connection.channel() + channel.exchange_declare('tasks', 'direct', durable=True, + auto_delete=False) + + channel.basic_publish(Message(body=anyjson.serialize(env)), 'tasks', + 'tasks') def create_resource(): diff --git a/portas/portas/common/service.py b/portas/portas/common/service.py index 2d8f38e..238304e 100644 --- a/portas/portas/common/service.py +++ b/portas/portas/common/service.py @@ -55,7 +55,7 @@ class TaskResultHandlerService(service.Service): def handle_report(msg): log.debug(_('Got report message from orchestration engine:\n{0}'. - format(msg.body))) + format(msg.body))) params = anyjson.deserialize(msg.body) params['entity_id'] = params['id'] @@ -76,10 +76,16 @@ def handle_report(msg): def handle_result(msg): - log.debug(_('Got result message from orchestration engine:\n{0}'. - format(msg.body))) + log.debug(_('Got result message from ' + 'orchestration engine:\n{0}'.format(msg.body))) environment_result = anyjson.deserialize(msg.body) + if environment_result['deleted']: + log.debug(_('Result for environment {0} is dropped. ' + 'Environment is deleted'.format(environment_result['id']))) + + msg.channel.basic_ack(msg.delivery_tag) + return session = get_session() environment = session.query(Environment).get(environment_result['id'])