Merge "Locality support for replication"

This commit is contained in:
Jenkins 2016-06-24 21:19:16 +00:00 committed by Gerrit Code Review
commit 3196d347f0
22 changed files with 641 additions and 75 deletions

View File

@ -0,0 +1,6 @@
---
features:
- A locality flag was added to the trove ReST API to
allow a user to specify whether new replicas should
be on the same hypervisor (affinity) or on different
hypervisors (anti-affinity).

View File

@ -349,7 +349,8 @@ instance = {
}
},
"nics": nics,
"modules": module_list
"modules": module_list,
"locality": non_empty_string
}
}
}

View File

@ -0,0 +1,96 @@
# Copyright 2016 Tesora, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import six
from oslo_log import log as logging
from trove.common import cfg
from trove.common.i18n import _
from trove.common.remote import create_nova_client
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class ServerGroup(object):
@classmethod
def load(cls, context, compute_id):
client = create_nova_client(context)
server_group = None
try:
for sg in client.server_groups.list():
if compute_id in sg.members:
server_group = sg
except Exception:
LOG.exception(_("Could not load server group for compute %s") %
compute_id)
return server_group
@classmethod
def create(cls, context, locality, name_suffix):
client = create_nova_client(context)
server_group_name = "%s_%s" % ('locality', name_suffix)
server_group = client.server_groups.create(
name=server_group_name, policies=[locality])
LOG.debug("Created '%s' server group called %s (id: %s)." %
(locality, server_group_name, server_group.id))
return server_group
@classmethod
def delete(cls, context, server_group, force=False):
# Only delete the server group if we're the last member in it, or if
# it has no members
if server_group:
if force or len(server_group.members) <= 1:
client = create_nova_client(context)
client.server_groups.delete(server_group.id)
LOG.debug("Deleted server group %s." % server_group.id)
else:
LOG.debug("Skipping delete of server group %s (members: %s)." %
(server_group.id, server_group.members))
@classmethod
def convert_to_hint(cls, server_group, hints=None):
if server_group:
hints = hints or {}
hints["group"] = server_group.id
return hints
@classmethod
def build_scheduler_hint(cls, context, locality, name_suffix):
scheduler_hint = None
if locality:
# Build the scheduler hint, but only if locality's a string
if isinstance(locality, six.string_types):
server_group = cls.create(
context, locality, name_suffix)
scheduler_hint = cls.convert_to_hint(
server_group)
else:
# otherwise assume it's already in hint form (i.e. a dict)
scheduler_hint = locality
return scheduler_hint
@classmethod
def get_locality(cls, server_group):
locality = None
if server_group:
locality = server_group.policies[0]
return locality

View File

