Merge "add test_cluster.py and partial test_host.py" into dev/experimental

This commit is contained in:
Jenkins 2014-09-30 20:16:38 +00:00 committed by Gerrit Code Review
commit 5f1717cac8
4 changed files with 2345 additions and 5 deletions

View File

@ -91,10 +91,10 @@ RESP_CLUSTERHOST_STATE_FIELDS = [
'created_at', 'updated_at'
]
RESP_REVIEW_FIELDS = [
'cluster', 'clusterhosts'
'cluster', 'hosts'
]
RESP_DEPLOY_FIELDS = [
'status', 'cluster', 'clusterhosts'
'status', 'cluster', 'hosts'
]
IGNORE_FIELDS = ['id', 'created_at', 'updated_at']
ADDED_FIELDS = ['name', 'adapter_id', 'os_id']
@ -779,7 +779,7 @@ def patch_clusterhost(
):
"""Update cluster host."""
clusterhost = utils.get_db_object(
session, models.Cluster, clusterhost_id=clusterhost_id
session, models.ClusterHost, clusterhost_id=clusterhost_id
)
return _update_clusterhost(session, updater, clusterhost, **kwargs)
@ -1261,7 +1261,7 @@ def validate_cluster(session, cluster):
@utils.wrap_to_dict(
RESP_REVIEW_FIELDS,
cluster=RESP_CONFIG_FIELDS,
clusterhosts=RESP_CLUSTERHOST_CONFIG_FIELDS
hosts=RESP_CLUSTERHOST_CONFIG_FIELDS
)
def review_cluster(session, reviewer, cluster_id, review={}, **kwargs):
"""review cluster."""
@ -1335,7 +1335,7 @@ def review_cluster(session, reviewer, cluster_id, review={}, **kwargs):
@utils.wrap_to_dict(
RESP_DEPLOY_FIELDS,
cluster=RESP_CONFIG_FIELDS,
clusterhosts=RESP_CLUSTERHOST_FIELDS
hosts=RESP_CLUSTERHOST_FIELDS
)
def deploy_cluster(
session, deployer, cluster_id, deploy={}, **kwargs

View File

@ -599,6 +599,15 @@ class ClusterHost(BASE, TimestampMixin, HelperMixin):
host = self.host
host.put_os_config = value
@property
def deployed_os_config(self):
return self.host.deployed_os_config
@deployed_os_config.setter
def deployed_os_config(self, value):
host = self.host
host.deployed_os_config = value
@hybrid_property
def distributed_system_name(self):
return self.cluster.distributed_system_name

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,353 @@
# Copyright 2014 Huawei Technologies Co. Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import logging
import os
import sys
import unittest2
os.environ['COMPASS_IGNORE_SETTING'] = 'true'
from compass.utils import setting_wrapper as setting
reload(setting)
from base import BaseTest
from compass.db.api import adapter as adapter_api
from compass.db.api import adapter_holder as adapter
from compass.db.api import cluster
from compass.db.api import database
from compass.db.api import host
from compass.db.api import machine
from compass.db.api import metadata as metadata_api
from compass.db.api import metadata_holder as metadata
from compass.db.api import network
from compass.db.api import switch
from compass.db.api import user as user_api
from compass.db import exception
from compass.utils import flags
from compass.utils import logsetting
from compass.utils import util
class HostTestCase(unittest2.TestCase):
"""Host case test case."""
def setUp(self):
super(HostTestCase, self).setUp()
reload(setting)
setting.CONFIG_DIR = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
'data'
)
database.init('sqlite://')
database.create_db()
adapter.load_adapters()
metadata.load_metadatas()
self.user_object = (
user_api.get_user_object(
setting.COMPASS_ADMIN_EMAIL
)
)
# get adapter information
list_adapters = adapter.list_adapters(self.user_object)
for list_adapter in list_adapters:
for supported_os in list_adapter['supported_oses']:
self.os_id = supported_os['os_id']
break
if list_adapter['flavors']:
details = list_adapter['flavors']
for detail in details:
if detail['display_name'] == 'allinone':
roles = detail['roles']
for role in roles:
self.adapter_id = role['adapter_id']
self.flavor_id = role['flavor_id']
break
# add cluster
cluster_names = ['test_cluster1', 'test_cluster2']
for cluster_name in cluster_names:
cluster.add_cluster(
self.user_object,
adapter_id=self.adapter_id,
os_id=self.os_id,
flavor_id=self.flavor_id,
name=cluster_name
)
clusters = cluster.list_clusters(self.user_object)
self.roles = None
for list_cluster in clusters:
for item in list_cluster['flavor']['roles']:
self.roles = item
if list_cluster['name'] == 'test_cluster1':
self.cluster_id = list_cluster['id']
break
# add switch
switch.add_switch(
self.user_object,
ip='172.29.8.40'
)
switches = switch.list_switches(self.user_object)
self.switch_id = None
for item in switches:
self.switch_id = item['id']
macs = ['28:6e:d4:46:c4:25', '00:0c:29:bf:eb:1d']
for mac in macs:
switch.add_switch_machine(
self.user_object,
self.switch_id,
mac=mac,
port='1'
)
# get machine information
machines = machine.list_machines(self.user_object)
self.machine_ids = []
for item in machines:
self.machine_ids.append(item['id'])
# add cluster host
name = ['newname1', 'newname2']
for i in range(0, 2):
cluster.add_cluster_host(
self.user_object,
self.cluster_id,
machine_id=self.machine_ids[i],
name=name[i]
)
self.host_ids = []
clusterhosts = cluster.list_clusterhosts(self.user_object)
for clusterhost in clusterhosts:
self.host_ids.append(clusterhost['host_id'])
# add subnet
subnets = ['10.145.88.0/23', '192.168.100.0/23']
for subnet in subnets:
network.add_subnet(
self.user_object,
subnet=subnet
)
list_subnet = network.list_subnets(
self.user_object
)
self.subnet_ids = []
for item in list_subnet:
self.subnet_ids.append(item['id'])
# add host network
host.add_host_network(
self.user_object,
self.host_ids[0],
interface='eth0',
ip='10.145.88.0',
subnet_id=self.subnet_ids[0],
is_mgmt=True
)
host.add_host_network(
self.user_object,
self.host_ids[1],
interface='eth1',
ip='192.168.100.0',
subnet_id=self.subnet_ids[1],
is_promiscuous=True
)
class TestListHosts(HostTestCase):
"""Test list hosts."""
def setUp(self):
super(TestListHosts, self).setUp()
def tearDown(self):
super(TestListHosts, self).tearDown()
def test_list_hosts(self):
list_hosts = host.list_hosts(
self.user_object
)
result = []
for list_host in list_hosts:
result.append(list_host['name'])
for item in result:
self.assertIn(item, ['newname1', 'newname2'])
class TestListMachinesOrHosts(HostTestCase):
"""Test list machines or hosts."""
def setUp(self):
super(TestListMachinesOrHosts, self).setUp()
def tearDown(self):
super(TestListMachinesOrHosts, self).tearDown()
def test_list__hosts(self):
list_hosts = host.list_machines_or_hosts(
self.user_object
)
result = []
for list_host in list_hosts:
result.append(list_host['name'])
for item in result:
self.assertIn(item, ['newname1', 'newname2'])
def test_list_machines(self):
host.del_host(
self.user_object,
self.host_ids[0]
)
host.del_host(
self.user_object,
self.host_ids[1]
)
list_hosts = host.list_machines_or_hosts(
self.user_object
)
macs = []
names = []
for list_host in list_hosts:
for k, v in list_host.iteritems():
if k == 'mac':
macs.append(v)
if k == 'name':
names.append(v)
for mac in macs:
self.assertIn(mac, ['28:6e:d4:46:c4:25', '00:0c:29:bf:eb:1d'])
self.assertEqual(names, [])
class TestGetHost(HostTestCase):
"""Test get host."""
def setUp(self):
super(TestGetHost, self).setUp()
def tearDown(self):
super(TestGetHost, self).tearDown()
def test_get_host(self):
get_host = host.get_host(
self.user_object,
self.host_ids[0]
)
self.assertIsNotNone(get_host)
self.assertEqual(get_host['mac'], '28:6e:d4:46:c4:25')
class TestGetMachineOrHost(HostTestCase):
"""Test get machine or host."""
def setUp(self):
super(TestGetMachineOrHost, self).setUp()
def tearDown(self):
super(TestGetMachineOrHost, self).tearDown()
def test_get_host(self):
get_host = host.get_machine_or_host(
self.user_object,
self.host_ids[0]
)
self.assertIsNotNone(get_host)
self.assertEqual(get_host['mac'], '28:6e:d4:46:c4:25')
def test_get_machine(self):
host.del_host(
self.user_object,
self.host_ids[0]
)
get_machine = host.get_machine_or_host(
self.user_object,
self.host_ids[0]
)
name = []
for k, v in get_machine.items():
if k == 'name':
name.append(v)
self.assertEqual(name, [])
self.assertEqual(get_machine['mac'], '28:6e:d4:46:c4:25')
class TestGetHostClusters(HostTestCase):
"""Test get host clusters."""
def setUp(self):
super(TestGetHostClusters, self).setUp()
def tearDown(self):
super(TestGetHostClusters, self).tearDown()
def test_get_host_clusters(self):
host_clusters = host.get_host_clusters(
self.user_object,
self.host_ids[0]
)
name = None
for item in host_clusters:
name = item['name']
self.assertEqual(name, 'test_cluster1')
class TestUpdateHost(HostTestCase):
"""Test update host."""
def setUp(self):
super(TestUpdateHost, self).setUp()
def tearDown(self):
super(TestUpdateHost, self).tearDown()
def test_update_host(self):
host.update_host(
self.user_object,
self.host_ids[0],
name='update_test_name'
)
update_host = host.get_host(
self.user_object,
self.host_ids[0]
)
self.assertEqual(update_host['name'], 'update_test_name')
def test_is_host_etitable(self):
host.update_host_state(
self.user_object,
self.host_ids[0],
state='INSTALLING'
)
self.assertRaises(
exception.Forbidden,
host.update_host,
self.user_object,
self.host_ids[0],
name='invalid'
)
def test_invalid_parameter(self):
self.assertRaises(
exception.InvalidParameter,
host.update_host,
self.user_object,
self.host_ids[1],
name='newname1'
)
if __name__ == '__main__':
flags.init()
logsetting.init()
unittest2.main()