Conductor send all the occurred exceptions to the API

Also, EnvironmentException thrown in the HeatExecutor now has a meaningful message

Change-Id: I8f93bdbc50ce777793e95c2d236de220d4b5c6dd
This commit is contained in:
Alexander Tivelkov 2013-07-30 19:22:22 +04:00
parent 7ec019f7bb
commit 773dcc9926
6 changed files with 52 additions and 18 deletions

View File

@ -15,6 +15,7 @@
import glob
import sys
import traceback
import anyjson
import eventlet
@ -76,6 +77,7 @@ class ConductorWorkflowService(service.Service):
def _task_received(self, message):
task = message.body or {}
message_id = message.id
reporter = None
with self.create_rmq_client() as mq:
try:
log.info('Starting processing task {0}: {1}'.format(
@ -83,8 +85,10 @@ class ConductorWorkflowService(service.Service):
reporter = reporting.Reporter(mq, message_id, task['id'])
config = Config()
command_dispatcher = CommandDispatcher(
'e' + task['id'], mq, task['token'], task['tenant_id'])
command_dispatcher = CommandDispatcher('e' + task['id'], mq,
task['token'],
task['tenant_id'],
reporter)
workflows = []
for path in glob.glob("data/workflows/*.xml"):
log.debug('Loading XML {0}'.format(path))
@ -106,10 +110,14 @@ class ConductorWorkflowService(service.Service):
except Exception as ex:
log.exception(ex)
break
command_dispatcher.close()
except reporting.ReportedException as e:
log.exception("Exception has occurred and was reported to API")
except Exception as e:
log.exception(e)
log.exception("Unexpected exception has occurred")
if reporter:
reporter.report_generic("Unexpected error has occurred",
e.message, 'error')
finally:
if 'token' in task:
del task['token']

View File

@ -12,9 +12,11 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import anyjson
import eventlet
from muranoconductor.reporting import ReportedException
import types
import jsonpath
@ -30,10 +32,11 @@ log = logging.getLogger(__name__)
class HeatExecutor(CommandBase):
def __init__(self, stack, token, tenant_id):
def __init__(self, stack, token, tenant_id, reporter):
self._update_pending_list = []
self._delete_pending_list = []
self._stack = stack
self._reporter = reporter
settings = muranoconductor.config.CONF.heat
client = ksclient.Client(endpoint=settings.auth_url)
@ -86,12 +89,18 @@ class HeatExecutor(CommandBase):
})
def has_pending_commands(self):
return len(self._update_pending_list) + \
len(self._delete_pending_list) > 0
return len(self._update_pending_list) + len(
self._delete_pending_list) > 0
def execute_pending(self):
r1 = self._execute_pending_updates()
r2 = self._execute_pending_deletes()
try:
r1 = self._execute_pending_updates()
r2 = self._execute_pending_deletes()
except Exception as e:
self._reporter.report_generic("Unable to execute Heat command",
e.message, "error")
trace = sys.exc_info()[2]
raise ReportedException(e.message), None, trace
return r1 or r2
def _execute_pending_updates(self):
@ -201,7 +210,8 @@ class HeatExecutor(CommandBase):
eventlet.sleep(1)
continue
if status not in states:
raise EnvironmentError()
raise EnvironmentError(
"Unexpected state {0}".format(status))
try:
return dict([(t['output_key'], t['output_value'])

View File

@ -19,11 +19,12 @@ import windows_agent
class CommandDispatcher(command.CommandBase):
def __init__(self, environment, rmqclient, token, tenant_id):
def __init__(self, environment, rmqclient, token, tenant_id, reporter):
self._command_map = {
'cf': cloud_formation.HeatExecutor(environment, token, tenant_id),
'cf': cloud_formation.HeatExecutor(environment, token, tenant_id,
reporter),
'agent': windows_agent.WindowsAgentExecutor(
environment, rmqclient)
environment, rmqclient, reporter)
}
def execute(self, name, **kwargs):

View File

@ -10,11 +10,12 @@ log = logging.getLogger(__name__)
class WindowsAgentExecutor(CommandBase):
def __init__(self, stack, rmqclient):
def __init__(self, stack, rmqclient, reporter):
self._stack = stack
self._rmqclient = rmqclient
self._pending_list = []
self._results_queue = '-execution-results-%s' % str(stack).lower()
self._reporter = reporter
rmqclient.declare(self._results_queue)
def execute(self, template, mappings, unit, service, callback):

View File

@ -24,11 +24,17 @@ class Reporter(object):
self._environment_id = environment_id
rmqclient.declare('task-reports')
def _report_func(self, id, entity, text, **kwargs):
def report_generic(self, text, details=None, level='info'):
return self._report_func(None, None, text, details, level)
def _report_func(self, id, entity, text, details=None, level='info',
**kwargs):
body = {
'id': id,
'entity': entity,
'text': text,
'details': details,
'level': level,
'environment_id': self._environment_id
}
@ -45,4 +51,9 @@ def _report_func(context, id, entity, text, **kwargs):
reporter = context['/reporter']
return reporter._report_func(id, entity, text, **kwargs)
class ReportedException(Exception):
pass
xml_code_engine.XmlCodeEngine.register_function(_report_func, "report")

View File

@ -52,7 +52,8 @@ class TestHeatExecutor(unittest.TestCase):
@mock.patch('muranoconductor.config.CONF')
def test_create_stack(self, config_mock, ksclient_mock, heat_mock):
self._init(config_mock, ksclient_mock)
executor = HeatExecutor('stack', 'token', 'tenant_id')
reporter = mock.MagicMock()
executor = HeatExecutor('stack', 'token', 'tenant_id', reporter)
callback = mock.MagicMock()
executor.execute(
@ -91,7 +92,8 @@ class TestHeatExecutor(unittest.TestCase):
@mock.patch('muranoconductor.config.CONF')
def test_update_stack(self, config_mock, ksclient_mock, heat_mock):
self._init(config_mock, ksclient_mock)
executor = HeatExecutor('stack', 'token', 'tenant_id')
reporter = mock.MagicMock()
executor = HeatExecutor('stack', 'token', 'tenant_id', reporter)
callback = mock.MagicMock()
executor.execute(
@ -146,7 +148,8 @@ class TestHeatExecutor(unittest.TestCase):
@mock.patch('muranoconductor.config.CONF')
def test_delete_stack(self, config_mock, ksclient_mock, heat_mock):
self._init(config_mock, ksclient_mock)
executor = HeatExecutor('stack', 'token', 'tenant_id')
reporter = mock.MagicMock()
executor = HeatExecutor('stack', 'token', 'tenant_id', reporter)
callback = mock.MagicMock()
executor.execute(