@ -33,6 +33,7 @@ from trove.common.remote import create_cinder_client
from trove.common.remote import create_dns_client
from trove.common.remote import create_guest_client
from trove.common.remote import create_nova_client
from trove.common import server_group as srv_grp
from trove.common import template
from trove.common import utils
from trove.configuration.models import Configuration
@ -153,7 +154,7 @@ class SimpleInstance(object):
"""
def __init__(self, context, db_info, datastore_status, root_password=None,
ds_version=None, ds=None):
ds_version=None, ds=None, locality=None):
"""
:type context: trove.common.context.TroveContext
:type db_info: trove.instance.models.DBInstance
@ -170,6 +171,7 @@ class SimpleInstance(object):
if ds is None:
self.ds = (datastore_models.Datastore.
load(self.ds_version.datastore_id))
self.locality = locality
self.slave_list = None
@ -495,7 +497,7 @@ def load_instance(cls, context, id, needs_server=False,
return cls(context, db_info, server, service_status)
def load_instance_with_guest(cls, context, id, cluster_id=None):
def load_instance_with_info(cls, context, id, cluster_id=None):
db_info = get_db_info(context, id, cluster_id)
load_simple_instance_server_status(context, db_info)
service_status = InstanceServiceStatus.find_by(instance_id=id)
@ -503,6 +505,7 @@ def load_instance_with_guest(cls, context, id, cluster_id=None):
{'instance_id': id, 'service_status': service_status.status})
instance = cls(context, db_info, service_status)
load_guest_info(instance, context, id)
load_server_group_info(instance, context, db_info.compute_instance_id)
return instance
@ -518,6 +521,12 @@ def load_guest_info(instance, context, id):
return instance
def load_server_group_info(instance, context, compute_id):
server_group = srv_grp.ServerGroup.load(context, compute_id)
if server_group:
instance.locality = srv_grp.ServerGroup.get_locality(server_group)
class BaseInstance(SimpleInstance):
"""Represents an instance.
-----------
@ -557,6 +566,8 @@ class BaseInstance(SimpleInstance):
self._guest = None
self._nova_client = None
self._volume_client = None
self._server_group = None
self._server_group_loaded = False
def get_guest(self):
return create_guest_client(self.context, self.db_info.id)
@ -640,6 +651,15 @@ class BaseInstance(SimpleInstance):
self.id)
self.update_db(task_status=InstanceTasks.NONE)
@property
def server_group(self):
# The server group could be empty, so we need a flag to cache it
if not self._server_group_loaded:
self._server_group = srv_grp.ServerGroup.load(
self.context, self.db_info.compute_instance_id)
self._server_group_loaded = True
return self._server_group
class FreshInstance(BaseInstance):
@classmethod
@ -677,7 +697,8 @@ class Instance(BuiltInstance):
datastore, datastore_version, volume_size, backup_id,
availability_zone=None, nics=None,
configuration_id=None, slave_of_id=None, cluster_config=None,
replica_count=None, volume_type=None, modules=None):
replica_count=None, volume_type=None, modules=None,
locality=None):
call_args = {
'name': name,
@ -792,6 +813,8 @@ class Instance(BuiltInstance):
"create %(count)d instances.") % {'count': replica_count})
multi_replica = slave_of_id and replica_count and replica_count > 1
instance_count = replica_count if multi_replica else 1
if locality:
call_args['locality'] = locality
if not nics:
nics = []
@ -889,10 +912,11 @@ class Instance(BuiltInstance):
datastore_version.manager, datastore_version.packages,
volume_size, backup_id, availability_zone, root_password,
nics, overrides, slave_of_id, cluster_config,
volume_type=volume_type, modules=module_list)
volume_type=volume_type, modules=module_list,
locality=locality)
return SimpleInstance(context, db_info, service_status,
root_password)
root_password, locality=locality)
with StartNotification(context, **call_args):
return run_with_quotas(context.tenant, deltas, _create_resources)

View File

@ -196,7 +196,7 @@ class InstanceController(wsgi.Controller):
LOG.debug("req : '%s'\n\n", req)
context = req.environ[wsgi.CONTEXT_KEY]
server = models.load_instance_with_guest(models.DetailInstance,
server = models.load_instance_with_info(models.DetailInstance,
context, id)
return wsgi.Result(views.InstanceDetailView(server,
req=req).data(), 200)
@ -275,6 +275,21 @@ class InstanceController(wsgi.Controller):
body['instance'].get('slave_of'))
replica_count = body['instance'].get('replica_count')
modules = body['instance'].get('modules')
locality = body['instance'].get('locality')
if locality:
locality_domain = ['affinity', 'anti-affinity']
locality_domain_msg = ("Invalid locality '%s'. "
"Must be one of ['%s']" %
(locality,
"', '".join(locality_domain)))
if locality not in locality_domain:
raise exception.BadRequest(msg=locality_domain_msg)
if slave_of_id:
dupe_locality_msg = (
'Cannot specify locality when adding replicas to existing '
'master.')
raise exception.BadRequest(msg=dupe_locality_msg)
instance = models.Instance.create(context, name, flavor_id,
image_id, databases, users,
datastore, datastore_version,
@ -283,7 +298,8 @@ class InstanceController(wsgi.Controller):
configuration, slave_of_id,
replica_count=replica_count,
volume_type=volume_type,
modules=modules)
modules=modules,
locality=locality)
view = views.InstanceDetailView(instance, req=req)
return wsgi.Result(view.data(), 200)

View File

@ -99,6 +99,9 @@ class InstanceDetailView(InstanceView):
result['instance']['configuration'] = (self.
_build_configuration_info())
if self.instance.locality:
result['instance']['locality'] = self.instance.locality
if (isinstance(self.instance, models.DetailInstance) and
self.instance.volume_used):
used = self.instance.volume_used

View File

@ -152,7 +152,7 @@ class API(object):
availability_zone=None, root_password=None,
nics=None, overrides=None, slave_of_id=None,
cluster_config=None, volume_type=None,
modules=None):
modules=None, locality=None):
LOG.debug("Making async call to create instance %s " % instance_id)
self._cast("create_instance", self.version_cap,
@ -172,7 +172,7 @@ class API(object):
slave_of_id=slave_of_id,
cluster_config=cluster_config,
volume_type=volume_type,
modules=modules)
modules=modules, locality=locality)
def create_cluster(self, cluster_id):
LOG.debug("Making async call to create cluster %s " % cluster_id)

View File

@ -28,6 +28,7 @@ from trove.common.i18n import _
from trove.common.notification import DBaaSQuotas, EndNotification
from trove.common import remote
import trove.common.rpc.version as rpc_version
from trove.common import server_group as srv_grp
from trove.common.strategies.cluster import strategy
import trove.extensions.mgmt.instances.models as mgmtmodels
from trove.instance.tasks import InstanceTasks
@ -288,6 +289,11 @@ class Manager(periodic_task.PeriodicTasks):
replica_backup_created = False
replicas = []
master_instance_tasks = BuiltInstanceTasks.load(context, slave_of_id)
server_group = master_instance_tasks.server_group
scheduler_hints = srv_grp.ServerGroup.convert_to_hint(server_group)
LOG.debug("Using scheduler hints for locality: %s" % scheduler_hints)
try:
for replica_index in range(0, len(ids)):
try:
@ -306,7 +312,7 @@ class Manager(periodic_task.PeriodicTasks):
packages, volume_size, replica_backup_id,
availability_zone, root_passwords[replica_index],
nics, overrides, None, snapshot, volume_type,
modules)
modules, scheduler_hints)
replicas.append(instance_tasks)
except Exception:
# if it's the first replica, then we shouldn't continue
@ -327,7 +333,7 @@ class Manager(periodic_task.PeriodicTasks):
image_id, databases, users, datastore_manager,
packages, volume_size, backup_id, availability_zone,
root_password, nics, overrides, slave_of_id,
cluster_config, volume_type, modules):
cluster_config, volume_type, modules, locality):
if slave_of_id:
self._create_replication_slave(context, instance_id, name,
flavor, image_id, databases, users,
@ -341,12 +347,16 @@ class Manager(periodic_task.PeriodicTasks):
raise AttributeError(_(
"Cannot create multiple non-replica instances."))
instance_tasks = FreshInstanceTasks.load(context, instance_id)
scheduler_hints = srv_grp.ServerGroup.build_scheduler_hint(
context, locality, instance_id)
instance_tasks.create_instance(flavor, image_id, databases, users,
datastore_manager, packages,
volume_size, backup_id,
availability_zone, root_password,
nics, overrides, cluster_config,
None, volume_type, modules)
None, volume_type, modules,
scheduler_hints)
timeout = (CONF.restore_usage_timeout if backup_id
else CONF.usage_timeout)
instance_tasks.wait_for_instance(timeout, flavor)
@ -355,7 +365,7 @@ class Manager(periodic_task.PeriodicTasks):
image_id, databases, users, datastore_manager,
packages, volume_size, backup_id, availability_zone,
root_password, nics, overrides, slave_of_id,
cluster_config, volume_type, modules):
cluster_config, volume_type, modules, locality):
with EndNotification(context,
instance_id=(instance_id[0]
if type(instance_id) is list
@ -365,7 +375,8 @@ class Manager(periodic_task.PeriodicTasks):
datastore_manager, packages, volume_size,
backup_id, availability_zone,
root_password, nics, overrides, slave_of_id,
cluster_config, volume_type, modules)
cluster_config, volume_type, modules,
locality)
def update_overrides(self, context, instance_id, overrides):
instance_tasks = models.BuiltInstanceTasks.load(context, instance_id)

View File

@ -52,6 +52,7 @@ import trove.common.remote as remote
from trove.common.remote import create_cinder_client
from trove.common.remote import create_dns_client
from trove.common.remote import create_heat_client
from trove.common import server_group as srv_grp
from trove.common.strategies.cluster import strategy
from trove.common import template
from trove.common import utils
@ -367,7 +368,7 @@ class FreshInstanceTasks(FreshInstance, NotifyMixin, ConfigurationMixin):
datastore_manager, packages, volume_size,
backup_id, availability_zone, root_password, nics,
overrides, cluster_config, snapshot, volume_type,
modules):
modules, scheduler_hints):
# It is the caller's responsibility to ensure that
# FreshInstanceTasks.wait_for_instance is called after
# create_instance to ensure that the proper usage event gets sent
@ -413,7 +414,8 @@ class FreshInstanceTasks(FreshInstance, NotifyMixin, ConfigurationMixin):
volume_size,
availability_zone,
nics,
files)
files,
scheduler_hints)
else:
volume_info = self._create_server_volume_individually(
flavor['id'],
@ -424,7 +426,8 @@ class FreshInstanceTasks(FreshInstance, NotifyMixin, ConfigurationMixin):
availability_zone,
nics,
files,
cinder_volume_type)
cinder_volume_type,
scheduler_hints)
config = self._render_config(flavor)
@ -626,7 +629,8 @@ class FreshInstanceTasks(FreshInstance, NotifyMixin, ConfigurationMixin):
def _create_server_volume(self, flavor_id, image_id, security_groups,
datastore_manager, volume_size,
availability_zone, nics, files):
availability_zone, nics, files,
scheduler_hints):
LOG.debug("Begin _create_server_volume for id: %s" % self.id)
try:
userdata = self._prepare_userdata(datastore_manager)
@ -642,7 +646,7 @@ class FreshInstanceTasks(FreshInstance, NotifyMixin, ConfigurationMixin):
security_groups=security_groups,
availability_zone=availability_zone,
nics=nics, config_drive=config_drive,
userdata=userdata)
userdata=userdata, scheduler_hints=scheduler_hints)
server_dict = server._info
LOG.debug("Created new compute instance %(server_id)s "
"for id: %(id)s\nServer response: %(response)s" %
@ -778,7 +782,8 @@ class FreshInstanceTasks(FreshInstance, NotifyMixin, ConfigurationMixin):
def _create_server_volume_individually(self, flavor_id, image_id,
security_groups, datastore_manager,
volume_size, availability_zone,
nics, files, volume_type):
nics, files, volume_type,
scheduler_hints):
LOG.debug("Begin _create_server_volume_individually for id: %s" %
self.id)
server = None
@ -790,7 +795,8 @@ class FreshInstanceTasks(FreshInstance, NotifyMixin, ConfigurationMixin):
server = self._create_server(flavor_id, image_id, security_groups,
datastore_manager,
block_device_mapping,
availability_zone, nics, files)
availability_zone, nics, files,
scheduler_hints)
server_id = server.id
# Save server ID.
self.update_db(compute_instance_id=server_id)
@ -904,7 +910,8 @@ class FreshInstanceTasks(FreshInstance, NotifyMixin, ConfigurationMixin):
def _create_server(self, flavor_id, image_id, security_groups,
datastore_manager, block_device_mapping,
availability_zone, nics, files={}):
availability_zone, nics, files={},
scheduler_hints=None):
userdata = self._prepare_userdata(datastore_manager)
name = self.hostname or self.name
bdmap = block_device_mapping
@ -914,7 +921,7 @@ class FreshInstanceTasks(FreshInstance, NotifyMixin, ConfigurationMixin):
name, image_id, flavor_id, files=files, userdata=userdata,
security_groups=security_groups, block_device_mapping=bdmap,
availability_zone=availability_zone, nics=nics,
config_drive=config_drive)
config_drive=config_drive, scheduler_hints=scheduler_hints)
LOG.debug("Created new compute instance %(server_id)s "
"for instance %(id)s" %
{'server_id': server.id, 'id': self.id})
@ -1079,6 +1086,11 @@ class BuiltInstanceTasks(BuiltInstance, NotifyMixin, ConfigurationMixin):
except Exception as ex:
LOG.exception(_("Error during dns entry of instance %(id)s: "
"%(ex)s") % {'id': self.db_info.id, 'ex': ex})
try:
srv_grp.ServerGroup.delete(self.context, self.server_group)
except Exception:
LOG.exception(_("Error during delete server group for %s")
% self.id)
# Poll until the server is gone.
def server_is_finished():

View File

@ -268,7 +268,8 @@ class FakeServers(object):
def create(self, name, image_id, flavor_ref, files=None, userdata=None,
block_device_mapping=None, volume=None, security_groups=None,
availability_zone=None, nics=None, config_drive=False):
availability_zone=None, nics=None, config_drive=False,
scheduler_hints=None):
id = "FAKE_%s" % uuid.uuid4()
if volume:
volume = self.volumes.create(volume['size'], volume['name'],
@ -794,6 +795,43 @@ class FakeSecurityGroupRules(object):
del self.securityGroupRules[id]
class FakeServerGroup(object):
def __init__(self, name=None, policies=None, context=None):
self.name = name
self.description = description
self.id = "FAKE_SRVGRP_%s" % uuid.uuid4()
self.policies = policies or {}
def get_id(self):
return self.id
def data(self):
return {
'id': self.id,
'name': self.name,
'policies': self.policies
}
class FakeServerGroups(object):
def __init__(self, context=None):
self.context = context
self.server_groups = {}
def create(self, name=None, policies=None):
server_group = FakeServerGroup(name, policies, context=self.context)
self.server_groups[server_group.get_id()] = server_group
return server_group
def delete(self, group_id):
pass
def list(self):
return self.server_groups
class FakeClient(object):
def __init__(self, context):
@ -808,6 +846,7 @@ class FakeClient(object):
self.rdservers = FakeRdServers(self.servers)
self.security_groups = FakeSecurityGroups(context)
self.security_group_rules = FakeSecurityGroupRules(context)
self.server_groups = FakeServerGroups(context)
def get_server_volumes(self, server_id):
return self.servers.get_server_volumes(server_id)

View File

@ -44,10 +44,15 @@ class ReplicationGroup(TestGroup):
@test(depends_on=[add_data_for_replication])
def verify_data_for_replication(self):
"""Verify data exists on master."""
"""Verify initial data exists on master."""
self.test_runner.run_verify_data_for_replication()
@test(runs_after=[verify_data_for_replication])
def create_non_affinity_master(self):
"""Test creating a non-affinity master."""
self.test_runner.run_create_non_affinity_master()
@test(runs_after=[create_non_affinity_master])
def create_single_replica(self):
"""Test creating a single replica."""
self.test_runner.run_create_single_replica()
@ -63,18 +68,50 @@ class ReplicationGroup(TestGroup):
self.test_runner.run_verify_replica_data_after_single()
@test(runs_after=[verify_replica_data_after_single])
def wait_for_non_affinity_master(self):
"""Wait for non-affinity master to complete."""
self.test_runner.run_wait_for_non_affinity_master()
@test(runs_after=[wait_for_non_affinity_master])
def create_non_affinity_replica(self):
"""Test creating a non-affinity replica."""
self.test_runner.run_create_non_affinity_replica()
@test(runs_after=[create_non_affinity_replica])
def create_multiple_replicas(self):
"""Test creating multiple replicas."""
self.test_runner.run_create_multiple_replicas()
@test(depends_on=[create_single_replica, create_multiple_replicas])
@test(runs_after=[create_multiple_replicas])
def wait_for_non_affinity_replica_fail(self):
"""Wait for non-affinity replica to fail."""
self.test_runner.run_wait_for_non_affinity_replica_fail()
@test(runs_after=[wait_for_non_affinity_replica_fail])
def delete_non_affinity_repl(self):
"""Test deleting non-affinity replica."""
self.test_runner.run_delete_non_affinity_repl()
@test(runs_after=[delete_non_affinity_repl])
def delete_non_affinity_master(self):
"""Test deleting non-affinity master."""
self.test_runner.run_delete_non_affinity_master()
@test(depends_on=[create_single_replica, create_multiple_replicas],
runs_after=[delete_non_affinity_master])
def verify_replica_data_orig(self):
"""Verify original data was transferred to replicas."""
self.test_runner.run_verify_replica_data_orig()
@test(depends_on=[create_single_replica, create_multiple_replicas],
runs_after=[verify_replica_data_orig])
def add_data_to_replicate(self):
"""Add data to master to verify replication."""
"""Add new data to master to verify replication."""
self.test_runner.run_add_data_to_replicate()
@test(depends_on=[add_data_to_replicate])
def verify_data_to_replicate(self):
"""Verify data exists on master."""
"""Verify new data exists on master."""
self.test_runner.run_verify_data_to_replicate()
@test(depends_on=[create_single_replica, create_multiple_replicas,
@ -87,13 +124,6 @@ class ReplicationGroup(TestGroup):
@test(depends_on=[create_single_replica, create_multiple_replicas,
add_data_to_replicate],
runs_after=[wait_for_data_to_replicate])
def verify_replica_data_orig(self):
"""Verify original data was transferred to replicas."""
self.test_runner.run_verify_replica_data_orig()
@test(depends_on=[create_single_replica, create_multiple_replicas,
add_data_to_replicate],
runs_after=[verify_replica_data_orig])
def verify_replica_data_new(self):
"""Verify new data was transferred to replicas."""
self.test_runner.run_verify_replica_data_new()
@ -128,8 +158,14 @@ class ReplicationGroup(TestGroup):
"""Test promoting a replica to replica source (master)."""
self.test_runner.run_promote_to_replica_source()
@test(depends_on=[promote_to_replica_source])
def verify_replica_data_new_master(self):
"""Verify data is still on new master."""
self.test_runner.run_verify_replica_data_new_master()
@test(depends_on=[create_single_replica, create_multiple_replicas,
promote_to_replica_source])
promote_to_replica_source],
runs_after=[verify_replica_data_new_master])
def add_data_to_replicate2(self):
"""Add data to new master to verify replication."""
self.test_runner.run_add_data_to_replicate2()

View File

@ -45,7 +45,8 @@ class InstanceCreateRunner(TestRunner):
instance_info = self.assert_instance_create(
name, flavor, trove_volume_size, [], [], None, None,
CONFIG.dbaas_datastore, CONFIG.dbaas_datastore_version,
expected_states, expected_http_code, create_helper_user=True)
expected_states, expected_http_code, create_helper_user=True,
locality='affinity')
# Update the shared instance info.
self.instance_info.id = instance_info.id
@ -57,6 +58,8 @@ class InstanceCreateRunner(TestRunner):
instance_info.dbaas_datastore_version)
self.instance_info.dbaas_flavor_href = instance_info.dbaas_flavor_href
self.instance_info.volume = instance_info.volume
self.instance_info.srv_grp_id = self.assert_server_group_exists(
self.instance_info.id)
def run_initial_configuration_create(self, expected_http_code=200):
dynamic_config = self.test_helper.get_dynamic_group()
@ -126,7 +129,8 @@ class InstanceCreateRunner(TestRunner):
self, name, flavor, trove_volume_size,
database_definitions, user_definitions,
configuration_id, root_password, datastore, datastore_version,
expected_states, expected_http_code, create_helper_user=False):
expected_states, expected_http_code, create_helper_user=False,
locality=None):
"""This assert method executes a 'create' call and verifies the server
response. It neither waits for the instance to become available
nor it performs any other validations itself.
@ -134,7 +138,6 @@ class InstanceCreateRunner(TestRunner):
(other tests may run while the instance is building) and also to allow
its reuse in other runners.
"""
databases = database_definitions
users = [{'name': item['name'], 'password': item['password']}
for item in user_definitions]
@ -199,7 +202,8 @@ class InstanceCreateRunner(TestRunner):
configuration=configuration_id,
availability_zone="nova",
datastore=instance_info.dbaas_datastore,
datastore_version=instance_info.dbaas_datastore_version)
datastore_version=instance_info.dbaas_datastore_version,
locality=locality)
self.assert_instance_action(
instance.id, expected_states[0:1], expected_http_code)
@ -227,6 +231,9 @@ class InstanceCreateRunner(TestRunner):
instance._info['datastore']['version'],
"Unexpected instance datastore version")
self.assert_configuration_group(instance_info.id, configuration_id)
if locality:
self.assert_equal(locality, instance._info['locality'],
"Unexpected locality")
return instance_info

View File

@ -34,6 +34,7 @@ class InstanceDeleteRunner(TestRunner):
self.assert_instance_delete(self.instance_info.id, expected_states,
expected_http_code)
self.assert_server_group_gone(self.instance_info.srv_grp_id)
def assert_instance_delete(self, instance_id, expected_states,
expected_http_code):

View File

@ -32,6 +32,10 @@ class ReplicationRunner(TestRunner):
self.replica_1_host = None
self.master_backup_count = None
self.used_data_sets = set()
self.non_affinity_master_id = None
self.non_affinity_srv_grp_id = None
self.non_affinity_repl_id = None
self.locality = 'affinity'
def run_add_data_for_replication(self, data_type=DataType.small):
self.assert_add_replication_data(data_type, self.master_host)
@ -55,6 +59,16 @@ class ReplicationRunner(TestRunner):
"""
self.test_helper.verify_data(data_type, host)
def run_create_non_affinity_master(self, expected_http_code=200):
self.non_affinity_master_id = self.auth_client.instances.create(
self.instance_info.name + 'non-affinity',
self.instance_info.dbaas_flavor_href,
self.instance_info.volume,
datastore=self.instance_info.dbaas_datastore,
datastore_version=self.instance_info.dbaas_datastore_version,
locality='anti-affinity').id
self.assert_client_code(expected_http_code)
def run_create_single_replica(self, expected_states=['BUILD', 'ACTIVE'],
expected_http_code=200):
master_id = self.instance_info.id
@ -81,6 +95,7 @@ class ReplicationRunner(TestRunner):
expected_http_code)
self._assert_is_master(master_id, [replica_id])
self._assert_is_replica(replica_id, master_id)
self._assert_locality(master_id)
return replica_id
def _assert_is_master(self, instance_id, replica_ids):
@ -103,12 +118,75 @@ class ReplicationRunner(TestRunner):
'Unexpected replication master ID')
self._validate_replica(instance_id)
def _assert_locality(self, instance_id):
replica_ids = self._get_replica_set(instance_id)
instance = self.get_instance(instance_id)
self.assert_equal(self.locality, instance.locality,
"Unexpected locality for instance '%s'" %
instance_id)
for replica_id in replica_ids:
replica = self.get_instance(replica_id)
self.assert_equal(self.locality, replica.locality,
"Unexpected locality for instance '%s'" %
replica_id)
def run_wait_for_non_affinity_master(self,
expected_states=['BUILD', 'ACTIVE']):
self._assert_instance_states(self.non_affinity_master_id,
expected_states)
self.non_affinity_srv_grp_id = self.assert_server_group_exists(
self.non_affinity_master_id)
def run_create_non_affinity_replica(self, expected_http_code=200):
self.non_affinity_repl_id = self.auth_client.instances.create(
self.instance_info.name + 'non-affinity-repl',
self.instance_info.dbaas_flavor_href,
self.instance_info.volume,
datastore=self.instance_info.dbaas_datastore,
datastore_version=self.instance_info.dbaas_datastore_version,
replica_of=self.non_affinity_master_id,
replica_count=1).id
self.assert_client_code(expected_http_code)
def run_create_multiple_replicas(self, expected_states=['BUILD', 'ACTIVE'],
expected_http_code=200):
master_id = self.instance_info.id
self.replica_2_id = self.assert_replica_create(
master_id, 'replica2', 2, expected_states, expected_http_code)
def run_wait_for_non_affinity_replica_fail(
self, expected_states=['BUILD', 'FAILED']):
self._assert_instance_states(self.non_affinity_repl_id,
expected_states,
fast_fail_status=['ACTIVE'])
def run_delete_non_affinity_repl(self,
expected_last_state=['SHUTDOWN'],
expected_http_code=202):
self.assert_delete_instances(
self.non_affinity_repl_id,
expected_last_state=expected_last_state,
expected_http_code=expected_http_code)
def assert_delete_instances(
self, instance_ids, expected_last_state, expected_http_code):
instance_ids = (instance_ids if utils.is_collection(instance_ids)
else [instance_ids])
for instance_id in instance_ids:
self.auth_client.instances.delete(instance_id)
self.assert_client_code(expected_http_code)
self.assert_all_gone(instance_ids, expected_last_state)
def run_delete_non_affinity_master(self,
expected_last_state=['SHUTDOWN'],
expected_http_code=202):
self.assert_delete_instances(
self.non_affinity_master_id,
expected_last_state=expected_last_state,
expected_http_code=expected_http_code)
self.assert_server_group_gone(self.non_affinity_srv_grp_id)
def run_add_data_to_replicate(self):
self.assert_add_replication_data(DataType.tiny, self.master_host)
@ -191,6 +269,12 @@ class ReplicationRunner(TestRunner):
self.assert_instance_action(new_master_id, expected_states,
expected_http_code)
def run_verify_replica_data_new_master(self):
self.assert_verify_replication_data(
DataType.small, self.replica_1_host)
self.assert_verify_replication_data(
DataType.tiny, self.replica_1_host)
def run_add_data_to_replicate2(self):
self.assert_add_replication_data(DataType.tiny2, self.replica_1_host)
@ -266,16 +350,6 @@ class ReplicationRunner(TestRunner):
self.replica_1_id, expected_last_state=expected_last_state,
expected_http_code=expected_http_code)
def assert_delete_instances(
self, instance_ids, expected_last_state, expected_http_code):
instance_ids = (instance_ids if utils.is_collection(instance_ids)
else [instance_ids])
for instance_id in instance_ids:
self.auth_client.instances.delete(instance_id)
self.assert_client_code(expected_http_code)
self.assert_all_gone(instance_ids, expected_last_state)
def run_delete_all_replicas(self, expected_last_state=['SHUTDOWN'],
expected_http_code=202):
self.assert_delete_all_replicas(

View File

@ -30,6 +30,7 @@ from trove.common.utils import poll_until, build_polling_task
from trove.tests.config import CONFIG
from trove.tests.util.check import AttrCheck
from trove.tests.util import create_dbaas_client
from trove.tests.util import create_nova_client
from trove.tests.util.users import Requirements
CONF = cfg.CONF
@ -213,7 +214,9 @@ class TestRunner(object):
self._unauth_client = None
self._admin_client = None
self._swift_client = None
self._nova_client = None
self._test_helper = None
self._servers = {}
@classmethod
def fail(cls, message):
@ -338,6 +341,12 @@ class TestRunner(object):
auth_version='2.0',
os_options=os_options)
@property
def nova_client(self):
if not self._nova_client:
self._nova_client = create_nova_client(self.instance_info.user)
return self._nova_client
def get_client_tenant(self, client):
tenant_name = client.real_client.client.tenant
service_url = client.real_client.client.service_url
@ -518,6 +527,50 @@ class TestRunner(object):
% (instance_id, instance.status))
return instance.status == status
def get_server(self, instance_id):
server = None
if instance_id in self._servers:
server = self._servers[instance_id]
else:
instance = self.get_instance(instance_id)
self.report.log("Getting server for instance: %s" % instance)
for nova_server in self.nova_client.servers.list():
if str(nova_server.name) == instance.name:
server = nova_server
break
if server:
self._servers[instance_id] = server
return server
def assert_server_group_exists(self, instance_id):
"""Check that the Nova instance associated with instance_id
belongs to a server group, and return the id.
"""
server = self.get_server(instance_id)
self.assert_is_not_none(server, "Could not find Nova server for '%s'" %
instance_id)
server_group = None
server_groups = self.nova_client.server_groups.list()
for sg in server_groups:
if server.id in sg.members:
server_group = sg
break
if server_group is None:
self.fail("Could not find server group for Nova instance %s" %
server.id)
return server_group.id
def assert_server_group_gone(self, srv_grp_id):
"""Ensure that the server group is no longer present."""
server_group = None
server_groups = self.nova_client.server_groups.list()
for sg in server_groups:
if sg.id == srv_grp_id:
server_group = sg
break
if server_group:
self.fail("Found left-over server group: %s" % server_group)
def get_instance(self, instance_id):
return self.auth_client.instances.get(instance_id)

