Support for vNext Execution Plans templates

Change-Id: I4fe191d9fb3012e82ee93ad1ffec3cfa8dfc7651
This commit is contained in:
Stan Lagun 2013-10-04 12:53:38 +04:00
parent 4ee5016006
commit d9c84894aa
9 changed files with 147 additions and 53 deletions

View File

@ -33,12 +33,12 @@
</report> </report>
</success> </success>
<failure> <failure>
<report entity="unit" level="error"> <report entity="unit" level="error">
<parameter name="id"><select path="id"/></parameter> <parameter name="id"><select path="id"/></parameter>
<parameter name="text">Unable to deploy instance <select path="state.hostname"/> (<select path="name"/>) due to <select source="exception" path="message" default="unknown Heat error"/> </parameter> <parameter name="text">Unable to deploy instance <select path="state.hostname"/> (<select path="name"/>) due to <format-error error="exception"/> </parameter>
</report> </report>
<stop/> <stop/>
</failure> </failure>
</update-cf-stack> </update-cf-stack>
</rule> </rule>
@ -66,7 +66,7 @@
<failure> <failure>
<report entity="unit" level="error"> <report entity="unit" level="error">
<parameter name="id"><select path="id"/></parameter> <parameter name="id"><select path="id"/></parameter>
<parameter name="text">Unable to install demo service on <select path="state.hostname"/> (<select path="name"/>) due to <select source="exception" path="0.messages.0" default="unknown Agent error"/> </parameter> <parameter name="text">Unable to install demo service on <select path="state.hostname"/> (<select path="name"/>) due to <format-error error="exception"/></parameter>
</report> </report>
<stop/> <stop/>
</failure> </failure>

View File

@ -68,7 +68,7 @@
<failure> <failure>
<report entity="unit" level="error"> <report entity="unit" level="error">
<parameter name="id"><select path="id"/></parameter> <parameter name="id"><select path="id"/></parameter>
<parameter name="text">Unable to assign address pair and open SQL ports on instance <select path="state.hostname"/> (<select path="name"/>) due to <select source="exception" path="message" default="unknown Heat error"/> </parameter> <parameter name="text">Unable to assign address pair and open SQL ports on instance <select path="state.hostname"/> (<select path="name"/>) due to <format-error error="exception"/></parameter>
</report> </report>
<stop/> <stop/>
</failure> </failure>

View File

@ -66,7 +66,7 @@
<failure> <failure>
<report entity="unit" level="error"> <report entity="unit" level="error">
<parameter name="id"><select path="id"/></parameter> <parameter name="id"><select path="id"/></parameter>
<parameter name="text">Unable to open SQL ports on instance <select path="state.hostname"/> (<select path="name"/>) due to <select source="exception" path="message" default="unknown Heat error"/> </parameter> <parameter name="text">Unable to open SQL ports on instance <select path="state.hostname"/> (<select path="name"/>) due to <format-error error="exception"/></parameter>
</report> </report>
<stop/> <stop/>
</failure> </failure>

View File

@ -29,7 +29,7 @@ from muranocommon.messaging import MqClient, Message
from muranoconductor import config as cfg from muranoconductor import config as cfg
from muranocommon.helpers.token_sanitizer import TokenSanitizer from muranocommon.helpers.token_sanitizer import TokenSanitizer
import windows_agent import vm_agent
import cloud_formation import cloud_formation
log = logging.getLogger(__name__) log = logging.getLogger(__name__)

View File

@ -15,7 +15,7 @@
import command import command
import cloud_formation import cloud_formation
import windows_agent import vm_agent
class CommandDispatcher(command.CommandBase): class CommandDispatcher(command.CommandBase):
@ -23,7 +23,7 @@ class CommandDispatcher(command.CommandBase):
self._command_map = { self._command_map = {
'cf': cloud_formation.HeatExecutor(environment, token, tenant_id, 'cf': cloud_formation.HeatExecutor(environment, token, tenant_id,
reporter), reporter),
'agent': windows_agent.WindowsAgentExecutor( 'agent': vm_agent.VmAgentExecutor(
environment, rmqclient, reporter) environment, rmqclient, reporter)
} }

View File

@ -1,6 +1,7 @@
import json
import uuid import uuid
import yaml
import os import os
import types
from muranoconductor.openstack.common import log as logging from muranoconductor.openstack.common import log as logging
from muranocommon.messaging import Message from muranocommon.messaging import Message
@ -11,7 +12,7 @@ from muranocommon.helpers.token_sanitizer import TokenSanitizer
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class WindowsAgentExecutor(CommandBase): class VmAgentExecutor(CommandBase):
def __init__(self, stack, rmqclient, reporter): def __init__(self, stack, rmqclient, reporter):
self._stack = stack self._stack = stack
self._rmqclient = rmqclient self._rmqclient = rmqclient
@ -23,15 +24,16 @@ class WindowsAgentExecutor(CommandBase):
def execute(self, template, mappings, unit, service, callback, def execute(self, template, mappings, unit, service, callback,
timeout=None): timeout=None):
template_path = 'data/templates/agent/%s.template' % template template_path = 'data/templates/agent/%s.template' % template
with open(template_path) as t_file: #with open(template_path) as t_file:
template_data = t_file.read() # template_data = t_file.read()
#
#json_template = json.loads(template_data)
#json_template = self.encode_scripts(json_template, template_path)
template, msg_id = self.build_execution_plan(template_path)
json_template = json.loads(template_data) template = muranoconductor.helpers.transform_json(
json_template = self.encode_scripts(json_template, template_path) template, mappings)
template_data = muranoconductor.helpers.transform_json(
json_template, mappings)
msg_id = str(uuid.uuid4()).lower()
queue = ('%s-%s-%s' % (self._stack, service, unit)).lower() queue = ('%s-%s-%s' % (self._stack, service, unit)).lower()
self._pending_list.append({ self._pending_list.append({
'id': msg_id, 'id': msg_id,
@ -40,16 +42,28 @@ class WindowsAgentExecutor(CommandBase):
}) })
msg = Message() msg = Message()
msg.body = template_data msg.body = template
msg.id = msg_id msg.id = msg_id
self._rmqclient.declare(queue) self._rmqclient.declare(queue)
self._rmqclient.send(message=msg, key=queue) self._rmqclient.send(message=msg, key=queue)
log.info('Sending RMQ message {0} to {1} with id {2}'.format( log.info('Sending RMQ message {0} to {1} with id {2}'.format(
TokenSanitizer().sanitize(template_data), queue, msg_id)) TokenSanitizer().sanitize(template), queue, msg_id))
def encode_scripts(self, json_data, template_path): def build_execution_plan(self, path):
scripts_folder = 'data/templates/agent/scripts' with open(path) as stream:
script_files = json_data.get("Scripts", []) template = yaml.load(stream)
if not isinstance(template, types.DictionaryType):
raise ValueError('Incorrect execution plan ' + path)
format_version = template.get('FormatVersion')
if not format_version or format_version.startswith('1.'):
return self._build_v1_execution_plan(template, path)
else:
return self._build_v2_execution_plan(template, path)
def _build_v1_execution_plan(self, template, path):
scripts_folder = os.path.join(
os.path.dirname(path), 'scripts')
script_files = template.get('Scripts', [])
scripts = [] scripts = []
for script in script_files: for script in script_files:
script_path = os.path.join(scripts_folder, script) script_path = os.path.join(scripts_folder, script)
@ -57,8 +71,56 @@ class WindowsAgentExecutor(CommandBase):
with open(script_path) as script_file: with open(script_path) as script_file:
script_data = script_file.read() script_data = script_file.read()
scripts.append(script_data.encode('base64')) scripts.append(script_data.encode('base64'))
json_data["Scripts"] = scripts template['Scripts'] = scripts
return json_data return template, uuid.uuid4().hex
def _build_v2_execution_plan(self, template, path):
scripts_folder = os.path.join(
os.path.dirname(path), 'scripts')
plan_id = uuid.uuid4().hex
template['ID'] = plan_id
if 'Action' not in template:
template['Action'] = 'Execute'
if 'Files' not in template:
template['Files'] = {}
files = {}
for file_id, file_descr in template['Files'].items():
files[file_descr['Name']] = file_id
for name, script in template.get('Scripts', {}).items():
if 'EntryPoint' not in script:
raise ValueError('No entry point in script ' + name)
script['EntryPoint'] = self._place_file(
scripts_folder, script['EntryPoint'], template, files)
if 'Files' in script:
for i in range(0, len(script['Files'])):
script['Files'][i] = self._place_file(
scripts_folder, script['Files'][i], template, files)
return template, plan_id
def _place_file(self, folder, name, template, files):
use_base64 = False
if name.startswith('<') and name.endswith('>'):
use_base64 = True
name = name[1:len(name) - 1]
if name in files:
return files[name]
file_id = uuid.uuid4().hex
body_type = 'Base64' if use_base64 else 'Text'
with open(os.path.join(folder, name)) as stream:
body = stream.read()
if use_base64:
body = body.encode('base64')
template['Files'][file_id] = {
'Name': name,
'BodyType': body_type,
'Body': body
}
files[name] = file_id
return file_id
def has_pending_commands(self): def has_pending_commands(self):
return len(self._pending_list) > 0 return len(self._pending_list) > 0
@ -86,7 +148,7 @@ class WindowsAgentExecutor(CommandBase):
msg = subscription.get_message(timeout=timeout) msg = subscription.get_message(timeout=timeout)
if msg: if msg:
msg.ack() msg.ack()
msg_id = msg.id.lower() msg_id = msg.body.get('SourceID', msg.id)
item, index = muranoconductor.helpers.find( item, index = muranoconductor.helpers.find(
lambda t: t['id'] == msg_id, self._pending_list) lambda t: t['id'] == msg_id, self._pending_list)
if item: if item:

View File

@ -14,8 +14,8 @@
# limitations under the License. # limitations under the License.
import os.path import os.path
import datetime import datetime
from muranoconductor.commands.windows_agent import AgentTimeoutException from muranoconductor.commands.vm_agent import AgentTimeoutException
from muranoconductor.commands.windows_agent import UnhandledAgentException from muranoconductor.commands.vm_agent import UnhandledAgentException
import xml_code_engine import xml_code_engine
@ -24,6 +24,53 @@ from openstack.common import log as logging
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
def _extract_results(result_value, ok, errors):
if isinstance(result_value, AgentTimeoutException):
errors.append({
'source': 'timeout',
'message': result_value.message,
'timeout': result_value.timeout,
'timestamp': datetime.datetime.now().isoformat()
})
elif isinstance(result_value, dict):
if result_value.get('FormatVersion', '1.0.0').startswith('1.'):
_extract_v1_results(result_value, ok, errors)
else:
_extract_v2_results(result_value, ok, errors)
def _extract_v1_results(result_value, ok, errors):
if result_value['IsException']:
errors.append(dict(_get_exception_info(
result_value.get('Result', [])), source='execution_plan'))
else:
for res in result_value.get('Result', []):
if res['IsException']:
errors.append(dict(_get_exception_info(
res.get('Result', [])), source='command'))
else:
ok.append(res)
def _extract_v2_results(result_value, ok, errors):
error_code = result_value.get('ErrorCode', 0)
if not error_code:
ok.append(result_value.get('Body'))
else:
body = result_value.get('Body') or {}
err = {
'message': body.get('Message'),
'details': body.get('AdditionalInfo'),
'errorCode': error_code,
'time': result_value.get('Time')
}
for attr in ('Message', 'AdditionalInfo'):
if attr in body:
del attr[body]
err['extra'] = body if body else None
errors.append(err)
def send_command(engine, context, body, template, service, unit, def send_command(engine, context, body, template, service, unit,
mappings=None, result=None, error=None, timeout=None, mappings=None, result=None, error=None, timeout=None,
osVersion=None, **kwargs): osVersion=None, **kwargs):
@ -41,24 +88,7 @@ def send_command(engine, context, body, template, service, unit,
template, result_value, unit)) template, result_value, unit))
ok = [] ok = []
errors = [] errors = []
if isinstance(result_value, AgentTimeoutException): _extract_results(result_value, ok, errors)
errors.append({
'source': 'timeout',
'message': result_value.message,
'timeout': result_value.timeout,
'timestamp': datetime.datetime.now().isoformat()
})
else:
if result_value['IsException']:
errors.append(dict(_get_exception_info(
result_value.get('Result', [])), source='execution_plan'))
else:
for res in result_value.get('Result', []):
if res['IsException']:
errors.append(dict(_get_exception_info(
res.get('Result', [])), source='command'))
else:
ok.append(res)
if ok: if ok:
if result is not None: if result is not None:

View File

@ -12,3 +12,4 @@ netaddr
oslo.config oslo.config
deep deep
murano-common>=0.2.2 murano-common>=0.2.2
PyYAML>=3.1.0

View File

@ -18,10 +18,10 @@ import mock
import mockfs import mockfs
import json import json
from muranoconductor.commands.windows_agent import WindowsAgentExecutor from muranoconductor.commands.vm_agent import VmAgentExecutor
class TestWindowsAgent(unittest.TestCase): class TestVmAgent(unittest.TestCase):
def setUp(self): def setUp(self):
self.mfs = mockfs.replace_builtins() self.mfs = mockfs.replace_builtins()
self.template = { self.template = {
@ -53,8 +53,9 @@ class TestWindowsAgent(unittest.TestCase):
reporter = mock.MagicMock() reporter = mock.MagicMock()
rmqclient.declare = mock.Mock() rmqclient.declare = mock.Mock()
executor = WindowsAgentExecutor(stack, rmqclient, reporter) executor = VmAgentExecutor(stack, rmqclient, reporter)
result = executor.encode_scripts(self.template, self.template_path) result, plan_id = executor.build_execution_plan(
self.template_path)
encoded = [ encoded = [
'ZnVuY3Rpb24gR2V0RE5TaXAoKXsKdGVzdAp9Cg==\n', 'ZnVuY3Rpb24gR2V0RE5TaXAoKXsKdGVzdAp9Cg==\n',
'ZnVuY3Rpb24gSm9pbkRvbWFpbigpewp0ZXN0Cn0K\n' 'ZnVuY3Rpb24gSm9pbkRvbWFpbigpewp0ZXN0Cn0K\n'