rally-openstack/tests/unit/fakes.py
Andrey Kurilin 6fb4655ca0 [core] Refactor validation
* do not save validation errors from CLI layer to database. It includes
  such redundant data as IOError (when task file doesn't exist)
* include validation at task start by default. It is bad sign to allow
  starting tasks without validation.

Change-Id: I0d30ff31fb6fbc467ef3cda01e7de1f46c5f79fe
2017-04-12 01:03:08 +03:00

1899 lines
55 KiB
Python

# Copyright 2013: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import itertools
import multiprocessing
import random
import re
import string
import uuid
from ceilometerclient import exc as ceilometer_exc
from glanceclient import exc
import mock
from neutronclient.common import exceptions as neutron_exceptions
from novaclient import exceptions as nova_exceptions
import six
from swiftclient import exceptions as swift_exceptions
from rally import api
from rally.common import utils as rally_utils
from rally import consts
from rally.task import context
from rally.task import scenario
def generate_uuid():
return str(uuid.uuid4())
def generate_name(prefix="", length=12, choices=string.ascii_lowercase):
"""Generate pseudo-random name.
:param prefix: str, custom prefix for genertated name
:param length: int, length of autogenerated part of result name
:param choices: str, chars that accurs in generated name
:returns: str, pseudo-random name
"""
return prefix + "".join(random.choice(choices) for i in range(length))
def generate_mac():
"""Generate pseudo-random MAC address.
:returns: str, MAC address
"""
rand_str = generate_name(choices="0123456789abcdef", length=12)
return ":".join(re.findall("..", rand_str))
def setup_dict(data, required=None, defaults=None):
"""Setup and validate dict scenario_base. on mandatory keys and default data.
This function reduces code that constructs dict objects
with specific schema (e.g. for API data).
:param data: dict, input data
:param required: list, mandatory keys to check
:param defaults: dict, default data
:returns: dict, with all keys set
:raises IndexError, ValueError: If input data is incorrect
"""
required = required or []
for i in set(required) - set(data):
raise IndexError("Missed: %s" % i)
defaults = defaults or {}
for i in set(data) - set(required) - set(defaults):
raise ValueError("Unexpected: %s" % i)
defaults.update(data)
return defaults
def fake_credential(**config):
m = mock.Mock()
m.to_dict.return_value = config
for key, value in config.items():
setattr(m, key, value)
return m
class FakeResource(object):
def __init__(self, manager=None, name=None, status="ACTIVE", items=None,
deployment_uuid=None, id=None):
self.name = name or generate_uuid()
self.status = status
self.manager = manager
self.uuid = generate_uuid()
self.id = id or self.uuid
self.items = items or {}
self.deployment_uuid = deployment_uuid or generate_uuid()
def __getattr__(self, name):
# NOTE(msdubov): e.g. server.delete() -> manager.delete(server)
def manager_func(*args, **kwargs):
return getattr(self.manager, name)(self, *args, **kwargs)
return manager_func
def __getitem__(self, key):
return self.items[key]
class FakeServer(FakeResource):
def suspend(self):
self.status = "SUSPENDED"
def lock(self):
setattr(self, "OS-EXT-STS:locked", True)
def unlock(self):
setattr(self, "OS-EXT-STS:locked", False)
class FakeImage(FakeResource):
def __init__(self, manager=None, id="image-id-0", min_ram=0,
size=0, min_disk=0, status="active", name=None):
super(FakeImage, self).__init__(manager, id=id, name=name)
self.min_ram = min_ram
self.size = size
self.min_disk = min_disk
self.status = status
self.update = mock.MagicMock()
class FakeStrategy(FakeResource):
pass
class FakeGoal(FakeResource):
pass
class FakeMurano(FakeResource):
pass
class FakeFloatingIP(FakeResource):
pass
class FakeFloatingIPPool(FakeResource):
pass
class FakeTenant(FakeResource):
def __init__(self, manager, name):
super(FakeTenant, self).__init__(manager, name=name)
class FakeUser(FakeResource):
pass
class FakeService(FakeResource):
pass
class FakeNetwork(FakeResource):
pass
class FakeFlavor(FakeResource):
def __init__(self, id="flavor-id-0", manager=None, ram=0, disk=0, vcpus=1,
name="flavor-name-0"):
super(FakeFlavor, self).__init__(manager, id=id)
self.ram = ram
self.disk = disk
self.vcpus = vcpus
self.name = name
class FakeKeypair(FakeResource):
pass
class FakeStack(FakeResource):
pass
class FakeDomain(FakeResource):
pass
class FakeQuotas(FakeResource):
pass
class FakeSecurityGroup(FakeResource):
def __init__(self, manager=None, rule_manager=None, id=None, name=None):
super(FakeSecurityGroup, self).__init__(manager, id=id, name=name)
self.rule_manager = rule_manager
@property
def rules(self):
return [rule for rule in self.rule_manager.list()
if rule.parent_group_id == self.id]
class FakeSecurityGroupRule(FakeResource):
def __init__(self, name, **kwargs):
super(FakeSecurityGroupRule, self).__init__(name)
if "cidr" in kwargs:
kwargs["ip_range"] = {"cidr": kwargs["cidr"]}
del kwargs["cidr"]
for key, value in kwargs.items():
self.items[key] = value
setattr(self, key, value)
class FakeMetric(FakeResource):
def __init_(self, manager=None, **kwargs):
super(FakeMetric, self).__init__(manager)
self.metric = kwargs.get("metric_name")
self.optional_args = kwargs.get("optional_args", {})
class FakeAlarm(FakeResource):
def __init__(self, manager=None, **kwargs):
super(FakeAlarm, self).__init__(manager)
self.meter_name = kwargs.get("meter_name")
self.threshold = kwargs.get("threshold")
self.state = kwargs.get("state", "fake-alarm-state")
self.alarm_id = kwargs.get("alarm_id", "fake-alarm-id")
self.state = kwargs.get("state", "ok")
self.optional_args = kwargs.get("optional_args", {})
class FakeSample(FakeResource):
def __init__(self, manager=None, **kwargs):
super(FakeSample, self).__init__(manager)
self.counter_name = kwargs.get("counter_name", "fake-counter-name")
self.counter_type = kwargs.get("counter_type", "fake-counter-type")
self.counter_unit = kwargs.get("counter_unit", "fake-counter-unit")
self.counter_volume = kwargs.get("counter_volume", 100)
@property
def resource_id(self):
return "fake-resource-id"
def to_dict(self):
return {"counter_name": self.counter_name,
"counter_type": self.counter_type,
"counter_unit": self.counter_unit,
"counter_volume": self.counter_volume,
"resource_id": self.resource_id}
class FakeVolume(FakeResource):
@property
def _info(self):
return {"id": "uuid"}
class FakeVolumeType(FakeResource):
pass
class FakeVolumeTransfer(FakeResource):
pass
class FakeVolumeSnapshot(FakeResource):
pass
class FakeVolumeBackup(FakeResource):
pass
class FakeRole(FakeResource):
pass
class FakeQueue(FakeResource):
def __init__(self, manager=None, name="myqueue"):
super(FakeQueue, self).__init__(manager, name)
self.queue_name = name
self.messages = FakeMessagesManager(name)
def post(self, messages):
for msg in messages:
self.messages.create(**msg)
def messages(self):
return self.messages.list()
class FakeDbInstance(FakeResource):
pass
class FakeMessage(FakeResource):
def __init__(self, manager=None, **kwargs):
super(FakeMessage, self).__init__(manager)
self.body = kwargs.get("body", "fake-body")
self.ttl = kwargs.get("ttl", 100)
class FakeAvailabilityZone(FakeResource):
def __init__(self, manager=None):
super(FakeAvailabilityZone, self).__init__(manager)
self.zoneName = mock.MagicMock()
self.zoneState = mock.MagicMock()
self.hosts = mock.MagicMock()
class FakeWorkbook(FakeResource):
def __init__(self, manager=None):
super(FakeWorkbook, self).__init__(manager)
self.workbook = mock.MagicMock()
class FakeWorkflow(FakeResource):
def __init__(self, manager=None):
super(FakeWorkflow, self).__init__(manager)
self.workflow = mock.MagicMock()
class FakeExecution(FakeResource):
def __init__(self, manager=None):
super(FakeExecution, self).__init__(manager)
self.execution = mock.MagicMock()
class FakeObject(FakeResource):
pass
class FakeClusterTemplate(FakeResource):
pass
class FakeManager(object):
def __init__(self):
super(FakeManager, self).__init__()
self.cache = {}
self.resources_order = []
def get(self, resource_uuid):
return self.cache.get(resource_uuid)
def delete(self, resource_uuid):
cached = self.get(resource_uuid)
if cached is not None:
cached.status = "DELETED"
del self.cache[resource_uuid]
self.resources_order.remove(resource_uuid)
def _cache(self, resource):
self.resources_order.append(resource.uuid)
self.cache[resource.uuid] = resource
return resource
def list(self, **kwargs):
return [self.cache[key] for key in self.resources_order]
def find(self, **kwargs):
for resource in self.cache.values():
match = True
for key, value in kwargs.items():
if getattr(resource, key, None) != value:
match = False
break
if match:
return resource
class FakeServerManager(FakeManager):
def __init__(self, image_mgr=None):
super(FakeServerManager, self).__init__()
self.images = image_mgr or FakeImageManager()
def get(self, resource_uuid):
server = self.cache.get(resource_uuid)
if server is not None:
return server
raise nova_exceptions.NotFound("Server %s not found" % (resource_uuid))
def _create(self, server_class=FakeServer, name=None):
server = self._cache(server_class(self))
if name is not None:
server.name = name
return server
def create(self, name, image_id, flavor_id, **kwargs):
return self._create(name=name)
def create_image(self, server, name):
image = self.images._create()
return image.uuid
def add_floating_ip(self, server, fip):
pass
def remove_floating_ip(self, server, fip):
pass
def delete(self, resource):
if not isinstance(resource, six.string_types):
resource = resource.id
cached = self.get(resource)
if cached is not None:
cached.status = "DELETED"
del self.cache[resource]
self.resources_order.remove(resource)
class FakeImageManager(FakeManager):
def __init__(self):
super(FakeImageManager, self).__init__()
def get(self, resource_uuid):
image = self.cache.get(resource_uuid)
if image is not None:
return image
raise exc.HTTPNotFound("Image %s not found" % (resource_uuid))
def _create(self, image_class=FakeImage, name=None, id=None):
image = self._cache(image_class(self))
image.owner = "dummy"
image.id = image.uuid
if name is not None:
image.name = name
return image
def create(self, name, copy_from, container_format, disk_format):
return self._create(name=name)
def delete(self, resource):
if not isinstance(resource, six.string_types):
resource = resource.id
cached = self.get(resource)
if cached is not None:
cached.status = "DELETED"
del self.cache[resource]
self.resources_order.remove(resource)
class FakeStrategyManager(FakeManager):
def get(self, resource_name):
for key in self.resources_order:
if self.cache[key].name == resource_name:
return self.cache[key]
class FakeGoalManager(FakeManager):
def get(self, resource_name):
for key in self.resources_order:
if self.cache[key].name == resource_name:
return self.cache[key]
class FakePackageManager(FakeManager):
def create(self, package_descr, package_arch, package_class=FakeMurano):
package = self._cache(package_class(self))
package.name = list(package_arch.keys())[0]
return package
class FakeFloatingIPsManager(FakeManager):
def create(self):
return FakeFloatingIP(self)
class FakeFloatingIPPoolsManager(FakeManager):
def create(self):
return FakeFloatingIPPool(self)
class FakeTenantsManager(FakeManager):
def create(self, name):
return self._cache(FakeTenant(self, name))
def update(self, tenant_id, name=None, description=None):
tenant = self.get(tenant_id)
name = name or (tenant.name + "_updated")
desc = description or (tenant.name + "_description_updated")
tenant.name = name
tenant.description = desc
return self._cache(tenant)
class FakeNetworkManager(FakeManager):
def create(self, net_id):
net = FakeNetwork(self)
net.id = net_id
return self._cache(net)
class FakeFlavorManager(FakeManager):
def create(self):
flv = FakeFlavor(self)
return self._cache(flv)
class FakeKeypairManager(FakeManager):
def create(self, name, public_key=None):
kp = FakeKeypair(self)
kp.name = name or kp.name
return self._cache(kp)
def delete(self, resource):
if not isinstance(resource, six.string_types):
resource = resource.id
cached = self.get(resource)
if cached is not None:
cached.status = "DELETED"
del self.cache[resource]
self.resources_order.remove(resource)
class FakeClusterTemplateManager(FakeManager):
def create(self, name):
cluster_template = FakeClusterTemplate(self)
cluster_template.name = name or cluster_template.name
return self._cache(cluster_template)
def delete(self, resource):
if not isinstance(resource, six.string_types):
resource = resource.id
cached = self.get(resource)
if cached is not None:
del self.cache[resource]
self.resources_order.remove(resource)
class FakeStackManager(FakeManager):
def create(self, name):
stack = FakeStack(self)
stack.name = name or stack.name
return self._cache(stack)
def delete(self, resource):
if not isinstance(resource, six.string_types):
resource = resource.id
cached = self.get(resource)
if cached is not None:
cached.status = "DELETE_COMPLETE"
del self.cache[resource]
self.resources_order.remove(resource)
class FakeDomainManager(FakeManager):
def create(self, name):
domain = FakeDomain(self)
domain.name = name or domain.name
return self._cache(domain)
def delete(self, resource):
if not isinstance(resource, six.string_types):
resource = resource.id
cached = self.get(resource)
if cached is not None:
cached.status = "DELETE_COMPLETE"
del self.cache[resource]
self.resources_order.remove(resource)
class FakeNovaQuotasManager(FakeManager):
def update(self, tenant_id, **kwargs):
fq = FakeQuotas(self)
return self._cache(fq)
def delete(self, tenant_id):
pass
class FakeCinderQuotasManager(FakeManager):
def update(self, tenant_id, **kwargs):
fq = FakeQuotas(self)
return self._cache(fq)
def delete(self, tenant_id):
pass
class FakeSecurityGroupManager(FakeManager):
def __init__(self, rule_manager=None):
super(FakeSecurityGroupManager, self).__init__()
self.rule_manager = rule_manager
self.create("default")
def create(self, name, description=""):
sg = FakeSecurityGroup(
manager=self,
rule_manager=self.rule_manager)
sg.name = name or sg.name
sg.description = description
return self._cache(sg)
def to_dict(self, obj):
return {"id": obj.id, "name": obj.name}
def find(self, name, **kwargs):
kwargs["name"] = name
for resource in self.cache.values():
match = True
for key, value in kwargs.items():
if getattr(resource, key, None) != value:
match = False
break
if match:
return resource
raise nova_exceptions.NotFound("Security Group not found")
def delete(self, resource):
if not isinstance(resource, six.string_types):
resource = resource.id
cached = self.get(resource)
if cached is not None:
cached.status = "DELETED"
del self.cache[resource]
self.resources_order.remove(resource)
class FakeSecurityGroupRuleManager(FakeManager):
def __init__(self):
super(FakeSecurityGroupRuleManager, self).__init__()
def create(self, parent_group_id, **kwargs):
kwargs["parent_group_id"] = parent_group_id
sgr = FakeSecurityGroupRule(self, **kwargs)
return self._cache(sgr)
class FakeUsersManager(FakeManager):
def create(self, username, password, email, tenant_id):
user = FakeUser(manager=self, name=username)
user.name = username or user.name
return self._cache(user)
class FakeServicesManager(FakeManager):
def list(self):
return []
class FakeVolumeManager(FakeManager):
def __init__(self):
super(FakeVolumeManager, self).__init__()
self.__volumes = {}
self.__tenant_id = generate_uuid()
def create(self, size=None, **kwargs):
volume = FakeVolume(self)
volume.size = size or 1
volume.name = kwargs.get("display_name", volume.name)
volume.status = "available"
volume.tenant_id = self.__tenant_id
self.__volumes[volume.id] = volume
return self._cache(volume)
def list(self):
return self.__volumes.values()
def delete(self, resource):
super(FakeVolumeManager, self).delete(resource.id)
del self.__volumes[resource.id]
class FakeVolumeTypeManager(FakeManager):
def create(self, name):
vol_type = FakeVolumeType(self)
vol_type.name = name or vol_type.name
return self._cache(vol_type)
class FakeVolumeTransferManager(FakeManager):
def __init__(self):
super(FakeVolumeTransferManager, self).__init__()
self.__volume_transfers = {}
def list(self):
return self.__volume_transfers.values()
def create(self, name):
transfer = FakeVolumeTransfer(self)
transfer.name = name or transfer.name
self.__volume_transfers[transfer.id] = transfer
return self._cache(transfer)
def delete(self, resource):
super(FakeVolumeTransferManager, self).delete(resource.id)
del self.__volume_transfers[resource.id]
class FakeVolumeSnapshotManager(FakeManager):
def __init__(self):
super(FakeVolumeSnapshotManager, self).__init__()
self.__snapshots = {}
self.__tenant_id = generate_uuid()
def create(self, name, force=False, display_name=None):
snapshot = FakeVolumeSnapshot(self)
snapshot.name = name or snapshot.name
snapshot.status = "available"
snapshot.tenant_id = self.__tenant_id
self.__snapshots[snapshot.id] = snapshot
return self._cache(snapshot)
def list(self):
return self.__snapshots.values()
def delete(self, resource):
super(FakeVolumeSnapshotManager, self).delete(resource.id)
del self.__snapshots[resource.id]
class FakeVolumeBackupManager(FakeManager):
def __init__(self):
super(FakeVolumeBackupManager, self).__init__()
self.__backups = {}
self.__tenant_id = generate_uuid()
def create(self, name):
backup = FakeVolumeBackup(self)
backup.name = name or backup.name
self.__backups[backup.id] = backup
return self._cache(backup)
def list(self):
return self.__backups.values()
def delete(self, resource):
super(FakeVolumeBackupManager, self).delete(resource.id)
del self.__backups[resource.id]
class FakeRolesManager(FakeManager):
def create(self, role_id, name):
role = FakeRole(self)
role.name = name
role.id = role_id
return self._cache(role)
def roles_for_user(self, user, tenant):
role = FakeRole(self)
role.name = "admin"
return [role, ]
def add_user_role(self, user, role, tenant):
pass
class FakeMetricManager(FakeManager):
def create(self, **kwargs):
metric = FakeMetric(self, **kwargs)
return self._cache(metric)
def get(self, metric_id):
metric = self.find(metric_id=metric_id)
return [metric]
class FakeAlarmManager(FakeManager):
def get(self, alarm_id):
alarm = self.find(alarm_id=alarm_id)
if alarm:
return [alarm]
raise ceilometer_exc.HTTPNotFound(
"Alarm with %s not found" % (alarm_id))
def update(self, alarm_id, **fake_alarm_dict_diff):
alarm = self.get(alarm_id)[0]
for attr, value in fake_alarm_dict_diff.items():
setattr(alarm, attr, value)
return alarm
def create(self, **kwargs):
alarm = FakeAlarm(self, **kwargs)
return self._cache(alarm)
def delete(self, alarm_id):
alarm = self.find(alarm_id=alarm_id)
if alarm is not None:
alarm.status = "DELETED"
del self.cache[alarm.id]
self.resources_order.remove(alarm.id)
def get_state(self, alarm_id):
alarm = self.find(alarm_id=alarm_id)
if alarm is not None:
return getattr(alarm, "state", "fake-alarm-state")
def get_history(self, alarm_id):
return ["fake-alarm-history"]
def set_state(self, alarm_id, state):
alarm = self.find(alarm_id=alarm_id)
if alarm is not None:
return setattr(alarm, "state", state)
class FakeSampleManager(FakeManager):
def create(self, **kwargs):
sample = FakeSample(self, **kwargs)
return [self._cache(sample)]
def list(self):
return ["fake-samples"]
class FakeMeterManager(FakeManager):
def list(self):
return ["fake-meter"]
class FakeMetricsManager(FakeManager):
def list(self):
return ["fake-metric"]
class FakeCeilometerResourceManager(FakeManager):
def get(self, resource_id):
return ["fake-resource-info"]
def list(self):
return ["fake-resource"]
class FakeStatisticsManager(FakeManager):
def list(self, meter):
return ["%s-statistics" % meter]
class FakeQueryManager(FakeManager):
def query(self, filter, orderby, limit):
return ["fake-query-result"]
class FakeQueuesManager(FakeManager):
def __init__(self):
super(FakeQueuesManager, self).__init__()
self.__queues = {}
def create(self, name):
queue = FakeQueue(self, name)
self.__queues[queue.name] = queue
return self._cache(queue)
def list(self):
return self.__queues.values()
def delete(self, queue):
super(FakeQueuesManager, self).delete(queue.name)
del self.__queues[queue.name]
class FakeDbInstanceManager(FakeManager):
def __init__(self):
super(FakeDbInstanceManager, self).__init__()
self.__db_instances = {}
def create(self, name, flavor_id, size):
instance = FakeDbInstance(self)
instance.name = name or instance.name
instance.flavor_id = flavor_id
instance.size = size
return self._cache(instance)
def list(self):
return self.__db_instances.values()
def delete(self, resource):
if not isinstance(resource, six.string_types):
resource = resource.id
cached = self.get(resource)
if cached is not None:
cached.status = "DELETE_COMPLETE"
del self.cache[resource]
self.resources_order.remove(resource)
class FakeMessagesManager(FakeManager):
def __init__(self, queue="myqueue"):
super(FakeMessagesManager, self).__init__()
self.__queue = queue
self.__messages = {}
def create(self, **kwargs):
message = FakeMessage(self, **kwargs)
self.__messages[message.id] = message
return self._cache(message)
def list(self):
return self.__messages.values()
def delete(self, message):
super(FakeMessagesManager, self).delete(message.id)
del self.__messages[message.id]
class FakeAvailabilityZonesManager(FakeManager):
def __init__(self):
super(FakeAvailabilityZonesManager, self).__init__()
self.zones = FakeAvailabilityZone()
def list(self):
return [self.zones]
class FakeWorkbookManager(FakeManager):
def __init__(self):
super(FakeWorkbookManager, self).__init__()
self.workbook = FakeWorkbook()
def list(self):
return [self.workbook]
class FakeWorkflowManager(FakeManager):
def __init__(self):
super(FakeWorkflowManager, self).__init__()
self.workflow = FakeWorkflow()
def list(self):
return [self.workflow]
class FakeExecutionManager(FakeManager):
def __init__(self):
super(FakeExecutionManager, self).__init__()
self.execution = FakeExecution()
def list(self):
return [self.execution]
def create(self):
return self.execution
class FakeObjectManager(FakeManager):
def get_account(self, **kwargs):
containers = self.list()
return (mock.MagicMock(), [{"name": con.name} for con in containers])
def get_container(self, name, **kwargs):
container = self.find(name=name)
if container is None:
raise swift_exceptions.ClientException("Container GET failed")
return (mock.MagicMock(), [{"name": obj} for obj in container.items])
def put_container(self, name, **kwargs):
if self.find(name=name):
raise swift_exceptions.ClientException("Container PUT failed")
self._cache(FakeObject(name=name))
def delete_container(self, name, **kwargs):
container = self.find(name=name)
if container is None or len(container.items.keys()) > 0:
raise swift_exceptions.ClientException("Container DELETE failed")
self.delete(container.uuid)
def get_object(self, container_name, object_name, **kwargs):
container = self.find(name=container_name)
if container is None or object_name not in container.items:
raise swift_exceptions.ClientException("Object GET failed")
return (mock.MagicMock(), container.items[object_name])
def put_object(self, container_name, object_name, content, **kwargs):
container = self.find(name=container_name)
if container is None:
raise swift_exceptions.ClientException("Object PUT failed")
container.items[object_name] = content
return mock.MagicMock()
def delete_object(self, container_name, object_name, **kwargs):
container = self.find(name=container_name)
if container is None or object_name not in container.items:
raise swift_exceptions.ClientException("Object DELETE failed")
del container.items[object_name]
class FakeServiceCatalog(object):
def get_credentials(self):
return {"image": [{"publicURL": "http://fake.to"}],
"metering": [{"publicURL": "http://fake.to"}],
"monitoring": [{"publicURL": "http://fake.to"}]}
def url_for(self, **kwargs):
return "http://fake.to"
class FakeGlanceClient(object):
def __init__(self):
self.images = FakeImageManager()
class FakeMuranoClient(object):
def __init__(self):
self.packages = FakePackageManager()
class FakeCinderClient(object):
def __init__(self):
self.volumes = FakeVolumeManager()
self.volume_types = FakeVolumeTypeManager()
self.transfers = FakeVolumeTransferManager()
self.volume_snapshots = FakeVolumeSnapshotManager()
self.backups = FakeVolumeBackupManager()
self.quotas = FakeCinderQuotasManager()
class FakeNovaClient(object):
def __init__(self, failed_server_manager=False):
self.images = FakeImageManager()
self.servers = FakeServerManager(self.images)
self.floating_ips = FakeFloatingIPsManager()
self.floating_ip_pools = FakeFloatingIPPoolsManager()
self.networks = FakeNetworkManager()
self.flavors = FakeFlavorManager()
self.keypairs = FakeKeypairManager()
self.security_group_rules = FakeSecurityGroupRuleManager()
self.security_groups = FakeSecurityGroupManager(
rule_manager=self.security_group_rules)
self.quotas = FakeNovaQuotasManager()
self.set_management_url = mock.MagicMock()
self.availability_zones = FakeAvailabilityZonesManager()
class FakeHeatClient(object):
def __init__(self):
self.stacks = FakeStackManager()
class FakeDesignateClient(object):
def __init__(self):
self.domains = FakeDomainManager()
class FakeKeystoneClient(object):
def __init__(self):
self.tenants = FakeTenantsManager()
self.users = FakeUsersManager()
self.roles = FakeRolesManager()
self.project_id = "abc123"
self.auth_url = "http://example.com:5000/v2.0/"
self.auth_token = "fake"
self.auth_user_id = generate_uuid()
self.auth_tenant_id = generate_uuid()
self.service_catalog = FakeServiceCatalog()
self.services = FakeServicesManager()
self.region_name = "RegionOne"
self.auth_ref = mock.Mock()
self.auth_ref.role_names = ["admin"]
self.version = "v2.0"
self.session = mock.MagicMock()
self.authenticate = mock.MagicMock()
def authenticate(self):
return True
def list_users(self):
return self.users.list()
def list_projects(self):
return self.tenants.list()
def list_services(self):
return self.services.list()
def list_roles(self):
return self.roles.list()
def delete_user(self, uuid):
return self.users.delete(uuid)
class FakeCeilometerClient(object):
def __init__(self):
self.alarms = FakeAlarmManager()
self.meters = FakeMeterManager()
self.resources = FakeCeilometerResourceManager()
self.statistics = FakeStatisticsManager()
self.samples = FakeSampleManager()
self.query_alarms = FakeQueryManager()
self.query_samples = FakeQueryManager()
self.query_alarm_history = FakeQueryManager()
class FakeGnocchiClient(object):
def __init__(self):
self.metric = FakeMetricManager()
class FakeMonascaClient(object):
def __init__(self):
self.metrics = FakeMetricsManager()
class FakeNeutronClient(object):
def __init__(self, **kwargs):
self.__networks = {}
self.__subnets = {}
self.__routers = {}
self.__ports = {}
self.__pools = {}
self.__vips = {}
self.__fips = {}
self.__healthmonitors = {}
self.__tenant_id = kwargs.get("tenant_id", generate_uuid())
self.format = "json"
self.version = "2.0"
@staticmethod
def _filter(resource_list, search_opts):
return [res for res in resource_list
if all(res[field] == value
for field, value in search_opts.items())]
def add_interface_router(self, router_id, data):
subnet_id = data["subnet_id"]
if (router_id not in self.__routers or
subnet_id not in self.__subnets):
raise neutron_exceptions.NeutronClientException
subnet = self.__subnets[subnet_id]
port = self.create_port(
{"port": {"network_id": subnet["network_id"]}})["port"]
port["device_id"] = router_id
port["fixed_ips"].append({"subnet_id": subnet_id,
"ip_address": subnet["gateway_ip"]})
return {"subnet_id": subnet_id,
"tenant_id": port["tenant_id"],
"port_id": port["id"],
"id": router_id}
def create_network(self, data):
network = setup_dict(data["network"],
defaults={"name": generate_name("net_"),
"admin_state_up": True})
network_id = generate_uuid()
network.update({"id": network_id,
"status": "ACTIVE",
"subnets": [],
"provider:physical_network": None,
"tenant_id": self.__tenant_id,
"provider:network_type": "local",
"router:external": True,
"shared": False,
"provider:segmentation_id": None})
self.__networks[network_id] = network
return {"network": network}
def create_pool(self, data):
pool = setup_dict(data["pool"],
required=["lb_method", "protocol", "subnet_id"],
defaults={"name": generate_name("pool_"),
"admin_state_up": True})
if pool["subnet_id"] not in self.__subnets:
raise neutron_exceptions.NeutronClientException
pool_id = generate_uuid()
pool.update({"id": pool_id,
"status": "PENDING_CREATE",
"tenant_id": self.__tenant_id})
self.__pools[pool_id] = pool
return {"pool": pool}
def create_vip(self, data):
vip = setup_dict(data["vip"],
required=["protocol_port", "protocol", "subnet_id",
"pool_id"],
defaults={"name": generate_name("vip_"),
"admin_state_up": True})
if (vip["subnet_id"] not in self.__subnets) or (vip["pool_id"] not in
self.__pools):
raise neutron_exceptions.NeutronClientException
vip_id = generate_uuid()
vip.update({"id": vip_id,
"status": "PENDING_CREATE",
"tenant_id": self.__tenant_id})
self.__vips[vip_id] = vip
return {"vip": vip}
def create_floatingip(self, data):
fip = setup_dict(data["floatingip"],
required=["floating_network"],
defaults={"admin_state_up": True})
if (fip["floating_network"] not in self.__nets):
raise neutron_exceptions.NeutronClientException
fip_id = generate_uuid()
fip.update({"id": fip_id,
"tenant_id": self.__tenant_id})
self.__fips[fip_id] = fip
return {"fip": fip}
def create_health_monitor(self, data):
healthmonitor = setup_dict(data["healthmonitor"],
required=["type", "timeout", "delay",
"max_retries"],
defaults={"admin_state_up": True})
healthmonitor_id = generate_uuid()
healthmonitor.update({"id": healthmonitor_id,
"status": "PENDING_CREATE",
"tenant_id": self.__tenant_id})
self.__healthmonitors[healthmonitor_id] = healthmonitor
return {"healthmonitor": healthmonitor}
def create_port(self, data):
port = setup_dict(data["port"],
required=["network_id"],
defaults={"name": generate_name("port_"),
"admin_state_up": True})
if port["network_id"] not in self.__networks:
raise neutron_exceptions.NeutronClientException
port_id = generate_uuid()
port.update({"id": port_id,
"status": "ACTIVE",
"binding:host_id": "fakehost",
"extra_dhcp_opts": [],
"binding:vnic_type": "normal",
"binding:vif_type": "ovs",
"device_owner": "",
"mac_address": generate_mac(),
"binding:profile": {},
"binding:vif_details": {u"port_filter": True},
"security_groups": [],
"fixed_ips": [],
"device_id": "",
"tenant_id": self.__tenant_id,
"allowed_address_pairs": []})
self.__ports[port_id] = port
return {"port": port}
def create_router(self, data):
router = setup_dict(data["router"],
defaults={"name": generate_name("router_"),
"external_gateway_info": None,
"admin_state_up": True})
router_id = generate_uuid()
router.update({"id": router_id,
"status": "ACTIVE",
"external_gateway_info": None,
"tenant_id": self.__tenant_id})
self.__routers[router_id] = router
return {"router": router}
def create_subnet(self, data):
subnet = setup_dict(
data["subnet"],
required=["network_id", "cidr", "ip_version"],
defaults={"name": generate_name("subnet_"),
"dns_nameservers": ["8.8.8.8", "8.8.4.4"]})
if subnet["network_id"] not in self.__networks:
raise neutron_exceptions.NeutronClientException
subnet_id = generate_uuid()
subnet.update({"id": subnet_id,
"enable_dhcp": True,
"tenant_id": self.__tenant_id,
"ipv6_ra_mode": None,
"allocation_pools": [],
"gateway_ip": re.sub("./.*$", "1", subnet["cidr"]),
"ipv6_address_mode": None,
"ip_version": 4,
"host_routes": []})
self.__subnets[subnet_id] = subnet
return {"subnet": subnet}
def update_resource(self, resource_id, resource_dict, data):
if resource_id not in resource_dict:
raise neutron_exceptions.NeutronClientException
self.resource_list[resource_id].update(data)
def update_network(self, network_id, data):
self.update_resource(network_id, self.__networks, data)
def update_pool(self, pool_id, data):
self.update_resource(pool_id, self.__pools, data)
def update_vip(self, vip_id, data):
self.update_resource(vip_id, self.__vips, data)
def update_health_monitor(self, healthmonitor_id, data):
self.update_resource(healthmonitor_id, self.__healthmonitors, data)
def update_subnet(self, subnet_id, data):
self.update_resource(subnet_id, self.__subnets, data)
def update_port(self, port_id, data):
self.update_resource(port_id, self.__ports, data)
def update_router(self, router_id, data):
self.update_resource(router_id, self.__routers, data)
def delete_network(self, network_id):
if network_id not in self.__networks:
raise neutron_exceptions.NeutronClientException
for port in self.__ports.values():
if port["network_id"] == network_id:
# Network is in use by port
raise neutron_exceptions.NeutronClientException
del self.__networks[network_id]
return ""
def delete_pool(self, pool_id):
if pool_id not in self.__pools:
raise neutron_exceptions.NeutronClientException
del self.__pools[pool_id]
return ""
def delete_vip(self, vip_id):
if vip_id not in self.__vips:
raise neutron_exceptions.NeutronClientException
del self.__vips[vip_id]
def delete_health_monitor(self, healthmonitor_id):
if healthmonitor_id not in self.__healthmonitors:
raise neutron_exceptions.NeutronClientException
del self.__healthmonitors[healthmonitor_id]
return ""
def delete_floatingip(self, fip_id):
if fip_id not in self.__fips:
raise neutron_exceptions.NeutronClientException
del self.__fips[fip_id]
return ""
def delete_port(self, port_id):
if port_id not in self.__ports:
raise neutron_exceptions.PortNotFoundClient
if self.__ports[port_id]["device_owner"]:
# Port is owned by some device
raise neutron_exceptions.NeutronClientException
del self.__ports[port_id]
return ""
def delete_router(self, router_id):
if router_id not in self.__routers:
raise neutron_exceptions.NeutronClientException
for port in self.__ports.values():
if port["device_id"] == router_id:
# Router has active port
raise neutron_exceptions.NeutronClientException
del self.__routers[router_id]
return ""
def delete_subnet(self, subnet_id):
if subnet_id not in self.__subnets:
raise neutron_exceptions.NeutronClientException
for port in self.__ports.values():
for fip in port["fixed_ips"]:
if fip["subnet_id"] == subnet_id:
# Subnet has IP allocation from some port
raise neutron_exceptions.NeutronClientException
del self.__subnets[subnet_id]
return ""
def list_networks(self, **search_opts):
nets = self._filter(self.__networks.values(), search_opts)
return {"networks": nets}
def list_pools(self, **search_opts):
pools = self._filter(self.__pools.values(), search_opts)
return {"pools": pools}
def list_vips(self, **search_opts):
vips = self._filter(self.__vips.values(), search_opts)
return {"vips": vips}
def list_health_monitors(self, **search_opts):
healthmonitors = self._filter(
self.__healthmonitors.values(), search_opts)
return {"healthmonitors": healthmonitors}
def list_ports(self, **search_opts):
ports = self._filter(self.__ports.values(), search_opts)
return {"ports": ports}
def list_routers(self, **search_opts):
routers = self._filter(self.__routers.values(), search_opts)
return {"routers": routers}
def list_subnets(self, **search_opts):
subnets = self._filter(self.__subnets.values(), search_opts)
return {"subnets": subnets}
def list_floatingips(self, **search_opts):
fips = self._filter(self.__fips.values(), search_opts)
return {"floatingips": fips}
def remove_interface_router(self, router_id, data):
subnet_id = data["subnet_id"]
if (router_id not in self.__routers
or subnet_id not in self.__subnets):
raise neutron_exceptions.NeutronClientException
subnet = self.__subnets[subnet_id]
for port_id, port in self.__ports.items():
if port["device_id"] == router_id:
for fip in port["fixed_ips"]:
if fip["subnet_id"] == subnet_id:
del self.__ports[port_id]
return {"subnet_id": subnet_id,
"tenant_id": subnet["tenant_id"],
"port_id": port_id,
"id": router_id}
raise neutron_exceptions.NeutronClientException
def associate_health_monitor(self, pool_id, healthmonitor_id):
if pool_id not in self.__pools:
raise neutron_exceptions.NeutronClientException
if healthmonitor_id not in self.__healthmonitors:
raise neutron_exceptions.NeutronClientException
self.__pools[pool_id]["pool"]["healthmonitors"] = healthmonitor_id
return {"pool": self.__pools[pool_id]}
def disassociate_health_monitor(self, pool_id, healthmonitor_id):
if pool_id not in self.__pools:
raise neutron_exceptions.NeutronClientException
if healthmonitor_id not in self.__healthmonitors:
raise neutron_exceptions.NeutronClientException
del self.__pools[pool_id]["pool"]["healthmonitors"][healthmonitor_id]
return ""
class FakeIronicClient(object):
def __init__(self):
# TODO(romcheg):Fake Manager subclasses to manage BM nodes.
pass
class FakeSaharaClient(object):
def __init__(self):
self.job_executions = mock.MagicMock()
self.jobs = mock.MagicMock()
self.job_binary_internals = mock.MagicMock()
self.job_binaries = mock.MagicMock()
self.data_sources = mock.MagicMock()
self.clusters = mock.MagicMock()
self.cluster_templates = mock.MagicMock()
self.node_group_templates = mock.MagicMock()
self.setup_list_methods()
def setup_list_methods(self):
mock_with_id = mock.MagicMock()
mock_with_id.id = 42
# First call of list returns a list with one object, the next should
# empty after delete.
self.job_executions.list.side_effect = [[mock_with_id], []]
self.jobs.list.side_effect = [[mock_with_id], []]
self.job_binary_internals.list.side_effect = [[mock_with_id], []]
self.job_binaries.list.side_effect = [[mock_with_id], []]
self.data_sources.list.side_effect = [[mock_with_id], []]
self.clusters.list.side_effect = [[mock_with_id], []]
self.cluster_templates.list.side_effect = [[mock_with_id], []]
self.node_group_templates.list.side_effect = [[mock_with_id], []]
class FakeZaqarClient(object):
def __init__(self):
self.queues = FakeQueuesManager()
def queue(self, name, **kwargs):
return self.queues.create(name, **kwargs)
class FakeTroveClient(object):
def __init__(self):
self.instances = FakeDbInstanceManager()
class FakeMistralClient(object):
def __init__(self):
self.workbook = FakeWorkbookManager()
self.workflow = FakeWorkflowManager()
self.execution = FakeExecutionManager()
class FakeSwiftClient(FakeObjectManager):
pass
class FakeEC2Client(object):
def __init__(self):
pass
class FakeSenlinClient(object):
def __init__(self):
# TODO(Yanyan Hu):Fake interfaces of senlinclient.
pass
class FakeMagnumClient(object):
def __init__(self):
self.cluster_templates = FakeClusterTemplateManager()
class FakeWatcherClient(object):
def __init__(self):
self.strategy = FakeStrategyManager()
self.goal = FakeGoalManager()
class FakeClients(object):
def __init__(self, credential_=None):
self._nova = None
self._glance = None
self._keystone = None
self._cinder = None
self._neutron = None
self._sahara = None
self._heat = None
self._designate = None
self._ceilometer = None
self._zaqar = None
self._trove = None
self._mistral = None
self._swift = None
self._murano = None
self._monasca = None
self._ec2 = None
self._senlin = None
self._watcher = None
self._credential = credential_ or fake_credential(
auth_url="http://fake.example.org:5000/v2.0/",
username="fake_username",
password="fake_password",
tenant_name="fake_tenant_name")
def keystone(self, version=None):
if not self._keystone:
self._keystone = FakeKeystoneClient()
return self._keystone
def verified_keystone(self):
return self.keystone()
def nova(self):
if not self._nova:
self._nova = FakeNovaClient()
return self._nova
def glance(self):
if not self._glance:
self._glance = FakeGlanceClient()
return self._glance
def cinder(self):
if not self._cinder:
self._cinder = FakeCinderClient()
return self._cinder
def neutron(self):
if not self._neutron:
self._neutron = FakeNeutronClient()
return self._neutron
def sahara(self):
if not self._sahara:
self._sahara = FakeSaharaClient()
return self._sahara
def heat(self):
if not self._heat:
self._heat = FakeHeatClient()
return self._heat
def designate(self):
if not self._designate:
self._designate = FakeDesignateClient()
return self._designate
def ceilometer(self):
if not self._ceilometer:
self._ceilometer = FakeCeilometerClient()
return self._ceilometer
def monasca(self):
if not self._monasca:
self._monasca = FakeMonascaClient()
return self._monasca
def zaqar(self):
if not self._zaqar:
self._zaqar = FakeZaqarClient()
return self._zaqar
def trove(self):
if not self._trove:
self._trove = FakeTroveClient()
return self._trove
def mistral(self):
if not self._mistral:
self._mistral = FakeMistralClient()
return self._mistral
def swift(self):
if not self._swift:
self._swift = FakeSwiftClient()
return self._swift
def murano(self):
if not self._murano:
self._murano = FakeMuranoClient()
return self._murano
def ec2(self):
if not self._ec2:
self._ec2 = FakeEC2Client()
return self._ec2
def senlin(self):
if not self._senlin:
self._senlin = FakeSenlinClient()
return self._senlin
def watcher(self):
if not self._watcher:
self._watcher = FakeWatcherClient()
return self._watcher
class FakeRunner(object):
CONFIG_SCHEMA = {
"type": "object",
"$schema": consts.JSON_SCHEMA,
"properties": {
"type": {
"type": "string",
"enum": ["fake"]
},
"a": {
"type": "string"
},
"b": {
"type": "number"
}
},
"required": ["type", "a"]
}
class FakeScenario(scenario.Scenario):
def idle_time(self):
return 0
def do_it(self, **kwargs):
pass
def with_output(self, **kwargs):
return {"data": {"a": 1}, "error": None}
def with_add_output(self):
self.add_output(additive={"title": "Additive",
"description": "Additive description",
"data": [["a", 1]],
"chart_plugin": "FooPlugin"},
complete={"title": "Complete",
"description": "Complete description",
"data": [["a", [[1, 2], [2, 3]]]],
"chart_plugin": "BarPlugin"})
def too_long(self, **kwargs):
pass
def something_went_wrong(self, **kwargs):
raise Exception("Something went wrong")
def raise_timeout(self, **kwargs):
raise multiprocessing.TimeoutError()
@scenario.configure(name="classbased.fooscenario")
class FakeClassBasedScenario(FakeScenario):
"""Fake class-based scenario."""
def run(self, *args, **kwargs):
pass
class FakeTimer(rally_utils.Timer):
def duration(self):
return 10
def timestamp(self):
return 0
def finish_timestamp(self):
return 3
@context.configure(name="fake", order=1)
class FakeContext(context.Context):
CONFIG_SCHEMA = {
"type": "object",
"$schema": consts.JSON_SCHEMA,
"properties": {
"test": {
"type": "integer"
},
},
"additionalProperties": False
}
def __init__(self, context_obj=None):
context_obj = context_obj or {}
context_obj.setdefault("config", {})
context_obj["config"].setdefault("fake", None)
context_obj.setdefault("task", mock.MagicMock())
super(FakeContext, self).__init__(context_obj)
def setup(self):
pass
def cleanup(self):
pass
@context.configure(name="fake_hidden_context", order=1, hidden=True)
class FakeHiddenContext(FakeContext):
pass
@context.configure(name="fake_user_context", order=1)
class FakeUserContext(FakeContext):
admin = {
"id": "adminuuid",
"credential": fake_credential(
auth_url="aurl",
username="aname",
password="apwd",
tenant_name="atenant")
}
user = {
"id": "uuid",
"credential": fake_credential(
auth_url="url",
username="name",
password="pwd",
tenant_name="tenant"),
"tenant_id": "uuid"
}
tenants = {"uuid": {"name": "tenant"}}
def __init__(self, ctx):
super(FakeUserContext, self).__init__(ctx)
self.context.setdefault("admin", FakeUserContext.admin)
self.context.setdefault("users", [FakeUserContext.user])
self.context.setdefault("tenants", FakeUserContext.tenants)
self.context.setdefault(
"scenario_name", "NovaServers.boot_server_from_volume_and_delete")
class FakeDeployment(dict):
def __init__(self, **kwargs):
namespace = kwargs.pop("namespace", "openstack")
kwargs["credentials"] = {
namespace: [{"admin": kwargs.pop("admin", None),
"users": kwargs.pop("users", [])}],
"default": [{"admin": None, "users": []}]}
dict.__init__(self, **kwargs)
self.update_status = mock.Mock()
def get_platforms(self):
return [platform for platform in self["credentials"]]
def get_credentials_for(self, namespace):
return self["credentials"][namespace][0]
class FakeTask(dict, object):
def __init__(self, task=None, temporary=False, **kwargs):
self.is_temporary = temporary
self.set_failed = mock.Mock()
self.set_validation_failed = mock.Mock()
task = task or {}
for k, v in itertools.chain(task.items(), kwargs.items()):
self[k] = v
self.task = self
def to_dict(self):
return self
class FakeAPI(object):
def __init__(self):
self._deployment = mock.create_autospec(api._Deployment)
self._task = mock.create_autospec(api._Task)
self._verifier = mock.create_autospec(api._Verifier)
self._verification = mock.create_autospec(api._Verification)
@property
def deployment(self):
return self._deployment
@property
def task(self):
return self._task
@property
def verifier(self):
return self._verifier
@property
def verification(self):
return self._verification