View File

@ -0,0 +1,111 @@
# Copyright 2016 Tesora, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import copy
from mock import Mock, patch
from trove.common import server_group as srv_grp
from trove.tests.unittests import trove_testtools
class TestServerGroup(trove_testtools.TestCase):
def setUp(self):
super(TestServerGroup, self).setUp()
self.ServerGroup = srv_grp.ServerGroup()
self.context = trove_testtools.TroveTestContext(self)
self.sg_id = 'sg-1234'
self.locality = 'affinity'
self.expected_hints = {'group': self.sg_id}
self.server_group = Mock()
self.server_group.id = self.sg_id
self.server_group.policies = [self.locality]
self.server_group.members = ['id-1', 'id-2']
self.empty_server_group = copy.copy(self.server_group)
self.empty_server_group.members = ['id-1']
@patch.object(srv_grp, 'create_nova_client')
def test_create(self, mock_client):
mock_create = Mock(return_value=self.server_group)
mock_client.return_value.server_groups.create = mock_create
server_group = self.ServerGroup.create(
self.context, self.locality, "name_suffix")
mock_create.assert_called_with(name="locality_name_suffix",
policies=[self.locality])
self.assertEqual(self.server_group, server_group)
@patch.object(srv_grp, 'create_nova_client')
def test_delete(self, mock_client):
mock_delete = Mock()
mock_client.return_value.server_groups.delete = mock_delete
self.ServerGroup.delete(self.context, self.empty_server_group)
mock_delete.assert_called_with(self.sg_id)
@patch.object(srv_grp, 'create_nova_client')
def test_delete_non_empty(self, mock_client):
mock_delete = Mock()
mock_client.return_value.server_groups.delete = mock_delete
srv_grp.ServerGroup.delete(self.context, self.server_group)
mock_delete.assert_not_called()
@patch.object(srv_grp, 'create_nova_client')
def test_delete_force(self, mock_client):
mock_delete = Mock()
mock_client.return_value.server_groups.delete = mock_delete
self.ServerGroup.delete(self.context, self.server_group, force=True)
mock_delete.assert_called_with(self.sg_id)
def test_convert_to_hint(self):
hint = srv_grp.ServerGroup.convert_to_hint(self.server_group)
self.assertEqual(self.expected_hints, hint, "Unexpected hint")
def test_convert_to_hints(self):
hints = {'hint': 'myhint'}
hints = srv_grp.ServerGroup.convert_to_hint(self.server_group, hints)
self.expected_hints.update(hints)
self.assertEqual(self.expected_hints, hints, "Unexpected hints")
def test_convert_to_hint_none(self):
self.assertIsNone(srv_grp.ServerGroup.convert_to_hint(None))
@patch.object(srv_grp, 'create_nova_client')
def test_build_scheduler_hint(self, mock_client):
mock_create = Mock(return_value=self.server_group)
mock_client.return_value.server_groups.create = mock_create
expected_hint = {'get_back': 'same_dict'}
scheduler_hint = self.ServerGroup.build_scheduler_hint(
self.context, expected_hint, "name_suffix")
self.assertEqual(expected_hint, scheduler_hint, "Unexpected hint")
@patch.object(srv_grp, 'create_nova_client')
def test_build_scheduler_hint_from_locality(self, mock_client):
mock_create = Mock(return_value=self.server_group)
mock_client.return_value.server_groups.create = mock_create
expected_hint = {'group': 'sg-1234'}
scheduler_hint = self.ServerGroup.build_scheduler_hint(
self.context, self.locality, "name_suffix")
self.assertEqual(expected_hint, scheduler_hint, "Unexpected hint")
def test_build_scheduler_hint_none(self):
self.assertIsNone(srv_grp.ServerGroup.build_scheduler_hint(
self.context, None, None))
def test_get_locality(self):
locality = srv_grp.ServerGroup.get_locality(self.server_group)
self.assertEqual(self.locality, locality, "Unexpected locality")
def test_get_locality_none(self):
self.assertIsNone(srv_grp.ServerGroup.get_locality(None))

