rally-openstack/tests/unit/fakes.py
Boris Pavlovic 76a1e23df6 [plugins] Introduce <name>@<platform> syntax
Simplify and unify full naming of plugins
The new logic is pretty strightforward and simple to understand:

1) Full name provided
   -> Try to find exact match or raise NotFound
2) Only name provided
   -> Finds all plugins with exact name match:
      A) if one found -> return it
      B) if multiple found -> raise error MultiMatch with infomration
         of platforms
      c) if none found -> return error

This approach is going to force people to be explicit about platform
when they use multiple platforms in same task if there are name colisions

However, it won't be redunant in case when you are using one platform and
don't want to explicitly specify it.

Auto-magic discovering of platform should be disabled because:
1) it's unclear how it works and hard to explain which is very bad
2) is going to produce random limitations and unexpected behaviors

   One of example is if we decided to write large libs for:
   HTTP, DNS, FTP, and other kind of scenarios and context and have
   separated platforms for them e.g. @http, @dns, @ftp, ....
   they are going to be run against "default" platform which doesn't
   contain any information however they are not @default platform

Change-Id: I8ae5369bcf5ef3aba0fa39a95d8c40cf75edb4bf
2017-09-20 09:55:06 -07:00

1906 lines
55 KiB
Python

# Copyright 2013: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import itertools
import multiprocessing
import random
import re
import string
import uuid
from ceilometerclient import exc as ceilometer_exc
from glanceclient import exc
import mock
from neutronclient.common import exceptions as neutron_exceptions
from novaclient import exceptions as nova_exceptions
import six
from swiftclient import exceptions as swift_exceptions
from rally import api
from rally.common import utils as rally_utils
from rally import consts
from rally.task import context
from rally.task import scenario
def generate_uuid():
return str(uuid.uuid4())
def generate_name(prefix="", length=12, choices=string.ascii_lowercase):
"""Generate pseudo-random name.
:param prefix: str, custom prefix for genertated name
:param length: int, length of autogenerated part of result name
:param choices: str, chars that accurs in generated name
:returns: str, pseudo-random name
"""
return prefix + "".join(random.choice(choices) for i in range(length))
def generate_mac():
"""Generate pseudo-random MAC address.
:returns: str, MAC address
"""
rand_str = generate_name(choices="0123456789abcdef", length=12)
return ":".join(re.findall("..", rand_str))
def setup_dict(data, required=None, defaults=None):
"""Setup and validate dict scenario_base. on mandatory keys and default data.
This function reduces code that constructs dict objects
with specific schema (e.g. for API data).
:param data: dict, input data
:param required: list, mandatory keys to check
:param defaults: dict, default data
:returns: dict, with all keys set
:raises IndexError, ValueError: If input data is incorrect
"""
required = required or []
for i in set(required) - set(data):
raise IndexError("Missed: %s" % i)
defaults = defaults or {}
for i in set(data) - set(required) - set(defaults):
raise ValueError("Unexpected: %s" % i)
defaults.update(data)
return defaults
def fake_credential(**config):
m = mock.Mock()
m.to_dict.return_value = config
for key, value in config.items():
setattr(m, key, value)
return m
class FakeResource(object):
def __init__(self, manager=None, name=None, status="ACTIVE", items=None,
deployment_uuid=None, id=None):
self.name = name or generate_uuid()
self.status = status
self.manager = manager
self.uuid = generate_uuid()
self.id = id or self.uuid
self.items = items or {}
self.deployment_uuid = deployment_uuid or generate_uuid()
def __getattr__(self, name):
# NOTE(msdubov): e.g. server.delete() -> manager.delete(server)
def manager_func(*args, **kwargs):
return getattr(self.manager, name)(self, *args, **kwargs)
return manager_func
def __getitem__(self, key):
return self.items[key]
class FakeServer(FakeResource):
def suspend(self):
self.status = "SUSPENDED"
def lock(self):
setattr(self, "OS-EXT-STS:locked", True)
def unlock(self):
setattr(self, "OS-EXT-STS:locked", False)
class FakeImage(FakeResource):
def __init__(self, manager=None, id="image-id-0", min_ram=0,
size=0, min_disk=0, status="active", name=None):
super(FakeImage, self).__init__(manager, id=id, name=name)
self.min_ram = min_ram
self.size = size
self.min_disk = min_disk
self.status = status
self.update = mock.MagicMock()
class FakeStrategy(FakeResource):
pass
class FakeGoal(FakeResource):
pass
class FakeMurano(FakeResource):
pass
class FakeFloatingIP(FakeResource):
pass
class FakeFloatingIPPool(FakeResource):
pass
class FakeTenant(FakeResource):
def __init__(self, manager, name):
super(FakeTenant, self).__init__(manager, name=name)
class FakeUser(FakeResource):
pass
class FakeService(FakeResource):
pass
class FakeNetwork(FakeResource):
pass
class FakeFlavor(FakeResource):
def __init__(self, id="flavor-id-0", manager=None, ram=0, disk=0, vcpus=1,
name="flavor-name-0"):
super(FakeFlavor, self).__init__(manager, id=id)
self.ram = ram
self.disk = disk
self.vcpus = vcpus
self.name = name
class FakeKeypair(FakeResource):
pass
class FakeStack(FakeResource):
pass
class FakeDomain(FakeResource):
pass
class FakeQuotas(FakeResource):
pass
class FakeSecurityGroup(FakeResource):
def __init__(self, manager=None, rule_manager=None, id=None, name=None):
super(FakeSecurityGroup, self).__init__(manager, id=id, name=name)
self.rule_manager = rule_manager
@property
def rules(self):
return [rule for rule in self.rule_manager.list()
if rule.parent_group_id == self.id]
class FakeSecurityGroupRule(FakeResource):
def __init__(self, name, **kwargs):
super(FakeSecurityGroupRule, self).__init__(name)
if "cidr" in kwargs:
kwargs["ip_range"] = {"cidr": kwargs["cidr"]}
del kwargs["cidr"]
for key, value in kwargs.items():
self.items[key] = value
setattr(self, key, value)
class FakeMetric(FakeResource):
def __init_(self, manager=None, **kwargs):
super(FakeMetric, self).__init__(manager)
self.metric = kwargs.get("metric_name")
self.optional_args = kwargs.get("optional_args", {})
class FakeAlarm(FakeResource):
def __init__(self, manager=None, **kwargs):
super(FakeAlarm, self).__init__(manager)
self.meter_name = kwargs.get("meter_name")
self.threshold = kwargs.get("threshold")
self.state = kwargs.get("state", "fake-alarm-state")
self.alarm_id = kwargs.get("alarm_id", "fake-alarm-id")
self.state = kwargs.get("state", "ok")
self.optional_args = kwargs.get("optional_args", {})
class FakeSample(FakeResource):
def __init__(self, manager=None, **kwargs):
super(FakeSample, self).__init__(manager)
self.counter_name = kwargs.get("counter_name", "fake-counter-name")
self.counter_type = kwargs.get("counter_type", "fake-counter-type")
self.counter_unit = kwargs.get("counter_unit", "fake-counter-unit")
self.counter_volume = kwargs.get("counter_volume", 100)
@property
def resource_id(self):
return "fake-resource-id"
def to_dict(self):
return {"counter_name": self.counter_name,
"counter_type": self.counter_type,
"counter_unit": self.counter_unit,
"counter_volume": self.counter_volume,
"resource_id": self.resource_id}
class FakeVolume(FakeResource):
@property
def _info(self):
return {"id": "uuid"}
class FakeVolumeType(FakeResource):
pass
class FakeVolumeTransfer(FakeResource):
pass
class FakeVolumeSnapshot(FakeResource):
pass
class FakeVolumeBackup(FakeResource):
pass
class FakeRole(FakeResource):
pass
class FakeQueue(FakeResource):
def __init__(self, manager=None, name="myqueue"):
super(FakeQueue, self).__init__(manager, name)
self.queue_name = name
self.messages = FakeMessagesManager(name)
def post(self, messages):
for msg in messages:
self.messages.create(**msg)
def messages(self):
return self.messages.list()
class FakeDbInstance(FakeResource):
pass
class FakeMessage(FakeResource):
def __init__(self, manager=None, **kwargs):
super(FakeMessage, self).__init__(manager)
self.body = kwargs.get("body", "fake-body")
self.ttl = kwargs.get("ttl", 100)
class FakeAvailabilityZone(FakeResource):
def __init__(self, manager=None):
super(FakeAvailabilityZone, self).__init__(manager)
self.zoneName = mock.MagicMock()
self.zoneState = mock.MagicMock()
self.hosts = mock.MagicMock()
class FakeWorkbook(FakeResource):
def __init__(self, manager=None):
super(FakeWorkbook, self).__init__(manager)
self.workbook = mock.MagicMock()
class FakeWorkflow(FakeResource):
def __init__(self, manager=None):
super(FakeWorkflow, self).__init__(manager)
self.workflow = mock.MagicMock()
class FakeExecution(FakeResource):
def __init__(self, manager=None):
super(FakeExecution, self).__init__(manager)
self.execution = mock.MagicMock()
class FakeObject(FakeResource):
pass
class FakeClusterTemplate(FakeResource):
pass
class FakeManager(object):
def __init__(self):
super(FakeManager, self).__init__()
self.cache = {}
self.resources_order = []
def get(self, resource_uuid):
return self.cache.get(resource_uuid)
def delete(self, resource_uuid):
cached = self.get(resource_uuid)
if cached is not None:
cached.status = "DELETED"
del self.cache[resource_uuid]
self.resources_order.remove(resource_uuid)
def _cache(self, resource):
self.resources_order.append(resource.uuid)
self.cache[resource.uuid] = resource
return resource
def list(self, **kwargs):
return [self.cache[key] for key in self.resources_order]
def find(self, **kwargs):
for resource in self.cache.values():
match = True
for key, value in kwargs.items():
if getattr(resource, key, None) != value:
match = False
break
if match:
return resource
class FakeServerManager(FakeManager):
def __init__(self, image_mgr=None):
super(FakeServerManager, self).__init__()
self.images = image_mgr or FakeImageManager()
def get(self, resource_uuid):
server = self.cache.get(resource_uuid)
if server is not None:
return server
raise nova_exceptions.NotFound("Server %s not found" % (resource_uuid))
def _create(self, server_class=FakeServer, name=None):
server = self._cache(server_class(self))
if name is not None:
server.name = name
return server
def create(self, name, image_id, flavor_id, **kwargs):
return self._create(name=name)
def create_image(self, server, name):
image = self.images._create()
return image.uuid
def add_floating_ip(self, server, fip):
pass
def remove_floating_ip(self, server, fip):
pass
def delete(self, resource):
if not isinstance(resource, six.string_types):
resource = resource.id
cached = self.get(resource)
if cached is not None:
cached.status = "DELETED"
del self.cache[resource]
self.resources_order.remove(resource)
class FakeImageManager(FakeManager):
def __init__(self):
super(FakeImageManager, self).__init__()
def get(self, resource_uuid):
image = self.cache.get(resource_uuid)
if image is not None:
return image
raise exc.HTTPNotFound("Image %s not found" % (resource_uuid))
def _create(self, image_class=FakeImage, name=None, id=None):
image = self._cache(image_class(self))
image.owner = "dummy"
image.id = image.uuid
if name is not None:
image.name = name
return image
def create(self, name, copy_from, container_format, disk_format):
return self._create(name=name)
def delete(self, resource):
if not isinstance(resource, six.string_types):
resource = resource.id
cached = self.get(resource)
if cached is not None:
cached.status = "DELETED"
del self.cache[resource]
self.resources_order.remove(resource)
class FakeStrategyManager(FakeManager):
def get(self, resource_name):
for key in self.resources_order:
if self.cache[key].name == resource_name:
return self.cache[key]
class FakeGoalManager(FakeManager):
def get(self, resource_name):
for key in self.resources_order:
if self.cache[key].name == resource_name:
return self.cache[key]
class FakePackageManager(FakeManager):
def create(self, package_descr, package_arch, package_class=FakeMurano):
package = self._cache(package_class(self))
package.name = list(package_arch.keys())[0]
return package
class FakeFloatingIPsManager(FakeManager):
def create(self):
return FakeFloatingIP(self)
class FakeFloatingIPPoolsManager(FakeManager):
def create(self):
return FakeFloatingIPPool(self)
class FakeTenantsManager(FakeManager):
def create(self, name):
return self._cache(FakeTenant(self, name))
def update(self, tenant_id, name=None, description=None):
tenant = self.get(tenant_id)
name = name or (tenant.name + "_updated")
desc = description or (tenant.name + "_description_updated")
tenant.name = name
tenant.description = desc
return self._cache(tenant)
class FakeNetworkManager(FakeManager):
def create(self, net_id):
net = FakeNetwork(self)
net.id = net_id
return self._cache(net)
class FakeFlavorManager(FakeManager):
def create(self):
flv = FakeFlavor(self)
return self._cache(flv)
class FakeKeypairManager(FakeManager):
def create(self, name, public_key=None):
kp = FakeKeypair(self)
kp.name = name or kp.name
return self._cache(kp)
def delete(self, resource):
if not isinstance(resource, six.string_types):
resource = resource.id
cached = self.get(resource)
if cached is not None:
cached.status = "DELETED"
del self.cache[resource]
self.resources_order.remove(resource)
class FakeClusterTemplateManager(FakeManager):
def create(self, name):
cluster_template = FakeClusterTemplate(self)
cluster_template.name = name or cluster_template.name
return self._cache(cluster_template)
def delete(self, resource):
if not isinstance(resource, six.string_types):
resource = resource.id
cached = self.get(resource)
if cached is not None:
del self.cache[resource]
self.resources_order.remove(resource)
class FakeStackManager(FakeManager):
def create(self, name):
stack = FakeStack(self)
stack.name = name or stack.name
return self._cache(stack)
def delete(self, resource):
if not isinstance(resource, six.string_types):
resource = resource.id
cached = self.get(resource)
if cached is not None:
cached.status = "DELETE_COMPLETE"
del self.cache[resource]
self.resources_order.remove(resource)
class FakeDomainManager(FakeManager):
def create(self, name):
domain = FakeDomain(self)
domain.name = name or domain.name
return self._cache(domain)
def delete(self, resource):
if not isinstance(resource, six.string_types):
resource = resource.id
cached = self.get(resource)
if cached is not None:
cached.status = "DELETE_COMPLETE"
del self.cache[resource]
self.resources_order.remove(resource)
class FakeNovaQuotasManager(FakeManager):
def update(self, tenant_id, **kwargs):
fq = FakeQuotas(self)
return self._cache(fq)
def delete(self, tenant_id):
pass
class FakeCinderQuotasManager(FakeManager):
def update(self, tenant_id, **kwargs):
fq = FakeQuotas(self)
return self._cache(fq)
def delete(self, tenant_id):
pass
class FakeSecurityGroupManager(FakeManager):
def __init__(self, rule_manager=None):
super(FakeSecurityGroupManager, self).__init__()
self.rule_manager = rule_manager
self.create("default")
def create(self, name, description=""):
sg = FakeSecurityGroup(
manager=self,
rule_manager=self.rule_manager)
sg.name = name or sg.name
sg.description = description
return self._cache(sg)
def to_dict(self, obj):
return {"id": obj.id, "name": obj.name}
def find(self, name, **kwargs):
kwargs["name"] = name
for resource in self.cache.values():
match = True
for key, value in kwargs.items():
if getattr(resource, key, None) != value:
match = False
break
if match:
return resource
raise nova_exceptions.NotFound("Security Group not found")
def delete(self, resource):
if not isinstance(resource, six.string_types):
resource = resource.id
cached = self.get(resource)
if cached is not None:
cached.status = "DELETED"
del self.cache[resource]
self.resources_order.remove(resource)
class FakeSecurityGroupRuleManager(FakeManager):
def __init__(self):
super(FakeSecurityGroupRuleManager, self).__init__()
def create(self, parent_group_id, **kwargs):
kwargs["parent_group_id"] = parent_group_id
sgr = FakeSecurityGroupRule(self, **kwargs)
return self._cache(sgr)
class FakeUsersManager(FakeManager):
def create(self, username, password, email, tenant_id):
user = FakeUser(manager=self, name=username)
user.name = username or user.name
return self._cache(user)
class FakeServicesManager(FakeManager):
def list(self):
return []
class FakeVolumeManager(FakeManager):
def __init__(self):
super(FakeVolumeManager, self).__init__()
self.__volumes = {}
self.__tenant_id = generate_uuid()
def create(self, size=None, **kwargs):
volume = FakeVolume(self)
volume.size = size or 1
volume.name = kwargs.get("display_name", volume.name)
volume.status = "available"
volume.tenant_id = self.__tenant_id
self.__volumes[volume.id] = volume
return self._cache(volume)
def list(self):
return self.__volumes.values()
def delete(self, resource):
super(FakeVolumeManager, self).delete(resource.id)
del self.__volumes[resource.id]
class FakeVolumeTypeManager(FakeManager):
def create(self, name):
vol_type = FakeVolumeType(self)
vol_type.name = name or vol_type.name
return self._cache(vol_type)
class FakeVolumeTransferManager(FakeManager):
def __init__(self):
super(FakeVolumeTransferManager, self).__init__()
self.__volume_transfers = {}
def list(self):
return self.__volume_transfers.values()
def create(self, name):
transfer = FakeVolumeTransfer(self)
transfer.name = name or transfer.name
self.__volume_transfers[transfer.id] = transfer
return self._cache(transfer)
def delete(self, resource):
super(FakeVolumeTransferManager, self).delete(resource.id)
del self.__volume_transfers[resource.id]
class FakeVolumeSnapshotManager(FakeManager):
def __init__(self):
super(FakeVolumeSnapshotManager, self).__init__()
self.__snapshots = {}
self.__tenant_id = generate_uuid()
def create(self, name, force=False, display_name=None):
snapshot = FakeVolumeSnapshot(self)
snapshot.name = name or snapshot.name
snapshot.status = "available"
snapshot.tenant_id = self.__tenant_id
self.__snapshots[snapshot.id] = snapshot
return self._cache(snapshot)
def list(self):
return self.__snapshots.values()
def delete(self, resource):
super(FakeVolumeSnapshotManager, self).delete(resource.id)
del self.__snapshots[resource.id]
class FakeVolumeBackupManager(FakeManager):
def __init__(self):
super(FakeVolumeBackupManager, self).__init__()
self.__backups = {}
self.__tenant_id = generate_uuid()
def create(self, name):
backup = FakeVolumeBackup(self)
backup.name = name or backup.name
self.__backups[backup.id] = backup
return self._cache(backup)
def list(self):
return self.__backups.values()
def delete(self, resource):
super(FakeVolumeBackupManager, self).delete(resource.id)
del self.__backups[resource.id]
class FakeRolesManager(FakeManager):
def create(self, role_id, name):
role = FakeRole(self)
role.name = name
role.id = role_id
return self._cache(role)
def roles_for_user(self, user, tenant):
role = FakeRole(self)
role.name = "admin"
return [role, ]
def add_user_role(self, user, role, tenant):
pass
class FakeMetricManager(FakeManager):
def create(self, **kwargs):
metric = FakeMetric(self, **kwargs)
return self._cache(metric)
def get(self, metric_id):
metric = self.find(metric_id=metric_id)
return [metric]
class FakeAlarmManager(FakeManager):
def get(self, alarm_id):
alarm = self.find(alarm_id=alarm_id)
if alarm:
return [alarm]
raise ceilometer_exc.HTTPNotFound(
"Alarm with %s not found" % (alarm_id))
def update(self, alarm_id, **fake_alarm_dict_diff):
alarm = self.get(alarm_id)[0]
for attr, value in fake_alarm_dict_diff.items():
setattr(alarm, attr, value)
return alarm
def create(self, **kwargs):
alarm = FakeAlarm(self, **kwargs)
return self._cache(alarm)
def delete(self, alarm_id):
alarm = self.find(alarm_id=alarm_id)
if alarm is not None:
alarm.status = "DELETED"
del self.cache[alarm.id]
self.resources_order.remove(alarm.id)
def get_state(self, alarm_id):
alarm = self.find(alarm_id=alarm_id)
if alarm is not None:
return getattr(alarm, "state", "fake-alarm-state")
def get_history(self, alarm_id):
return ["fake-alarm-history"]
def set_state(self, alarm_id, state):
alarm = self.find(alarm_id=alarm_id)
if alarm is not None:
return setattr(alarm, "state", state)
class FakeSampleManager(FakeManager):
def create(self, **kwargs):
sample = FakeSample(self, **kwargs)
return [self._cache(sample)]
def list(self):
return ["fake-samples"]
class FakeMeterManager(FakeManager):
def list(self):
return ["fake-meter"]
class FakeMetricsManager(FakeManager):
def list(self):
return ["fake-metric"]
class FakeCeilometerResourceManager(FakeManager):
def get(self, resource_id):
return ["fake-resource-info"]
def list(self):
return ["fake-resource"]
class FakeStatisticsManager(FakeManager):
def list(self, meter):
return ["%s-statistics" % meter]
class FakeQueryManager(FakeManager):
def query(self, filter, orderby, limit):
return ["fake-query-result"]
class FakeQueuesManager(FakeManager):
def __init__(self):
super(FakeQueuesManager, self).__init__()
self.__queues = {}
def create(self, name):
queue = FakeQueue(self, name)
self.__queues[queue.name] = queue
return self._cache(queue)
def list(self):
return self.__queues.values()
def delete(self, queue):
super(FakeQueuesManager, self).delete(queue.name)
del self.__queues[queue.name]
class FakeDbInstanceManager(FakeManager):
def __init__(self):
super(FakeDbInstanceManager, self).__init__()
self.__db_instances = {}
def create(self, name, flavor_id, size):
instance = FakeDbInstance(self)
instance.name = name or instance.name
instance.flavor_id = flavor_id
instance.size = size
return self._cache(instance)
def list(self):
return self.__db_instances.values()
def delete(self, resource):
if not isinstance(resource, six.string_types):
resource = resource.id
cached = self.get(resource)
if cached is not None:
cached.status = "DELETE_COMPLETE"
del self.cache[resource]
self.resources_order.remove(resource)
class FakeMessagesManager(FakeManager):
def __init__(self, queue="myqueue"):
super(FakeMessagesManager, self).__init__()
self.__queue = queue
self.__messages = {}
def create(self, **kwargs):
message = FakeMessage(self, **kwargs)
self.__messages[message.id] = message
return self._cache(message)
def list(self):
return self.__messages.values()
def delete(self, message):
super(FakeMessagesManager, self).delete(message.id)
del self.__messages[message.id]
class FakeAvailabilityZonesManager(FakeManager):
def __init__(self):
super(FakeAvailabilityZonesManager, self).__init__()
self.zones = FakeAvailabilityZone()
def list(self):
return [self.zones]
class FakeWorkbookManager(FakeManager):
def __init__(self):
super(FakeWorkbookManager, self).__init__()
self.workbook = FakeWorkbook()
def list(self):
return [self.workbook]
class FakeWorkflowManager(FakeManager):
def __init__(self):
super(FakeWorkflowManager, self).__init__()
self.workflow = FakeWorkflow()
def list(self):
return [self.workflow]
class FakeExecutionManager(FakeManager):
def __init__(self):
super(FakeExecutionManager, self).__init__()
self.execution = FakeExecution()
def list(self):
return [self.execution]
def create(self):
return self.execution
class FakeObjectManager(FakeManager):
def get_account(self, **kwargs):
containers = self.list()
return (mock.MagicMock(), [{"name": con.name} for con in containers])
def get_container(self, name, **kwargs):
container = self.find(name=name)
if container is None:
raise swift_exceptions.ClientException("Container GET failed")
return (mock.MagicMock(), [{"name": obj} for obj in container.items])
def put_container(self, name, **kwargs):
if self.find(name=name):
raise swift_exceptions.ClientException("Container PUT failed")
self._cache(FakeObject(name=name))
def delete_container(self, name, **kwargs):
container = self.find(name=name)
if container is None or len(container.items.keys()) > 0:
raise swift_exceptions.ClientException("Container DELETE failed")
self.delete(container.uuid)
def get_object(self, container_name, object_name, **kwargs):
container = self.find(name=container_name)
if container is None or object_name not in container.items:
raise swift_exceptions.ClientException("Object GET failed")
return (mock.MagicMock(), container.items[object_name])
def put_object(self, container_name, object_name, content, **kwargs):
container = self.find(name=container_name)
if container is None:
raise swift_exceptions.ClientException("Object PUT failed")
container.items[object_name] = content
return mock.MagicMock()
def delete_object(self, container_name, object_name, **kwargs):
container = self.find(name=container_name)
if container is None or object_name not in container.items:
raise swift_exceptions.ClientException("Object DELETE failed")
del container.items[object_name]
class FakeServiceCatalog(object):
def get_credentials(self):
return {"image": [{"publicURL": "http://fake.to"}],
"metering": [{"publicURL": "http://fake.to"}],
"monitoring": [{"publicURL": "http://fake.to"}]}
def url_for(self, **kwargs):
return "http://fake.to"
class FakeGlanceClient(object):
def __init__(self):
self.images = FakeImageManager()
class FakeMuranoClient(object):
def __init__(self):
self.packages = FakePackageManager()
class FakeCinderClient(object):
def __init__(self):
self.volumes = FakeVolumeManager()
self.volume_types = FakeVolumeTypeManager()
self.transfers = FakeVolumeTransferManager()
self.volume_snapshots = FakeVolumeSnapshotManager()
self.backups = FakeVolumeBackupManager()
self.quotas = FakeCinderQuotasManager()
class FakeNovaClient(object):
def __init__(self, failed_server_manager=False):
self.images = FakeImageManager()
self.servers = FakeServerManager(self.images)
self.floating_ips = FakeFloatingIPsManager()
self.floating_ip_pools = FakeFloatingIPPoolsManager()
self.networks = FakeNetworkManager()
self.flavors = FakeFlavorManager()
self.keypairs = FakeKeypairManager()
self.security_group_rules = FakeSecurityGroupRuleManager()
self.security_groups = FakeSecurityGroupManager(
rule_manager=self.security_group_rules)
self.quotas = FakeNovaQuotasManager()
self.set_management_url = mock.MagicMock()
self.availability_zones = FakeAvailabilityZonesManager()
class FakeHeatClient(object):
def __init__(self):
self.stacks = FakeStackManager()
class FakeDesignateClient(object):
def __init__(self):
self.domains = FakeDomainManager()
class FakeKeystoneClient(object):
def __init__(self):
self.tenants = FakeTenantsManager()
self.users = FakeUsersManager()
self.roles = FakeRolesManager()
self.project_id = "abc123"
self.auth_url = "http://example.com:5000/v2.0/"
self.auth_token = "fake"
self.auth_user_id = generate_uuid()
self.auth_tenant_id = generate_uuid()
self.service_catalog = FakeServiceCatalog()
self.services = FakeServicesManager()
self.region_name = "RegionOne"
self.auth_ref = mock.Mock()
self.auth_ref.role_names = ["admin"]
self.version = "v2.0"
self.session = mock.MagicMock()
self.authenticate = mock.MagicMock()
def authenticate(self):
return True
def list_users(self):
return self.users.list()
def list_projects(self):
return self.tenants.list()
def list_services(self):
return self.services.list()
def list_roles(self):
return self.roles.list()
def delete_user(self, uuid):
return self.users.delete(uuid)
class FakeCeilometerClient(object):
def __init__(self):
self.alarms = FakeAlarmManager()
self.meters = FakeMeterManager()
self.resources = FakeCeilometerResourceManager()
self.statistics = FakeStatisticsManager()
self.samples = FakeSampleManager()
self.query_alarms = FakeQueryManager()
self.query_samples = FakeQueryManager()
self.query_alarm_history = FakeQueryManager()
class FakeGnocchiClient(object):
def __init__(self):
self.metric = FakeMetricManager()
class FakeMonascaClient(object):
def __init__(self):
self.metrics = FakeMetricsManager()
class FakeNeutronClient(object):
def __init__(self, **kwargs):
self.__networks = {}
self.__subnets = {}
self.__routers = {}
self.__ports = {}
self.__pools = {}
self.__vips = {}
self.__fips = {}
self.__healthmonitors = {}
self.__tenant_id = kwargs.get("tenant_id", generate_uuid())
self.format = "json"
self.version = "2.0"
@staticmethod
def _filter(resource_list, search_opts):
return [res for res in resource_list
if all(res[field] == value
for field, value in search_opts.items())]
def add_interface_router(self, router_id, data):
subnet_id = data["subnet_id"]
if (router_id not in self.__routers or
subnet_id not in self.__subnets):
raise neutron_exceptions.NeutronClientException
subnet = self.__subnets[subnet_id]
port = self.create_port(
{"port": {"network_id": subnet["network_id"]}})["port"]
port["device_id"] = router_id
port["fixed_ips"].append({"subnet_id": subnet_id,
"ip_address": subnet["gateway_ip"]})
return {"subnet_id": subnet_id,
"tenant_id": port["tenant_id"],
"port_id": port["id"],
"id": router_id}
def create_network(self, data):
network = setup_dict(data["network"],
defaults={"name": generate_name("net_"),
"admin_state_up": True})
network_id = generate_uuid()
network.update({"id": network_id,
"status": "ACTIVE",
"subnets": [],
"provider:physical_network": None,
"tenant_id": self.__tenant_id,
"provider:network_type": "local",
"router:external": True,
"shared": False,
"provider:segmentation_id": None})
self.__networks[network_id] = network
return {"network": network}
def create_pool(self, data):
pool = setup_dict(data["pool"],
required=["lb_method", "protocol", "subnet_id"],
defaults={"name": generate_name("pool_"),
"admin_state_up": True})
if pool["subnet_id"] not in self.__subnets:
raise neutron_exceptions.NeutronClientException
pool_id = generate_uuid()
pool.update({"id": pool_id,
"status": "PENDING_CREATE",
"tenant_id": self.__tenant_id})
self.__pools[pool_id] = pool
return {"pool": pool}
def create_vip(self, data):
vip = setup_dict(data["vip"],
required=["protocol_port", "protocol", "subnet_id",
"pool_id"],
defaults={"name": generate_name("vip_"),
"admin_state_up": True})
if (vip["subnet_id"] not in self.__subnets) or (vip["pool_id"] not in
self.__pools):
raise neutron_exceptions.NeutronClientException
vip_id = generate_uuid()
vip.update({"id": vip_id,
"status": "PENDING_CREATE",
"tenant_id": self.__tenant_id})
self.__vips[vip_id] = vip
return {"vip": vip}
def create_floatingip(self, data):
fip = setup_dict(data["floatingip"],
required=["floating_network"],
defaults={"admin_state_up": True})
if (fip["floating_network"] not in self.__nets):
raise neutron_exceptions.NeutronClientException
fip_id = generate_uuid()
fip.update({"id": fip_id,
"tenant_id": self.__tenant_id})
self.__fips[fip_id] = fip
return {"fip": fip}
def create_health_monitor(self, data):
healthmonitor = setup_dict(data["healthmonitor"],
required=["type", "timeout", "delay",
"max_retries"],
defaults={"admin_state_up": True})
healthmonitor_id = generate_uuid()
healthmonitor.update({"id": healthmonitor_id,
"status": "PENDING_CREATE",
"tenant_id": self.__tenant_id})
self.__healthmonitors[healthmonitor_id] = healthmonitor
return {"healthmonitor": healthmonitor}
def create_port(self, data):
port = setup_dict(data["port"],
required=["network_id"],
defaults={"name": generate_name("port_"),
"admin_state_up": True})
if port["network_id"] not in self.__networks:
raise neutron_exceptions.NeutronClientException
port_id = generate_uuid()
port.update({"id": port_id,
"status": "ACTIVE",
"binding:host_id": "fakehost",
"extra_dhcp_opts": [],
"binding:vnic_type": "normal",
"binding:vif_type": "ovs",
"device_owner": "",
"mac_address": generate_mac(),
"binding:profile": {},
"binding:vif_details": {u"port_filter": True},
"security_groups": [],
"fixed_ips": [],
"device_id": "",
"tenant_id": self.__tenant_id,
"allowed_address_pairs": []})
self.__ports[port_id] = port
return {"port": port}
def create_router(self, data):
router = setup_dict(data["router"],
defaults={"name": generate_name("router_"),
"external_gateway_info": None,
"admin_state_up": True})
router_id = generate_uuid()
router.update({"id": router_id,
"status": "ACTIVE",
"external_gateway_info": None,
"tenant_id": self.__tenant_id})
self.__routers[router_id] = router
return {"router": router}
def create_subnet(self, data):
subnet = setup_dict(
data["subnet"],
required=["network_id", "cidr", "ip_version"],
defaults={"name": generate_name("subnet_"),
"dns_nameservers": ["8.8.8.8", "8.8.4.4"]})
if subnet["network_id"] not in self.__networks:
raise neutron_exceptions.NeutronClientException
subnet_id = generate_uuid()
subnet.update({"id": subnet_id,
"enable_dhcp": True,
"tenant_id": self.__tenant_id,
"ipv6_ra_mode": None,
"allocation_pools": [],
"gateway_ip": re.sub("./.*$", "1", subnet["cidr"]),
"ipv6_address_mode": None,
"ip_version": 4,
"host_routes": []})
self.__subnets[subnet_id] = subnet
return {"subnet": subnet}
def update_resource(self, resource_id, resource_dict, data):
if resource_id not in resource_dict:
raise neutron_exceptions.NeutronClientException
self.resource_list[resource_id].update(data)
def update_network(self, network_id, data):
self.update_resource(network_id, self.__networks, data)
def update_pool(self, pool_id, data):
self.update_resource(pool_id, self.__pools, data)
def update_vip(self, vip_id, data):
self.update_resource(vip_id, self.__vips, data)
def update_health_monitor(self, healthmonitor_id, data):
self.update_resource(healthmonitor_id, self.__healthmonitors, data)
def update_subnet(self, subnet_id, data):
self.update_resource(subnet_id, self.__subnets, data)
def update_port(self, port_id, data):
self.update_resource(port_id, self.__ports, data)
def update_router(self, router_id, data):
self.update_resource(router_id, self.__routers, data)
def delete_network(self, network_id):
if network_id not in self.__networks:
raise neutron_exceptions.NeutronClientException
for port in self.__ports.values():
if port["network_id"] == network_id:
# Network is in use by port
raise neutron_exceptions.NeutronClientException
del self.__networks[network_id]
return ""
def delete_pool(self, pool_id):
if pool_id not in self.__pools:
raise neutron_exceptions.NeutronClientException
del self.__pools[pool_id]
return ""
def delete_vip(self, vip_id):
if vip_id not in self.__vips:
raise neutron_exceptions.NeutronClientException
del self.__vips[vip_id]
def delete_health_monitor(self, healthmonitor_id):
if healthmonitor_id not in self.__healthmonitors:
raise neutron_exceptions.NeutronClientException
del self.__healthmonitors[healthmonitor_id]
return ""
def delete_floatingip(self, fip_id):
if fip_id not in self.__fips:
raise neutron_exceptions.NeutronClientException
del self.__fips[fip_id]
return ""
def delete_port(self, port_id):
if port_id not in self.__ports:
raise neutron_exceptions.PortNotFoundClient
if self.__ports[port_id]["device_owner"]:
# Port is owned by some device
raise neutron_exceptions.NeutronClientException
del self.__ports[port_id]
return ""
def delete_router(self, router_id):
if router_id not in self.__routers:
raise neutron_exceptions.NeutronClientException
for port in self.__ports.values():
if port["device_id"] == router_id:
# Router has active port
raise neutron_exceptions.NeutronClientException
del self.__routers[router_id]
return ""
def delete_subnet(self, subnet_id):
if subnet_id not in self.__subnets:
raise neutron_exceptions.NeutronClientException
for port in self.__ports.values():
for fip in port["fixed_ips"]:
if fip["subnet_id"] == subnet_id:
# Subnet has IP allocation from some port
raise neutron_exceptions.NeutronClientException
del self.__subnets[subnet_id]
return ""
def list_networks(self, **search_opts):
nets = self._filter(self.__networks.values(), search_opts)
return {"networks": nets}
def list_pools(self, **search_opts):
pools = self._filter(self.__pools.values(), search_opts)
return {"pools": pools}
def list_vips(self, **search_opts):
vips = self._filter(self.__vips.values(), search_opts)
return {"vips": vips}
def list_health_monitors(self, **search_opts):
healthmonitors = self._filter(
self.__healthmonitors.values(), search_opts)
return {"healthmonitors": healthmonitors}
def list_ports(self, **search_opts):
ports = self._filter(self.__ports.values(), search_opts)
return {"ports": ports}
def list_routers(self, **search_opts):
routers = self._filter(self.__routers.values(), search_opts)
return {"routers": routers}
def list_subnets(self, **search_opts):
subnets = self._filter(self.__subnets.values(), search_opts)
return {"subnets": subnets}
def list_floatingips(self, **search_opts):
fips = self._filter(self.__fips.values(), search_opts)
return {"floatingips": fips}
def remove_interface_router(self, router_id, data):
subnet_id = data["subnet_id"]
if (router_id not in self.__routers
or subnet_id not in self.__subnets):
raise neutron_exceptions.NeutronClientException
subnet = self.__subnets[subnet_id]
for port_id, port in self.__ports.items():
if port["device_id"] == router_id:
for fip in port["fixed_ips"]:
if fip["subnet_id"] == subnet_id:
del self.__ports[port_id]
return {"subnet_id": subnet_id,
"tenant_id": subnet["tenant_id"],
"port_id": port_id,
"id": router_id}
raise neutron_exceptions.NeutronClientException
def associate_health_monitor(self, pool_id, healthmonitor_id):
if pool_id not in self.__pools:
raise neutron_exceptions.NeutronClientException
if healthmonitor_id not in self.__healthmonitors:
raise neutron_exceptions.NeutronClientException
self.__pools[pool_id]["pool"]["healthmonitors"] = healthmonitor_id
return {"pool": self.__pools[pool_id]}
def disassociate_health_monitor(self, pool_id, healthmonitor_id):
if pool_id not in self.__pools:
raise neutron_exceptions.NeutronClientException
if healthmonitor_id not in self.__healthmonitors:
raise neutron_exceptions.NeutronClientException
del self.__pools[pool_id]["pool"]["healthmonitors"][healthmonitor_id]
return ""
class FakeIronicClient(object):
def __init__(self):
# TODO(romcheg):Fake Manager subclasses to manage BM nodes.
pass
class FakeSaharaClient(object):
def __init__(self):
self.job_executions = mock.MagicMock()
self.jobs = mock.MagicMock()
self.job_binary_internals = mock.MagicMock()
self.job_binaries = mock.MagicMock()
self.data_sources = mock.MagicMock()
self.clusters = mock.MagicMock()
self.cluster_templates = mock.MagicMock()
self.node_group_templates = mock.MagicMock()
self.setup_list_methods()
def setup_list_methods(self):
mock_with_id = mock.MagicMock()
mock_with_id.id = 42
# First call of list returns a list with one object, the next should
# empty after delete.
self.job_executions.list.side_effect = [[mock_with_id], []]
self.jobs.list.side_effect = [[mock_with_id], []]
self.job_binary_internals.list.side_effect = [[mock_with_id], []]
self.job_binaries.list.side_effect = [[mock_with_id], []]
self.data_sources.list.side_effect = [[mock_with_id], []]
self.clusters.list.side_effect = [[mock_with_id], []]
self.cluster_templates.list.side_effect = [[mock_with_id], []]
self.node_group_templates.list.side_effect = [[mock_with_id], []]
class FakeZaqarClient(object):
def __init__(self):
self.queues = FakeQueuesManager()
def queue(self, name, **kwargs):
return self.queues.create(name, **kwargs)
class FakeTroveClient(object):
def __init__(self):
self.instances = FakeDbInstanceManager()
class FakeMistralClient(object):
def __init__(self):
self.workbook = FakeWorkbookManager()
self.workflow = FakeWorkflowManager()
self.execution = FakeExecutionManager()
class FakeSwiftClient(FakeObjectManager):
pass
class FakeEC2Client(object):
def __init__(self):
pass
class FakeSenlinClient(object):
def __init__(self):
# TODO(Yanyan Hu):Fake interfaces of senlinclient.
pass
class FakeMagnumClient(object):
def __init__(self):
self.cluster_templates = FakeClusterTemplateManager()
class FakeWatcherClient(object):
def __init__(self):
self.strategy = FakeStrategyManager()
self.goal = FakeGoalManager()
class FakeClients(object):
def __init__(self, credential_=None):
self._nova = None
self._glance = None
self._keystone = None
self._cinder = None
self._neutron = None
self._sahara = None
self._heat = None
self._designate = None
self._ceilometer = None
self._zaqar = None
self._trove = None
self._mistral = None
self._swift = None
self._murano = None
self._monasca = None
self._ec2 = None
self._senlin = None
self._watcher = None
self._credential = credential_ or fake_credential(
auth_url="http://fake.example.org:5000/v2.0/",
username="fake_username",
password="fake_password",
tenant_name="fake_tenant_name")
def keystone(self, version=None):
if not self._keystone:
self._keystone = FakeKeystoneClient()
return self._keystone
def verified_keystone(self):
return self.keystone()
def nova(self):
if not self._nova:
self._nova = FakeNovaClient()
return self._nova
def glance(self):
if not self._glance:
self._glance = FakeGlanceClient()
return self._glance
def cinder(self):
if not self._cinder:
self._cinder = FakeCinderClient()
return self._cinder
def neutron(self):
if not self._neutron:
self._neutron = FakeNeutronClient()
return self._neutron
def sahara(self):
if not self._sahara:
self._sahara = FakeSaharaClient()
return self._sahara
def heat(self):
if not self._heat:
self._heat = FakeHeatClient()
return self._heat
def designate(self):
if not self._designate:
self._designate = FakeDesignateClient()
return self._designate
def ceilometer(self):
if not self._ceilometer:
self._ceilometer = FakeCeilometerClient()
return self._ceilometer
def monasca(self):
if not self._monasca:
self._monasca = FakeMonascaClient()
return self._monasca
def zaqar(self):
if not self._zaqar:
self._zaqar = FakeZaqarClient()
return self._zaqar
def trove(self):
if not self._trove:
self._trove = FakeTroveClient()
return self._trove
def mistral(self):
if not self._mistral:
self._mistral = FakeMistralClient()
return self._mistral
def swift(self):
if not self._swift:
self._swift = FakeSwiftClient()
return self._swift
def murano(self):
if not self._murano:
self._murano = FakeMuranoClient()
return self._murano
def ec2(self):
if not self._ec2:
self._ec2 = FakeEC2Client()
return self._ec2
def senlin(self):
if not self._senlin:
self._senlin = FakeSenlinClient()
return self._senlin
def watcher(self):
if not self._watcher:
self._watcher = FakeWatcherClient()
return self._watcher
class FakeRunner(object):
CONFIG_SCHEMA = {
"type": "object",
"$schema": consts.JSON_SCHEMA,
"properties": {
"type": {
"type": "string",
"enum": ["fake"]
},
"a": {
"type": "string"
},
"b": {
"type": "number"
}
},
"required": ["type", "a"]
}
class FakeScenario(scenario.Scenario):
def idle_time(self):
return 0
def do_it(self, **kwargs):
pass
def with_output(self, **kwargs):
return {"data": {"a": 1}, "error": None}
def with_add_output(self):
self.add_output(additive={"title": "Additive",
"description": "Additive description",
"data": [["a", 1]],
"chart_plugin": "FooPlugin"},
complete={"title": "Complete",
"description": "Complete description",
"data": [["a", [[1, 2], [2, 3]]]],
"chart_plugin": "BarPlugin"})
def too_long(self, **kwargs):
pass
def something_went_wrong(self, **kwargs):
raise Exception("Something went wrong")
def raise_timeout(self, **kwargs):
raise multiprocessing.TimeoutError()
@scenario.configure(name="classbased.fooscenario")
class FakeClassBasedScenario(FakeScenario):
"""Fake class-based scenario."""
def run(self, *args, **kwargs):
pass
class FakeTimer(rally_utils.Timer):
def duration(self):
return 10
def timestamp(self):
return 0
def finish_timestamp(self):
return 3
@context.configure(name="fake", order=1)
class FakeContext(context.Context):
CONFIG_SCHEMA = {
"type": "object",
"$schema": consts.JSON_SCHEMA,
"properties": {
"test": {
"type": "integer"
},
},
"additionalProperties": False
}
def __init__(self, context_obj=None):
context_obj = context_obj or {}
context_obj.setdefault("config", {})
context_obj["config"].setdefault("fake", None)
context_obj.setdefault("task", mock.MagicMock())
super(FakeContext, self).__init__(context_obj)
def setup(self):
pass
def cleanup(self):
pass
@context.configure(name="fake_hidden_context", order=1, hidden=True)
class FakeHiddenContext(FakeContext):
pass
@context.configure(name="fake_user_context", order=1)
class FakeUserContext(FakeContext):
admin = {
"id": "adminuuid",
"credential": fake_credential(
auth_url="aurl",
username="aname",
password="apwd",
tenant_name="atenant")
}
user = {
"id": "uuid",
"credential": fake_credential(
auth_url="url",
username="name",
password="pwd",
tenant_name="tenant"),
"tenant_id": "uuid"
}
tenants = {"uuid": {"name": "tenant"}}
def __init__(self, ctx):
super(FakeUserContext, self).__init__(ctx)
self.context.setdefault("admin", FakeUserContext.admin)
self.context.setdefault("users", [FakeUserContext.user])
self.context.setdefault("tenants", FakeUserContext.tenants)
self.context.setdefault(
"scenario_name", "NovaServers.boot_server_from_volume_and_delete")
class FakeDeployment(dict):
def __init__(self, **kwargs):
namespace = kwargs.pop("namespace", "openstack")
kwargs["credentials"] = {
namespace: [{"admin": kwargs.pop("admin", None),
"users": kwargs.pop("users", [])}],
"default": [{"admin": None, "users": []}]}
dict.__init__(self, **kwargs)
self.update_status = mock.Mock()
def get_platforms(self):
return [platform for platform in self["credentials"]]
def get_credentials_for(self, namespace):
return self["credentials"][namespace][0]
def verify_connections(self):
pass
def get_validation_context(self):
return {}
class FakeTask(dict, object):
def __init__(self, task=None, temporary=False, **kwargs):
self.is_temporary = temporary
self.update_status = mock.Mock()
self.set_failed = mock.Mock()
self.set_validation_failed = mock.Mock()
task = task or {}
for k, v in itertools.chain(task.items(), kwargs.items()):
self[k] = v
self.task = self
def to_dict(self):
return self
class FakeAPI(object):
def __init__(self):
self._deployment = mock.create_autospec(api._Deployment)
self._task = mock.create_autospec(api._Task)
self._verifier = mock.create_autospec(api._Verifier)
self._verification = mock.create_autospec(api._Verification)
@property
def deployment(self):
return self._deployment
@property
def task(self):
return self._task
@property
def verifier(self):
return self._verifier
@property
def verification(self):
return self._verification