pxc grow/shrink cluster implementation

* The adds support for pxc to grow a cluster.
* api and taskmanager support for shrinking a cluster
* validate that the networks given are the same for each instance in
  the cluster.
* make sure to add the existing networks on an instance in the
  cluster.
* add new Error task for grow and shrink.
* nova client version configuration changed to a string option rather
  than an int option because the nova microversions change nova api
  output. This was needed for the network interfaces on existing
  instances.
* testing for grow and shrink cluster

Change-Id: I8602cad1a98970fcd99ee9793a89e1abdceb0982
Partially-Implements-Blueprint: pxc-grow-shrink-cluster
This commit is contained in:
Craig Vyvial 2015-11-16 14:11:00 -06:00
parent fac6e76b54
commit 9e96241d12
17 changed files with 559 additions and 77 deletions

View File

@ -9,7 +9,7 @@ usedevelop = True
install_command = pip install -U {opts} {packages} install_command = pip install -U {opts} {packages}
deps = -r{toxinidir}/requirements.txt deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt -r{toxinidir}/test-requirements.txt
commands = find . -type f -name "*.pyc" -delete commands = find ./trove -type f -name "*.pyc" -delete
pip install pymongo===3.0.3 pip install pymongo===3.0.3
{envpython} run_tests.py {envpython} run_tests.py
python setup.py testr --slowest python setup.py testr --slowest
@ -33,7 +33,6 @@ commands = oslo_debug_helper {posargs}
[testenv:cover] [testenv:cover]
basepython = python2.7 basepython = python2.7
commands = commands =
{envpython} run_tests.py --group=does_not_exist
coverage erase coverage erase
python setup.py testr --coverage python setup.py testr --coverage
coverage run -a run_tests.py coverage run -a run_tests.py

View File

@ -55,7 +55,7 @@ common_opts = [
help='Service type to use when searching catalog.'), help='Service type to use when searching catalog.'),
cfg.StrOpt('nova_compute_endpoint_type', default='publicURL', cfg.StrOpt('nova_compute_endpoint_type', default='publicURL',
help='Service endpoint type to use when searching catalog.'), help='Service endpoint type to use when searching catalog.'),
cfg.IntOpt('nova_client_version', default=2, cfg.StrOpt('nova_client_version', default='2.12',
help="The version of of the compute service client."), help="The version of of the compute service client."),
cfg.StrOpt('neutron_url', help='URL without the tenant segment.'), cfg.StrOpt('neutron_url', help='URL without the tenant segment.'),
cfg.StrOpt('neutron_service_type', default='network', cfg.StrOpt('neutron_service_type', default='network',

View File

@ -504,6 +504,14 @@ class ClusterFlavorsNotEqual(TroveError):
message = _("The flavor for each instance in a cluster must be the same.") message = _("The flavor for each instance in a cluster must be the same.")
class ClusterNetworksNotEqual(TroveError):
message = _("The network for each instance in a cluster must be the same.")
class NetworkNotFound(TroveError):
message = _("Network Resource %(uuid)s cannot be found.")
class ClusterVolumeSizeRequired(TroveError): class ClusterVolumeSizeRequired(TroveError):
message = _("A volume size is required for each instance in the cluster.") message = _("A volume size is required for each instance in the cluster.")
@ -523,6 +531,11 @@ class ClusterNumInstancesNotLargeEnough(TroveError):
"be at least %(num_instances)s.") "be at least %(num_instances)s.")
class ClusterShrinkMustNotLeaveClusterEmpty(TroveError):
message = _("Must leave at least one instance in the cluster when "
"shrinking.")
class ClusterInstanceOperationNotSupported(TroveError): class ClusterInstanceOperationNotSupported(TroveError):
message = _("Operation not supported for instances that are part of a " message = _("Operation not supported for instances that are part of a "
"cluster.") "cluster.")

View File