View File

@ -27,6 +27,7 @@ class TestInstanceController(trove_testtools.TestCase):
def setUp(self):
super(TestInstanceController, self).setUp()
self.controller = InstanceController()
self.locality = 'affinity'
self.instance = {
"instance": {
"volume": {"size": "1"},
@ -46,7 +47,8 @@ class TestInstanceController(trove_testtools.TestCase):
{
"name": "db2"
}
]
],
"locality": self.locality
}
}
self.context = trove_testtools.TroveTestContext(self)
@ -149,6 +151,20 @@ class TestInstanceController(trove_testtools.TestCase):
self.assertIn("'$#$%^^' does not match '^.*[0-9a-zA-Z]+.*$'",
errors[0].message)
def test_validate_create_invalid_locality(self):
body = self.instance
body['instance']['locality'] = "$%^"
schema = self.controller.get_schema('create', body)
validator = jsonschema.Draft4Validator(schema)
self.assertFalse(validator.is_valid(body))
errors = sorted(validator.iter_errors(body), key=lambda e: e.path)
error_messages = [error.message for error in errors]
error_paths = [error.path.pop() for error in errors]
self.assertEqual(1, len(errors))
self.assertIn("'$%^' does not match '^.*[0-9a-zA-Z]+.*$'",
error_messages)
self.assertIn("locality", error_paths)
def test_validate_restart(self):
body = {"restart": {}}
schema = self.controller.get_schema('action', body)

