59aaac0cb5
This change introduces a set of functions for dynamically populating the lxc_hosts function on each inventory run. Any previous membership is overwritten each time. The output is written to the openstack_inventory.json still in order to provide parity with the values provided to the ansible executable on stdout. Previous code was fairly naive, inserting hosts into lxc_hosts when they were marked as `is_metal`. This would add hosts, such as Ceph, that were on metal but had no LXC containers. A previous attempt at fixing this changed the _build_container_hosts function to provide more information about the container build process, then used that data in the _append_host_containers function to populate the lxc_hosts group. However, this approach failed due to limited information in each pass of the loop - if a node was an AIO, it might be erroneously removed from the lxc_hosts group because a container wasn't built on a given pass, and due to ordering, that pass may be the last one of the loop. To get around such problems, this code instead processes the inventory in whole, after all containers have been made. Population into the group is determined according to whether or not a given host's `physical_host` hostvar matches the host name. Change-Id: I9f3336f77cd0ef05fe1c7edeaf7defc6d93c3111 Closes-Bug: #1660996
1390 lines
46 KiB
Python
1390 lines
46 KiB
Python
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import collections
|
|
import copy
|
|
import json
|
|
import mock
|
|
import os
|
|
from os import path
|
|
import Queue
|
|
import sys
|
|
import unittest
|
|
import warnings
|
|
import yaml
|
|
|
|
INV_DIR = 'playbooks/inventory'
|
|
LIB_DIR = 'lib'
|
|
|
|
sys.path.append(path.join(os.getcwd(), LIB_DIR))
|
|
sys.path.append(path.join(os.getcwd(), INV_DIR))
|
|
|
|
import dictutils
|
|
import dynamic_inventory
|
|
import filesystem as fs
|
|
import generate as di
|
|
import tools
|
|
|
|
TARGET_DIR = path.join(os.getcwd(), 'tests', 'inventory')
|
|
BASE_ENV_DIR = INV_DIR
|
|
CONFIGS_DIR = path.join(os.getcwd(), 'etc', 'openstack_deploy')
|
|
CONFD = os.path.join(CONFIGS_DIR, 'conf.d')
|
|
AIO_CONFIG_FILE = path.join(CONFIGS_DIR, 'openstack_user_config.yml.aio')
|
|
USER_CONFIG_FILE = path.join(TARGET_DIR, 'openstack_user_config.yml')
|
|
|
|
# These files will be placed in TARGET_DIR by the inventory functions
|
|
# They should be cleaned up between each test.
|
|
CLEANUP = [
|
|
'openstack_inventory.json',
|
|
'openstack_hostnames_ips.yml',
|
|
'backup_openstack_inventory.tar'
|
|
]
|
|
|
|
# Base config is a global configuration accessible for convenience.
|
|
# It should *not* be mutated outside of setUpModule, which populates it.
|
|
_BASE_CONFIG = {}
|
|
|
|
|
|
def get_config():
|
|
"""Return a copy of the original config so original isn't modified."""
|
|
global _BASE_CONFIG
|
|
return copy.deepcopy(_BASE_CONFIG)
|
|
|
|
|
|
def make_config():
|
|
"""Build an inventory configuration from the sample AIO files.
|
|
|
|
Take any files specified as '.aio' and load their keys into a
|
|
configuration dict and write them out to a file for consumption by
|
|
the tests.
|
|
"""
|
|
# Allow access here so we can populate the dictionary.
|
|
global _BASE_CONFIG
|
|
|
|
_BASE_CONFIG = tools.make_example_config(AIO_CONFIG_FILE, CONFD)
|
|
|
|
tools.write_example_config(USER_CONFIG_FILE, _BASE_CONFIG)
|
|
|
|
|
|
def setUpModule():
|
|
# The setUpModule function is used by the unittest framework.
|
|
make_config()
|
|
|
|
|
|
def tearDownModule():
|
|
# This file should only be removed after all tests are run,
|
|
# thus it is excluded from cleanup.
|
|
os.remove(USER_CONFIG_FILE)
|
|
|
|
|
|
def cleanup():
|
|
for f_name in CLEANUP:
|
|
f_file = path.join(TARGET_DIR, f_name)
|
|
if os.path.exists(f_file):
|
|
os.remove(f_file)
|
|
|
|
|
|
def get_inventory(clean=True, extra_args=None):
|
|
"Return the inventory mapping in a dict."
|
|
# Use the list argument to more closely mirror
|
|
# Ansible's use of the callable.
|
|
args = {'config': TARGET_DIR, 'list': True,
|
|
'environment': BASE_ENV_DIR}
|
|
if extra_args:
|
|
args.update(extra_args)
|
|
try:
|
|
inventory_string = di.main(**args)
|
|
inventory = json.loads(inventory_string)
|
|
return inventory
|
|
finally:
|
|
if clean:
|
|
# Remove the file system artifacts since we want to force
|
|
# fresh runs
|
|
cleanup()
|
|
|
|
|
|
class TestArgParser(unittest.TestCase):
|
|
def test_no_args(self):
|
|
arg_dict = dynamic_inventory.args([])
|
|
self.assertIsNone(arg_dict['config'])
|
|
self.assertEqual(arg_dict['list'], False)
|
|
|
|
def test_list_arg(self):
|
|
arg_dict = dynamic_inventory.args(['--list'])
|
|
self.assertEqual(arg_dict['list'], True)
|
|
|
|
def test_config_arg(self):
|
|
arg_dict = dynamic_inventory.args(['--config',
|
|
'/etc/openstack_deploy'])
|
|
self.assertEqual(arg_dict['config'], '/etc/openstack_deploy')
|
|
|
|
|
|
class TestAnsibleInventoryFormatConstraints(unittest.TestCase):
|
|
inventory = None
|
|
|
|
expected_groups = [
|
|
'aio1-host_containers',
|
|
'all',
|
|
'all_containers',
|
|
'aodh_alarm_evaluator',
|
|
'aodh_alarm_notifier',
|
|
'aodh_all',
|
|
'aodh_api',
|
|
'aodh_container',
|
|
'aodh_listener',
|
|
'barbican_all',
|
|
'barbican_api',
|
|
'barbican_container',
|
|
'ceilometer_agent_central',
|
|
'ceilometer_agent_compute',
|
|
'ceilometer_agent_notification',
|
|
'ceilometer_all',
|
|
'ceilometer_api',
|
|
'ceilometer_api_container',
|
|
'ceph_all',
|
|
'ceph-mon_all',
|
|
'ceph-mon_containers',
|
|
'ceph-mon_container',
|
|
'ceph-mon_hosts',
|
|
'ceph-mon',
|
|
'ceph-osd_all',
|
|
'ceph-osd_containers',
|
|
'ceph-osd_container',
|
|
'ceph-osd_hosts',
|
|
'ceph-osd',
|
|
'cinder_all',
|
|
'cinder_api',
|
|
'cinder_api_container',
|
|
'cinder_backup',
|
|
'cinder_scheduler',
|
|
'cinder_scheduler_container',
|
|
'cinder_volume',
|
|
'cinder_volumes_container',
|
|
'compute-infra_all',
|
|
'compute-infra_containers',
|
|
'compute-infra_hosts',
|
|
'compute_all',
|
|
'compute_containers',
|
|
'compute_hosts',
|
|
'dashboard_all',
|
|
'dashboard_containers',
|
|
'dashboard_hosts',
|
|
'database_containers',
|
|
'database_hosts',
|
|
'dnsaas_all',
|
|
'dnsaas_containers',
|
|
'dnsaas_hosts',
|
|
'designate_all',
|
|
'designate_container',
|
|
'designate_api',
|
|
'designate_central',
|
|
'designate_mdns',
|
|
'designate_worker',
|
|
'designate_producer',
|
|
'designate_sink',
|
|
'galera',
|
|
'galera_all',
|
|
'galera_container',
|
|
'glance_all',
|
|
'glance_api',
|
|
'glance_container',
|
|
'glance_registry',
|
|
'gnocchi_all',
|
|
'gnocchi_api',
|
|
'gnocchi_container',
|
|
'gnocchi_metricd',
|
|
'haproxy',
|
|
'haproxy_all',
|
|
'haproxy_container',
|
|
'haproxy_containers',
|
|
'haproxy_hosts',
|
|
'heat_all',
|
|
'heat_api',
|
|
'heat_api_cfn',
|
|
'heat_api_cloudwatch',
|
|
'heat_apis_container',
|
|
'heat_engine',
|
|
'heat_engine_container',
|
|
'horizon',
|
|
'horizon_all',
|
|
'horizon_container',
|
|
'hosts',
|
|
'identity_all',
|
|
'identity_containers',
|
|
'identity_hosts',
|
|
'image_all',
|
|
'image_containers',
|
|
'image_hosts',
|
|
'ironic-infra_all',
|
|
'ironic-infra_containers',
|
|
'ironic-infra_hosts',
|
|
'ironic-server_containers',
|
|
'ironic-server_hosts',
|
|
'ironic_all',
|
|
'ironic_api',
|
|
'ironic_api_container',
|
|
'ironic_conductor',
|
|
'ironic_conductor_container',
|
|
'ironic_server',
|
|
'ironic_server_container',
|
|
'ironic_servers',
|
|
'ironic_compute',
|
|
'ironic_compute_container',
|
|
'ironic-compute_containers',
|
|
'ironic-compute_all',
|
|
'ironic-compute_hosts',
|
|
'key-manager_containers',
|
|
'key-manager_hosts',
|
|
'keystone',
|
|
'keystone_all',
|
|
'keystone_container',
|
|
'log_all',
|
|
'log_containers',
|
|
'log_hosts',
|
|
'lxc_hosts',
|
|
'magnum',
|
|
'magnum-infra_all',
|
|
'magnum-infra_containers',
|
|
'magnum-infra_hosts',
|
|
'magnum_all',
|
|
'magnum_container',
|
|
'trove_all',
|
|
'trove_api',
|
|
'trove_conductor',
|
|
'trove_taskmanager',
|
|
'trove_conductor_container',
|
|
'trove_api_container',
|
|
'trove_taskmanager_container',
|
|
'trove-infra_containers',
|
|
'trove-infra_hosts',
|
|
'trove-infra_all',
|
|
'memcached',
|
|
'memcached_all',
|
|
'memcached_container',
|
|
'memcaching_containers',
|
|
'memcaching_hosts',
|
|
'metering-alarm_all',
|
|
'metering-alarm_containers',
|
|
'metering-alarm_hosts',
|
|
'metering-compute_all',
|
|
'metering-compute_container',
|
|
'metering-compute_containers',
|
|
'metering-compute_hosts',
|
|
'metering-infra_all',
|
|
'metering-infra_containers',
|
|
'metering-infra_hosts',
|
|
'metrics_all',
|
|
'metrics_containers',
|
|
'metrics_hosts',
|
|
'mq_containers',
|
|
'mq_hosts',
|
|
'network_all',
|
|
'network_containers',
|
|
'network_hosts',
|
|
'neutron_agent',
|
|
'neutron_agents_container',
|
|
'neutron_all',
|
|
'neutron_bgp_dragent',
|
|
'neutron_dhcp_agent',
|
|
'neutron_l3_agent',
|
|
'neutron_lbaas_agent',
|
|
'neutron_linuxbridge_agent',
|
|
'neutron_metadata_agent',
|
|
'neutron_metering_agent',
|
|
'neutron_openvswitch_agent',
|
|
'neutron_sriov_nic_agent',
|
|
'neutron_server',
|
|
'neutron_server_container',
|
|
'nova_all',
|
|
'nova_api_metadata',
|
|
'nova_api_metadata_container',
|
|
'nova_api_os_compute',
|
|
'nova_api_os_compute_container',
|
|
'nova_api_placement',
|
|
'nova_api_placement_container',
|
|
'nova_compute',
|
|
'nova_compute_container',
|
|
'nova_conductor',
|
|
'nova_conductor_container',
|
|
'nova_console',
|
|
'nova_console_container',
|
|
'nova_scheduler',
|
|
'nova_scheduler_container',
|
|
'operator_containers',
|
|
'operator_hosts',
|
|
'orchestration_all',
|
|
'orchestration_containers',
|
|
'orchestration_hosts',
|
|
'os-infra_containers',
|
|
'os-infra_hosts',
|
|
'pkg_repo',
|
|
'rabbit_mq_container',
|
|
'rabbitmq',
|
|
'rabbitmq_all',
|
|
'remote',
|
|
'remote_containers',
|
|
'repo-infra_all',
|
|
'repo-infra_containers',
|
|
'repo-infra_hosts',
|
|
'repo_all',
|
|
'repo_container',
|
|
'rsyslog',
|
|
'rsyslog_all',
|
|
'rsyslog_container',
|
|
'sahara-infra_all',
|
|
'sahara-infra_containers',
|
|
'sahara-infra_hosts',
|
|
'sahara_all',
|
|
'sahara_api',
|
|
'sahara_container',
|
|
'sahara_engine',
|
|
'shared-infra_all',
|
|
'shared-infra_containers',
|
|
'shared-infra_hosts',
|
|
'storage-infra_all',
|
|
'storage-infra_containers',
|
|
'storage-infra_hosts',
|
|
'storage_all',
|
|
'storage_containers',
|
|
'storage_hosts',
|
|
'swift-proxy_all',
|
|
'swift-proxy_containers',
|
|
'swift-proxy_hosts',
|
|
'swift-remote_containers',
|
|
'swift-remote_hosts',
|
|
'swift_acc',
|
|
'swift_acc_container',
|
|
'swift_all',
|
|
'swift_cont',
|
|
'swift_cont_container',
|
|
'swift_containers',
|
|
'swift_hosts',
|
|
'swift_obj',
|
|
'swift_obj_container',
|
|
'swift_proxy',
|
|
'swift_proxy_container',
|
|
'swift_remote',
|
|
'swift_remote_all',
|
|
'swift_remote_container',
|
|
'unbound',
|
|
'unbound_all',
|
|
'unbound_container',
|
|
'unbound_containers',
|
|
'unbound_hosts',
|
|
'utility',
|
|
'utility_all',
|
|
'utility_container'
|
|
]
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
cls.inventory = get_inventory()
|
|
|
|
def test_meta(self):
|
|
meta = self.inventory['_meta']
|
|
self.assertIsNotNone(meta, "_meta missing from inventory")
|
|
self.assertIsInstance(meta, dict, "_meta is not a dict")
|
|
|
|
def test_hostvars(self):
|
|
hostvars = self.inventory['_meta']['hostvars']
|
|
self.assertIsNotNone(hostvars, "hostvars missing from _meta")
|
|
self.assertIsInstance(hostvars, dict, "hostvars is not a dict")
|
|
|
|
def test_group_vars_all(self):
|
|
group_vars_all = self.inventory['all']
|
|
self.assertIsNotNone(group_vars_all,
|
|
"group vars all missing from inventory")
|
|
self.assertIsInstance(group_vars_all, dict,
|
|
"group vars all is not a dict")
|
|
|
|
the_vars = group_vars_all['vars']
|
|
self.assertIsNotNone(the_vars,
|
|
"vars missing from group vars all")
|
|
self.assertIsInstance(the_vars, dict,
|
|
"vars in group vars all is not a dict")
|
|
|
|
def test_expected_host_groups_present(self):
|
|
|
|
for group in self.expected_groups:
|
|
the_group = self.inventory[group]
|
|
self.assertIsNotNone(the_group,
|
|
"Required host group: %s is missing "
|
|
"from inventory" % group)
|
|
self.assertIsInstance(the_group, dict)
|
|
|
|
if group != 'all':
|
|
self.assertIn('hosts', the_group)
|
|
self.assertIsInstance(the_group['hosts'], list)
|
|
|
|
def test_only_expected_host_groups_present(self):
|
|
all_keys = list(self.expected_groups)
|
|
all_keys.append('_meta')
|
|
self.assertEqual(set(all_keys), set(self.inventory.keys()))
|
|
|
|
def test_configured_groups_have_hosts(self):
|
|
config = get_config()
|
|
groups = self.inventory.keys()
|
|
for group in groups:
|
|
if group in config.keys():
|
|
self.assertTrue(0 < len(self.inventory[group]['hosts']))
|
|
|
|
|
|
class TestUserConfiguration(unittest.TestCase):
|
|
def setUp(self):
|
|
self.longMessage = True
|
|
self.loaded_user_configuration = fs.load_user_configuration(TARGET_DIR)
|
|
|
|
def test_loading_user_configuration(self):
|
|
"""Test that the user configuration can be loaded"""
|
|
self.assertIsInstance(self.loaded_user_configuration, dict)
|
|
|
|
|
|
class TestEnvironments(unittest.TestCase):
|
|
def setUp(self):
|
|
self.longMessage = True
|
|
self.loaded_environment = fs.load_environment(BASE_ENV_DIR, {})
|
|
|
|
def test_loading_environment(self):
|
|
"""Test that the environment can be loaded"""
|
|
self.assertIsInstance(self.loaded_environment, dict)
|
|
|
|
def test_envd_read(self):
|
|
"""Test that the env.d contents are inserted into the environment"""
|
|
expected_keys = [
|
|
'component_skel',
|
|
'container_skel',
|
|
'physical_skel',
|
|
]
|
|
for key in expected_keys:
|
|
self.assertIn(key, self.loaded_environment)
|
|
|
|
|
|
class TestIps(unittest.TestCase):
|
|
def setUp(self):
|
|
# Allow custom assertion errors.
|
|
self.longMessage = True
|
|
self.env = fs.load_environment(BASE_ENV_DIR, {})
|
|
|
|
@mock.patch('filesystem.load_environment')
|
|
@mock.patch('filesystem.load_user_configuration')
|
|
def test_duplicates(self, mock_load_config, mock_load_env):
|
|
"""Test that no duplicate IPs are made on any network."""
|
|
|
|
# Grab our values read from the file system just once.
|
|
mock_load_config.return_value = get_config()
|
|
mock_load_env.return_value = self.env
|
|
|
|
mock_open = mock.mock_open()
|
|
|
|
for i in range(0, 99):
|
|
# tearDown is ineffective for this loop, so clean the USED_IPs
|
|
# on each run
|
|
inventory = None
|
|
di.ip.USED_IPS = set()
|
|
|
|
# Mock out the context manager being used to write files.
|
|
# We don't need to hit the file system for this test.
|
|
with mock.patch('__main__.open', mock_open):
|
|
inventory = get_inventory()
|
|
ips = collections.defaultdict(int)
|
|
hostvars = inventory['_meta']['hostvars']
|
|
|
|
for host, var_dict in hostvars.items():
|
|
nets = var_dict['container_networks']
|
|
for net, vals in nets.items():
|
|
if 'address' in vals.keys():
|
|
|
|
addr = vals['address']
|
|
ips[addr] += 1
|
|
|
|
self.assertEqual(1, ips[addr],
|
|
msg="IP %s duplicated." % addr)
|
|
|
|
def test_empty_ip_queue(self):
|
|
q = Queue.Queue()
|
|
with self.assertRaises(SystemExit) as context:
|
|
# TODO(nrb): import and use ip module directly
|
|
di.ip.get_ip_address('test', q)
|
|
expectedLog = ("Cannot retrieve requested amount of IP addresses. "
|
|
"Increase the test range in your "
|
|
"openstack_user_config.yml.")
|
|
self.assertEqual(str(context.exception), expectedLog)
|
|
|
|
def tearDown(self):
|
|
# Since the get_ip_address function touches USED_IPS,
|
|
# and USED_IPS is currently a global var, make sure we clean it out
|
|
di.ip.USED_IPS = set()
|
|
|
|
|
|
class TestConfigCheckBase(unittest.TestCase):
|
|
def setUp(self):
|
|
self.config_changed = False
|
|
self.user_defined_config = get_config()
|
|
|
|
def delete_config_key(self, user_defined_config, key):
|
|
try:
|
|
if key in user_defined_config:
|
|
del user_defined_config[key]
|
|
elif key in user_defined_config['global_overrides']:
|
|
del user_defined_config['global_overrides'][key]
|
|
else:
|
|
raise KeyError("can't find specified key in user config")
|
|
finally:
|
|
self.write_config()
|
|
|
|
def add_config_key(self, key, value):
|
|
self.user_defined_config[key] = value
|
|
self.write_config()
|
|
|
|
def delete_provider_network(self, net_name):
|
|
del self.user_defined_config['cidr_networks'][net_name]
|
|
self.write_config()
|
|
|
|
def delete_provider_network_key(self, net_name, key):
|
|
pns = self.user_defined_config['global_overrides']['provider_networks']
|
|
for net in pns:
|
|
if 'ip_from_q' in net['network']:
|
|
if net['network']['ip_from_q'] == net_name:
|
|
if key in net['network']:
|
|
del net['network'][key]
|
|
|
|
def write_config(self):
|
|
self.config_changed = True
|
|
# Save new user_config_file
|
|
with open(USER_CONFIG_FILE, 'wb') as f:
|
|
f.write(yaml.dump(self.user_defined_config))
|
|
|
|
def restore_config(self):
|
|
# get back our initial user config file
|
|
self.user_defined_config = get_config()
|
|
self.write_config()
|
|
|
|
def set_new_hostname(self, user_defined_config, group,
|
|
old_hostname, new_hostname):
|
|
# set a new name for the specified hostname
|
|
old_hostname_settings = user_defined_config[group].pop(old_hostname)
|
|
user_defined_config[group][new_hostname] = old_hostname_settings
|
|
self.write_config()
|
|
|
|
def set_new_ip(self, user_defined_config, group, hostname, ip):
|
|
# Sets an IP address for a specified host.
|
|
user_defined_config[group][hostname]['ip'] = ip
|
|
self.write_config()
|
|
|
|
def add_host(self, group, host_name, ip):
|
|
self.user_defined_config[group][host_name] = {'ip': ip}
|
|
self.write_config()
|
|
|
|
def tearDown(self):
|
|
if self.config_changed:
|
|
self.restore_config()
|
|
|
|
|
|
class TestConfigChecks(TestConfigCheckBase):
|
|
def test_missing_container_cidr_network(self):
|
|
self.delete_provider_network('container')
|
|
with self.assertRaises(SystemExit) as context:
|
|
get_inventory()
|
|
expectedLog = ("No container or management network specified in "
|
|
"user config.")
|
|
self.assertEqual(str(context.exception), expectedLog)
|
|
|
|
def test_management_network_malformed(self):
|
|
self.delete_provider_network_key('container', 'is_container_address')
|
|
self.delete_provider_network_key('container', 'is_ssh_address')
|
|
self.write_config()
|
|
|
|
with self.assertRaises(di.ProviderNetworkMisconfiguration) as context:
|
|
get_inventory()
|
|
expectedLog = ("Provider network with queue 'container' "
|
|
"requires 'is_container_address' and "
|
|
"'is_ssh_address' to be set to True.")
|
|
self.assertEqual(str(context.exception), expectedLog)
|
|
self.restore_config()
|
|
|
|
def test_missing_cidr_network_present_in_provider(self):
|
|
self.delete_provider_network('storage')
|
|
with self.assertRaises(SystemExit) as context:
|
|
get_inventory()
|
|
expectedLog = "can't find storage in cidr_networks"
|
|
self.assertEqual(str(context.exception), expectedLog)
|
|
|
|
def test_missing_cidr_networks_key(self):
|
|
del self.user_defined_config['cidr_networks']
|
|
self.write_config()
|
|
with self.assertRaises(SystemExit) as context:
|
|
get_inventory()
|
|
expectedLog = "No container CIDR specified in user config"
|
|
self.assertEqual(str(context.exception), expectedLog)
|
|
|
|
def test_provider_networks_check(self):
|
|
# create config file without provider networks
|
|
self.delete_config_key(self.user_defined_config, 'provider_networks')
|
|
# check if provider networks absence is Caught
|
|
with self.assertRaises(SystemExit) as context:
|
|
get_inventory()
|
|
expectedLog = "provider networks can't be found under global_overrides"
|
|
self.assertIn(expectedLog, str(context.exception))
|
|
|
|
def test_global_overrides_check(self):
|
|
# create config file without global_overrides
|
|
self.delete_config_key(self.user_defined_config, 'global_overrides')
|
|
# check if global_overrides absence is Caught
|
|
with self.assertRaises(SystemExit) as context:
|
|
get_inventory()
|
|
expectedLog = "global_overrides can't be found in user config"
|
|
self.assertEqual(str(context.exception), expectedLog)
|
|
|
|
def test_two_hosts_same_ip(self):
|
|
# Use an OrderedDict to be certain our testing order is preserved
|
|
# Even with the same hash seed, different OSes get different results,
|
|
# eg. local OS X vs gate's Linux
|
|
config = collections.OrderedDict()
|
|
config['shared-infra_hosts'] = {
|
|
'host1': {
|
|
'ip': '192.168.1.1'
|
|
}
|
|
}
|
|
config['compute_hosts'] = {
|
|
'host2': {
|
|
'ip': '192.168.1.1'
|
|
}
|
|
}
|
|
|
|
with self.assertRaises(di.MultipleHostsWithOneIPError) as context:
|
|
di._check_same_ip_to_multiple_host(config)
|
|
self.assertEqual(context.exception.ip, '192.168.1.1')
|
|
self.assertEqual(context.exception.assigned_host, 'host1')
|
|
self.assertEqual(context.exception.new_host, 'host2')
|
|
|
|
def test_two_hosts_same_ip_externally(self):
|
|
self.set_new_hostname(self.user_defined_config, "haproxy_hosts",
|
|
"aio1", "hap")
|
|
with self.assertRaises(di.MultipleHostsWithOneIPError) as context:
|
|
get_inventory()
|
|
expectedLog = ("Both host:aio1 and host:hap have "
|
|
"address:172.29.236.100 assigned. Cannot "
|
|
"assign same ip to both hosts")
|
|
self.assertEqual(str(context.exception), expectedLog)
|
|
|
|
def test_one_host_two_ips_externally(self):
|
|
# haproxy chosen because it was last in the config file as of
|
|
# writing
|
|
self.set_new_ip(self.user_defined_config, 'haproxy_hosts', 'aio1',
|
|
'172.29.236.101')
|
|
with self.assertRaises(di.MultipleIpForHostError) as context:
|
|
get_inventory()
|
|
expectedLog = ("Host aio1 has both 172.29.236.100 and 172.29.236.101 "
|
|
"assigned")
|
|
self.assertEqual(str(context.exception), expectedLog)
|
|
|
|
def test_two_ips(self):
|
|
# Use an OrderedDict to be certain our testing order is preserved
|
|
# Even with the same hash seed, different OSes get different results,
|
|
# eg. local OS X vs gate's Linux
|
|
config = collections.OrderedDict()
|
|
config['shared-infra_hosts'] = {
|
|
'host1': {
|
|
'ip': '192.168.1.1'
|
|
}
|
|
}
|
|
config['compute_hosts'] = {
|
|
'host1': {
|
|
'ip': '192.168.1.2'
|
|
}
|
|
}
|
|
|
|
with self.assertRaises(di.MultipleIpForHostError) as context:
|
|
di._check_multiple_ips_to_host(config)
|
|
self.assertEqual(context.exception.current_ip, '192.168.1.1')
|
|
self.assertEqual(context.exception.new_ip, '192.168.1.2')
|
|
self.assertEqual(context.exception.hostname, 'host1')
|
|
|
|
def test_correct_hostname_ip_map(self):
|
|
config = {
|
|
'shared-infra_hosts': {
|
|
'host1': {
|
|
'ip': '192.168.1.1'
|
|
}
|
|
},
|
|
'compute_hosts': {
|
|
'host2': {
|
|
'ip': '192.168.1.2'
|
|
}
|
|
},
|
|
}
|
|
ret = di._check_multiple_ips_to_host(config)
|
|
self.assertTrue(ret)
|
|
|
|
|
|
class TestStaticRouteConfig(TestConfigCheckBase):
|
|
def setUp(self):
|
|
super(TestStaticRouteConfig, self).setUp()
|
|
self.expectedMsg = ("Static route provider network with queue "
|
|
"'container' needs both 'cidr' and 'gateway' "
|
|
"values.")
|
|
|
|
def add_static_route(self, q_name, route_dict):
|
|
"""Adds a static route to a provider network."""
|
|
pn = self.user_defined_config['global_overrides']['provider_networks']
|
|
for net in pn:
|
|
net_dict = net['network']
|
|
q = net_dict.get('ip_from_q', None)
|
|
if q == q_name:
|
|
net_dict['static_routes'] = [route_dict]
|
|
self.write_config()
|
|
|
|
def test_setting_static_route(self):
|
|
route_dict = {'cidr': '10.176.0.0/12',
|
|
'gateway': '172.29.248.1'}
|
|
self.add_static_route('container', route_dict)
|
|
inventory = get_inventory()
|
|
|
|
# Use aio1 and 'container_address' since they're known keys.
|
|
hostvars = inventory['_meta']['hostvars']['aio1']
|
|
cont_add = hostvars['container_networks']['container_address']
|
|
|
|
self.assertIn('static_routes', cont_add)
|
|
|
|
first_route = cont_add['static_routes'][0]
|
|
self.assertIn('cidr', first_route)
|
|
self.assertIn('gateway', first_route)
|
|
|
|
def test_setting_bad_static_route_only_cidr(self):
|
|
route_dict = {'cidr': '10.176.0.0/12'}
|
|
self.add_static_route('container', route_dict)
|
|
|
|
with self.assertRaises(di.MissingStaticRouteInfo) as context:
|
|
get_inventory()
|
|
|
|
exception = context.exception
|
|
|
|
self.assertEqual(str(exception), self.expectedMsg)
|
|
|
|
def test_setting_bad_static_route_only_gateway(self):
|
|
route_dict = {'gateway': '172.29.248.1'}
|
|
self.add_static_route('container', route_dict)
|
|
|
|
with self.assertRaises(di.MissingStaticRouteInfo) as context:
|
|
get_inventory()
|
|
|
|
exception = context.exception
|
|
|
|
self.assertEqual(exception.message, self.expectedMsg)
|
|
|
|
def test_setting_bad_gateway_value(self):
|
|
route_dict = {'cidr': '10.176.0.0/12',
|
|
'gateway': None}
|
|
self.add_static_route('container', route_dict)
|
|
|
|
with self.assertRaises(di.MissingStaticRouteInfo) as context:
|
|
get_inventory()
|
|
|
|
exception = context.exception
|
|
|
|
self.assertEqual(exception.message, self.expectedMsg)
|
|
|
|
def test_setting_bad_cidr_value(self):
|
|
route_dict = {'cidr': None,
|
|
'gateway': '172.29.248.1'}
|
|
self.add_static_route('container', route_dict)
|
|
|
|
with self.assertRaises(di.MissingStaticRouteInfo) as context:
|
|
get_inventory()
|
|
|
|
exception = context.exception
|
|
|
|
self.assertEqual(exception.message, self.expectedMsg)
|
|
|
|
def test_setting_bad_cidr_gateway_value(self):
|
|
route_dict = {'cidr': None,
|
|
'gateway': None}
|
|
self.add_static_route('container', route_dict)
|
|
|
|
with self.assertRaises(di.MissingStaticRouteInfo) as context:
|
|
get_inventory()
|
|
|
|
exception = context.exception
|
|
|
|
self.assertEqual(exception.message, self.expectedMsg)
|
|
|
|
|
|
class TestGlobalOverridesConfigDeletion(TestConfigCheckBase):
|
|
def setUp(self):
|
|
super(TestGlobalOverridesConfigDeletion, self).setUp()
|
|
self.inventory = get_inventory()
|
|
|
|
def add_global_override(self, var_name, var_value):
|
|
"""Adds an arbitrary name and value to the global_overrides dict."""
|
|
overrides = self.user_defined_config['global_overrides']
|
|
overrides[var_name] = var_value
|
|
|
|
def remove_global_override(self, var_name):
|
|
"""Removes target key from the global_overrides dict."""
|
|
overrides = self.user_defined_config['global_overrides']
|
|
del overrides[var_name]
|
|
|
|
def test_global_overrides_delete_when_merge(self):
|
|
"""Vars removed from global overrides are removed from inventory"""
|
|
self.add_global_override('foo', 'bar')
|
|
|
|
di._parse_global_variables({}, self.inventory,
|
|
self.user_defined_config)
|
|
|
|
self.remove_global_override('foo')
|
|
|
|
di._parse_global_variables({}, self.inventory,
|
|
self.user_defined_config)
|
|
|
|
self.assertNotIn('foo', self.inventory['all']['vars'],
|
|
"foo var not removed from group_vars_all")
|
|
|
|
def test_global_overrides_merge(self):
|
|
self.add_global_override('foo', 'bar')
|
|
|
|
di._parse_global_variables({}, self.inventory,
|
|
self.user_defined_config)
|
|
|
|
self.assertEqual('bar', self.inventory['all']['vars']['foo'])
|
|
|
|
def test_container_cidr_key_retained(self):
|
|
user_cidr = self.user_defined_config['cidr_networks']['container']
|
|
di._parse_global_variables(user_cidr, self.inventory,
|
|
self.user_defined_config)
|
|
self.assertIn('container_cidr', self.inventory['all']['vars'])
|
|
self.assertEqual(self.inventory['all']['vars']['container_cidr'],
|
|
user_cidr)
|
|
|
|
def test_only_old_vars_deleted(self):
|
|
self.inventory['all']['vars']['foo'] = 'bar'
|
|
|
|
di._parse_global_variables('', self.inventory,
|
|
self.user_defined_config)
|
|
|
|
self.assertNotIn('foo', self.inventory['all']['vars'])
|
|
|
|
def test_empty_vars(self):
|
|
del self.inventory['all']
|
|
|
|
di._parse_global_variables('', self.inventory,
|
|
self.user_defined_config)
|
|
|
|
self.assertIn('container_cidr', self.inventory['all']['vars'])
|
|
|
|
for key in self.user_defined_config['global_overrides']:
|
|
self.assertIn(key, self.inventory['all']['vars'])
|
|
|
|
|
|
class TestEnsureInventoryUptoDate(unittest.TestCase):
|
|
def setUp(self):
|
|
self.env = fs.load_environment(BASE_ENV_DIR, {})
|
|
# Copy because we manipulate the structure in each test;
|
|
# not copying would modify the global var in the target code
|
|
self.inv = copy.deepcopy(di.INVENTORY_SKEL)
|
|
# Since we're not running skel_setup, add necessary keys
|
|
self.host_vars = self.inv['_meta']['hostvars']
|
|
|
|
# The _ensure_inventory_uptodate function depends on values inserted
|
|
# by the skel_setup function
|
|
di.skel_setup(self.env, self.inv)
|
|
|
|
def test_missing_required_host_vars(self):
|
|
self.host_vars['host1'] = {}
|
|
|
|
di._ensure_inventory_uptodate(self.inv, self.env['container_skel'])
|
|
|
|
for required_key in di.REQUIRED_HOSTVARS:
|
|
self.assertIn(required_key, self.host_vars['host1'])
|
|
|
|
def test_missing_container_name(self):
|
|
self.host_vars['host1'] = {}
|
|
|
|
di._ensure_inventory_uptodate(self.inv, self.env['container_skel'])
|
|
|
|
self.assertIn('container_name', self.host_vars['host1'])
|
|
self.assertEqual(self.host_vars['host1']['container_name'], 'host1')
|
|
|
|
def test_inserting_container_networks_is_dict(self):
|
|
self.host_vars['host1'] = {}
|
|
|
|
di._ensure_inventory_uptodate(self.inv, self.env['container_skel'])
|
|
|
|
self.assertIsInstance(self.host_vars['host1']['container_networks'],
|
|
dict)
|
|
|
|
def test_populating_inventory_info(self):
|
|
skel = self.env['container_skel']
|
|
|
|
di._ensure_inventory_uptodate(self.inv, skel)
|
|
|
|
for container_type, type_vars in skel.items():
|
|
hosts = self.inv[container_type]['hosts']
|
|
if hosts:
|
|
for host in hosts:
|
|
host_var_entries = self.inv['_meta']['hostvars'][host]
|
|
if 'properties' in type_vars:
|
|
self.assertEqual(host_var_entries['properties'],
|
|
type_vars['properties'])
|
|
|
|
def tearDown(self):
|
|
self.env = None
|
|
self.host_vars = None
|
|
self.inv = None
|
|
|
|
|
|
class OverridingEnvBase(unittest.TestCase):
|
|
def setUp(self):
|
|
self.base_env = fs.load_environment(BASE_ENV_DIR, {})
|
|
|
|
# Use the cinder configuration as our sample for override testing
|
|
with open(path.join(BASE_ENV_DIR, 'env.d', 'cinder.yml'), 'r') as f:
|
|
self.cinder_config = yaml.safe_load(f.read())
|
|
|
|
self.override_path = path.join(TARGET_DIR, 'env.d')
|
|
os.mkdir(self.override_path)
|
|
|
|
def write_override_env(self):
|
|
with open(path.join(self.override_path, 'cinder.yml'), 'w') as f:
|
|
f.write(yaml.safe_dump(self.cinder_config))
|
|
|
|
def tearDown(self):
|
|
os.remove(path.join(self.override_path, 'cinder.yml'))
|
|
os.rmdir(self.override_path)
|
|
|
|
|
|
class TestOverridingEnvVars(OverridingEnvBase):
|
|
|
|
def test_cinder_metal_override(self):
|
|
vol = self.cinder_config['container_skel']['cinder_volumes_container']
|
|
vol['properties']['is_metal'] = False
|
|
|
|
self.write_override_env()
|
|
|
|
fs.load_environment(TARGET_DIR, self.base_env)
|
|
|
|
test_vol = self.base_env['container_skel']['cinder_volumes_container']
|
|
self.assertFalse(test_vol['properties']['is_metal'])
|
|
|
|
def test_deleting_elements(self):
|
|
# Leave only the 'properties' dictionary attached to simulate writing
|
|
# a partial override file
|
|
|
|
vol = self.cinder_config['container_skel']['cinder_volumes_container']
|
|
for key in vol.keys():
|
|
if not key == 'properties':
|
|
del vol[key]
|
|
|
|
self.write_override_env()
|
|
|
|
fs.load_environment(TARGET_DIR, self.base_env)
|
|
|
|
test_vol = self.base_env['container_skel']['cinder_volumes_container']
|
|
|
|
self.assertIn('belongs_to', test_vol)
|
|
|
|
def test_adding_new_keys(self):
|
|
vol = self.cinder_config['container_skel']['cinder_volumes_container']
|
|
vol['a_new_key'] = 'Added'
|
|
|
|
self.write_override_env()
|
|
|
|
fs.load_environment(TARGET_DIR, self.base_env)
|
|
|
|
test_vol = self.base_env['container_skel']['cinder_volumes_container']
|
|
|
|
self.assertIn('a_new_key', test_vol)
|
|
self.assertEqual(test_vol['a_new_key'], 'Added')
|
|
|
|
def test_emptying_dictionaries(self):
|
|
self.cinder_config['container_skel']['cinder_volumes_container'] = {}
|
|
|
|
self.write_override_env()
|
|
|
|
fs.load_environment(TARGET_DIR, self.base_env)
|
|
|
|
test_vol = self.base_env['container_skel']['cinder_volumes_container']
|
|
|
|
self.assertNotIn('belongs_to', test_vol)
|
|
|
|
def test_emptying_lists(self):
|
|
vol = self.cinder_config['container_skel']['cinder_volumes_container']
|
|
vol['belongs_to'] = []
|
|
|
|
self.write_override_env()
|
|
|
|
fs.load_environment(TARGET_DIR, self.base_env)
|
|
|
|
test_vol = self.base_env['container_skel']['cinder_volumes_container']
|
|
|
|
self.assertEqual(test_vol['belongs_to'], [])
|
|
|
|
|
|
class TestOverridingEnvIntegration(OverridingEnvBase):
|
|
def setUp(self):
|
|
super(TestOverridingEnvIntegration, self).setUp()
|
|
self.user_defined_config = get_config()
|
|
|
|
# Inventory is necessary since keys are assumed present
|
|
self.inv, path = fs.load_inventory(TARGET_DIR, di.INVENTORY_SKEL)
|
|
|
|
def skel_setup(self):
|
|
self.environment = fs.load_environment(TARGET_DIR, self.base_env)
|
|
|
|
di.skel_setup(self.environment, self.inv)
|
|
|
|
di.skel_load(
|
|
self.environment.get('physical_skel'),
|
|
self.inv
|
|
)
|
|
|
|
def test_emptying_container_integration(self):
|
|
self.cinder_config = {}
|
|
self.cinder_config['container_skel'] = {'cinder_volumes_container': {}}
|
|
|
|
self.write_override_env()
|
|
self.skel_setup()
|
|
|
|
di.container_skel_load(
|
|
self.environment.get('container_skel'),
|
|
self.inv,
|
|
self.user_defined_config
|
|
)
|
|
|
|
test_vol = self.base_env['container_skel']['cinder_volumes_container']
|
|
|
|
self.assertNotIn('belongs_to', test_vol)
|
|
self.assertNotIn('contains', test_vol)
|
|
|
|
def test_empty_contains(self):
|
|
vol = self.cinder_config['container_skel']['cinder_volumes_container']
|
|
vol['contains'] = []
|
|
|
|
self.write_override_env()
|
|
self.skel_setup()
|
|
|
|
di.container_skel_load(
|
|
self.environment.get('container_skel'),
|
|
self.inv,
|
|
self.user_defined_config
|
|
)
|
|
|
|
test_vol = self.base_env['container_skel']['cinder_volumes_container']
|
|
|
|
self.assertEqual(test_vol['contains'], [])
|
|
|
|
def test_empty_belongs_to(self):
|
|
vol = self.cinder_config['container_skel']['cinder_volumes_container']
|
|
vol['belongs_to'] = []
|
|
|
|
self.write_override_env()
|
|
self.skel_setup()
|
|
|
|
di.container_skel_load(
|
|
self.environment.get('container_skel'),
|
|
self.inv,
|
|
self.user_defined_config
|
|
)
|
|
|
|
test_vol = self.base_env['container_skel']['cinder_volumes_container']
|
|
|
|
self.assertEqual(test_vol['belongs_to'], [])
|
|
|
|
def tearDown(self):
|
|
super(TestOverridingEnvIntegration, self).tearDown()
|
|
self.user_defined_config = None
|
|
self.inv = None
|
|
|
|
|
|
class TestSetUsedIPS(unittest.TestCase):
|
|
def setUp(self):
|
|
# Clean up the used ips in case other tests didn't.
|
|
di.ip.USED_IPS = set()
|
|
|
|
# Create a fake inventory just for this test.
|
|
self.inventory = {'_meta': {'hostvars': {
|
|
'host1': {'container_networks': {
|
|
'net': {'address': '172.12.1.1'}
|
|
}},
|
|
'host2': {'container_networks': {
|
|
'net': {'address': '172.12.1.2'}
|
|
}},
|
|
}}}
|
|
|
|
def test_adding_inventory_used_ips(self):
|
|
config = {'used_ips': None}
|
|
|
|
# TODO(nrb): This is a smell, needs to set more directly
|
|
|
|
di.ip.set_used_ips(config, self.inventory)
|
|
|
|
self.assertEqual(len(di.ip.USED_IPS), 2)
|
|
self.assertIn('172.12.1.1', di.ip.USED_IPS)
|
|
self.assertIn('172.12.1.2', di.ip.USED_IPS)
|
|
|
|
def tearDown(self):
|
|
di.ip.USED_IPS = set()
|
|
|
|
|
|
class TestConfigCheckFunctional(TestConfigCheckBase):
|
|
def duplicate_ip(self):
|
|
ip = self.user_defined_config['log_hosts']['aio1']
|
|
self.user_defined_config['log_hosts']['bogus'] = ip
|
|
|
|
def test_checking_good_config(self):
|
|
output = di.main(config=TARGET_DIR, check=True,
|
|
environment=BASE_ENV_DIR)
|
|
self.assertEqual(output, 'Configuration ok!')
|
|
|
|
def test_duplicated_ip(self):
|
|
self.duplicate_ip()
|
|
self.write_config()
|
|
with self.assertRaises(di.MultipleHostsWithOneIPError) as context:
|
|
di.main(config=TARGET_DIR, check=True, environment=BASE_ENV_DIR)
|
|
self.assertEqual(context.exception.ip, '172.29.236.100')
|
|
|
|
|
|
class TestNetworkEntry(unittest.TestCase):
|
|
def test_all_args_filled(self):
|
|
entry = di.network_entry(True, 'eth1', 'br-mgmt', 'my_type', '1700')
|
|
|
|
self.assertNotIn('interface', entry.keys())
|
|
self.assertEqual(entry['bridge'], 'br-mgmt')
|
|
self.assertEqual(entry['type'], 'my_type')
|
|
self.assertEqual(entry['mtu'], '1700')
|
|
|
|
def test_container_dict(self):
|
|
entry = di.network_entry(False, 'eth1', 'br-mgmt', 'my_type', '1700')
|
|
|
|
self.assertEqual(entry['interface'], 'eth1')
|
|
|
|
|
|
class TestDebugLogging(unittest.TestCase):
|
|
@mock.patch('generate.logging')
|
|
@mock.patch('generate.logger')
|
|
def test_logging_enabled(self, mock_logger, mock_logging):
|
|
# Shadow the real value so tests don't complain about it
|
|
mock_logging.DEBUG = 10
|
|
|
|
get_inventory(extra_args={"debug": True})
|
|
|
|
self.assertTrue(mock_logging.basicConfig.called)
|
|
self.assertTrue(mock_logger.info.called)
|
|
self.assertTrue(mock_logger.debug.called)
|
|
|
|
@mock.patch('generate.logging')
|
|
@mock.patch('generate.logger')
|
|
def test_logging_disabled(self, mock_logger, mock_logging):
|
|
get_inventory(extra_args={"debug": False})
|
|
|
|
self.assertFalse(mock_logging.basicConfig.called)
|
|
# Even though logging is disabled, we still call these
|
|
# all over the place; they just choose not to do anything.
|
|
# NOTE: No info messages are published when debug is False
|
|
self.assertTrue(mock_logger.debug.called)
|
|
|
|
|
|
class TestLxcHosts(TestConfigCheckBase):
|
|
|
|
def test_lxc_hosts_group_present(self):
|
|
inventory = get_inventory()
|
|
self.assertIn('lxc_hosts', inventory)
|
|
|
|
def test_lxc_hosts_only_inserted_once(self):
|
|
inventory = get_inventory()
|
|
self.assertEqual(1, len(inventory['lxc_hosts']['hosts']))
|
|
|
|
def test_lxc_hosts_members(self):
|
|
self.add_host('shared-infra_hosts', 'aio2', '172.29.236.101')
|
|
inventory = get_inventory()
|
|
self.assertIn('aio2', inventory['lxc_hosts']['hosts'])
|
|
self.assertIn('aio1', inventory['lxc_hosts']['hosts'])
|
|
|
|
def test_lxc_hosts_in_config_raises_error(self):
|
|
self.add_config_key('lxc_hosts', {})
|
|
with self.assertRaises(di.LxcHostsDefined):
|
|
get_inventory()
|
|
|
|
def test_host_without_containers(self):
|
|
self.add_host('compute_hosts', 'compute1', '172.29.236.102')
|
|
inventory = get_inventory()
|
|
self.assertNotIn('compute1', inventory['lxc_hosts']['hosts'])
|
|
|
|
def test_cleaning_bad_hosts(self):
|
|
self.add_host('compute_hosts', 'compute1', '172.29.236.102')
|
|
inventory = get_inventory()
|
|
# insert compute1 into lxc_hosts, which mimicks bug behavior
|
|
inventory['lxc_hosts']['hosts'].append('compute1')
|
|
faked_path = INV_DIR
|
|
|
|
with mock.patch('filesystem.load_inventory') as inv_mock:
|
|
inv_mock.return_value = (inventory, faked_path)
|
|
new_inventory = get_inventory()
|
|
# host should no longer be in lxc_hosts
|
|
|
|
self.assertNotIn('compute1', new_inventory['lxc_hosts']['hosts'])
|
|
|
|
def test_emptying_lxc_hosts(self):
|
|
"""If lxc_hosts is deleted between runs, it should re-populate"""
|
|
|
|
inventory = get_inventory()
|
|
original_lxc_hosts = inventory.pop('lxc_hosts')
|
|
|
|
self.assertNotIn('lxc_hosts', inventory.keys())
|
|
|
|
faked_path = INV_DIR
|
|
with mock.patch('filesystem.load_inventory') as inv_mock:
|
|
inv_mock.return_value = (inventory, faked_path)
|
|
new_inventory = get_inventory()
|
|
|
|
self.assertEqual(original_lxc_hosts, new_inventory['lxc_hosts'])
|
|
|
|
|
|
class TestConfigMatchesEnvironment(unittest.TestCase):
|
|
def setUp(self):
|
|
self.env = fs.load_environment(BASE_ENV_DIR, {})
|
|
|
|
def test_matching_keys(self):
|
|
config = get_config()
|
|
|
|
result = di._check_all_conf_groups_present(config, self.env)
|
|
self.assertTrue(result)
|
|
|
|
def test_failed_match(self):
|
|
bad_config = get_config()
|
|
bad_config['bogus_key'] = []
|
|
|
|
result = di._check_all_conf_groups_present(bad_config, self.env)
|
|
self.assertFalse(result)
|
|
|
|
def test_extra_config_key_warning(self):
|
|
bad_config = get_config()
|
|
bad_config['bogus_key'] = []
|
|
with warnings.catch_warnings(record=True) as wl:
|
|
di._check_all_conf_groups_present(bad_config, self.env)
|
|
self.assertEqual(1, len(wl))
|
|
self.assertIn('bogus_key', str(wl[0].message))
|
|
|
|
def test_multiple_extra_keys(self):
|
|
bad_config = get_config()
|
|
bad_config['bogus_key1'] = []
|
|
bad_config['bogus_key2'] = []
|
|
|
|
with warnings.catch_warnings(record=True) as wl:
|
|
di._check_all_conf_groups_present(bad_config, self.env)
|
|
self.assertEqual(2, len(wl))
|
|
warn_msgs = [str(warn.message) for warn in wl]
|
|
warn_msgs.sort()
|
|
self.assertIn('bogus_key1', warn_msgs[0])
|
|
self.assertIn('bogus_key2', warn_msgs[1])
|
|
|
|
def test_confirm_exclusions(self):
|
|
"""Ensure the excluded keys in the function are present."""
|
|
config = get_config()
|
|
excluded_keys = ('global_overrides', 'cidr_networks', 'used_ips')
|
|
|
|
for key in excluded_keys:
|
|
config[key] = 'sentinel value'
|
|
|
|
with warnings.catch_warnings(record=True) as wl:
|
|
di._check_all_conf_groups_present(config, self.env)
|
|
self.assertEqual(0, len(wl))
|
|
|
|
for key in excluded_keys:
|
|
self.assertIn(key, config.keys())
|
|
|
|
|
|
class TestInventoryGroupConstraints(unittest.TestCase):
|
|
def setUp(self):
|
|
self.env = fs.load_environment(BASE_ENV_DIR, {})
|
|
|
|
def test_group_with_hosts_dont_have_children(self):
|
|
"""Require that groups have children groups or hosts, not both."""
|
|
inventory = get_inventory()
|
|
|
|
# This should only work on groups, but stuff like '_meta' and 'all'
|
|
# are in here, too.
|
|
for key, values in inventory.items():
|
|
# The keys for children/hosts can exist, the important part is being empty lists.
|
|
has_children = bool(inventory.get('children'))
|
|
has_hosts = bool(inventory.get('hosts'))
|
|
|
|
self.assertFalse(has_children and has_hosts)
|
|
|
|
def _create_bad_env(self, env):
|
|
# This environment setup is used because it was reported with
|
|
# bug #1646136
|
|
override = """
|
|
physical_skel:
|
|
local-compute_containers:
|
|
belongs_to:
|
|
- compute_containers
|
|
local-compute_hosts:
|
|
belongs_to:
|
|
- compute_hosts
|
|
rbd-compute_containers:
|
|
belongs_to:
|
|
- compute_containers
|
|
rbd-compute_hosts:
|
|
belongs_to:
|
|
- compute_hosts
|
|
"""
|
|
|
|
bad_env = yaml.safe_load(override)
|
|
|
|
# This is essentially what load_environment does, after all the file
|
|
# system walking
|
|
dictutils.merge_dict(env, bad_env)
|
|
|
|
return env
|
|
|
|
def test_group_with_hosts_and_children_fails(self):
|
|
"""Integration test making sure the whole script fails."""
|
|
env = self._create_bad_env(self.env)
|
|
|
|
|
|
config = get_config()
|
|
|
|
kwargs = {
|
|
'load_environment': mock.DEFAULT,
|
|
'load_user_configuration': mock.DEFAULT
|
|
}
|
|
|
|
with mock.patch.multiple('filesystem', **kwargs) as mocks:
|
|
mocks['load_environment'].return_value = env
|
|
mocks['load_user_configuration'].return_value = config
|
|
|
|
with self.assertRaises(di.GroupConflict) as context:
|
|
get_inventory()
|
|
|
|
def test_group_validation_unit(self):
|
|
env = self._create_bad_env(self.env)
|
|
|
|
config = get_config()
|
|
|
|
with self.assertRaises(di.GroupConflict):
|
|
di._check_group_branches(config, env['physical_skel'])
|
|
|
|
def test_group_validation_no_config(self):
|
|
result = di._check_group_branches(None, self.env)
|
|
self.assertTrue(result)
|
|
|
|
def test_group_validation_passes_defaults(self):
|
|
config = get_config()
|
|
|
|
result = di._check_group_branches(config, self.env['physical_skel'])
|
|
|
|
self.assertTrue(result)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|