diff --git a/trove/cluster/models.py b/trove/cluster/models.py index 6846617c19..92fc3735a0 100644 --- a/trove/cluster/models.py +++ b/trove/cluster/models.py @@ -15,6 +15,7 @@ from oslo_log import log as logging +from novaclient import exceptions as nova_exceptions from trove.cluster.tasks import ClusterTask from trove.cluster.tasks import ClusterTasks from trove.common import cfg @@ -274,8 +275,51 @@ class Cluster(object): def is_cluster_deleting(context, cluster_id): cluster = Cluster.load(context, cluster_id) - return (cluster.db_info.task_status == ClusterTasks.DELETING - or cluster.db_info.task_status == ClusterTasks.SHRINKING_CLUSTER) + return (cluster.db_info.task_status == ClusterTasks.DELETING or + cluster.db_info.task_status == ClusterTasks.SHRINKING_CLUSTER) + + +def get_flavors_from_instance_defs(context, instances, + volume_enabled, ephemeral_enabled): + """Load and validate flavors for given instance definitions.""" + flavors = dict() + nova_client = remote.create_nova_client(context) + for instance in instances: + flavor_id = instance['flavor_id'] + if flavor_id not in flavors: + try: + flavor = nova_client.flavors.get(flavor_id) + if (not volume_enabled and + (ephemeral_enabled and flavor.ephemeral == 0)): + raise exception.LocalStorageNotSpecified( + flavor=flavor_id) + flavors[flavor_id] = flavor + except nova_exceptions.NotFound: + raise exception.FlavorNotFound(uuid=flavor_id) + + return flavors + + +def get_required_volume_size(instances, volume_enabled): + """Calculate the total Trove volume size for given instances.""" + volume_sizes = [instance['volume_size'] for instance in instances + if instance.get('volume_size', None)] + + if volume_enabled: + if len(volume_sizes) != len(instances): + raise exception.ClusterVolumeSizeRequired() + + total_volume_size = 0 + for volume_size in volume_sizes: + validate_volume_size(volume_size) + total_volume_size += volume_size + + return total_volume_size + + if len(volume_sizes) > 0: + raise exception.VolumeNotSupported() + + return None def validate_volume_size(size): diff --git a/trove/common/cfg.py b/trove/common/cfg.py index 8abcefdebb..23e5153513 100644 --- a/trove/common/cfg.py +++ b/trove/common/cfg.py @@ -785,7 +785,7 @@ cassandra_group = cfg.OptGroup( 'cassandra', title='Cassandra options', help="Oslo option group designed for Cassandra datastore") cassandra_opts = [ - cfg.ListOpt('tcp_ports', default=["7000", "7001", "9042", "9160"], + cfg.ListOpt('tcp_ports', default=["7000", "7001", "7199", "9042", "9160"], help='List of TCP ports and/or port ranges to open ' 'in the security group (only applicable ' 'if trove_security_groups_support is True).'), @@ -835,6 +835,22 @@ cassandra_opts = [ help='Databases to exclude when listing databases.'), cfg.StrOpt('guest_log_exposed_logs', default='', help='List of Guest Logs to expose for publishing.'), + cfg.BoolOpt('cluster_support', default=True, + help='Enable clusters to be created and managed.'), + cfg.StrOpt('api_strategy', + default='trove.common.strategies.cluster.experimental.' + 'cassandra.api.CassandraAPIStrategy', + help='Class that implements datastore-specific API logic.'), + cfg.StrOpt('taskmanager_strategy', + default='trove.common.strategies.cluster.experimental' + '.cassandra.taskmanager.CassandraTaskManagerStrategy', + help='Class that implements datastore-specific task manager ' + 'logic.'), + cfg.StrOpt('guestagent_strategy', + default='trove.common.strategies.cluster.experimental' + '.cassandra.guestagent.CassandraGuestAgentStrategy', + help='Class that implements datastore-specific Guest Agent API ' + 'logic.'), ] # Couchbase diff --git a/trove/common/strategies/cluster/experimental/cassandra/__init__.py b/trove/common/strategies/cluster/experimental/cassandra/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/trove/common/strategies/cluster/experimental/cassandra/api.py b/trove/common/strategies/cluster/experimental/cassandra/api.py new file mode 100644 index 0000000000..02db891fc2 --- /dev/null +++ b/trove/common/strategies/cluster/experimental/cassandra/api.py @@ -0,0 +1,212 @@ +# Copyright 2015 Tesora Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_log import log as logging + +from trove.cluster import models +from trove.cluster.tasks import ClusterTasks +from trove.cluster.views import ClusterView +from trove.common import cfg +from trove.common.strategies.cluster import base +from trove.common.strategies.cluster.experimental.cassandra.taskmanager import( + CassandraClusterTasks) +from trove.common import utils +from trove.extensions.mgmt.clusters.views import MgmtClusterView +from trove.instance import models as inst_models +from trove.quota.quota import check_quotas +from trove.taskmanager import api as task_api + + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + + +class CassandraAPIStrategy(base.BaseAPIStrategy): + + @property + def cluster_class(self): + return CassandraCluster + + @property + def cluster_controller_actions(self): + return { + 'grow': self._action_grow_cluster, + 'shrink': self._action_shrink_cluster + } + + def _action_grow_cluster(self, cluster, body): + nodes = body['grow'] + instances = [] + for node in nodes: + instance = { + 'flavor_id': utils.get_id_from_href(node['flavorRef']) + } + if 'name' in node: + instance['name'] = node['name'] + if 'volume' in node: + instance['volume_size'] = int(node['volume']['size']) + instances.append(instance) + return cluster.grow(instances) + + def _action_shrink_cluster(self, cluster, body): + nodes = body['shrink'] + instance_ids = [node['id'] for node in nodes] + return cluster.shrink(instance_ids) + + @property + def cluster_view_class(self): + return CassandraClusterView + + @property + def mgmt_cluster_view_class(self): + return CassandraMgmtClusterView + + +class CassandraCluster(models.Cluster): + + DEFAULT_DATA_CENTER = "dc1" + DEFAULT_RACK = "rack1" + + @classmethod + def create(cls, context, name, datastore, datastore_version, + instances, extended_properties): + LOG.debug("Processing a request for creating a new cluster.") + + # Updating Cluster Task. + db_info = models.DBCluster.create( + name=name, tenant_id=context.tenant, + datastore_version_id=datastore_version.id, + task_status=ClusterTasks.BUILDING_INITIAL) + + cls._create_cluster_instances( + context, db_info.id, db_info.name, + datastore, datastore_version, instances, extended_properties) + + # Calling taskmanager to further proceed for cluster-configuration. + task_api.load(context, datastore_version.manager).create_cluster( + db_info.id) + + return CassandraCluster(context, db_info, datastore, datastore_version) + + @classmethod + def _create_cluster_instances( + cls, context, cluster_id, cluster_name, + datastore, datastore_version, instances, extended_properties=None): + LOG.debug("Processing a request for new cluster instances.") + + cassandra_conf = CONF.get(datastore_version.manager) + eph_enabled = cassandra_conf.device_path + vol_enabled = cassandra_conf.volume_support + + # Validate instance flavors. + models.get_flavors_from_instance_defs(context, instances, + vol_enabled, eph_enabled) + + # Compute the total volume allocation. + req_volume_size = models.get_required_volume_size(instances, + vol_enabled) + + # Check requirements against quota. + num_new_instances = len(instances) + deltas = {'instances': num_new_instances, 'volumes': req_volume_size} + check_quotas(context.tenant, deltas) + + # Creating member instances. + num_instances = len( + CassandraClusterTasks.find_cluster_node_ids(cluster_id)) + new_instances = [] + for instance_idx, instance in enumerate(instances, num_instances + 1): + instance_az = instance.get('availability_zone', None) + + member_config = {"id": cluster_id, + "instance_type": "member", + "dc": cls.DEFAULT_DATA_CENTER, + "rack": instance_az or cls.DEFAULT_RACK} + + instance_name = instance.get('name') + if not instance_name: + instance_name = cls._build_instance_name( + cluster_name, member_config['dc'], member_config['rack'], + instance_idx) + + new_instance = inst_models.Instance.create( + context, instance_name, + instance['flavor_id'], + datastore_version.image_id, + [], [], + datastore, datastore_version, + instance['volume_size'], None, + nics=instance.get('nics', None), + availability_zone=instance_az, + configuration_id=None, + cluster_config=member_config) + + new_instances.append(new_instance) + + return new_instances + + @classmethod + def _build_instance_name(cls, cluster_name, dc, rack, instance_idx): + return "%s-member-%s-%s-%d" % (cluster_name, dc, rack, instance_idx) + + def grow(self, instances): + LOG.debug("Processing a request for growing cluster: %s" % self.id) + + self.validate_cluster_available() + + context = self.context + db_info = self.db_info + datastore = self.ds + datastore_version = self.ds_version + + db_info.update(task_status=ClusterTasks.GROWING_CLUSTER) + + new_instances = self._create_cluster_instances( + context, db_info.id, db_info.name, datastore, datastore_version, + instances) + + task_api.load(context, datastore_version.manager).grow_cluster( + db_info.id, [instance.id for instance in new_instances]) + + return CassandraCluster(context, db_info, datastore, datastore_version) + + def shrink(self, removal_ids): + LOG.debug("Processing a request for shrinking cluster: %s" % self.id) + + self.validate_cluster_available() + + context = self.context + db_info = self.db_info + datastore = self.ds + datastore_version = self.ds_version + + db_info.update(task_status=ClusterTasks.SHRINKING_CLUSTER) + + task_api.load(context, datastore_version.manager).shrink_cluster( + db_info.id, removal_ids) + + return CassandraCluster(context, db_info, datastore, datastore_version) + + +class CassandraClusterView(ClusterView): + + def build_instances(self): + return self._build_instances(['member'], ['member']) + + +class CassandraMgmtClusterView(MgmtClusterView): + + def build_instances(self): + return self._build_instances(['member'], ['member']) diff --git a/trove/common/strategies/cluster/experimental/cassandra/guestagent.py b/trove/common/strategies/cluster/experimental/cassandra/guestagent.py new file mode 100644 index 0000000000..6936245bf3 --- /dev/null +++ b/trove/common/strategies/cluster/experimental/cassandra/guestagent.py @@ -0,0 +1,96 @@ +# Copyright 2015 Tesora Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_log import log as logging + +from trove.common import cfg +from trove.common.strategies.cluster import base +from trove.guestagent import api as guest_api + + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + + +class CassandraGuestAgentStrategy(base.BaseGuestAgentStrategy): + + @property + def guest_client_class(self): + return CassandraGuestAgentAPI + + +class CassandraGuestAgentAPI(guest_api.API): + + def get_data_center(self): + LOG.debug("Retrieving the data center for node: %s" % self.id) + return self._call("get_data_center", guest_api.AGENT_LOW_TIMEOUT, + self.version_cap) + + def get_rack(self): + LOG.debug("Retrieving the rack for node: %s" % self.id) + return self._call("get_rack", guest_api.AGENT_LOW_TIMEOUT, + self.version_cap) + + def set_seeds(self, seeds): + LOG.debug("Configuring the gossip seeds for node: %s" % self.id) + return self._call("set_seeds", guest_api.AGENT_LOW_TIMEOUT, + self.version_cap, seeds=seeds) + + def get_seeds(self): + LOG.debug("Retrieving the gossip seeds for node: %s" % self.id) + return self._call("get_seeds", guest_api.AGENT_LOW_TIMEOUT, + self.version_cap) + + def set_auto_bootstrap(self, enabled): + LOG.debug("Setting the auto-bootstrap to '%s' for node: %s" + % (enabled, self.id)) + return self._call("set_auto_bootstrap", guest_api.AGENT_LOW_TIMEOUT, + self.version_cap, enabled=enabled) + + def cluster_complete(self): + LOG.debug("Sending a setup completion notification for node: %s" + % self.id) + return self._call("cluster_complete", guest_api.AGENT_LOW_TIMEOUT, + self.version_cap) + + def node_cleanup_begin(self): + LOG.debug("Signaling the node to prepare for cleanup: %s" % self.id) + return self._call("node_cleanup_begin", guest_api.AGENT_LOW_TIMEOUT, + self.version_cap) + + def node_cleanup(self): + LOG.debug("Running cleanup on node: %s" % self.id) + return self._cast('node_cleanup', self.version_cap) + + def node_decommission(self): + LOG.debug("Decommission node: %s" % self.id) + return self._cast("node_decommission", self.version_cap) + + def cluster_secure(self, password): + LOG.debug("Securing the cluster via node: %s" % self.id) + return self._call( + "cluster_secure", guest_api.AGENT_HIGH_TIMEOUT, + self.version_cap, password=password) + + def get_admin_credentials(self): + LOG.debug("Retrieving the admin credentials from node: %s" % self.id) + return self._call("get_admin_credentials", guest_api.AGENT_LOW_TIMEOUT, + self.version_cap) + + def store_admin_credentials(self, admin_credentials): + LOG.debug("Storing the admin credentials on node: %s" % self.id) + return self._call("store_admin_credentials", + guest_api.AGENT_LOW_TIMEOUT, self.version_cap, + admin_credentials=admin_credentials) diff --git a/trove/common/strategies/cluster/experimental/cassandra/taskmanager.py b/trove/common/strategies/cluster/experimental/cassandra/taskmanager.py new file mode 100644 index 0000000000..505601e0ae --- /dev/null +++ b/trove/common/strategies/cluster/experimental/cassandra/taskmanager.py @@ -0,0 +1,351 @@ +# Copyright 2015 Tesora Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from eventlet.timeout import Timeout +from oslo_log import log as logging + +from trove.common import cfg +from trove.common.i18n import _ +from trove.common.strategies.cluster import base +from trove.common import utils +from trove.instance.models import DBInstance +from trove.instance.models import Instance +from trove.taskmanager import api as task_api +import trove.taskmanager.models as task_models + + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF +USAGE_SLEEP_TIME = CONF.usage_sleep_time # seconds. + + +class CassandraTaskManagerStrategy(base.BaseTaskManagerStrategy): + + @property + def task_manager_api_class(self): + return CassandraTaskManagerAPI + + @property + def task_manager_cluster_tasks_class(self): + return CassandraClusterTasks + + +class CassandraClusterTasks(task_models.ClusterTasks): + + def create_cluster(self, context, cluster_id): + LOG.debug("Begin create_cluster for id: %s." % cluster_id) + + def _create_cluster(): + cluster_node_ids = self.find_cluster_node_ids(cluster_id) + + # Wait for cluster nodes to get to cluster-ready status. + LOG.debug("Waiting for all nodes to become ready.") + if not self._all_instances_ready(cluster_node_ids, cluster_id): + return + + cluster_nodes = self.load_cluster_nodes(context, cluster_node_ids) + + LOG.debug("All nodes ready, proceeding with cluster setup.") + seeds = self.choose_seed_nodes(cluster_nodes) + + # Configure each cluster node with the list of seeds. + # Once all nodes are configured, start the seed nodes one at a time + # followed by the rest of the nodes. + try: + LOG.debug("Selected seed nodes: %s" % seeds) + + for node in cluster_nodes: + LOG.debug("Configuring node: %s." % node['id']) + node['guest'].set_seeds(seeds) + node['guest'].set_auto_bootstrap(False) + + LOG.debug("Starting seed nodes.") + for node in cluster_nodes: + if node['ip'] in seeds: + node['guest'].restart() + node['guest'].set_auto_bootstrap(True) + + LOG.debug("All seeds running, starting remaining nodes.") + for node in cluster_nodes: + if node['ip'] not in seeds: + node['guest'].restart() + node['guest'].set_auto_bootstrap(True) + + # Create the in-database user via the first node. The remaining + # nodes will replicate in-database changes automatically. + # Only update the local authentication file on the other nodes. + LOG.debug("Securing the cluster.") + key = utils.generate_random_password() + admin_creds = None + for node in cluster_nodes: + if admin_creds is None: + admin_creds = node['guest'].cluster_secure(key) + else: + node['guest'].store_admin_credentials(admin_creds) + node['guest'].cluster_complete() + + LOG.debug("Cluster configuration finished successfully.") + except Exception: + LOG.exception(_("Error creating cluster.")) + self.update_statuses_on_failure(cluster_id) + + timeout = Timeout(CONF.cluster_usage_timeout) + try: + _create_cluster() + self.reset_task() + except Timeout as t: + if t is not timeout: + raise # not my timeout + LOG.exception(_("Timeout for building cluster.")) + self.update_statuses_on_failure(cluster_id) + finally: + timeout.cancel() + + LOG.debug("End create_cluster for id: %s." % cluster_id) + + @classmethod + def find_cluster_node_ids(cls, cluster_id): + db_instances = DBInstance.find_all(cluster_id=cluster_id).all() + return [db_instance.id for db_instance in db_instances] + + @classmethod + def load_cluster_nodes(cls, context, node_ids): + return [cls.build_node_info(Instance.load(context, node_id)) + for node_id in node_ids] + + @classmethod + def build_node_info(cls, instance): + guest = cls.get_guest(instance) + return {'instance': instance, + 'guest': guest, + 'id': instance.id, + 'ip': cls.get_ip(instance), + 'dc': guest.get_data_center(), + 'rack': guest.get_rack()} + + @classmethod + def choose_seed_nodes(cls, node_info): + """Select gossip seeds. The seeds are cluster nodes from which any + new/other cluster nodes request information on the + cluster geometry. + They should include at least one node from each data center and + rack. Gossip optimization is not critical, but it is recommended + to use a small seed list. + + Select one (random) node from each dc and rack. + + :param node_info: List of cluster nodes. + :type node_info: list of dicts + """ + ips_by_affinity = cls._group_by_affinity(node_info) + return {ips_by_affinity[dc][rack][0] + for dc in ips_by_affinity + for rack in ips_by_affinity[dc]} + + @classmethod + def _group_by_affinity(cls, node_info): + """Group node IPs by affinity to data center and rack.""" + ips_by_affinity = dict() + for node in node_info: + ip = node['ip'] + dc = node['dc'] + rack = node['rack'] + if dc in ips_by_affinity: + dc_nodes = ips_by_affinity[dc] + if rack in dc_nodes: + rack_nodes = dc_nodes[rack] + rack_nodes.append(ip) + else: + dc_nodes.update({rack: [ip]}) + else: + ips_by_affinity.update({dc: {rack: [ip]}}) + + return ips_by_affinity + + def grow_cluster(self, context, cluster_id, new_instance_ids): + LOG.debug("Begin grow_cluster for id: %s." % cluster_id) + + def _grow_cluster(): + # Wait for new nodes to get to cluster-ready status. + LOG.debug("Waiting for new nodes to become ready.") + if not self._all_instances_ready(new_instance_ids, cluster_id): + return + + new_instances = [Instance.load(context, instance_id) + for instance_id in new_instance_ids] + added_nodes = [self.build_node_info(instance) + for instance in new_instances] + + LOG.debug("All nodes ready, proceeding with cluster setup.") + + cluster_node_ids = self.find_cluster_node_ids(cluster_id) + cluster_nodes = self.load_cluster_nodes(context, cluster_node_ids) + + # Recompute the seed nodes based on the updated cluster geometry. + seeds = self.choose_seed_nodes(cluster_nodes) + + # Configure each cluster node with the updated list of seeds. + # Since we are adding to an existing cluster, ensure that the + # new nodes have auto-bootstrapping enabled. + # Start the added nodes. + try: + LOG.debug("Selected seed nodes: %s" % seeds) + + # Update the seeds on all nodes. + # Also retrieve the superuser password from one previously + # existing node. + admin_creds = None + for node in cluster_nodes: + LOG.debug("Configuring node: %s." % node['id']) + node['guest'].set_seeds(seeds) + if (admin_creds is None) and (node not in added_nodes): + admin_creds = node['guest'].get_admin_credentials() + + # Start any seeds from the added nodes first. + LOG.debug("Starting new seed nodes.") + for node in added_nodes: + if node['ip'] in seeds: + node['guest'].set_auto_bootstrap(True) + node['guest'].store_admin_credentials(admin_creds) + node['guest'].restart() + node['guest'].cluster_complete() + + LOG.debug("All new seeds running, starting the remaining of " + "added nodes.") + for node in added_nodes: + if node['ip'] not in seeds: + node['guest'].set_auto_bootstrap(True) + node['guest'].store_admin_credentials(admin_creds) + node['guest'].restart() + node['guest'].cluster_complete() + + # Run nodetool cleanup on each of the previously existing nodes + # to remove the keys that no longer belong to those nodes. + # Wait for cleanup to complete on one node before running + # it on the next node. + LOG.debug("Cleaning up orphan data on old cluster nodes.") + for node in cluster_nodes: + if node not in added_nodes: + nid = node['id'] + node['guest'].node_cleanup_begin() + node['guest'].node_cleanup() + LOG.debug("Waiting for node to finish its " + "cleanup: %s" % nid) + if not self._all_instances_running([nid], cluster_id): + LOG.warning(_("Node did not complete cleanup " + "successfully: %s") % nid) + + LOG.debug("Cluster configuration finished successfully.") + except Exception: + LOG.exception(_("Error growing cluster.")) + self.update_statuses_on_failure(cluster_id) + + timeout = Timeout(CONF.cluster_usage_timeout) + try: + _grow_cluster() + self.reset_task() + except Timeout as t: + if t is not timeout: + raise # not my timeout + LOG.exception(_("Timeout for growing cluster.")) + self.update_statuses_on_failure(cluster_id) + finally: + timeout.cancel() + + LOG.debug("End grow_cluster for id: %s." % cluster_id) + + def shrink_cluster(self, context, cluster_id, removal_ids): + LOG.debug("Begin shrink_cluster for id: %s." % cluster_id) + + def _shrink_cluster(): + cluster_node_ids = self.find_cluster_node_ids(cluster_id) + cluster_nodes = self.load_cluster_nodes(context, cluster_node_ids) + + removed_nodes = CassandraClusterTasks.load_cluster_nodes( + context, removal_ids) + + LOG.debug("All nodes ready, proceeding with cluster setup.") + + # Update the list of seeds on remaining nodes if necessary. + # Once all nodes are configured, decommission the removed nodes. + # Cassandra will stream data from decommissioned nodes to the + # remaining ones. + try: + + # All nodes should have the same seeds. + # We retrieve current seeds from the first node. + test_node = self.load_cluster_nodes( + context, cluster_node_ids[:1])[0] + current_seeds = test_node['guest'].get_seeds() + # The seeds will have to be updated on all remaining instances + # if any of the seed nodes is going to be removed. + update_seeds = any(node['ip'] in current_seeds + for node in removed_nodes) + + LOG.debug("Decommissioning removed nodes.") + for node in removed_nodes: + node['guest'].node_decommission() + node['instance'].update_db(cluster_id=None) + + # Recompute the seed nodes based on the updated cluster + # geometry if any of the existing seed nodes was removed. + if update_seeds: + LOG.debug("Updating seeds on the remaining nodes.") + cluster_nodes = self.load_cluster_nodes( + context, cluster_node_ids) + + remaining_nodes = [node for node in cluster_nodes + if node not in removed_nodes] + seeds = self.choose_seed_nodes(remaining_nodes) + LOG.debug("Selected seed nodes: %s" % seeds) + for node in remaining_nodes: + LOG.debug("Configuring node: %s." % node['id']) + node['guest'].set_seeds(seeds) + + # Wait for the removed nodes to go SHUTDOWN. + LOG.debug("Waiting for all decommissioned nodes to shutdown.") + if not self._all_instances_shutdown(removal_ids, cluster_id): + # Now detached, failed nodes will stay available + # in the list of standalone instances. + return + + # Delete decommissioned instances only when the cluster is in a + # consistent state. + LOG.debug("Deleting decommissioned instances.") + for node in removed_nodes: + Instance.delete(node['instance']) + + LOG.debug("Cluster configuration finished successfully.") + except Exception: + LOG.exception(_("Error shrinking cluster.")) + self.update_statuses_on_failure(cluster_id) + + timeout = Timeout(CONF.cluster_usage_timeout) + try: + _shrink_cluster() + self.reset_task() + except Timeout as t: + if t is not timeout: + raise # not my timeout + LOG.exception(_("Timeout for shrinking cluster.")) + self.update_statuses_on_failure(cluster_id) + finally: + timeout.cancel() + + LOG.debug("End shrink_cluster for id: %s." % cluster_id) + + +class CassandraTaskManagerAPI(task_api.API): + pass diff --git a/trove/guestagent/datastore/experimental/cassandra/manager.py b/trove/guestagent/datastore/experimental/cassandra/manager.py index d61d2f4550..350c5f6518 100644 --- a/trove/guestagent/datastore/experimental/cassandra/manager.py +++ b/trove/guestagent/datastore/experimental/cassandra/manager.py @@ -108,7 +108,16 @@ class Manager(manager.Manager): LOG.debug("Applying configuration.") self.app.configuration_manager.save_configuration( config_contents) - self.app.apply_initial_guestagent_configuration() + cluster_name = None + if cluster_config: + cluster_name = cluster_config.get('id', None) + self.app.apply_initial_guestagent_configuration( + cluster_name=cluster_name) + + if cluster_config: + self.app.write_cluster_topology( + cluster_config['dc'], cluster_config['rack'], + prefer_local=True) if device_path: LOG.debug("Preparing data volume.") @@ -124,20 +133,21 @@ class Manager(manager.Manager): LOG.debug("Mounting new volume.") device.mount(mount_point) - if backup_info: - self._perform_restore(backup_info, context, mount_point) + if not cluster_config: + if backup_info: + self._perform_restore(backup_info, context, mount_point) - LOG.debug("Starting database with configuration changes.") - self.app.start_db(update_db=False) + LOG.debug("Starting database with configuration changes.") + self.app.start_db(update_db=False) - if not self.app.has_user_config(): - LOG.debug("Securing superuser access.") - self.app.secure() - self.app.restart() + if not self.app.has_user_config(): + LOG.debug("Securing superuser access.") + self.app.secure() + self.app.restart() self.__admin = CassandraAdmin(self.app.get_current_superuser()) - if self.is_root_enabled(context): + if not cluster_config and self.is_root_enabled(context): self.status.report_root(context, self.app.default_superuser_name) def change_passwords(self, context, users): @@ -235,3 +245,39 @@ class Manager(manager.Manager): require restart, so this is a no-op. """ pass + + def get_data_center(self, context): + return self.app.get_data_center() + + def get_rack(self, context): + return self.app.get_rack() + + def set_seeds(self, context, seeds): + self.app.set_seeds(seeds) + + def get_seeds(self, context): + return self.app.get_seeds() + + def set_auto_bootstrap(self, context, enabled): + self.app.set_auto_bootstrap(enabled) + + def node_cleanup_begin(self, context): + self.app.node_cleanup_begin() + + def node_cleanup(self, context): + self.app.node_cleanup() + + def node_decommission(self, context): + self.app.node_decommission() + + def cluster_secure(self, context, password): + os_admin = self.app.cluster_secure(password) + self.__admin = CassandraAdmin(self.app.get_current_superuser()) + return os_admin + + def get_admin_credentials(self, context): + return self.app.get_admin_credentials() + + def store_admin_credentials(self, context, admin_credentials): + self.app.store_admin_credentials(admin_credentials) + self.__admin = CassandraAdmin(self.app.get_current_superuser()) diff --git a/trove/guestagent/datastore/experimental/cassandra/service.py b/trove/guestagent/datastore/experimental/cassandra/service.py index 4b0f18341a..6fd16e9fae 100644 --- a/trove/guestagent/datastore/experimental/cassandra/service.py +++ b/trove/guestagent/datastore/experimental/cassandra/service.py @@ -30,12 +30,14 @@ from trove.common.i18n import _ from trove.common import instance as rd_instance from trove.common import pagination from trove.common.stream_codecs import IniCodec +from trove.common.stream_codecs import PropertiesCodec from trove.common.stream_codecs import SafeYamlCodec from trove.common import utils from trove.guestagent.common.configuration import ConfigurationManager from trove.guestagent.common.configuration import OneFileOverrideStrategy from trove.guestagent.common import guestagent_utils from trove.guestagent.common import operating_system +from trove.guestagent.common.operating_system import FileMode from trove.guestagent.datastore import service from trove.guestagent.db import models from trove.guestagent import pkg @@ -59,6 +61,13 @@ class CassandraApp(object): _CONF_DIR_MODS = stat.S_IRWXU _CONF_FILE_MODS = stat.S_IRUSR + CASSANDRA_CONF_FILE = "cassandra.yaml" + CASSANDRA_TOPOLOGY_FILE = 'cassandra-rackdc.properties' + + _TOPOLOGY_CODEC = PropertiesCodec( + delimiter='=', unpack_singletons=True, string_mappings={ + 'true': True, 'false': False}) + CASSANDRA_KILL_CMD = "sudo killall java || true" def __init__(self): @@ -79,16 +88,23 @@ class CassandraApp(object): return ['cassandra'] @property - def cassandra_conf(self): + def cassandra_conf_dir(self): return { - operating_system.REDHAT: - "/etc/cassandra/default.conf/cassandra.yaml", - operating_system.DEBIAN: - "/etc/cassandra/cassandra.yaml", - operating_system.SUSE: - "/etc/cassandra/default.conf/cassandra.yaml" + operating_system.REDHAT: "/etc/cassandra/default.conf/", + operating_system.DEBIAN: "/etc/cassandra/", + operating_system.SUSE: "/etc/cassandra/default.conf/" }[operating_system.get_os()] + @property + def cassandra_conf(self): + return guestagent_utils.build_file_path(self.cassandra_conf_dir, + self.CASSANDRA_CONF_FILE) + + @property + def cassandra_topology(self): + return guestagent_utils.build_file_path(self.cassandra_conf_dir, + self.CASSANDRA_TOPOLOGY_FILE) + @property def cassandra_owner(self): return 'cassandra' @@ -248,7 +264,10 @@ class CassandraApp(object): finally: self.stop_db() # Always restore the initial state of the service. - def secure(self, update_user=None): + def cluster_secure(self, password): + return self.secure(password=password).serialize() + + def secure(self, update_user=None, password=None): """Configure the Trove administrative user. Update an existing user if given. Create a new one using the default database credentials @@ -256,29 +275,39 @@ class CassandraApp(object): """ LOG.info(_('Configuring Trove superuser.')) - current_superuser = update_user or models.CassandraUser( - self.default_superuser_name, - self.default_superuser_password) + if password is None: + password = utils.generate_random_password() + + admin_username = update_user.name if update_user else self._ADMIN_USER + os_admin = models.CassandraUser(admin_username, password) if update_user: - os_admin = models.CassandraUser(update_user.name, - utils.generate_random_password()) - CassandraAdmin(current_superuser).alter_user_password(os_admin) + CassandraAdmin(update_user).alter_user_password(os_admin) else: - os_admin = models.CassandraUser(self._ADMIN_USER, - utils.generate_random_password()) - CassandraAdmin(current_superuser)._create_superuser(os_admin) - CassandraAdmin(os_admin).drop_user(current_superuser) + cassandra = models.CassandraUser( + self.default_superuser_name, self.default_superuser_password) + CassandraAdmin(cassandra)._create_superuser(os_admin) + CassandraAdmin(os_admin).drop_user(cassandra) - self.__create_cqlsh_config({self._CONF_AUTH_SEC: - {self._CONF_USR_KEY: os_admin.name, - self._CONF_PWD_KEY: os_admin.password}}) - - # Update the internal status with the new user. - self.status = CassandraAppStatus(os_admin) + self._update_admin_credentials(os_admin) return os_admin + def _update_admin_credentials(self, user): + self.__create_cqlsh_config({self._CONF_AUTH_SEC: + {self._CONF_USR_KEY: user.name, + self._CONF_PWD_KEY: user.password}}) + + # Update the internal status with the new user. + self.status = CassandraAppStatus(user) + + def store_admin_credentials(self, admin_credentials): + user = models.CassandraUser.deserialize_user(admin_credentials) + self._update_admin_credentials(user) + + def get_admin_credentials(self): + return self.get_current_superuser().serialize() + def _reset_admin_password(self): """ Reset the password of the Trove's administrative superuser. @@ -424,6 +453,14 @@ class CassandraApp(object): {'data_file_directories': [self.cassandra_data_dir]}) self._make_host_reachable() self._update_cluster_name_property(cluster_name or CONF.guest_id) + # A single-node instance may use the SimpleSnitch + # (keyspaces use SimpleStrategy). + # A network-aware snitch has to be used otherwise. + if cluster_name is None: + updates = {'endpoint_snitch': 'SimpleSnitch'} + else: + updates = {'endpoint_snitch': 'GossipingPropertyFileSnitch'} + self.configuration_manager.apply_system_override(updates) def _make_host_reachable(self): """ @@ -498,6 +535,21 @@ class CassandraApp(object): def remove_overrides(self): self.configuration_manager.remove_user_override() + def write_cluster_topology(self, data_center, rack, prefer_local=True): + LOG.info(_('Saving Cassandra cluster topology configuration.')) + + config = {'dc': data_center, + 'rack': rack, + 'prefer_local': prefer_local} + + operating_system.write_file(self.cassandra_topology, config, + codec=self._TOPOLOGY_CODEC, as_root=True) + operating_system.chown( + self.cassandra_topology, + self.cassandra_owner, self.cassandra_owner, as_root=True) + operating_system.chmod( + self.cassandra_topology, FileMode.ADD_READ_ALL, as_root=True) + def start_db_with_conf_changes(self, config_contents): LOG.debug("Starting database with configuration changes.") if self.status.is_running: @@ -517,6 +569,106 @@ class CassandraApp(object): def _get_cqlsh_conf_path(self): return os.path.expanduser(self.cqlsh_conf_path) + def get_data_center(self): + config = operating_system.read_file(self.cassandra_topology, + codec=self._TOPOLOGY_CODEC) + return config['dc'] + + def get_rack(self): + config = operating_system.read_file(self.cassandra_topology, + codec=self._TOPOLOGY_CODEC) + return config['rack'] + + def set_seeds(self, seeds): + LOG.debug("Setting seed nodes: %s" % seeds) + updates = { + 'seed_provider': {'parameters': + [{'seeds': ','.join(seeds)}] + } + } + + self.configuration_manager.apply_system_override(updates) + + def get_seeds(self): + """Return a list of seed node IPs if any. + + The seed IPs are stored as a comma-separated string in the + seed-provider parameters: + [{'class_name': '', 'parameters': [{'seeds': ','}, ...]}] + """ + + def find_first(key, dict_list): + for item in dict_list: + if key in item: + return item[key] + return [] + + sp_property = self.configuration_manager.get_value('seed_provider', []) + seeds_str = find_first('seeds', find_first('parameters', sp_property)) + return seeds_str.split(',') if seeds_str else [] + + def set_auto_bootstrap(self, enabled): + """Auto-bootstrap makes new (non-seed) nodes automatically migrate the + right data to themselves. + The feature has to be turned OFF when initializing a fresh cluster + without data. + It must be turned back ON once the cluster is initialized. + """ + LOG.debug("Setting auto-bootstrapping: %s" % enabled) + updates = {'auto_bootstrap': enabled} + self.configuration_manager.apply_system_override(updates) + + def node_cleanup_begin(self): + """Suspend periodic status updates and mark the instance busy + throughout the operation. + """ + self.status.begin_restart() + self.status.set_status(rd_instance.ServiceStatuses.BLOCKED) + + def node_cleanup(self): + """Cassandra does not automatically remove data from nodes that + lose part of their partition range to a newly added node. + Cleans up keyspaces and partition keys no longer belonging to the node. + + Do not treat cleanup failures as fatal. Resume the heartbeat after + finishing and let it signal the true state of the instance to the + caller. + """ + LOG.debug("Running node cleanup.") + # nodetool -h -p -u -pw cleanup + try: + self._run_nodetool_command('cleanup') + self.status.set_status(rd_instance.ServiceStatuses.RUNNING) + except Exception: + LOG.exception(_("The node failed to complete its cleanup.")) + finally: + self.status.end_restart() + + def node_decommission(self): + """Causes a live node to decommission itself, + streaming its data to the next node on the ring. + + Shutdown the database after successfully finishing the operation, + or leave the node in a failed state otherwise. + + Suspend periodic status updates, so that the caller can poll for the + database shutdown. + """ + LOG.debug("Decommissioning the node.") + # nodetool -h -p -u -pw decommission + self.status.begin_restart() + try: + self._run_nodetool_command('decommission') + except Exception: + LOG.exception(_("The node failed to decommission itself.")) + self.status.set_status(rd_instance.ServiceStatuses.FAILED) + return + + try: + self.stop_db(update_db=True, do_not_start_on_reboot=True) + finally: + self.status.end_restart() + def flush_tables(self, keyspace, *tables): """Flushes one or more tables from the memtable. """ @@ -528,11 +680,8 @@ class CassandraApp(object): def _run_nodetool_command(self, cmd, *args, **kwargs): """Execute a nodetool command on this node. """ - cassandra = self.get_current_superuser() - return utils.execute('nodetool', - '-h', 'localhost', - '-u', cassandra.name, - '-pw', cassandra.password, cmd, *args, **kwargs) + return utils.execute('nodetool', '-h', 'localhost', + cmd, *args, **kwargs) def enable_root(self, root_password=None): """Cassandra's 'root' user is called 'cassandra'. diff --git a/trove/taskmanager/models.py b/trove/taskmanager/models.py index 38d33da14e..2a532df3ff 100755 --- a/trove/taskmanager/models.py +++ b/trove/taskmanager/models.py @@ -208,59 +208,85 @@ class ClusterTasks(Cluster): def _all_instances_ready(self, instance_ids, cluster_id, shard_id=None): + """Wait for all instances to get READY.""" + return self._all_instances_acquire_status( + instance_ids, cluster_id, shard_id, ServiceStatuses.INSTANCE_READY, + fast_fail_statuses=[ServiceStatuses.FAILED, + ServiceStatuses.FAILED_TIMEOUT_GUESTAGENT]) - def _all_status_ready(ids): - LOG.debug("Checking service status of instance ids: %s" % ids) + def _all_instances_shutdown(self, instance_ids, cluster_id, + shard_id=None): + """Wait for all instances to go SHUTDOWN.""" + return self._all_instances_acquire_status( + instance_ids, cluster_id, shard_id, ServiceStatuses.SHUTDOWN, + fast_fail_statuses=[ServiceStatuses.FAILED, + ServiceStatuses.FAILED_TIMEOUT_GUESTAGENT]) + + def _all_instances_running(self, instance_ids, cluster_id, shard_id=None): + """Wait for all instances to become ACTIVE.""" + return self._all_instances_acquire_status( + instance_ids, cluster_id, shard_id, ServiceStatuses.RUNNING, + fast_fail_statuses=[ServiceStatuses.FAILED, + ServiceStatuses.FAILED_TIMEOUT_GUESTAGENT]) + + def _all_instances_acquire_status( + self, instance_ids, cluster_id, shard_id, expected_status, + fast_fail_statuses=None): + + def _is_fast_fail_status(status): + return ((fast_fail_statuses is not None) and + ((status == fast_fail_statuses) or + (status in fast_fail_statuses))) + + def _all_have_status(ids): for instance_id in ids: status = InstanceServiceStatus.find_by( instance_id=instance_id).get_status() - if (status == ServiceStatuses.FAILED or - status == ServiceStatuses.FAILED_TIMEOUT_GUESTAGENT): - # if one has failed, no need to continue polling - LOG.debug("Instance %s in %s, exiting polling." % ( - instance_id, status)) - return True - if status != ServiceStatuses.INSTANCE_READY: - # if one is not in a cluster-ready state, - # continue polling - LOG.debug("Instance %s in %s, continue polling." % ( - instance_id, status)) - return False - LOG.debug("Instances are ready, exiting polling for: %s" % ids) + if _is_fast_fail_status(status): + # if one has failed, no need to continue polling + LOG.debug("Instance %s has acquired a fast-fail status %s." + % (instance_id, status)) + return True + if status != expected_status: + # if one is not in the expected state, continue polling + LOG.debug("Instance %s was %s." % (instance_id, status)) + return False + return True def _instance_ids_with_failures(ids): - LOG.debug("Checking for service status failures for " - "instance ids: %s" % ids) + LOG.debug("Checking for service failures on instances: %s" + % ids) failed_instance_ids = [] for instance_id in ids: status = InstanceServiceStatus.find_by( instance_id=instance_id).get_status() - if (status == ServiceStatuses.FAILED or - status == ServiceStatuses.FAILED_TIMEOUT_GUESTAGENT): - failed_instance_ids.append(instance_id) + if _is_fast_fail_status(status): + failed_instance_ids.append(instance_id) return failed_instance_ids - LOG.debug("Polling until service status is ready for " - "instance ids: %s" % instance_ids) + LOG.debug("Polling until all instances acquire %s status: %s" + % (expected_status, instance_ids)) try: utils.poll_until(lambda: instance_ids, - lambda ids: _all_status_ready(ids), + lambda ids: _all_have_status(ids), sleep_time=USAGE_SLEEP_TIME, time_out=CONF.usage_timeout) except PollTimeOut: - LOG.exception(_("Timeout for all instance service statuses " - "to become ready.")) + LOG.exception(_("Timed out while waiting for all instances " + "to become %s.") % expected_status) self.update_statuses_on_failure(cluster_id, shard_id) return False failed_ids = _instance_ids_with_failures(instance_ids) if failed_ids: - LOG.error(_("Some instances failed to become ready: %s") % - failed_ids) + LOG.error(_("Some instances failed: %s") % failed_ids) self.update_statuses_on_failure(cluster_id, shard_id) return False + LOG.debug("All instances have acquired the expected status %s." + % expected_status) + return True def delete_cluster(self, context, cluster_id): diff --git a/trove/tests/int_tests.py b/trove/tests/int_tests.py index c4d1558a6f..86ed581d23 100644 --- a/trove/tests/int_tests.py +++ b/trove/tests/int_tests.py @@ -200,7 +200,7 @@ register(["db2_supported"], common_groups, database_actions_groups, user_actions_groups) register(["cassandra_supported"], common_groups, user_actions_groups, database_actions_groups, - backup_groups, configuration_groups) + backup_groups, configuration_groups, cluster_actions_groups) register(["couchbase_supported"], common_groups, backup_groups, root_actions_groups) register(["couchdb_supported"], common_groups) diff --git a/trove/tests/scenario/helpers/cassandra_helper.py b/trove/tests/scenario/helpers/cassandra_helper.py index 385b08c60f..7aad13a6c4 100644 --- a/trove/tests/scenario/helpers/cassandra_helper.py +++ b/trove/tests/scenario/helpers/cassandra_helper.py @@ -57,7 +57,6 @@ class CassandraClient(object): class CassandraHelper(TestHelper): DATA_COLUMN_NAME = 'value' - cluster_node_count = 2 def __init__(self, expected_override_name): super(CassandraHelper, self).__init__(expected_override_name) diff --git a/trove/tests/scenario/helpers/mariadb_helper.py b/trove/tests/scenario/helpers/mariadb_helper.py index 19986e9a68..687b87631a 100644 --- a/trove/tests/scenario/helpers/mariadb_helper.py +++ b/trove/tests/scenario/helpers/mariadb_helper.py @@ -18,8 +18,6 @@ from trove.tests.scenario.helpers.mysql_helper import MysqlHelper class MariadbHelper(MysqlHelper): - cluster_node_count = 3 - def __init__(self, expected_override_name): super(MariadbHelper, self).__init__(expected_override_name) diff --git a/trove/tests/scenario/helpers/mongodb_helper.py b/trove/tests/scenario/helpers/mongodb_helper.py index 7710ab6c41..0fbb7de74b 100644 --- a/trove/tests/scenario/helpers/mongodb_helper.py +++ b/trove/tests/scenario/helpers/mongodb_helper.py @@ -18,8 +18,6 @@ from trove.tests.scenario.helpers.test_helper import TestHelper class MongodbHelper(TestHelper): - cluster_node_count = 2 - def __init__(self, expected_override_name): super(MongodbHelper, self).__init__(expected_override_name) diff --git a/trove/tests/scenario/helpers/pxc_helper.py b/trove/tests/scenario/helpers/pxc_helper.py index c26276d55c..7bd58c358c 100644 --- a/trove/tests/scenario/helpers/pxc_helper.py +++ b/trove/tests/scenario/helpers/pxc_helper.py @@ -18,7 +18,5 @@ from trove.tests.scenario.helpers.mysql_helper import MysqlHelper class PxcHelper(MysqlHelper): - cluster_node_count = 3 - def __init__(self, expected_override_name): super(PxcHelper, self).__init__(expected_override_name) diff --git a/trove/tests/scenario/helpers/redis_helper.py b/trove/tests/scenario/helpers/redis_helper.py index 80b5358d29..e07768ec36 100644 --- a/trove/tests/scenario/helpers/redis_helper.py +++ b/trove/tests/scenario/helpers/redis_helper.py @@ -22,8 +22,6 @@ from trove.tests.scenario.runners.test_runners import TestRunner class RedisHelper(TestHelper): - cluster_node_count = 2 - def __init__(self, expected_override_name): super(RedisHelper, self).__init__(expected_override_name) diff --git a/trove/tests/scenario/runners/cluster_actions_runners.py b/trove/tests/scenario/runners/cluster_actions_runners.py index 7d692b9b1c..8c94b674ab 100644 --- a/trove/tests/scenario/runners/cluster_actions_runners.py +++ b/trove/tests/scenario/runners/cluster_actions_runners.py @@ -55,7 +55,7 @@ class ClusterActionsRunner(TestRunner): expected_instance_states=['BUILD', 'ACTIVE'], expected_http_code=200): if not num_nodes: - num_nodes = self.test_helper.cluster_node_count + num_nodes = self.min_cluster_node_count instances_def = [ self.build_flavor( @@ -66,6 +66,10 @@ class ClusterActionsRunner(TestRunner): 'test_cluster', instances_def, expected_task_name, expected_instance_states, expected_http_code) + @property + def min_cluster_node_count(self): + return 2 + def assert_cluster_create( self, cluster_name, instances_def, expected_task_name, expected_instance_states, expected_http_code): @@ -337,12 +341,36 @@ class ClusterActionsRunner(TestRunner): self.assert_client_code(404) -class MariadbClusterActionsRunner(ClusterActionsRunner): +class CassandraClusterActionsRunner(ClusterActionsRunner): def run_cluster_root_enable(self): raise SkipTest("Operation is currently not supported.") +class MariadbClusterActionsRunner(ClusterActionsRunner): + + @property + def min_cluster_node_count(self): + return self.get_datastore_config_property('min_cluster_member_count') + + def run_cluster_root_enable(self): + raise SkipTest("Operation is currently not supported.") + + +class PxcClusterActionsRunner(ClusterActionsRunner): + + @property + def min_cluster_node_count(self): + return self.get_datastore_config_property('min_cluster_member_count') + + +class VerticaClusterActionsRunner(ClusterActionsRunner): + + @property + def min_cluster_node_count(self): + return self.get_datastore_config_property('cluster_member_count') + + class RedisClusterActionsRunner(ClusterActionsRunner): def run_cluster_root_enable(self): diff --git a/trove/tests/scenario/runners/negative_cluster_actions_runners.py b/trove/tests/scenario/runners/negative_cluster_actions_runners.py index 8a70f846a7..c37d767353 100644 --- a/trove/tests/scenario/runners/negative_cluster_actions_runners.py +++ b/trove/tests/scenario/runners/negative_cluster_actions_runners.py @@ -76,6 +76,15 @@ class MongodbNegativeClusterActionsRunner(NegativeClusterActionsRunner): max_nodes=3) +class CassandraNegativeClusterActionsRunner(NegativeClusterActionsRunner): + + def run_create_constrained_size_cluster(self): + raise SkipTest("No constraints apply to the number of cluster nodes.") + + def run_create_heterogeneous_cluster(self): + raise SkipTest("No constraints apply to the size of cluster nodes.") + + class RedisNegativeClusterActionsRunner(NegativeClusterActionsRunner): def run_create_constrained_size_cluster(self): diff --git a/trove/tests/unittests/cluster/test_cassandra_cluster.py b/trove/tests/unittests/cluster/test_cassandra_cluster.py new file mode 100644 index 0000000000..6ad17dae02 --- /dev/null +++ b/trove/tests/unittests/cluster/test_cassandra_cluster.py @@ -0,0 +1,99 @@ +# Copyright 2016 Tesora Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from mock import ANY +from mock import MagicMock +from mock import Mock +from mock import patch + +from trove.cluster import models +from trove.common.strategies.cluster.experimental.cassandra.api \ + import CassandraCluster +from trove.common.strategies.cluster.experimental.cassandra.taskmanager \ + import CassandraClusterTasks +from trove.instance import models as inst_models +from trove.quota import quota +from trove.tests.unittests import trove_testtools + + +class ClusterTest(trove_testtools.TestCase): + + def setUp(self): + super(ClusterTest, self).setUp() + self.context = trove_testtools.TroveTestContext(self) + + def tearDown(self): + super(ClusterTest, self).tearDown() + + @patch.object(inst_models.Instance, 'create') + @patch.object(quota.QUOTAS, 'check_quotas') + @patch.object(models, 'get_flavors_from_instance_defs') + @patch.object(models, 'get_required_volume_size', return_value=3) + def test_create_cluster_instances(self, get_vol_size, _, check_quotas, + inst_create): + test_instances = [MagicMock(), MagicMock()] + num_instances = len(test_instances) + datastore = Mock(manager='cassandra') + datastore_version = Mock(manager='cassandra') + + with patch.object(CassandraClusterTasks, 'find_cluster_node_ids', + return_value=[inst.id for inst in test_instances]): + CassandraCluster._create_cluster_instances( + self.context, 'test_cluster_id', 'test_cluster', + datastore, datastore_version, + test_instances) + + check_quotas.assert_called_once_with( + ANY, instances=num_instances, volumes=get_vol_size.return_value) + self.assertEqual(num_instances, inst_create.call_count, + "Unexpected number of instances created.") + + def test_choose_seed_nodes(self): + nodes = self._build_mock_nodes(3) + + seeds = CassandraClusterTasks.choose_seed_nodes(nodes) + self.assertEqual(1, len(seeds), + "Only one seed node should be selected for a " + "single-rack-single-dc cluster.") + + nodes = self._build_mock_nodes(3) + nodes[0]['rack'] = 'rack1' + nodes[1]['rack'] = 'rack2' + seeds = CassandraClusterTasks.choose_seed_nodes(nodes) + self.assertEqual(2, len(seeds), + "There should be exactly two seed nodes. " + "One from each rack.") + + nodes = self._build_mock_nodes(3) + nodes[0]['rack'] = 'rack1' + nodes[1]['rack'] = 'rack2' + nodes[2]['dc'] = 'dc2' + seeds = CassandraClusterTasks.choose_seed_nodes(nodes) + self.assertEqual(3, len(seeds), + "There should be exactly three seed nodes. " + "One from each rack and data center.") + + def _build_mock_nodes(self, num_nodes): + nodes = [] + for _ in range(num_nodes): + mock_instance = MagicMock() + nodes.append({'instance': mock_instance, + 'guest': MagicMock(), + 'id': mock_instance.id, + 'ip': '%s_IP' % mock_instance.id, + 'dc': 'dc1', + 'rack': 'rack1' + }) + return nodes diff --git a/trove/tests/unittests/guestagent/test_cassandra_manager.py b/trove/tests/unittests/guestagent/test_cassandra_manager.py index 0430a6271d..e95db6730e 100644 --- a/trove/tests/unittests/guestagent/test_cassandra_manager.py +++ b/trove/tests/unittests/guestagent/test_cassandra_manager.py @@ -296,7 +296,8 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase): mock_app.install_if_needed.assert_any_call(packages) mock_app._remove_system_tables.assert_any_call() mock_app.init_storage_structure.assert_any_call('/var/lib/cassandra') - mock_app.apply_initial_guestagent_configuration.assert_any_call() + mock_app.apply_initial_guestagent_configuration.assert_any_call( + cluster_name=None) mock_app.start_db.assert_any_call(update_db=False) mock_app.stop_db.assert_any_call() if backup_info: