Workflow unit tests

This commit is contained in:
Stan Lagun 2013-04-10 12:57:05 +04:00
parent f50bda351e
commit 2cc13744c0
3 changed files with 145 additions and 2 deletions

View File

@ -0,0 +1,42 @@
{
"name": "MyDataCenter",
"id": "adc6d143f9584d10808c7ef4d07e4802",
"token": "token",
"services": {
"activeDirectories": [
{
"id": "9571747991184642B95F430A014616F9",
"domain": "acme.loc",
"adminPassword": "SuperP@ssw0rd!",
"units": [
{
"id": "273c9183b6e74c9c9db7fdd532c5eb25",
"name": "dc01",
"isMaster": true,
"recoveryPassword": "2SuperP@ssw0rd2"
},
{
"id": "377c6f16d17a416791f80724dab360c6",
"name": "dc02",
"isMaster": false,
"adminPassword": "SuperP@ssw0rd",
"recoveryPassword": "2SuperP@ssw0rd2"
}
]
}
],
"webServers": [
{
"id": "e9657ceef84a4e669e31795040080262",
"domain": "acme.loc",
"units": [
{
"id": "e6f9cfd07ced48fba64e6bd9e65aba64",
"name": "iis01",
"adminPassword": "SuperP@ssw0rd"
}
]
}
]
}
}

View File

@ -0,0 +1,99 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os
import unittest
import deep
import mock
import mockfs
from conductor.workflow import Workflow
def load_sample(name):
with mockfs.storage.original_open(os.path.join(
os.path.dirname(__file__),
'sample_data',
name)) as sample_file:
return sample_file.read()
class TestWorkflow(unittest.TestCase):
def setUp(self):
self.mfs = mockfs.replace_builtins()
self.model = json.loads(load_sample('objectModel1.json'))
self.original_model = json.loads(load_sample('objectModel1.json'))
def tearDown(self):
mockfs.restore_builtins()
def _execute_workflow(self, xml):
self.mfs.add_entries({'test': xml})
stub = mock.MagicMock()
stub.side_effect = RuntimeError
workflow = Workflow('test', self.model, stub, stub, stub)
workflow.execute()
def test_empty_workflow_leaves_object_model_unchanged(self):
xml = '<workflow/>'
self._execute_workflow(xml)
self.assertIsNone(deep.diff(self.original_model, self.model))
def test_modifying_object_model_from_workflow(self):
xml = '''
<workflow>
<rule match="$.services[*][?(@.id ==
'9571747991184642B95F430A014616F9'
and not @.state.invalid)]">
<set path="state.invalid">value</set>
</rule>
</workflow>
'''
self.assertNotIn(
'state',
self.model['services']['activeDirectories'][0])
self._execute_workflow(xml)
self.assertEqual(
self.model['services']['activeDirectories'][0]['state']['invalid'],
'value')
self.assertIsNotNone(deep.diff(self.original_model, self.model))
del self.model['services']['activeDirectories'][0]['state']
self.assertIsNone(deep.diff(self.original_model, self.model))
def test_selecting_properties_from_object_model_within_workflow(self):
xml = '''
<workflow>
<rule match="$.services[*][?(@.id ==
'9571747991184642B95F430A014616F9'
and not @.test)]">
<set path="test">
Domain <select
path="domain"/> with primary DC <select
path="units.0.name"/>
</set>
</rule>
</workflow>
'''
self._execute_workflow(xml)
self.assertEqual(
self.model['services']['activeDirectories'][0]['test'],
'Domain acme.loc with primary DC dc01')

View File

@ -1,8 +1,10 @@
unittest2
mock==0.8.0
mock
nose
nose-exclude
nosexcover
#openstack.nose_plugin
pep8==1.0.1
pep8
sphinx>=1.1.2
mockfs
deep