View File

@ -43,7 +43,8 @@ class SimpleInstanceTest(trove_testtools.TestCase):
InstanceTasks.BUILDING, name="TestInstance")
self.instance = SimpleInstance(
None, db_info, InstanceServiceStatus(
ServiceStatuses.BUILDING), ds_version=Mock(), ds=Mock())
ServiceStatuses.BUILDING), ds_version=Mock(), ds=Mock(),
locality='affinity')
db_info.addresses = {"private": [{"addr": "123.123.123.123"}],
"internal": [{"addr": "10.123.123.123"}],
"public": [{"addr": "15.123.123.123"}]}
@ -102,6 +103,9 @@ class SimpleInstanceTest(trove_testtools.TestCase):
self.assertTrue('123.123.123.123' in ip)
self.assertTrue('15.123.123.123' in ip)
def test_locality(self):
self.assertEqual('affinity', self.instance.locality)
class CreateInstanceTest(trove_testtools.TestCase):
@ -172,6 +176,7 @@ class CreateInstanceTest(trove_testtools.TestCase):
self.check = backup_models.DBBackup.check_swift_object_exist
backup_models.DBBackup.check_swift_object_exist = Mock(
return_value=True)
self.locality = 'affinity'
super(CreateInstanceTest, self).setUp()
@patch.object(task_api.API, 'get_client', Mock(return_value=Mock()))
@ -213,6 +218,19 @@ class CreateInstanceTest(trove_testtools.TestCase):
self.az, self.nics, self.configuration)
self.assertIsNotNone(instance)
def test_can_instantiate_with_locality(self):
# make sure the backup will fit
self.backup.size = 0.2
self.backup.save()
instance = models.Instance.create(
self.context, self.name, self.flavor_id,
self.image_id, self.databases, self.users,
self.datastore, self.datastore_version,
self.volume_size, self.backup_id,
self.az, self.nics, self.configuration,
locality=self.locality)
self.assertIsNotNone(instance)
class TestReplication(trove_testtools.TestCase):

View File

@ -62,6 +62,7 @@ class InstanceDetailViewTest(trove_testtools.TestCase):
self.instance.get_visible_ip_addresses = lambda: ["1.2.3.4"]
self.instance.slave_of_id = None
self.instance.slaves = []
self.instance.locality = 'affinity'
def tearDown(self):
super(InstanceDetailViewTest, self).tearDown()
@ -90,3 +91,10 @@ class InstanceDetailViewTest(trove_testtools.TestCase):
result['instance']['datastore']['version'])
self.assertNotIn('hostname', result['instance'])
self.assertEqual([self.ip], result['instance']['ip'])
def test_locality(self):
self.instance.hostname = None
view = InstanceDetailView(self.instance, Mock())
result = view.data()
self.assertEqual(self.instance.locality,
result['instance']['locality'])

View File

@ -48,6 +48,25 @@ class ApiTest(trove_testtools.TestCase):
self.api.client.prepare = Mock(return_value=self.call_context)
self.call_context.cast = Mock()
@patch.object(task_api.API, '_transform_obj', Mock(return_value='flv-id'))
def test_create_instance(self):
flavor = Mock()
self.api.create_instance(
'inst-id', 'inst-name', flavor, 'img-id', {'name': 'db1'},
{'name': 'usr1'}, 'mysql', None, 1, backup_id='bk-id',
availability_zone='az', root_password='pwd', nics=['nic-id'],
overrides={}, slave_of_id='slv-id', cluster_config={},
volume_type='type', modules=['mod-id'], locality='affinity')
self._verify_rpc_prepare_before_cast()
self._verify_cast(
'create_instance', availability_zone='az', backup_id='bk-id',
cluster_config={}, databases={'name': 'db1'},
datastore_manager='mysql', flavor='flv-id', image_id='img-id',
instance_id='inst-id', locality='affinity', modules=['mod-id'],
name='inst-name', nics=['nic-id'], overrides={}, packages=None,
root_password='pwd', slave_of_id='slv-id', users={'name': 'usr1'},
volume_size=1, volume_type='type')
def test_detach_replica(self):
self.api.detach_replica('some-instance-id')