@ -22,8 +22,10 @@ from trove.common import cfg
from trove.common import exception from trove.common import exception
from trove.common import remote from trove.common import remote
from trove.common.strategies.cluster import base from trove.common.strategies.cluster import base
from trove.common import utils
from trove.extensions.mgmt.clusters.views import MgmtClusterView from trove.extensions.mgmt.clusters.views import MgmtClusterView
from trove.instance import models as inst_models from trove.instance.models import DBInstance
from trove.instance.models import Instance
from trove.quota.quota import check_quotas from trove.quota.quota import check_quotas
from trove.taskmanager import api as task_api from trove.taskmanager import api as task_api
@ -40,7 +42,29 @@ class PXCAPIStrategy(base.BaseAPIStrategy):
@property @property
def cluster_controller_actions(self): def cluster_controller_actions(self):
return {} return {
'grow': self._action_grow_cluster,
'shrink': self._action_shrink_cluster,
}
def _action_grow_cluster(self, cluster, body):
nodes = body['grow']
instances = []
for node in nodes:
instance = {
'flavor_id': utils.get_id_from_href(node['flavorRef'])
}
if 'name' in node:
instance['name'] = node['name']
if 'volume' in node:
instance['volume_size'] = int(node['volume']['size'])
instances.append(instance)
return cluster.grow(instances)
def _action_shrink_cluster(self, cluster, body):
instances = body['shrink']
instance_ids = [instance['id'] for instance in instances]
return cluster.shrink(instance_ids)
@property @property
def cluster_view_class(self): def cluster_view_class(self):
@ -53,10 +77,10 @@ class PXCAPIStrategy(base.BaseAPIStrategy):
class PXCCluster(models.Cluster): class PXCCluster(models.Cluster):
@classmethod @staticmethod
def create(cls, context, name, datastore, datastore_version, def _validate_cluster_instances(context, instances, datastore,
instances, extended_properties): datastore_version):
LOG.debug("Initiating PXC cluster creation.") """Validate the flavor and volume"""
pxc_conf = CONF.get(datastore_version.manager) pxc_conf = CONF.get(datastore_version.manager)
num_instances = len(instances) num_instances = len(instances)
@ -65,7 +89,7 @@ class PXCCluster(models.Cluster):
raise exception.ClusterNumInstancesNotLargeEnough( raise exception.ClusterNumInstancesNotLargeEnough(
num_instances=pxc_conf.min_cluster_member_count) num_instances=pxc_conf.min_cluster_member_count)
# Checking flavors # Checking flavors and get delta for quota check
flavor_ids = [instance['flavor_id'] for instance in instances] flavor_ids = [instance['flavor_id'] for instance in instances]
if len(set(flavor_ids)) != 1: if len(set(flavor_ids)) != 1:
raise exception.ClusterFlavorsNotEqual() raise exception.ClusterFlavorsNotEqual()
@ -77,7 +101,7 @@ class PXCCluster(models.Cluster):
raise exception.FlavorNotFound(uuid=flavor_id) raise exception.FlavorNotFound(uuid=flavor_id)
deltas = {'instances': num_instances} deltas = {'instances': num_instances}
# Checking volumes # Checking volumes and get delta for quota check
volume_sizes = [instance['volume_size'] for instance in instances volume_sizes = [instance['volume_size'] for instance in instances
if instance.get('volume_size', None)] if instance.get('volume_size', None)]
volume_size = None volume_size = None
@ -96,35 +120,64 @@ class PXCCluster(models.Cluster):
if ephemeral_support and flavor.ephemeral == 0: if ephemeral_support and flavor.ephemeral == 0:
raise exception.LocalStorageNotSpecified(flavor=flavor_id) raise exception.LocalStorageNotSpecified(flavor=flavor_id)
# quota check
check_quotas(context.tenant, deltas) check_quotas(context.tenant, deltas)
nics = [instance.get('nics', None) for instance in instances] # Checking networks are same for the cluster
instance_nics = [instance.get('nics', None) for instance in instances]
if len(set(instance_nics)) != 1:
raise exception.ClusterNetworksNotEqual()
instance_nic = instance_nics[0]
if instance_nic is None:
return
try:
nova_client.networks.get(instance_nic)
except nova_exceptions.NotFound:
raise exception.NetworkNotFound(uuid=instance_nic)
azs = [instance.get('availability_zone', None) @staticmethod
for instance in instances] def _create_instances(context, db_info, datastore, datastore_version,
instances):
member_config = {"id": db_info.id,
"instance_type": "member"}
name_index = 1
for instance in instances:
if not instance.get("name"):
instance['name'] = "%s-member-%s" % (db_info.name,
str(name_index))
name_index += 1
return map(lambda instance:
Instance.create(context,
instance['name'],
instance['flavor_id'],
datastore_version.image_id,
[], [],
datastore, datastore_version,
instance.get('volume_size', None),
None,
availability_zone=instance.get(
'availability_zone', None),
nics=instance.get('nics', None),
configuration_id=None,
cluster_config=member_config
),
instances)
@classmethod
def create(cls, context, name, datastore, datastore_version,
instances, extended_properties):
LOG.debug("Initiating PXC cluster creation.")
cls._validate_cluster_instances(context, instances, datastore,
datastore_version)
# Updating Cluster Task # Updating Cluster Task
db_info = models.DBCluster.create( db_info = models.DBCluster.create(
name=name, tenant_id=context.tenant, name=name, tenant_id=context.tenant,
datastore_version_id=datastore_version.id, datastore_version_id=datastore_version.id,
task_status=ClusterTasks.BUILDING_INITIAL) task_status=ClusterTasks.BUILDING_INITIAL)
member_config = {"id": db_info.id, cls._create_instances(context, db_info, datastore, datastore_version,
"instance_type": "member"} instances)
# Creating member instances
for i in range(0, num_instances):
instance_name = "%s-member-%s" % (name, str(i + 1))
inst_models.Instance.create(context, instance_name,
flavor_id,
datastore_version.image_id,
[], [], datastore,
datastore_version,
volume_size, None,
nics=nics[i],
availability_zone=azs[i],
configuration_id=None,
cluster_config=member_config)
# Calling taskmanager to further proceed for cluster-configuration # Calling taskmanager to further proceed for cluster-configuration
task_api.load(context, datastore_version.manager).create_cluster( task_api.load(context, datastore_version.manager).create_cluster(
@ -132,6 +185,57 @@ class PXCCluster(models.Cluster):
return PXCCluster(context, db_info, datastore, datastore_version) return PXCCluster(context, db_info, datastore, datastore_version)
def _get_cluster_network_interfaces(self):
nova_client = remote.create_nova_client(self.context)
nova_instance_id = self.db_instances[0].compute_instance_id
interfaces = nova_client.virtual_interfaces.list(nova_instance_id)
ret = [{"net-id": getattr(interface, 'net_id')}
for interface in interfaces]
return ret
def grow(self, instances):
LOG.debug("Growing cluster %s." % self.id)
self.validate_cluster_available()
context = self.context
db_info = self.db_info
datastore = self.ds
datastore_version = self.ds_version
# Get the network of the existing cluster instances.
interface_ids = self._get_cluster_network_interfaces()
for instance in instances:
instance["nics"] = interface_ids
db_info.update(task_status=ClusterTasks.GROWING_CLUSTER)
new_instances = self._create_instances(context, db_info,
datastore, datastore_version,
instances)
task_api.load(context, datastore_version.manager).grow_cluster(
db_info.id, [instance.id for instance in new_instances])
return PXCCluster(context, db_info, datastore, datastore_version)
def shrink(self, instances):
"""Removes instances from a cluster."""
LOG.debug("Shrinking cluster %s." % self.id)
self.validate_cluster_available()
removal_instances = [Instance.load(self.context, inst_id)
for inst_id in instances]
db_instances = DBInstance.find_all(cluster_id=self.db_info.id).all()
if len(db_instances) - len(removal_instances) < 1:
raise exception.ClusterShrinkMustNotLeaveClusterEmpty()
self.db_info.update(task_status=ClusterTasks.SHRINKING_CLUSTER)
task_api.load(self.context, self.ds_version.manager).shrink_cluster(
self.db_info.id, [instance.id for instance in removal_instances])
return PXCCluster(self.context, self.db_info, self.ds, self.ds_version)
class PXCClusterView(ClusterView): class PXCClusterView(ClusterView):

View File

@ -33,6 +33,7 @@ class PXCGuestAgentAPI(guest_api.API):
def install_cluster(self, replication_user, cluster_configuration, def install_cluster(self, replication_user, cluster_configuration,
bootstrap): bootstrap):
"""Install the cluster."""
LOG.debug("Installing PXC cluster.") LOG.debug("Installing PXC cluster.")
self._call("install_cluster", CONF.cluster_usage_timeout, self._call("install_cluster", CONF.cluster_usage_timeout,
self.version_cap, self.version_cap,
@ -41,13 +42,27 @@ class PXCGuestAgentAPI(guest_api.API):
bootstrap=bootstrap) bootstrap=bootstrap)
def reset_admin_password(self, admin_password): def reset_admin_password(self, admin_password):
"""Store this password on the instance as the admin password""" """Store this password on the instance as the admin password."""
self._call("reset_admin_password", CONF.cluster_usage_timeout, self._call("reset_admin_password", CONF.cluster_usage_timeout,
self.version_cap, self.version_cap,
admin_password=admin_password) admin_password=admin_password)
def cluster_complete(self): def cluster_complete(self):
"""Set the status that the cluster is build is complete""" """Set the status that the cluster is build is complete."""
LOG.debug("Notifying cluster install completion.") LOG.debug("Notifying cluster install completion.")
return self._call("cluster_complete", guest_api.AGENT_HIGH_TIMEOUT, return self._call("cluster_complete", guest_api.AGENT_HIGH_TIMEOUT,
self.version_cap) self.version_cap)
def get_cluster_context(self):
"""Get the context of the cluster."""
LOG.debug("Getting the cluster context.")
return self._call("get_cluster_context", guest_api.AGENT_HIGH_TIMEOUT,
self.version_cap)
def write_cluster_configuration_overrides(self, cluster_configuration):
"""Write an updated the cluster configuration."""
LOG.debug("Writing an updated the cluster configuration.")
self._call("write_cluster_configuration_overrides",
guest_api.AGENT_HIGH_TIMEOUT,
self.version_cap,
cluster_configuration=cluster_configuration)

View File

@ -15,6 +15,8 @@ from eventlet.timeout import Timeout
from oslo_log import log as logging from oslo_log import log as logging
from trove.common import cfg from trove.common import cfg
from trove.common.exception import PollTimeOut
from trove.common.exception import TroveError
from trove.common.i18n import _ from trove.common.i18n import _
from trove.common.remote import create_nova_client from trove.common.remote import create_nova_client
from trove.common.strategies.cluster import base from trove.common.strategies.cluster import base
@ -22,6 +24,7 @@ from trove.common.template import ClusterConfigTemplate
from trove.common import utils from trove.common import utils
from trove.instance.models import DBInstance from trove.instance.models import DBInstance
from trove.instance.models import Instance from trove.instance.models import Instance
from trove.instance import tasks as inst_tasks
from trove.taskmanager import api as task_api from trove.taskmanager import api as task_api
import trove.taskmanager.models as task_models import trove.taskmanager.models as task_models
@ -74,7 +77,7 @@ class PXCClusterTasks(task_models.ClusterTasks):
LOG.debug("Waiting for instances to get to cluster-ready status.") LOG.debug("Waiting for instances to get to cluster-ready status.")
# Wait for cluster members to get to cluster-ready status. # Wait for cluster members to get to cluster-ready status.
if not self._all_instances_ready(instance_ids, cluster_id): if not self._all_instances_ready(instance_ids, cluster_id):
return raise TroveError("Instances in cluster did not report ACTIVE")
LOG.debug("All members ready, proceeding for cluster setup.") LOG.debug("All members ready, proceeding for cluster setup.")
instances = [Instance.load(context, instance_id) for instance_id instances = [Instance.load(context, instance_id) for instance_id
@ -113,7 +116,8 @@ class PXCClusterTasks(task_models.ClusterTasks):
# render the conf.d/cluster.cnf configuration # render the conf.d/cluster.cnf configuration
cluster_configuration = self._render_cluster_config( cluster_configuration = self._render_cluster_config(
context, context,
instance, ",".join(cluster_ips), instance,
",".join(cluster_ips),
cluster_name, cluster_name,
replication_user) replication_user)
@ -139,7 +143,169 @@ class PXCClusterTasks(task_models.ClusterTasks):
raise # not my timeout raise # not my timeout
LOG.exception(_("Timeout for building cluster.")) LOG.exception(_("Timeout for building cluster."))
self.update_statuses_on_failure(cluster_id) self.update_statuses_on_failure(cluster_id)
except TroveError:
LOG.exception(_("Error creating cluster %s.") % cluster_id)
self.update_statuses_on_failure(cluster_id)
finally: finally:
timeout.cancel() timeout.cancel()
LOG.debug("End create_cluster for id: %s." % cluster_id) LOG.debug("End create_cluster for id: %s." % cluster_id)
def grow_cluster(self, context, cluster_id, new_instance_ids):
LOG.debug("Begin pxc grow_cluster for id: %s." % cluster_id)
def _grow_cluster():
db_instances = DBInstance.find_all(cluster_id=cluster_id).all()
existing_instances = [Instance.load(context, db_inst.id)
for db_inst in db_instances
if db_inst.id not in new_instance_ids]
if not existing_instances:
raise TroveError("Unable to determine existing cluster "
"member(s)")
# get list of ips of existing cluster members
existing_cluster_ips = [self.get_ip(instance) for instance in
existing_instances]
existing_instance_guests = [self.get_guest(instance)
for instance in existing_instances]
# get the cluster context to setup new members
cluster_context = existing_instance_guests[0].get_cluster_context()
# Wait for cluster members to get to cluster-ready status.
if not self._all_instances_ready(new_instance_ids, cluster_id):
raise TroveError("Instances in cluster did not report ACTIVE")
LOG.debug("All members ready, proceeding for cluster setup.")
# Get the new instances to join the cluster
new_instances = [Instance.load(context, instance_id)
for instance_id in new_instance_ids]
new_cluster_ips = [self.get_ip(instance) for instance in
new_instances]
for instance in new_instances:
guest = self.get_guest(instance)
guest.reset_admin_password(cluster_context['admin_password'])
# render the conf.d/cluster.cnf configuration
cluster_configuration = self._render_cluster_config(
context,
instance,
",".join(existing_cluster_ips),
cluster_context['cluster_name'],
cluster_context['replication_user'])
# push the cluster config and bootstrap the first instance
bootstrap = False
guest.install_cluster(cluster_context['replication_user'],
cluster_configuration,
bootstrap)
# apply the new config to all instances
for instance in existing_instances + new_instances:
guest = self.get_guest(instance)
# render the conf.d/cluster.cnf configuration
cluster_configuration = self._render_cluster_config(
context,
instance,
",".join(existing_cluster_ips + new_cluster_ips),
cluster_context['cluster_name'],
cluster_context['replication_user'])
guest.write_cluster_configuration_overrides(
cluster_configuration)
for instance in new_instances:
guest = self.get_guest(instance)
guest.cluster_complete()
timeout = Timeout(CONF.cluster_usage_timeout)
try:
_grow_cluster()
self.reset_task()
except Timeout as t:
if t is not timeout:
raise # not my timeout
LOG.exception(_("Timeout for growing cluster."))
self.update_statuses_on_failure(
cluster_id, status=inst_tasks.InstanceTasks.GROWING_ERROR)
except Exception:
LOG.exception(_("Error growing cluster %s.") % cluster_id)
self.update_statuses_on_failure(
cluster_id, status=inst_tasks.InstanceTasks.GROWING_ERROR)
finally:
timeout.cancel()
LOG.debug("End grow_cluster for id: %s." % cluster_id)
def shrink_cluster(self, context, cluster_id, removal_instance_ids):
LOG.debug("Begin pxc shrink_cluster for id: %s." % cluster_id)
def _shrink_cluster():
removal_instances = [Instance.load(context, instance_id)
for instance_id in removal_instance_ids]
for instance in removal_instances:
Instance.delete(instance)
# wait for instances to be deleted
def all_instances_marked_deleted():
non_deleted_instances = DBInstance.find_all(
cluster_id=cluster_id, deleted=False).all()
non_deleted_ids = [db_instance.id for db_instance
in non_deleted_instances]
return not bool(
set(removal_instance_ids).intersection(
set(non_deleted_ids))
)
try:
LOG.info(_("Deleting instances (%s)") % removal_instance_ids)
utils.poll_until(all_instances_marked_deleted,
sleep_time=2,
time_out=CONF.cluster_delete_time_out)
except PollTimeOut:
LOG.error(_("timeout for instances to be marked as deleted."))
return
db_instances = DBInstance.find_all(cluster_id=cluster_id).all()
leftover_instances = [Instance.load(context, db_inst.id)
for db_inst in db_instances
if db_inst.id not in removal_instance_ids]
leftover_cluster_ips = [self.get_ip(instance) for instance in
leftover_instances]
# Get config changes for left over instances
rnd_cluster_guest = self.get_guest(leftover_instances[0])
cluster_context = rnd_cluster_guest.get_cluster_context()
# apply the new config to all leftover instances
for instance in leftover_instances:
guest = self.get_guest(instance)
# render the conf.d/cluster.cnf configuration
cluster_configuration = self._render_cluster_config(
context,
instance,
",".join(leftover_cluster_ips),
cluster_context['cluster_name'],
cluster_context['replication_user'])
guest.write_cluster_configuration_overrides(
cluster_configuration)
timeout = Timeout(CONF.cluster_usage_timeout)
try:
_shrink_cluster()
self.reset_task()
except Timeout as t:
if t is not timeout:
raise # not my timeout
LOG.exception(_("Timeout for shrinking cluster."))
self.update_statuses_on_failure(
cluster_id, status=inst_tasks.InstanceTasks.SHRINKING_ERROR)
except Exception:
LOG.exception(_("Error shrinking cluster %s.") % cluster_id)
self.update_statuses_on_failure(
cluster_id, status=inst_tasks.InstanceTasks.SHRINKING_ERROR)
finally:
timeout.cancel()
LOG.debug("End shrink_cluster for id: %s." % cluster_id)

View File

@ -21,6 +21,7 @@ from trove.cluster.views import ClusterView
from trove.common import cfg from trove.common import cfg
from trove.common import exception from trove.common import exception
from trove.common.exception import TroveError from trove.common.exception import TroveError
from trove.common.i18n import _
from trove.common import remote from trove.common import remote
from trove.common.strategies.cluster import base from trove.common.strategies.cluster import base
from trove.common import utils from trove.common import utils

View File

@ -69,3 +69,14 @@ class Manager(manager.MySqlManager):
LOG.debug("Storing the admin password on the instance.") LOG.debug("Storing the admin password on the instance.")
app = self.mysql_app(self.mysql_app_status.get()) app = self.mysql_app(self.mysql_app_status.get())
app.reset_admin_password(admin_password) app.reset_admin_password(admin_password)
def get_cluster_context(self, context):
LOG.debug("Getting the cluster context.")
app = self.mysql_app(self.mysql_app_status.get())
return app.get_cluster_context()
def write_cluster_configuration_overrides(self, context,
cluster_configuration):
LOG.debug("Apply the updated cluster configuration.")
app = self.mysql_app(self.mysql_app_status.get())
app.write_cluster_configuration_overrides(cluster_configuration)

View File

@ -63,7 +63,7 @@ class PXCApp(service.BaseMySqlApp):
def _wait_for_mysql_to_be_really_alive(self, max_time): def _wait_for_mysql_to_be_really_alive(self, max_time):
utils.poll_until(self._test_mysql, sleep_time=3, time_out=max_time) utils.poll_until(self._test_mysql, sleep_time=3, time_out=max_time)
def secure(self, config_contents, overrides): def secure(self, config_contents):
LOG.info(_("Generating admin password.")) LOG.info(_("Generating admin password."))
admin_password = utils.generate_random_password() admin_password = utils.generate_random_password()
service.clear_expired_password() service.clear_expired_password()
@ -74,7 +74,6 @@ class PXCApp(service.BaseMySqlApp):
self._create_admin_user(client, admin_password) self._create_admin_user(client, admin_password)
self.stop_db() self.stop_db()
self._reset_configuration(config_contents, admin_password) self._reset_configuration(config_contents, admin_password)
self._apply_user_overrides(overrides)
self.start_mysql() self.start_mysql()
# TODO(cp16net) figure out reason for PXC not updating the password # TODO(cp16net) figure out reason for PXC not updating the password
try: try:
@ -93,7 +92,6 @@ class PXCApp(service.BaseMySqlApp):
self.stop_db() self.stop_db()
self._reset_configuration(config_contents, admin_password) self._reset_configuration(config_contents, admin_password)
self._apply_user_overrides(overrides)
self.start_mysql() self.start_mysql()
self._wait_for_mysql_to_be_really_alive( self._wait_for_mysql_to_be_really_alive(
CONF.timeout_wait_for_service) CONF.timeout_wait_for_service)
@ -121,13 +119,16 @@ class PXCApp(service.BaseMySqlApp):
LOG.exception(_("Error bootstrapping cluster.")) LOG.exception(_("Error bootstrapping cluster."))
raise RuntimeError(_("Service is not discovered.")) raise RuntimeError(_("Service is not discovered."))
def write_cluster_configuration_overrides(self, cluster_configuration):
self.configuration_manager.apply_system_override(
cluster_configuration, CNF_CLUSTER)
def install_cluster(self, replication_user, cluster_configuration, def install_cluster(self, replication_user, cluster_configuration,
bootstrap=False): bootstrap=False):
LOG.info(_("Installing cluster configuration.")) LOG.info(_("Installing cluster configuration."))
self._grant_cluster_replication_privilege(replication_user) self._grant_cluster_replication_privilege(replication_user)
self.stop_db() self.stop_db()
self.configuration_manager.apply_system_override(cluster_configuration, self.write_cluster_configuration_overrides(cluster_configuration)
CNF_CLUSTER)
self.wipe_ib_logfiles() self.wipe_ib_logfiles()
LOG.debug("bootstrap the instance? : %s" % bootstrap) LOG.debug("bootstrap the instance? : %s" % bootstrap)
# Have to wait to sync up the joiner instances with the donor instance. # Have to wait to sync up the joiner instances with the donor instance.
@ -136,6 +137,20 @@ class PXCApp(service.BaseMySqlApp):
else: else:
self.start_mysql(timeout=CONF.restore_usage_timeout) self.start_mysql(timeout=CONF.restore_usage_timeout)
def get_cluster_context(self):
auth = self.configuration_manager.get_value('mysqld').get(
"wsrep_sst_auth").replace('"', '')
cluster_name = self.configuration_manager.get_value(
'mysqld').get("wsrep_cluster_name")
return {
'replication_user': {
'name': auth.split(":")[0],
'password': auth.split(":")[1],
},
'cluster_name': cluster_name,
'admin_password': self.get_auth_password()
}
class PXCRootAccess(service.BaseMySqlRootAccess): class PXCRootAccess(service.BaseMySqlRootAccess):
def __init__(self): def __init__(self):

View File

@ -106,6 +106,12 @@ class InstanceTasks(object):
EJECTION_ERROR = InstanceTask(0x56, 'EJECTING', EJECTION_ERROR = InstanceTask(0x56, 'EJECTING',
'Replica Source Ejection Error.', 'Replica Source Ejection Error.',
is_error=True) is_error=True)
GROWING_ERROR = InstanceTask(0x57, 'GROWING',
'Growing Cluster Error.',
is_error=True)
SHRINKING_ERROR = InstanceTask(0x58, 'SHRINKING',
'Shrinking Cluster Error.',
is_error=True)
# Dissuade further additions at run-time. # Dissuade further additions at run-time.
InstanceTask.__init__ = None InstanceTask.__init__ = None

View File

@ -181,7 +181,8 @@ class ConfigurationMixin(object):
class ClusterTasks(Cluster): class ClusterTasks(Cluster):
def update_statuses_on_failure(self, cluster_id, shard_id=None): def update_statuses_on_failure(self, cluster_id, shard_id=None,
status=None):
if CONF.update_status_on_fail: if CONF.update_status_on_fail:
if shard_id: if shard_id:
@ -193,7 +194,7 @@ class ClusterTasks(Cluster):
for db_instance in db_instances: for db_instance in db_instances:
db_instance.set_task_status( db_instance.set_task_status(
InstanceTasks.BUILDING_ERROR_SERVER) status or InstanceTasks.BUILDING_ERROR_SERVER)
db_instance.save() db_instance.save()
@classmethod @classmethod

View File

@ -191,6 +191,8 @@ class ClusterActionsRunner(TestRunner):
self._assert_cluster_action(cluster_id, expected_task_name, self._assert_cluster_action(cluster_id, expected_task_name,
expected_http_code) expected_http_code)
self._assert_cluster_states(cluster_id, ['NONE'])
cluster = self.auth_client.clusters.get(cluster_id)
self.assert_equal( self.assert_equal(
len(removed_instance_names), len(removed_instance_names),
initial_instance_count - len(cluster.instances), initial_instance_count - len(cluster.instances),
@ -199,7 +201,6 @@ class ClusterActionsRunner(TestRunner):
cluster_instances = self._get_cluster_instances(cluster_id) cluster_instances = self._get_cluster_instances(cluster_id)
self.assert_all_instance_states(cluster_instances, ['ACTIVE']) self.assert_all_instance_states(cluster_instances, ['ACTIVE'])
self._assert_cluster_states(cluster_id, ['NONE'])
self._assert_cluster_response(cluster_id, 'NONE') self._assert_cluster_response(cluster_id, 'NONE')
def _find_cluster_instances_by_name(self, cluster, instance_names): def _find_cluster_instances_by_name(self, cluster, instance_names):
@ -322,9 +323,3 @@ class PxcClusterActionsRunner(ClusterActionsRunner):
num_nodes=num_nodes, expected_task_name=expected_task_name, num_nodes=num_nodes, expected_task_name=expected_task_name,
expected_instance_states=expected_instance_states, expected_instance_states=expected_instance_states,
expected_http_code=expected_http_code) expected_http_code=expected_http_code)
def run_cluster_shrink(self):
raise SkipTest("Operation not supported by the datastore.")
def run_cluster_grow(self):
raise SkipTest("Operation not supported by the datastore.")

View File

@ -196,16 +196,19 @@ class ClusterTest(trove_testtools.TestCase):
instances, {} instances, {}
) )
@patch.object(inst_models.DBInstance, 'find_all')
@patch.object(inst_models.Instance, 'create') @patch.object(inst_models.Instance, 'create')
@patch.object(DBCluster, 'create') @patch.object(DBCluster, 'create')
@patch.object(task_api, 'load') @patch.object(task_api, 'load')
@patch.object(QUOTAS, 'check_quotas') @patch.object(QUOTAS, 'check_quotas')
@patch.object(remote, 'create_nova_client') @patch.object(remote, 'create_nova_client')
def test_create(self, mock_client, mock_check_quotas, mock_task_api, def test_create(self, mock_client, mock_check_quotas, mock_task_api,
mock_db_create, mock_ins_create): mock_db_create, mock_ins_create, mock_find_all):
instances = self.instances instances = self.instances
flavors = Mock() flavors = Mock()
networks = Mock()
mock_client.return_value.flavors = flavors mock_client.return_value.flavors = flavors
mock_client.return_value.networks = networks
self.cluster.create(Mock(), self.cluster.create(Mock(),
self.cluster_name, self.cluster_name,
self.datastore, self.datastore,
@ -215,28 +218,7 @@ class ClusterTest(trove_testtools.TestCase):
mock_db_create.return_value.id) mock_db_create.return_value.id)
self.assertEqual(3, mock_ins_create.call_count) self.assertEqual(3, mock_ins_create.call_count)
@patch.object(inst_models.Instance, 'create') @patch.object(inst_models.DBInstance, 'find_all')
@patch.object(DBCluster, 'create')
@patch.object(task_api, 'load')
@patch.object(QUOTAS, 'check_quotas')
@patch.object(remote, 'create_nova_client')
def test_create_over_limit(self, mock_client, mock_check_quotas,
mock_task_api, mock_db_create, mock_ins_create):
instances = [{'volume_size': 1, 'flavor_id': '1234'},
{'volume_size': 1, 'flavor_id': '1234'},
{'volume_size': 1, 'flavor_id': '1234'},
{'volume_size': 1, 'flavor_id': '1234'}]
flavors = Mock()
mock_client.return_value.flavors = flavors
self.cluster.create(Mock(),
self.cluster_name,
self.datastore,
self.datastore_version,
instances, {})
mock_task_api.return_value.create_cluster.assert_called_with(
mock_db_create.return_value.id)
self.assertEqual(4, mock_ins_create.call_count)
@patch.object(pxc_api, 'CONF') @patch.object(pxc_api, 'CONF')
@patch.object(inst_models.Instance, 'create') @patch.object(inst_models.Instance, 'create')
@patch.object(DBCluster, 'create') @patch.object(DBCluster, 'create')
@ -245,7 +227,8 @@ class ClusterTest(trove_testtools.TestCase):
@patch.object(remote, 'create_nova_client') @patch.object(remote, 'create_nova_client')
def test_create_with_ephemeral_flavor(self, mock_client, mock_check_quotas, def test_create_with_ephemeral_flavor(self, mock_client, mock_check_quotas,
mock_task_api, mock_db_create, mock_task_api, mock_db_create,
mock_ins_create, mock_conf): mock_ins_create, mock_conf,
mock_find_all):
class FakeFlavor: class FakeFlavor:
def __init__(self, flavor_id): def __init__(self, flavor_id):
self.flavor_id = flavor_id self.flavor_id = flavor_id
@ -300,3 +283,53 @@ class ClusterTest(trove_testtools.TestCase):
self.cluster.db_info.task_status = ClusterTasks.DELETING self.cluster.db_info.task_status = ClusterTasks.DELETING
self.cluster.delete() self.cluster.delete()
mock_update_db.assert_called_with(task_status=ClusterTasks.DELETING) mock_update_db.assert_called_with(task_status=ClusterTasks.DELETING)
@patch.object(pxc_api.PXCCluster, '_get_cluster_network_interfaces')
@patch.object(DBCluster, 'update')
@patch.object(pxc_api, 'CONF')
@patch.object(inst_models.Instance, 'create')
@patch.object(task_api, 'load')
@patch.object(QUOTAS, 'check_quotas')
@patch.object(remote, 'create_nova_client')
def test_grow(self, mock_client, mock_check_quotas, mock_task_api,
mock_inst_create, mock_conf, mock_update, mock_interfaces):
mock_client.return_value.flavors = Mock()
mock_interfaces.return_value = [Mock()]
self.cluster.grow(self.instances)
mock_update.assert_called_with(
task_status=ClusterTasks.GROWING_CLUSTER)
mock_task_api.return_value.grow_cluster.assert_called_with(
self.db_info.id,
[mock_inst_create.return_value.id] * 3)
self.assertEqual(mock_inst_create.call_count, 3)
self.assertEqual(mock_interfaces.call_count, 1)
@patch.object(inst_models.DBInstance, 'find_all')
@patch.object(inst_models.Instance, 'load')
@patch.object(Cluster, 'validate_cluster_available')
def test_shrink_empty(self, mock_validate, mock_load, mock_find_all):
instance = Mock()
self.assertRaises(
exception.ClusterShrinkMustNotLeaveClusterEmpty,
self.cluster.shrink, [instance])
@patch.object(pxc_api.PXCCluster, '__init__')
@patch.object(task_api, 'load')
@patch.object(DBCluster, 'update')
@patch.object(inst_models.DBInstance, 'find_all')
@patch.object(inst_models.Instance, 'load')
@patch.object(Cluster, 'validate_cluster_available')
def test_shrink(self, mock_validate, mock_load, mock_find_all,
mock_update, mock_task_api, mock_init):
mock_init.return_value = None
existing_instances = [Mock(), Mock()]
mock_find_all.return_value.all.return_value = existing_instances
instance = Mock()
self.cluster.shrink([instance])
mock_validate.assert_called_with()
mock_update.assert_called_with(
task_status=ClusterTasks.SHRINKING_CLUSTER)
mock_task_api.return_value.shrink_cluster.assert_called_with(
self.db_info.id, [mock_load.return_value.id])
mock_init.assert_called_with(self.context, self.db_info,
self.datastore, self.datastore_version)

View File

@ -122,3 +122,25 @@ class ApiTest(trove_testtools.TestCase):
self._verify_rpc_prepare_before_call() self._verify_rpc_prepare_before_call()
self._verify_call('cluster_complete') self._verify_call('cluster_complete')
self.assertEqual(exp_resp, resp) self.assertEqual(exp_resp, resp)
def test_get_cluster_context(self):
exp_resp = None
self.call_context.call.return_value = exp_resp
resp = self.api.get_cluster_context()
self._verify_rpc_prepare_before_call()
self._verify_call('get_cluster_context')
self.assertEqual(exp_resp, resp)
def test_write_cluster_configuration_overrides(self):
exp_resp = None
self.call_context.call.return_value = exp_resp
resp = self.api.write_cluster_configuration_overrides(
cluster_configuration="cluster-configuration")
self._verify_rpc_prepare_before_call()
self._verify_call('write_cluster_configuration_overrides',
cluster_configuration="cluster-configuration",)
self.assertEqual(exp_resp, resp)

View File

@ -57,3 +57,18 @@ class GuestAgentManagerTest(trove_testtools.TestCase):
self.manager.reset_admin_password(self.context, admin_password) self.manager.reset_admin_password(self.context, admin_password)
self.status_get_mock.assert_any_call() self.status_get_mock.assert_any_call()
reset_admin_pwd.assert_called_with(admin_password) reset_admin_pwd.assert_called_with(admin_password)
@patch.object(dbaas.PXCApp, 'get_cluster_context')
def test_get_cluster_context(self, get_cluster_ctxt):
get_cluster_ctxt.return_value = {'cluster': 'info'}
self.manager.get_cluster_context(self.context)
self.status_get_mock.assert_any_call()
get_cluster_ctxt.assert_any_call()
@patch.object(dbaas.PXCApp, 'write_cluster_configuration_overrides')
def test_write_cluster_configuration_overrides(self, conf_overries):
cluster_configuration = "cluster_configuration"
self.manager.write_cluster_configuration_overrides(
self.context, cluster_configuration)
self.status_get_mock.assert_any_call()
conf_overries.assert_called_with(cluster_configuration)

View File

@ -18,7 +18,7 @@ from mock import patch
from trove.cluster.models import ClusterTasks as ClusterTaskStatus from trove.cluster.models import ClusterTasks as ClusterTaskStatus
from trove.cluster.models import DBCluster from trove.cluster.models import DBCluster
from trove.common.exception import GuestError from trove.common import exception
from trove.common.strategies.cluster.experimental.pxc.taskmanager import ( from trove.common.strategies.cluster.experimental.pxc.taskmanager import (
PXCClusterTasks as ClusterTasks) PXCClusterTasks as ClusterTasks)
from trove.common.strategies.cluster.experimental.pxc.taskmanager import ( from trove.common.strategies.cluster.experimental.pxc.taskmanager import (
@ -82,6 +82,14 @@ class PXCClusterTasksTest(trove_testtools.TestCase):
self.db_cluster, self.db_cluster,
datastore=mock_ds1, datastore=mock_ds1,
datastore_version=mock_dv1) datastore_version=mock_dv1)
self.cluster_context = {
'replication_user': {
'name': "name",
'password': "password",
},
'cluster_name': self.cluster_name,
'admin_password': "admin_password"
}
@patch.object(ClusterTasks, 'update_statuses_on_failure') @patch.object(ClusterTasks, 'update_statuses_on_failure')
@patch.object(InstanceServiceStatus, 'find_by') @patch.object(InstanceServiceStatus, 'find_by')
@ -103,7 +111,8 @@ class PXCClusterTasksTest(trove_testtools.TestCase):
self.cluster_id) self.cluster_id)
self.assertTrue(ret_val) self.assertTrue(ret_val)
@patch.object(ClusterTasks, 'reset_task') @patch('trove.common.strategies.cluster.experimental.pxc.taskmanager.LOG')
@patch.object(ClusterTasks, 'update_statuses_on_failure')
@patch.object(ClusterTasks, '_all_instances_ready', return_value=False) @patch.object(ClusterTasks, '_all_instances_ready', return_value=False)
@patch.object(Instance, 'load') @patch.object(Instance, 'load')
@patch.object(DBInstance, 'find_all') @patch.object(DBInstance, 'find_all')
@ -111,14 +120,15 @@ class PXCClusterTasksTest(trove_testtools.TestCase):
@patch.object(datastore_models.DatastoreVersion, 'load_by_uuid') @patch.object(datastore_models.DatastoreVersion, 'load_by_uuid')
def test_create_cluster_instance_not_ready(self, mock_dv, mock_ds, def test_create_cluster_instance_not_ready(self, mock_dv, mock_ds,
mock_find_all, mock_load, mock_find_all, mock_load,
mock_ready, mock_reset_task): mock_ready, mock_update,
mock_logging):
mock_find_all.return_value.all.return_value = [self.dbinst1] mock_find_all.return_value.all.return_value = [self.dbinst1]
mock_load.return_value = BaseInstance(Mock(), mock_load.return_value = BaseInstance(Mock(),
self.dbinst1, Mock(), self.dbinst1, Mock(),
InstanceServiceStatus( InstanceServiceStatus(
ServiceStatuses.NEW)) ServiceStatuses.NEW))
self.clustertasks.create_cluster(Mock(), self.cluster_id) self.clustertasks.create_cluster(Mock(), self.cluster_id)
mock_reset_task.assert_called_with() mock_update.assert_called_with(self.cluster_id)
@patch.object(ClusterTasks, 'update_statuses_on_failure') @patch.object(ClusterTasks, 'update_statuses_on_failure')
@patch.object(ClusterTasks, 'reset_task') @patch.object(ClusterTasks, 'reset_task')
@ -139,13 +149,83 @@ class PXCClusterTasksTest(trove_testtools.TestCase):
ServiceStatuses.NEW)) ServiceStatuses.NEW))
mock_ip.return_value = "10.0.0.2" mock_ip.return_value = "10.0.0.2"
guest_client = Mock() guest_client = Mock()
guest_client.install_cluster = Mock(side_effect=GuestError("Error")) guest_client.install_cluster = Mock(
side_effect=exception.GuestError("Error"))
with patch.object(ClusterTasks, 'get_guest', with patch.object(ClusterTasks, 'get_guest',
return_value=guest_client): return_value=guest_client):
self.clustertasks.create_cluster(Mock(), self.cluster_id) self.clustertasks.create_cluster(Mock(), self.cluster_id)
mock_update_status.assert_called_with('1232') mock_update_status.assert_called_with('1232')
mock_reset_task.assert_called_with() mock_reset_task.assert_called_with()
@patch.object(ClusterTasks, 'update_statuses_on_failure')
@patch('trove.common.strategies.cluster.experimental.pxc.taskmanager.LOG')
def test_grow_cluster_does_not_exist(self, mock_logging,
mock_update_status):
context = Mock()
bad_cluster_id = '1234'
new_instances = [Mock(), Mock()]
self.clustertasks.grow_cluster(context, bad_cluster_id, new_instances)
mock_update_status.assert_called_with(
'1234',
status=InstanceTasks.GROWING_ERROR)
@patch.object(ClusterTasks, 'reset_task')
@patch.object(ClusterTasks, '_render_cluster_config')
@patch.object(ClusterTasks, 'get_ip')
@patch.object(ClusterTasks, 'get_guest')
@patch.object(ClusterTasks, '_all_instances_ready', return_value=True)
@patch.object(Instance, 'load')
@patch.object(DBInstance, 'find_all')
@patch.object(datastore_models.Datastore, 'load')
@patch.object(datastore_models.DatastoreVersion, 'load_by_uuid')
def test_grow_cluster_successs(self, mock_dv, mock_ds, mock_find_all,
mock_load, mock_ready, mock_guest, mock_ip,
mock_render, mock_reset_task):
mock_find_all.return_value.all.return_value = [self.dbinst1]
mock_ip.return_value = "10.0.0.2"
context = Mock()
new_instances = [Mock(), Mock()]
mock_guest.get_cluster_context = Mock(
return_value=self.cluster_context)
mock_guest.reset_admin_password = Mock()
self.clustertasks.grow_cluster(context, self.cluster_id,
new_instances)
mock_reset_task.assert_called_with()
@patch.object(ClusterTasks, 'reset_task')
@patch.object(Instance, 'load')
@patch.object(Instance, 'delete')
@patch.object(DBInstance, 'find_all')
@patch.object(ClusterTasks, 'get_guest')
@patch.object(ClusterTasks, 'get_ip')
@patch.object(ClusterTasks, '_render_cluster_config')
def test_shrink_cluster_success(self, mock_render, mock_ip, mock_guest,
mock_find_all, mock_delete, mock_load,
mock_reset_task):
mock_find_all.return_value.all.return_value = [self.dbinst1]
context = Mock()
remove_instances = [Mock()]
mock_ip.return_value = "10.0.0.2"
mock_guest.get_cluster_context = Mock(
return_value=self.cluster_context)
self.clustertasks.shrink_cluster(context, self.cluster_id,
remove_instances)
mock_reset_task.assert_called_with()
@patch.object(ClusterTasks, 'update_statuses_on_failure')
@patch('trove.common.strategies.cluster.experimental.pxc.taskmanager.LOG')
def test_shrink_cluster_does_not_exist(self, mock_logging,
mock_update_status):
context = Mock()
bad_cluster_id = '1234'
remove_instances = [Mock()]
self.clustertasks.shrink_cluster(context, bad_cluster_id,
remove_instances)
mock_update_status.assert_called_with(
'1234',
status=InstanceTasks.SHRINKING_ERROR)
class PXCTaskManagerStrategyTest(trove_testtools.TestCase): class PXCTaskManagerStrategyTest(trove_testtools.TestCase):

View File

@ -12,8 +12,13 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
DB_SETUP = None
def init_db(): def init_db():
global DB_SETUP
if DB_SETUP:
return
from trove.common import cfg from trove.common import cfg
from trove.db import get_db_api from trove.db import get_db_api
from trove.db.sqlalchemy import session from trove.db.sqlalchemy import session
@ -21,3 +26,4 @@ def init_db():
db_api = get_db_api() db_api = get_db_api()
db_api.db_sync(CONF) db_api.db_sync(CONF)
session.configure_db(CONF) session.configure_db(CONF)
DB_SETUP = True