From 773dcc99267d5353e58617b8ee15304aa52ce24e Mon Sep 17 00:00:00 2001 From: Alexander Tivelkov Date: Tue, 30 Jul 2013 19:22:22 +0400 Subject: [PATCH] Conductor send all the occurred exceptions to the API Also, EnvironmentException thrown in the HeatExecutor now has a meaningful message Change-Id: I8f93bdbc50ce777793e95c2d236de220d4b5c6dd --- muranoconductor/app.py | 16 +++++++++++---- muranoconductor/commands/cloud_formation.py | 22 +++++++++++++++------ muranoconductor/commands/dispatcher.py | 7 ++++--- muranoconductor/commands/windows_agent.py | 3 ++- muranoconductor/reporting.py | 13 +++++++++++- tests/conductor/test_heat_commands.py | 9 ++++++--- 6 files changed, 52 insertions(+), 18 deletions(-) diff --git a/muranoconductor/app.py b/muranoconductor/app.py index 885ecfe..f1d173f 100644 --- a/muranoconductor/app.py +++ b/muranoconductor/app.py @@ -15,6 +15,7 @@ import glob import sys +import traceback import anyjson import eventlet @@ -76,6 +77,7 @@ class ConductorWorkflowService(service.Service): def _task_received(self, message): task = message.body or {} message_id = message.id + reporter = None with self.create_rmq_client() as mq: try: log.info('Starting processing task {0}: {1}'.format( @@ -83,8 +85,10 @@ class ConductorWorkflowService(service.Service): reporter = reporting.Reporter(mq, message_id, task['id']) config = Config() - command_dispatcher = CommandDispatcher( - 'e' + task['id'], mq, task['token'], task['tenant_id']) + command_dispatcher = CommandDispatcher('e' + task['id'], mq, + task['token'], + task['tenant_id'], + reporter) workflows = [] for path in glob.glob("data/workflows/*.xml"): log.debug('Loading XML {0}'.format(path)) @@ -106,10 +110,14 @@ class ConductorWorkflowService(service.Service): except Exception as ex: log.exception(ex) break - command_dispatcher.close() + except reporting.ReportedException as e: + log.exception("Exception has occurred and was reported to API") except Exception as e: - log.exception(e) + log.exception("Unexpected exception has occurred") + if reporter: + reporter.report_generic("Unexpected error has occurred", + e.message, 'error') finally: if 'token' in task: del task['token'] diff --git a/muranoconductor/commands/cloud_formation.py b/muranoconductor/commands/cloud_formation.py index 6bd67d5..1adfb37 100644 --- a/muranoconductor/commands/cloud_formation.py +++ b/muranoconductor/commands/cloud_formation.py @@ -12,9 +12,11 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. +import sys import anyjson import eventlet +from muranoconductor.reporting import ReportedException import types import jsonpath @@ -30,10 +32,11 @@ log = logging.getLogger(__name__) class HeatExecutor(CommandBase): - def __init__(self, stack, token, tenant_id): + def __init__(self, stack, token, tenant_id, reporter): self._update_pending_list = [] self._delete_pending_list = [] self._stack = stack + self._reporter = reporter settings = muranoconductor.config.CONF.heat client = ksclient.Client(endpoint=settings.auth_url) @@ -86,12 +89,18 @@ class HeatExecutor(CommandBase): }) def has_pending_commands(self): - return len(self._update_pending_list) + \ - len(self._delete_pending_list) > 0 + return len(self._update_pending_list) + len( + self._delete_pending_list) > 0 def execute_pending(self): - r1 = self._execute_pending_updates() - r2 = self._execute_pending_deletes() + try: + r1 = self._execute_pending_updates() + r2 = self._execute_pending_deletes() + except Exception as e: + self._reporter.report_generic("Unable to execute Heat command", + e.message, "error") + trace = sys.exc_info()[2] + raise ReportedException(e.message), None, trace return r1 or r2 def _execute_pending_updates(self): @@ -201,7 +210,8 @@ class HeatExecutor(CommandBase): eventlet.sleep(1) continue if status not in states: - raise EnvironmentError() + raise EnvironmentError( + "Unexpected state {0}".format(status)) try: return dict([(t['output_key'], t['output_value']) diff --git a/muranoconductor/commands/dispatcher.py b/muranoconductor/commands/dispatcher.py index 3701445..aa43762 100644 --- a/muranoconductor/commands/dispatcher.py +++ b/muranoconductor/commands/dispatcher.py @@ -19,11 +19,12 @@ import windows_agent class CommandDispatcher(command.CommandBase): - def __init__(self, environment, rmqclient, token, tenant_id): + def __init__(self, environment, rmqclient, token, tenant_id, reporter): self._command_map = { - 'cf': cloud_formation.HeatExecutor(environment, token, tenant_id), + 'cf': cloud_formation.HeatExecutor(environment, token, tenant_id, + reporter), 'agent': windows_agent.WindowsAgentExecutor( - environment, rmqclient) + environment, rmqclient, reporter) } def execute(self, name, **kwargs): diff --git a/muranoconductor/commands/windows_agent.py b/muranoconductor/commands/windows_agent.py index 9d68946..140e732 100644 --- a/muranoconductor/commands/windows_agent.py +++ b/muranoconductor/commands/windows_agent.py @@ -10,11 +10,12 @@ log = logging.getLogger(__name__) class WindowsAgentExecutor(CommandBase): - def __init__(self, stack, rmqclient): + def __init__(self, stack, rmqclient, reporter): self._stack = stack self._rmqclient = rmqclient self._pending_list = [] self._results_queue = '-execution-results-%s' % str(stack).lower() + self._reporter = reporter rmqclient.declare(self._results_queue) def execute(self, template, mappings, unit, service, callback): diff --git a/muranoconductor/reporting.py b/muranoconductor/reporting.py index 627caae..4f68906 100644 --- a/muranoconductor/reporting.py +++ b/muranoconductor/reporting.py @@ -24,11 +24,17 @@ class Reporter(object): self._environment_id = environment_id rmqclient.declare('task-reports') - def _report_func(self, id, entity, text, **kwargs): + def report_generic(self, text, details=None, level='info'): + return self._report_func(None, None, text, details, level) + + def _report_func(self, id, entity, text, details=None, level='info', + **kwargs): body = { 'id': id, 'entity': entity, 'text': text, + 'details': details, + 'level': level, 'environment_id': self._environment_id } @@ -45,4 +51,9 @@ def _report_func(context, id, entity, text, **kwargs): reporter = context['/reporter'] return reporter._report_func(id, entity, text, **kwargs) + +class ReportedException(Exception): + pass + + xml_code_engine.XmlCodeEngine.register_function(_report_func, "report") diff --git a/tests/conductor/test_heat_commands.py b/tests/conductor/test_heat_commands.py index 5c00f3a..d41d0cf 100644 --- a/tests/conductor/test_heat_commands.py +++ b/tests/conductor/test_heat_commands.py @@ -52,7 +52,8 @@ class TestHeatExecutor(unittest.TestCase): @mock.patch('muranoconductor.config.CONF') def test_create_stack(self, config_mock, ksclient_mock, heat_mock): self._init(config_mock, ksclient_mock) - executor = HeatExecutor('stack', 'token', 'tenant_id') + reporter = mock.MagicMock() + executor = HeatExecutor('stack', 'token', 'tenant_id', reporter) callback = mock.MagicMock() executor.execute( @@ -91,7 +92,8 @@ class TestHeatExecutor(unittest.TestCase): @mock.patch('muranoconductor.config.CONF') def test_update_stack(self, config_mock, ksclient_mock, heat_mock): self._init(config_mock, ksclient_mock) - executor = HeatExecutor('stack', 'token', 'tenant_id') + reporter = mock.MagicMock() + executor = HeatExecutor('stack', 'token', 'tenant_id', reporter) callback = mock.MagicMock() executor.execute( @@ -146,7 +148,8 @@ class TestHeatExecutor(unittest.TestCase): @mock.patch('muranoconductor.config.CONF') def test_delete_stack(self, config_mock, ksclient_mock, heat_mock): self._init(config_mock, ksclient_mock) - executor = HeatExecutor('stack', 'token', 'tenant_id') + reporter = mock.MagicMock() + executor = HeatExecutor('stack', 'token', 'tenant_id', reporter) callback = mock.MagicMock() executor.execute(