From 9e96241d12f11893f4278dedca278bb01c4d6b34 Mon Sep 17 00:00:00 2001 From: Craig Vyvial Date: Mon, 16 Nov 2015 14:11:00 -0600 Subject: [PATCH] pxc grow/shrink cluster implementation * The adds support for pxc to grow a cluster. * api and taskmanager support for shrinking a cluster * validate that the networks given are the same for each instance in the cluster. * make sure to add the existing networks on an instance in the cluster. * add new Error task for grow and shrink. * nova client version configuration changed to a string option rather than an int option because the nova microversions change nova api output. This was needed for the network interfaces on existing instances. * testing for grow and shrink cluster Change-Id: I8602cad1a98970fcd99ee9793a89e1abdceb0982 Partially-Implements-Blueprint: pxc-grow-shrink-cluster --- tox.ini | 3 +- trove/common/cfg.py | 2 +- trove/common/exception.py | 13 ++ .../cluster/experimental/pxc/api.py | 158 +++++++++++++--- .../cluster/experimental/pxc/guestagent.py | 19 +- .../cluster/experimental/pxc/taskmanager.py | 170 +++++++++++++++++- .../cluster/experimental/redis/api.py | 1 + .../datastore/experimental/pxc/manager.py | 11 ++ .../datastore/experimental/pxc/service.py | 25 ++- trove/instance/tasks.py | 6 + trove/taskmanager/models.py | 5 +- .../runners/cluster_actions_runners.py | 9 +- .../unittests/cluster/test_pxc_cluster.py | 81 ++++++--- .../unittests/guestagent/test_pxc_api.py | 22 +++ .../unittests/guestagent/test_pxc_manager.py | 15 ++ .../taskmanager/test_pxc_clusters.py | 90 +++++++++- trove/tests/unittests/util/util.py | 6 + 17 files changed, 559 insertions(+), 77 deletions(-) diff --git a/tox.ini b/tox.ini index b79cbb5c74..824b6510c6 100644 --- a/tox.ini +++ b/tox.ini @@ -9,7 +9,7 @@ usedevelop = True install_command = pip install -U {opts} {packages} deps = -r{toxinidir}/requirements.txt -r{toxinidir}/test-requirements.txt -commands = find . -type f -name "*.pyc" -delete +commands = find ./trove -type f -name "*.pyc" -delete pip install pymongo===3.0.3 {envpython} run_tests.py python setup.py testr --slowest @@ -33,7 +33,6 @@ commands = oslo_debug_helper {posargs} [testenv:cover] basepython = python2.7 commands = - {envpython} run_tests.py --group=does_not_exist coverage erase python setup.py testr --coverage coverage run -a run_tests.py diff --git a/trove/common/cfg.py b/trove/common/cfg.py index 0a4a03d2db..eb709c028d 100644 --- a/trove/common/cfg.py +++ b/trove/common/cfg.py @@ -55,7 +55,7 @@ common_opts = [ help='Service type to use when searching catalog.'), cfg.StrOpt('nova_compute_endpoint_type', default='publicURL', help='Service endpoint type to use when searching catalog.'), - cfg.IntOpt('nova_client_version', default=2, + cfg.StrOpt('nova_client_version', default='2.12', help="The version of of the compute service client."), cfg.StrOpt('neutron_url', help='URL without the tenant segment.'), cfg.StrOpt('neutron_service_type', default='network', diff --git a/trove/common/exception.py b/trove/common/exception.py index 1787ed7e24..a67b2224e3 100644 --- a/trove/common/exception.py +++ b/trove/common/exception.py @@ -504,6 +504,14 @@ class ClusterFlavorsNotEqual(TroveError): message = _("The flavor for each instance in a cluster must be the same.") +class ClusterNetworksNotEqual(TroveError): + message = _("The network for each instance in a cluster must be the same.") + + +class NetworkNotFound(TroveError): + message = _("Network Resource %(uuid)s cannot be found.") + + class ClusterVolumeSizeRequired(TroveError): message = _("A volume size is required for each instance in the cluster.") @@ -523,6 +531,11 @@ class ClusterNumInstancesNotLargeEnough(TroveError): "be at least %(num_instances)s.") +class ClusterShrinkMustNotLeaveClusterEmpty(TroveError): + message = _("Must leave at least one instance in the cluster when " + "shrinking.") + + class ClusterInstanceOperationNotSupported(TroveError): message = _("Operation not supported for instances that are part of a " "cluster.") diff --git a/trove/common/strategies/cluster/experimental/pxc/api.py b/trove/common/strategies/cluster/experimental/pxc/api.py index c6aa403385..c13e29ab14 100644 --- a/trove/common/strategies/cluster/experimental/pxc/api.py +++ b/trove/common/strategies/cluster/experimental/pxc/api.py @@ -22,8 +22,10 @@ from trove.common import cfg from trove.common import exception from trove.common import remote from trove.common.strategies.cluster import base +from trove.common import utils from trove.extensions.mgmt.clusters.views import MgmtClusterView -from trove.instance import models as inst_models +from trove.instance.models import DBInstance +from trove.instance.models import Instance from trove.quota.quota import check_quotas from trove.taskmanager import api as task_api @@ -40,7 +42,29 @@ class PXCAPIStrategy(base.BaseAPIStrategy): @property def cluster_controller_actions(self): - return {} + return { + 'grow': self._action_grow_cluster, + 'shrink': self._action_shrink_cluster, + } + + def _action_grow_cluster(self, cluster, body): + nodes = body['grow'] + instances = [] + for node in nodes: + instance = { + 'flavor_id': utils.get_id_from_href(node['flavorRef']) + } + if 'name' in node: + instance['name'] = node['name'] + if 'volume' in node: + instance['volume_size'] = int(node['volume']['size']) + instances.append(instance) + return cluster.grow(instances) + + def _action_shrink_cluster(self, cluster, body): + instances = body['shrink'] + instance_ids = [instance['id'] for instance in instances] + return cluster.shrink(instance_ids) @property def cluster_view_class(self): @@ -53,10 +77,10 @@ class PXCAPIStrategy(base.BaseAPIStrategy): class PXCCluster(models.Cluster): - @classmethod - def create(cls, context, name, datastore, datastore_version, - instances, extended_properties): - LOG.debug("Initiating PXC cluster creation.") + @staticmethod + def _validate_cluster_instances(context, instances, datastore, + datastore_version): + """Validate the flavor and volume""" pxc_conf = CONF.get(datastore_version.manager) num_instances = len(instances) @@ -65,7 +89,7 @@ class PXCCluster(models.Cluster): raise exception.ClusterNumInstancesNotLargeEnough( num_instances=pxc_conf.min_cluster_member_count) - # Checking flavors + # Checking flavors and get delta for quota check flavor_ids = [instance['flavor_id'] for instance in instances] if len(set(flavor_ids)) != 1: raise exception.ClusterFlavorsNotEqual() @@ -77,7 +101,7 @@ class PXCCluster(models.Cluster): raise exception.FlavorNotFound(uuid=flavor_id) deltas = {'instances': num_instances} - # Checking volumes + # Checking volumes and get delta for quota check volume_sizes = [instance['volume_size'] for instance in instances if instance.get('volume_size', None)] volume_size = None @@ -96,35 +120,64 @@ class PXCCluster(models.Cluster): if ephemeral_support and flavor.ephemeral == 0: raise exception.LocalStorageNotSpecified(flavor=flavor_id) + # quota check check_quotas(context.tenant, deltas) - nics = [instance.get('nics', None) for instance in instances] + # Checking networks are same for the cluster + instance_nics = [instance.get('nics', None) for instance in instances] + if len(set(instance_nics)) != 1: + raise exception.ClusterNetworksNotEqual() + instance_nic = instance_nics[0] + if instance_nic is None: + return + try: + nova_client.networks.get(instance_nic) + except nova_exceptions.NotFound: + raise exception.NetworkNotFound(uuid=instance_nic) - azs = [instance.get('availability_zone', None) - for instance in instances] + @staticmethod + def _create_instances(context, db_info, datastore, datastore_version, + instances): + member_config = {"id": db_info.id, + "instance_type": "member"} + name_index = 1 + for instance in instances: + if not instance.get("name"): + instance['name'] = "%s-member-%s" % (db_info.name, + str(name_index)) + name_index += 1 + return map(lambda instance: + Instance.create(context, + instance['name'], + instance['flavor_id'], + datastore_version.image_id, + [], [], + datastore, datastore_version, + instance.get('volume_size', None), + None, + availability_zone=instance.get( + 'availability_zone', None), + nics=instance.get('nics', None), + configuration_id=None, + cluster_config=member_config + ), + instances) + + @classmethod + def create(cls, context, name, datastore, datastore_version, + instances, extended_properties): + LOG.debug("Initiating PXC cluster creation.") + cls._validate_cluster_instances(context, instances, datastore, + datastore_version) # Updating Cluster Task db_info = models.DBCluster.create( name=name, tenant_id=context.tenant, datastore_version_id=datastore_version.id, task_status=ClusterTasks.BUILDING_INITIAL) - member_config = {"id": db_info.id, - "instance_type": "member"} - - # Creating member instances - for i in range(0, num_instances): - instance_name = "%s-member-%s" % (name, str(i + 1)) - inst_models.Instance.create(context, instance_name, - flavor_id, - datastore_version.image_id, - [], [], datastore, - datastore_version, - volume_size, None, - nics=nics[i], - availability_zone=azs[i], - configuration_id=None, - cluster_config=member_config) + cls._create_instances(context, db_info, datastore, datastore_version, + instances) # Calling taskmanager to further proceed for cluster-configuration task_api.load(context, datastore_version.manager).create_cluster( @@ -132,6 +185,57 @@ class PXCCluster(models.Cluster): return PXCCluster(context, db_info, datastore, datastore_version) + def _get_cluster_network_interfaces(self): + nova_client = remote.create_nova_client(self.context) + nova_instance_id = self.db_instances[0].compute_instance_id + interfaces = nova_client.virtual_interfaces.list(nova_instance_id) + ret = [{"net-id": getattr(interface, 'net_id')} + for interface in interfaces] + return ret + + def grow(self, instances): + LOG.debug("Growing cluster %s." % self.id) + + self.validate_cluster_available() + + context = self.context + db_info = self.db_info + datastore = self.ds + datastore_version = self.ds_version + + # Get the network of the existing cluster instances. + interface_ids = self._get_cluster_network_interfaces() + for instance in instances: + instance["nics"] = interface_ids + + db_info.update(task_status=ClusterTasks.GROWING_CLUSTER) + + new_instances = self._create_instances(context, db_info, + datastore, datastore_version, + instances) + + task_api.load(context, datastore_version.manager).grow_cluster( + db_info.id, [instance.id for instance in new_instances]) + + return PXCCluster(context, db_info, datastore, datastore_version) + + def shrink(self, instances): + """Removes instances from a cluster.""" + LOG.debug("Shrinking cluster %s." % self.id) + + self.validate_cluster_available() + removal_instances = [Instance.load(self.context, inst_id) + for inst_id in instances] + db_instances = DBInstance.find_all(cluster_id=self.db_info.id).all() + if len(db_instances) - len(removal_instances) < 1: + raise exception.ClusterShrinkMustNotLeaveClusterEmpty() + + self.db_info.update(task_status=ClusterTasks.SHRINKING_CLUSTER) + task_api.load(self.context, self.ds_version.manager).shrink_cluster( + self.db_info.id, [instance.id for instance in removal_instances]) + + return PXCCluster(self.context, self.db_info, self.ds, self.ds_version) + class PXCClusterView(ClusterView): diff --git a/trove/common/strategies/cluster/experimental/pxc/guestagent.py b/trove/common/strategies/cluster/experimental/pxc/guestagent.py index aa67281a5b..425cd9001b 100644 --- a/trove/common/strategies/cluster/experimental/pxc/guestagent.py +++ b/trove/common/strategies/cluster/experimental/pxc/guestagent.py @@ -33,6 +33,7 @@ class PXCGuestAgentAPI(guest_api.API): def install_cluster(self, replication_user, cluster_configuration, bootstrap): + """Install the cluster.""" LOG.debug("Installing PXC cluster.") self._call("install_cluster", CONF.cluster_usage_timeout, self.version_cap, @@ -41,13 +42,27 @@ class PXCGuestAgentAPI(guest_api.API): bootstrap=bootstrap) def reset_admin_password(self, admin_password): - """Store this password on the instance as the admin password""" + """Store this password on the instance as the admin password.""" self._call("reset_admin_password", CONF.cluster_usage_timeout, self.version_cap, admin_password=admin_password) def cluster_complete(self): - """Set the status that the cluster is build is complete""" + """Set the status that the cluster is build is complete.""" LOG.debug("Notifying cluster install completion.") return self._call("cluster_complete", guest_api.AGENT_HIGH_TIMEOUT, self.version_cap) + + def get_cluster_context(self): + """Get the context of the cluster.""" + LOG.debug("Getting the cluster context.") + return self._call("get_cluster_context", guest_api.AGENT_HIGH_TIMEOUT, + self.version_cap) + + def write_cluster_configuration_overrides(self, cluster_configuration): + """Write an updated the cluster configuration.""" + LOG.debug("Writing an updated the cluster configuration.") + self._call("write_cluster_configuration_overrides", + guest_api.AGENT_HIGH_TIMEOUT, + self.version_cap, + cluster_configuration=cluster_configuration) diff --git a/trove/common/strategies/cluster/experimental/pxc/taskmanager.py b/trove/common/strategies/cluster/experimental/pxc/taskmanager.py index 2988d763e8..250fd79392 100644 --- a/trove/common/strategies/cluster/experimental/pxc/taskmanager.py +++ b/trove/common/strategies/cluster/experimental/pxc/taskmanager.py @@ -15,6 +15,8 @@ from eventlet.timeout import Timeout from oslo_log import log as logging from trove.common import cfg +from trove.common.exception import PollTimeOut +from trove.common.exception import TroveError from trove.common.i18n import _ from trove.common.remote import create_nova_client from trove.common.strategies.cluster import base @@ -22,6 +24,7 @@ from trove.common.template import ClusterConfigTemplate from trove.common import utils from trove.instance.models import DBInstance from trove.instance.models import Instance +from trove.instance import tasks as inst_tasks from trove.taskmanager import api as task_api import trove.taskmanager.models as task_models @@ -74,7 +77,7 @@ class PXCClusterTasks(task_models.ClusterTasks): LOG.debug("Waiting for instances to get to cluster-ready status.") # Wait for cluster members to get to cluster-ready status. if not self._all_instances_ready(instance_ids, cluster_id): - return + raise TroveError("Instances in cluster did not report ACTIVE") LOG.debug("All members ready, proceeding for cluster setup.") instances = [Instance.load(context, instance_id) for instance_id @@ -113,7 +116,8 @@ class PXCClusterTasks(task_models.ClusterTasks): # render the conf.d/cluster.cnf configuration cluster_configuration = self._render_cluster_config( context, - instance, ",".join(cluster_ips), + instance, + ",".join(cluster_ips), cluster_name, replication_user) @@ -139,7 +143,169 @@ class PXCClusterTasks(task_models.ClusterTasks): raise # not my timeout LOG.exception(_("Timeout for building cluster.")) self.update_statuses_on_failure(cluster_id) + except TroveError: + LOG.exception(_("Error creating cluster %s.") % cluster_id) + self.update_statuses_on_failure(cluster_id) finally: timeout.cancel() LOG.debug("End create_cluster for id: %s." % cluster_id) + + def grow_cluster(self, context, cluster_id, new_instance_ids): + LOG.debug("Begin pxc grow_cluster for id: %s." % cluster_id) + + def _grow_cluster(): + + db_instances = DBInstance.find_all(cluster_id=cluster_id).all() + existing_instances = [Instance.load(context, db_inst.id) + for db_inst in db_instances + if db_inst.id not in new_instance_ids] + if not existing_instances: + raise TroveError("Unable to determine existing cluster " + "member(s)") + + # get list of ips of existing cluster members + existing_cluster_ips = [self.get_ip(instance) for instance in + existing_instances] + existing_instance_guests = [self.get_guest(instance) + for instance in existing_instances] + + # get the cluster context to setup new members + cluster_context = existing_instance_guests[0].get_cluster_context() + + # Wait for cluster members to get to cluster-ready status. + if not self._all_instances_ready(new_instance_ids, cluster_id): + raise TroveError("Instances in cluster did not report ACTIVE") + + LOG.debug("All members ready, proceeding for cluster setup.") + + # Get the new instances to join the cluster + new_instances = [Instance.load(context, instance_id) + for instance_id in new_instance_ids] + new_cluster_ips = [self.get_ip(instance) for instance in + new_instances] + for instance in new_instances: + guest = self.get_guest(instance) + + guest.reset_admin_password(cluster_context['admin_password']) + + # render the conf.d/cluster.cnf configuration + cluster_configuration = self._render_cluster_config( + context, + instance, + ",".join(existing_cluster_ips), + cluster_context['cluster_name'], + cluster_context['replication_user']) + + # push the cluster config and bootstrap the first instance + bootstrap = False + guest.install_cluster(cluster_context['replication_user'], + cluster_configuration, + bootstrap) + + # apply the new config to all instances + for instance in existing_instances + new_instances: + guest = self.get_guest(instance) + # render the conf.d/cluster.cnf configuration + cluster_configuration = self._render_cluster_config( + context, + instance, + ",".join(existing_cluster_ips + new_cluster_ips), + cluster_context['cluster_name'], + cluster_context['replication_user']) + guest.write_cluster_configuration_overrides( + cluster_configuration) + + for instance in new_instances: + guest = self.get_guest(instance) + guest.cluster_complete() + + timeout = Timeout(CONF.cluster_usage_timeout) + try: + _grow_cluster() + self.reset_task() + except Timeout as t: + if t is not timeout: + raise # not my timeout + LOG.exception(_("Timeout for growing cluster.")) + self.update_statuses_on_failure( + cluster_id, status=inst_tasks.InstanceTasks.GROWING_ERROR) + except Exception: + LOG.exception(_("Error growing cluster %s.") % cluster_id) + self.update_statuses_on_failure( + cluster_id, status=inst_tasks.InstanceTasks.GROWING_ERROR) + finally: + timeout.cancel() + + LOG.debug("End grow_cluster for id: %s." % cluster_id) + + def shrink_cluster(self, context, cluster_id, removal_instance_ids): + LOG.debug("Begin pxc shrink_cluster for id: %s." % cluster_id) + + def _shrink_cluster(): + removal_instances = [Instance.load(context, instance_id) + for instance_id in removal_instance_ids] + for instance in removal_instances: + Instance.delete(instance) + + # wait for instances to be deleted + def all_instances_marked_deleted(): + non_deleted_instances = DBInstance.find_all( + cluster_id=cluster_id, deleted=False).all() + non_deleted_ids = [db_instance.id for db_instance + in non_deleted_instances] + return not bool( + set(removal_instance_ids).intersection( + set(non_deleted_ids)) + ) + try: + LOG.info(_("Deleting instances (%s)") % removal_instance_ids) + utils.poll_until(all_instances_marked_deleted, + sleep_time=2, + time_out=CONF.cluster_delete_time_out) + except PollTimeOut: + LOG.error(_("timeout for instances to be marked as deleted.")) + return + + db_instances = DBInstance.find_all(cluster_id=cluster_id).all() + leftover_instances = [Instance.load(context, db_inst.id) + for db_inst in db_instances + if db_inst.id not in removal_instance_ids] + leftover_cluster_ips = [self.get_ip(instance) for instance in + leftover_instances] + + # Get config changes for left over instances + rnd_cluster_guest = self.get_guest(leftover_instances[0]) + cluster_context = rnd_cluster_guest.get_cluster_context() + + # apply the new config to all leftover instances + for instance in leftover_instances: + guest = self.get_guest(instance) + # render the conf.d/cluster.cnf configuration + cluster_configuration = self._render_cluster_config( + context, + instance, + ",".join(leftover_cluster_ips), + cluster_context['cluster_name'], + cluster_context['replication_user']) + guest.write_cluster_configuration_overrides( + cluster_configuration) + + timeout = Timeout(CONF.cluster_usage_timeout) + try: + _shrink_cluster() + self.reset_task() + except Timeout as t: + if t is not timeout: + raise # not my timeout + LOG.exception(_("Timeout for shrinking cluster.")) + self.update_statuses_on_failure( + cluster_id, status=inst_tasks.InstanceTasks.SHRINKING_ERROR) + except Exception: + LOG.exception(_("Error shrinking cluster %s.") % cluster_id) + self.update_statuses_on_failure( + cluster_id, status=inst_tasks.InstanceTasks.SHRINKING_ERROR) + finally: + timeout.cancel() + + LOG.debug("End shrink_cluster for id: %s." % cluster_id) diff --git a/trove/common/strategies/cluster/experimental/redis/api.py b/trove/common/strategies/cluster/experimental/redis/api.py index 03de7fa7d4..4c018b7f8f 100644 --- a/trove/common/strategies/cluster/experimental/redis/api.py +++ b/trove/common/strategies/cluster/experimental/redis/api.py @@ -21,6 +21,7 @@ from trove.cluster.views import ClusterView from trove.common import cfg from trove.common import exception from trove.common.exception import TroveError +from trove.common.i18n import _ from trove.common import remote from trove.common.strategies.cluster import base from trove.common import utils diff --git a/trove/guestagent/datastore/experimental/pxc/manager.py b/trove/guestagent/datastore/experimental/pxc/manager.py index 7026382bd0..389621b2f7 100644 --- a/trove/guestagent/datastore/experimental/pxc/manager.py +++ b/trove/guestagent/datastore/experimental/pxc/manager.py @@ -69,3 +69,14 @@ class Manager(manager.MySqlManager): LOG.debug("Storing the admin password on the instance.") app = self.mysql_app(self.mysql_app_status.get()) app.reset_admin_password(admin_password) + + def get_cluster_context(self, context): + LOG.debug("Getting the cluster context.") + app = self.mysql_app(self.mysql_app_status.get()) + return app.get_cluster_context() + + def write_cluster_configuration_overrides(self, context, + cluster_configuration): + LOG.debug("Apply the updated cluster configuration.") + app = self.mysql_app(self.mysql_app_status.get()) + app.write_cluster_configuration_overrides(cluster_configuration) diff --git a/trove/guestagent/datastore/experimental/pxc/service.py b/trove/guestagent/datastore/experimental/pxc/service.py index 9f783194e0..a61b0a7863 100644 --- a/trove/guestagent/datastore/experimental/pxc/service.py +++ b/trove/guestagent/datastore/experimental/pxc/service.py @@ -63,7 +63,7 @@ class PXCApp(service.BaseMySqlApp): def _wait_for_mysql_to_be_really_alive(self, max_time): utils.poll_until(self._test_mysql, sleep_time=3, time_out=max_time) - def secure(self, config_contents, overrides): + def secure(self, config_contents): LOG.info(_("Generating admin password.")) admin_password = utils.generate_random_password() service.clear_expired_password() @@ -74,7 +74,6 @@ class PXCApp(service.BaseMySqlApp): self._create_admin_user(client, admin_password) self.stop_db() self._reset_configuration(config_contents, admin_password) - self._apply_user_overrides(overrides) self.start_mysql() # TODO(cp16net) figure out reason for PXC not updating the password try: @@ -93,7 +92,6 @@ class PXCApp(service.BaseMySqlApp): self.stop_db() self._reset_configuration(config_contents, admin_password) - self._apply_user_overrides(overrides) self.start_mysql() self._wait_for_mysql_to_be_really_alive( CONF.timeout_wait_for_service) @@ -121,13 +119,16 @@ class PXCApp(service.BaseMySqlApp): LOG.exception(_("Error bootstrapping cluster.")) raise RuntimeError(_("Service is not discovered.")) + def write_cluster_configuration_overrides(self, cluster_configuration): + self.configuration_manager.apply_system_override( + cluster_configuration, CNF_CLUSTER) + def install_cluster(self, replication_user, cluster_configuration, bootstrap=False): LOG.info(_("Installing cluster configuration.")) self._grant_cluster_replication_privilege(replication_user) self.stop_db() - self.configuration_manager.apply_system_override(cluster_configuration, - CNF_CLUSTER) + self.write_cluster_configuration_overrides(cluster_configuration) self.wipe_ib_logfiles() LOG.debug("bootstrap the instance? : %s" % bootstrap) # Have to wait to sync up the joiner instances with the donor instance. @@ -136,6 +137,20 @@ class PXCApp(service.BaseMySqlApp): else: self.start_mysql(timeout=CONF.restore_usage_timeout) + def get_cluster_context(self): + auth = self.configuration_manager.get_value('mysqld').get( + "wsrep_sst_auth").replace('"', '') + cluster_name = self.configuration_manager.get_value( + 'mysqld').get("wsrep_cluster_name") + return { + 'replication_user': { + 'name': auth.split(":")[0], + 'password': auth.split(":")[1], + }, + 'cluster_name': cluster_name, + 'admin_password': self.get_auth_password() + } + class PXCRootAccess(service.BaseMySqlRootAccess): def __init__(self): diff --git a/trove/instance/tasks.py b/trove/instance/tasks.py index 734df950c5..97907ea72e 100644 --- a/trove/instance/tasks.py +++ b/trove/instance/tasks.py @@ -106,6 +106,12 @@ class InstanceTasks(object): EJECTION_ERROR = InstanceTask(0x56, 'EJECTING', 'Replica Source Ejection Error.', is_error=True) + GROWING_ERROR = InstanceTask(0x57, 'GROWING', + 'Growing Cluster Error.', + is_error=True) + SHRINKING_ERROR = InstanceTask(0x58, 'SHRINKING', + 'Shrinking Cluster Error.', + is_error=True) # Dissuade further additions at run-time. InstanceTask.__init__ = None diff --git a/trove/taskmanager/models.py b/trove/taskmanager/models.py index 03e7fca6c4..200da00548 100755 --- a/trove/taskmanager/models.py +++ b/trove/taskmanager/models.py @@ -181,7 +181,8 @@ class ConfigurationMixin(object): class ClusterTasks(Cluster): - def update_statuses_on_failure(self, cluster_id, shard_id=None): + def update_statuses_on_failure(self, cluster_id, shard_id=None, + status=None): if CONF.update_status_on_fail: if shard_id: @@ -193,7 +194,7 @@ class ClusterTasks(Cluster): for db_instance in db_instances: db_instance.set_task_status( - InstanceTasks.BUILDING_ERROR_SERVER) + status or InstanceTasks.BUILDING_ERROR_SERVER) db_instance.save() @classmethod diff --git a/trove/tests/scenario/runners/cluster_actions_runners.py b/trove/tests/scenario/runners/cluster_actions_runners.py index f2643be1d3..6a92246797 100644 --- a/trove/tests/scenario/runners/cluster_actions_runners.py +++ b/trove/tests/scenario/runners/cluster_actions_runners.py @@ -191,6 +191,8 @@ class ClusterActionsRunner(TestRunner): self._assert_cluster_action(cluster_id, expected_task_name, expected_http_code) + self._assert_cluster_states(cluster_id, ['NONE']) + cluster = self.auth_client.clusters.get(cluster_id) self.assert_equal( len(removed_instance_names), initial_instance_count - len(cluster.instances), @@ -199,7 +201,6 @@ class ClusterActionsRunner(TestRunner): cluster_instances = self._get_cluster_instances(cluster_id) self.assert_all_instance_states(cluster_instances, ['ACTIVE']) - self._assert_cluster_states(cluster_id, ['NONE']) self._assert_cluster_response(cluster_id, 'NONE') def _find_cluster_instances_by_name(self, cluster, instance_names): @@ -322,9 +323,3 @@ class PxcClusterActionsRunner(ClusterActionsRunner): num_nodes=num_nodes, expected_task_name=expected_task_name, expected_instance_states=expected_instance_states, expected_http_code=expected_http_code) - - def run_cluster_shrink(self): - raise SkipTest("Operation not supported by the datastore.") - - def run_cluster_grow(self): - raise SkipTest("Operation not supported by the datastore.") diff --git a/trove/tests/unittests/cluster/test_pxc_cluster.py b/trove/tests/unittests/cluster/test_pxc_cluster.py index b12bca1ad4..59ce95664f 100644 --- a/trove/tests/unittests/cluster/test_pxc_cluster.py +++ b/trove/tests/unittests/cluster/test_pxc_cluster.py @@ -196,16 +196,19 @@ class ClusterTest(trove_testtools.TestCase): instances, {} ) + @patch.object(inst_models.DBInstance, 'find_all') @patch.object(inst_models.Instance, 'create') @patch.object(DBCluster, 'create') @patch.object(task_api, 'load') @patch.object(QUOTAS, 'check_quotas') @patch.object(remote, 'create_nova_client') def test_create(self, mock_client, mock_check_quotas, mock_task_api, - mock_db_create, mock_ins_create): + mock_db_create, mock_ins_create, mock_find_all): instances = self.instances flavors = Mock() + networks = Mock() mock_client.return_value.flavors = flavors + mock_client.return_value.networks = networks self.cluster.create(Mock(), self.cluster_name, self.datastore, @@ -215,28 +218,7 @@ class ClusterTest(trove_testtools.TestCase): mock_db_create.return_value.id) self.assertEqual(3, mock_ins_create.call_count) - @patch.object(inst_models.Instance, 'create') - @patch.object(DBCluster, 'create') - @patch.object(task_api, 'load') - @patch.object(QUOTAS, 'check_quotas') - @patch.object(remote, 'create_nova_client') - def test_create_over_limit(self, mock_client, mock_check_quotas, - mock_task_api, mock_db_create, mock_ins_create): - instances = [{'volume_size': 1, 'flavor_id': '1234'}, - {'volume_size': 1, 'flavor_id': '1234'}, - {'volume_size': 1, 'flavor_id': '1234'}, - {'volume_size': 1, 'flavor_id': '1234'}] - flavors = Mock() - mock_client.return_value.flavors = flavors - self.cluster.create(Mock(), - self.cluster_name, - self.datastore, - self.datastore_version, - instances, {}) - mock_task_api.return_value.create_cluster.assert_called_with( - mock_db_create.return_value.id) - self.assertEqual(4, mock_ins_create.call_count) - + @patch.object(inst_models.DBInstance, 'find_all') @patch.object(pxc_api, 'CONF') @patch.object(inst_models.Instance, 'create') @patch.object(DBCluster, 'create') @@ -245,7 +227,8 @@ class ClusterTest(trove_testtools.TestCase): @patch.object(remote, 'create_nova_client') def test_create_with_ephemeral_flavor(self, mock_client, mock_check_quotas, mock_task_api, mock_db_create, - mock_ins_create, mock_conf): + mock_ins_create, mock_conf, + mock_find_all): class FakeFlavor: def __init__(self, flavor_id): self.flavor_id = flavor_id @@ -300,3 +283,53 @@ class ClusterTest(trove_testtools.TestCase): self.cluster.db_info.task_status = ClusterTasks.DELETING self.cluster.delete() mock_update_db.assert_called_with(task_status=ClusterTasks.DELETING) + + @patch.object(pxc_api.PXCCluster, '_get_cluster_network_interfaces') + @patch.object(DBCluster, 'update') + @patch.object(pxc_api, 'CONF') + @patch.object(inst_models.Instance, 'create') + @patch.object(task_api, 'load') + @patch.object(QUOTAS, 'check_quotas') + @patch.object(remote, 'create_nova_client') + def test_grow(self, mock_client, mock_check_quotas, mock_task_api, + mock_inst_create, mock_conf, mock_update, mock_interfaces): + mock_client.return_value.flavors = Mock() + mock_interfaces.return_value = [Mock()] + self.cluster.grow(self.instances) + mock_update.assert_called_with( + task_status=ClusterTasks.GROWING_CLUSTER) + mock_task_api.return_value.grow_cluster.assert_called_with( + self.db_info.id, + [mock_inst_create.return_value.id] * 3) + self.assertEqual(mock_inst_create.call_count, 3) + self.assertEqual(mock_interfaces.call_count, 1) + + @patch.object(inst_models.DBInstance, 'find_all') + @patch.object(inst_models.Instance, 'load') + @patch.object(Cluster, 'validate_cluster_available') + def test_shrink_empty(self, mock_validate, mock_load, mock_find_all): + instance = Mock() + self.assertRaises( + exception.ClusterShrinkMustNotLeaveClusterEmpty, + self.cluster.shrink, [instance]) + + @patch.object(pxc_api.PXCCluster, '__init__') + @patch.object(task_api, 'load') + @patch.object(DBCluster, 'update') + @patch.object(inst_models.DBInstance, 'find_all') + @patch.object(inst_models.Instance, 'load') + @patch.object(Cluster, 'validate_cluster_available') + def test_shrink(self, mock_validate, mock_load, mock_find_all, + mock_update, mock_task_api, mock_init): + mock_init.return_value = None + existing_instances = [Mock(), Mock()] + mock_find_all.return_value.all.return_value = existing_instances + instance = Mock() + self.cluster.shrink([instance]) + mock_validate.assert_called_with() + mock_update.assert_called_with( + task_status=ClusterTasks.SHRINKING_CLUSTER) + mock_task_api.return_value.shrink_cluster.assert_called_with( + self.db_info.id, [mock_load.return_value.id]) + mock_init.assert_called_with(self.context, self.db_info, + self.datastore, self.datastore_version) diff --git a/trove/tests/unittests/guestagent/test_pxc_api.py b/trove/tests/unittests/guestagent/test_pxc_api.py index 26a0ccca85..ed03086411 100644 --- a/trove/tests/unittests/guestagent/test_pxc_api.py +++ b/trove/tests/unittests/guestagent/test_pxc_api.py @@ -122,3 +122,25 @@ class ApiTest(trove_testtools.TestCase): self._verify_rpc_prepare_before_call() self._verify_call('cluster_complete') self.assertEqual(exp_resp, resp) + + def test_get_cluster_context(self): + exp_resp = None + self.call_context.call.return_value = exp_resp + + resp = self.api.get_cluster_context() + + self._verify_rpc_prepare_before_call() + self._verify_call('get_cluster_context') + self.assertEqual(exp_resp, resp) + + def test_write_cluster_configuration_overrides(self): + exp_resp = None + self.call_context.call.return_value = exp_resp + + resp = self.api.write_cluster_configuration_overrides( + cluster_configuration="cluster-configuration") + + self._verify_rpc_prepare_before_call() + self._verify_call('write_cluster_configuration_overrides', + cluster_configuration="cluster-configuration",) + self.assertEqual(exp_resp, resp) diff --git a/trove/tests/unittests/guestagent/test_pxc_manager.py b/trove/tests/unittests/guestagent/test_pxc_manager.py index 5861cb424a..20050dd18c 100644 --- a/trove/tests/unittests/guestagent/test_pxc_manager.py +++ b/trove/tests/unittests/guestagent/test_pxc_manager.py @@ -57,3 +57,18 @@ class GuestAgentManagerTest(trove_testtools.TestCase): self.manager.reset_admin_password(self.context, admin_password) self.status_get_mock.assert_any_call() reset_admin_pwd.assert_called_with(admin_password) + + @patch.object(dbaas.PXCApp, 'get_cluster_context') + def test_get_cluster_context(self, get_cluster_ctxt): + get_cluster_ctxt.return_value = {'cluster': 'info'} + self.manager.get_cluster_context(self.context) + self.status_get_mock.assert_any_call() + get_cluster_ctxt.assert_any_call() + + @patch.object(dbaas.PXCApp, 'write_cluster_configuration_overrides') + def test_write_cluster_configuration_overrides(self, conf_overries): + cluster_configuration = "cluster_configuration" + self.manager.write_cluster_configuration_overrides( + self.context, cluster_configuration) + self.status_get_mock.assert_any_call() + conf_overries.assert_called_with(cluster_configuration) diff --git a/trove/tests/unittests/taskmanager/test_pxc_clusters.py b/trove/tests/unittests/taskmanager/test_pxc_clusters.py index 2aa5795cb2..8a96f92cfd 100644 --- a/trove/tests/unittests/taskmanager/test_pxc_clusters.py +++ b/trove/tests/unittests/taskmanager/test_pxc_clusters.py @@ -18,7 +18,7 @@ from mock import patch from trove.cluster.models import ClusterTasks as ClusterTaskStatus from trove.cluster.models import DBCluster -from trove.common.exception import GuestError +from trove.common import exception from trove.common.strategies.cluster.experimental.pxc.taskmanager import ( PXCClusterTasks as ClusterTasks) from trove.common.strategies.cluster.experimental.pxc.taskmanager import ( @@ -82,6 +82,14 @@ class PXCClusterTasksTest(trove_testtools.TestCase): self.db_cluster, datastore=mock_ds1, datastore_version=mock_dv1) + self.cluster_context = { + 'replication_user': { + 'name': "name", + 'password': "password", + }, + 'cluster_name': self.cluster_name, + 'admin_password': "admin_password" + } @patch.object(ClusterTasks, 'update_statuses_on_failure') @patch.object(InstanceServiceStatus, 'find_by') @@ -103,7 +111,8 @@ class PXCClusterTasksTest(trove_testtools.TestCase): self.cluster_id) self.assertTrue(ret_val) - @patch.object(ClusterTasks, 'reset_task') + @patch('trove.common.strategies.cluster.experimental.pxc.taskmanager.LOG') + @patch.object(ClusterTasks, 'update_statuses_on_failure') @patch.object(ClusterTasks, '_all_instances_ready', return_value=False) @patch.object(Instance, 'load') @patch.object(DBInstance, 'find_all') @@ -111,14 +120,15 @@ class PXCClusterTasksTest(trove_testtools.TestCase): @patch.object(datastore_models.DatastoreVersion, 'load_by_uuid') def test_create_cluster_instance_not_ready(self, mock_dv, mock_ds, mock_find_all, mock_load, - mock_ready, mock_reset_task): + mock_ready, mock_update, + mock_logging): mock_find_all.return_value.all.return_value = [self.dbinst1] mock_load.return_value = BaseInstance(Mock(), self.dbinst1, Mock(), InstanceServiceStatus( ServiceStatuses.NEW)) self.clustertasks.create_cluster(Mock(), self.cluster_id) - mock_reset_task.assert_called_with() + mock_update.assert_called_with(self.cluster_id) @patch.object(ClusterTasks, 'update_statuses_on_failure') @patch.object(ClusterTasks, 'reset_task') @@ -139,13 +149,83 @@ class PXCClusterTasksTest(trove_testtools.TestCase): ServiceStatuses.NEW)) mock_ip.return_value = "10.0.0.2" guest_client = Mock() - guest_client.install_cluster = Mock(side_effect=GuestError("Error")) + guest_client.install_cluster = Mock( + side_effect=exception.GuestError("Error")) with patch.object(ClusterTasks, 'get_guest', return_value=guest_client): self.clustertasks.create_cluster(Mock(), self.cluster_id) mock_update_status.assert_called_with('1232') mock_reset_task.assert_called_with() + @patch.object(ClusterTasks, 'update_statuses_on_failure') + @patch('trove.common.strategies.cluster.experimental.pxc.taskmanager.LOG') + def test_grow_cluster_does_not_exist(self, mock_logging, + mock_update_status): + context = Mock() + bad_cluster_id = '1234' + new_instances = [Mock(), Mock()] + self.clustertasks.grow_cluster(context, bad_cluster_id, new_instances) + mock_update_status.assert_called_with( + '1234', + status=InstanceTasks.GROWING_ERROR) + + @patch.object(ClusterTasks, 'reset_task') + @patch.object(ClusterTasks, '_render_cluster_config') + @patch.object(ClusterTasks, 'get_ip') + @patch.object(ClusterTasks, 'get_guest') + @patch.object(ClusterTasks, '_all_instances_ready', return_value=True) + @patch.object(Instance, 'load') + @patch.object(DBInstance, 'find_all') + @patch.object(datastore_models.Datastore, 'load') + @patch.object(datastore_models.DatastoreVersion, 'load_by_uuid') + def test_grow_cluster_successs(self, mock_dv, mock_ds, mock_find_all, + mock_load, mock_ready, mock_guest, mock_ip, + mock_render, mock_reset_task): + mock_find_all.return_value.all.return_value = [self.dbinst1] + + mock_ip.return_value = "10.0.0.2" + context = Mock() + new_instances = [Mock(), Mock()] + mock_guest.get_cluster_context = Mock( + return_value=self.cluster_context) + mock_guest.reset_admin_password = Mock() + self.clustertasks.grow_cluster(context, self.cluster_id, + new_instances) + mock_reset_task.assert_called_with() + + @patch.object(ClusterTasks, 'reset_task') + @patch.object(Instance, 'load') + @patch.object(Instance, 'delete') + @patch.object(DBInstance, 'find_all') + @patch.object(ClusterTasks, 'get_guest') + @patch.object(ClusterTasks, 'get_ip') + @patch.object(ClusterTasks, '_render_cluster_config') + def test_shrink_cluster_success(self, mock_render, mock_ip, mock_guest, + mock_find_all, mock_delete, mock_load, + mock_reset_task): + mock_find_all.return_value.all.return_value = [self.dbinst1] + context = Mock() + remove_instances = [Mock()] + mock_ip.return_value = "10.0.0.2" + mock_guest.get_cluster_context = Mock( + return_value=self.cluster_context) + self.clustertasks.shrink_cluster(context, self.cluster_id, + remove_instances) + mock_reset_task.assert_called_with() + + @patch.object(ClusterTasks, 'update_statuses_on_failure') + @patch('trove.common.strategies.cluster.experimental.pxc.taskmanager.LOG') + def test_shrink_cluster_does_not_exist(self, mock_logging, + mock_update_status): + context = Mock() + bad_cluster_id = '1234' + remove_instances = [Mock()] + self.clustertasks.shrink_cluster(context, bad_cluster_id, + remove_instances) + mock_update_status.assert_called_with( + '1234', + status=InstanceTasks.SHRINKING_ERROR) + class PXCTaskManagerStrategyTest(trove_testtools.TestCase): diff --git a/trove/tests/unittests/util/util.py b/trove/tests/unittests/util/util.py index 2f9b68930b..21c090aeb8 100644 --- a/trove/tests/unittests/util/util.py +++ b/trove/tests/unittests/util/util.py @@ -12,8 +12,13 @@ # License for the specific language governing permissions and limitations # under the License. +DB_SETUP = None + def init_db(): + global DB_SETUP + if DB_SETUP: + return from trove.common import cfg from trove.db import get_db_api from trove.db.sqlalchemy import session @@ -21,3 +26,4 @@ def init_db(): db_api = get_db_api() db_api.db_sync(CONF) session.configure_db(CONF) + DB_SETUP = True