View File

@ -15,14 +15,15 @@
# under the License.
from mock import Mock, patch, PropertyMock
from proboscis.asserts import assert_equal
from trove.backup.models import Backup
from trove.common.exception import TroveError, ReplicationSlaveAttachError
from trove.common import server_group as srv_grp
from trove.instance.tasks import InstanceTasks
from trove.taskmanager.manager import Manager
from trove.taskmanager import models
from trove.taskmanager import service
from trove.common.exception import TroveError, ReplicationSlaveAttachError
from proboscis.asserts import assert_equal
from trove.tests.unittests import trove_testtools
@ -189,7 +190,8 @@ class TestManager(trove_testtools.TestCase):
self.context, 'some-inst-id')
@patch.object(Backup, 'delete')
def test_create_replication_slave(self, mock_backup_delete):
@patch.object(models.BuiltInstanceTasks, 'load')
def test_create_replication_slave(self, mock_load, mock_backup_delete):
mock_tasks = Mock()
mock_snapshot = {'dataset': {'snapshot_id': 'test-id'}}
mock_tasks.get_replication_master_snapshot = Mock(
@ -203,7 +205,7 @@ class TestManager(trove_testtools.TestCase):
'temp-backup-id', None,
'some_password', None, Mock(),
'some-master-id', None, None,
None)
None, None)
mock_tasks.get_replication_master_snapshot.assert_called_with(
self.context, 'some-master-id', mock_flavor, 'temp-backup-id',
replica_number=1)
@ -211,15 +213,16 @@ class TestManager(trove_testtools.TestCase):
@patch.object(models.FreshInstanceTasks, 'load')
@patch.object(Backup, 'delete')
@patch.object(models.BuiltInstanceTasks, 'load')
@patch('trove.taskmanager.manager.LOG')
def test_exception_create_replication_slave(self, mock_logging,
def test_exception_create_replication_slave(self, mock_logging, mock_tasks,
mock_delete, mock_load):
mock_load.return_value.create_instance = Mock(side_effect=TroveError)
self.assertRaises(TroveError, self.manager.create_instance,
self.context, ['id1', 'id2'], Mock(), Mock(),
Mock(), None, None, 'mysql', 'mysql-server', 2,
'temp-backup-id', None, 'some_password', None,
Mock(), 'some-master-id', None, None, None)
Mock(), 'some-master-id', None, None, None, None)
def test_AttributeError_create_instance(self):
self.assertRaisesRegexp(
@ -227,20 +230,23 @@ class TestManager(trove_testtools.TestCase):
self.manager.create_instance, self.context, ['id1', 'id2'],
Mock(), Mock(), Mock(), None, None, 'mysql', 'mysql-server', 2,
'temp-backup-id', None, 'some_password', None, Mock(), None, None,
None, None)
None, None, None)
def test_create_instance(self):
mock_tasks = Mock()
mock_flavor = Mock()
mock_override = Mock()
mock_csg = Mock()
type(mock_csg.return_value).id = PropertyMock(
return_value='sg-id')
with patch.object(models.FreshInstanceTasks, 'load',
return_value=mock_tasks):
self.manager.create_instance(self.context, 'id1', 'inst1',
mock_flavor, 'mysql-image-id', None,
None, 'mysql', 'mysql-server', 2,
'temp-backup-id', None, 'password',
None, mock_override, None, None, None,
None)
with patch.object(srv_grp.ServerGroup, 'create', mock_csg):
self.manager.create_instance(
self.context, 'id1', 'inst1', mock_flavor,
'mysql-image-id', None, None, 'mysql', 'mysql-server', 2,
'temp-backup-id', None, 'password', None, mock_override,
None, None, None, None, 'affinity')
mock_tasks.create_instance.assert_called_with(mock_flavor,
'mysql-image-id', None,
None, 'mysql',
@ -248,7 +254,8 @@ class TestManager(trove_testtools.TestCase):
'temp-backup-id', None,
'password', None,
mock_override,
None, None, None, None)
None, None, None, None,
{'group': 'sg-id'})
mock_tasks.wait_for_instance.assert_called_with(36000, mock_flavor)
def test_create_cluster(self):

View File

@ -81,7 +81,8 @@ class fake_Server:
class fake_ServerManager:
def create(self, name, image_id, flavor_id, files, userdata,
security_groups, block_device_mapping, availability_zone=None,
nics=None, config_drive=False):
nics=None, config_drive=False,
scheduler_hints=None):
server = fake_Server()
server.id = "server_id"
server.name = name
@ -380,7 +381,7 @@ class FreshInstanceTasksTest(trove_testtools.TestCase):
'Error creating security group for instance',
self.freshinstancetasks.create_instance, mock_flavor,
'mysql-image-id', None, None, 'mysql', 'mysql-server', 2,
None, None, None, None, Mock(), None, None, None, None)
None, None, None, None, Mock(), None, None, None, None, None)
@patch.object(BaseInstance, 'update_db')
@patch.object(backup_models.Backup, 'get_by_id')
@ -402,7 +403,7 @@ class FreshInstanceTasksTest(trove_testtools.TestCase):
self.freshinstancetasks.create_instance, mock_flavor,
'mysql-image-id', None, None, 'mysql', 'mysql-server',
2, Mock(), None, 'root_password', None, Mock(), None, None, None,
None)
None, None)
@patch.object(BaseInstance, 'update_db')
@patch.object(taskmanager_models.FreshInstanceTasks, '_create_dns_entry')
@ -417,6 +418,8 @@ class FreshInstanceTasksTest(trove_testtools.TestCase):
mock_guest_prepare,
mock_build_volume_info,
mock_create_secgroup,
mock_create_server,
mock_get_injected_files,
*args):
mock_flavor = {'id': 8, 'ram': 768, 'name': 'bigger_flavor'}
config_content = {'config_contents': 'some junk'}
@ -428,13 +431,18 @@ class FreshInstanceTasksTest(trove_testtools.TestCase):
'mysql-server', 2,
None, None, None, None,
overrides, None, None,
'volume_type', None)
'volume_type', None,
{'group': 'sg-id'})
mock_create_secgroup.assert_called_with('mysql')
mock_build_volume_info.assert_called_with('mysql', volume_size=2,
volume_type='volume_type')
mock_guest_prepare.assert_called_with(
768, mock_build_volume_info(), 'mysql-server', None, None, None,
config_content, None, overrides, None, None, None)
mock_create_server.assert_called_with(
8, 'mysql-image-id', mock_create_secgroup(),
'mysql', mock_build_volume_info()['block_device'], None,
None, mock_get_injected_files(), {'group': 'sg-id'})
@patch.object(trove.guestagent.api.API, 'attach_replication_slave')
@patch.object(rpc, 'get_client')