Merge "Implement Cassandra clustering"
This commit is contained in:
commit
560cb8e0a2
@ -15,6 +15,7 @@
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
from novaclient import exceptions as nova_exceptions
|
||||
from trove.cluster.tasks import ClusterTask
|
||||
from trove.cluster.tasks import ClusterTasks
|
||||
from trove.common import cfg
|
||||
@ -274,8 +275,51 @@ class Cluster(object):
|
||||
|
||||
def is_cluster_deleting(context, cluster_id):
|
||||
cluster = Cluster.load(context, cluster_id)
|
||||
return (cluster.db_info.task_status == ClusterTasks.DELETING
|
||||
or cluster.db_info.task_status == ClusterTasks.SHRINKING_CLUSTER)
|
||||
return (cluster.db_info.task_status == ClusterTasks.DELETING or
|
||||
cluster.db_info.task_status == ClusterTasks.SHRINKING_CLUSTER)
|
||||
|
||||
|
||||
def get_flavors_from_instance_defs(context, instances,
|
||||
volume_enabled, ephemeral_enabled):
|
||||
"""Load and validate flavors for given instance definitions."""
|
||||
flavors = dict()
|
||||
nova_client = remote.create_nova_client(context)
|
||||
for instance in instances:
|
||||
flavor_id = instance['flavor_id']
|
||||
if flavor_id not in flavors:
|
||||
try:
|
||||
flavor = nova_client.flavors.get(flavor_id)
|
||||
if (not volume_enabled and
|
||||
(ephemeral_enabled and flavor.ephemeral == 0)):
|
||||
raise exception.LocalStorageNotSpecified(
|
||||
flavor=flavor_id)
|
||||
flavors[flavor_id] = flavor
|
||||
except nova_exceptions.NotFound:
|
||||
raise exception.FlavorNotFound(uuid=flavor_id)
|
||||
|
||||
return flavors
|
||||
|
||||
|
||||
def get_required_volume_size(instances, volume_enabled):
|
||||
"""Calculate the total Trove volume size for given instances."""
|
||||
volume_sizes = [instance['volume_size'] for instance in instances
|
||||
if instance.get('volume_size', None)]
|
||||
|
||||
if volume_enabled:
|
||||
if len(volume_sizes) != len(instances):
|
||||
raise exception.ClusterVolumeSizeRequired()
|
||||
|
||||
total_volume_size = 0
|
||||
for volume_size in volume_sizes:
|
||||
validate_volume_size(volume_size)
|
||||
total_volume_size += volume_size
|
||||
|
||||
return total_volume_size
|
||||
|
||||
if len(volume_sizes) > 0:
|
||||
raise exception.VolumeNotSupported()
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def validate_volume_size(size):
|
||||
|
@ -785,7 +785,7 @@ cassandra_group = cfg.OptGroup(
|
||||
'cassandra', title='Cassandra options',
|
||||
help="Oslo option group designed for Cassandra datastore")
|
||||
cassandra_opts = [
|
||||
cfg.ListOpt('tcp_ports', default=["7000", "7001", "9042", "9160"],
|
||||
cfg.ListOpt('tcp_ports', default=["7000", "7001", "7199", "9042", "9160"],
|
||||
help='List of TCP ports and/or port ranges to open '
|
||||
'in the security group (only applicable '
|
||||
'if trove_security_groups_support is True).'),
|
||||
@ -835,6 +835,22 @@ cassandra_opts = [
|
||||
help='Databases to exclude when listing databases.'),
|
||||
cfg.StrOpt('guest_log_exposed_logs', default='',
|
||||
help='List of Guest Logs to expose for publishing.'),
|
||||
cfg.BoolOpt('cluster_support', default=True,
|
||||
help='Enable clusters to be created and managed.'),
|
||||
cfg.StrOpt('api_strategy',
|
||||
default='trove.common.strategies.cluster.experimental.'
|
||||
'cassandra.api.CassandraAPIStrategy',
|
||||
help='Class that implements datastore-specific API logic.'),
|
||||
cfg.StrOpt('taskmanager_strategy',
|
||||
default='trove.common.strategies.cluster.experimental'
|
||||
'.cassandra.taskmanager.CassandraTaskManagerStrategy',
|
||||
help='Class that implements datastore-specific task manager '
|
||||
'logic.'),
|
||||
cfg.StrOpt('guestagent_strategy',
|
||||
default='trove.common.strategies.cluster.experimental'
|
||||
'.cassandra.guestagent.CassandraGuestAgentStrategy',
|
||||
help='Class that implements datastore-specific Guest Agent API '
|
||||
'logic.'),
|
||||
]
|
||||
|
||||
# Couchbase
|
||||
|
212
trove/common/strategies/cluster/experimental/cassandra/api.py
Normal file
212
trove/common/strategies/cluster/experimental/cassandra/api.py
Normal file
@ -0,0 +1,212 @@
|
||||
# Copyright 2015 Tesora Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
from trove.cluster import models
|
||||
from trove.cluster.tasks import ClusterTasks
|
||||
from trove.cluster.views import ClusterView
|
||||
from trove.common import cfg
|
||||
from trove.common.strategies.cluster import base
|
||||
from trove.common.strategies.cluster.experimental.cassandra.taskmanager import(
|
||||
CassandraClusterTasks)
|
||||
from trove.common import utils
|
||||
from trove.extensions.mgmt.clusters.views import MgmtClusterView
|
||||
from trove.instance import models as inst_models
|
||||
from trove.quota.quota import check_quotas
|
||||
from trove.taskmanager import api as task_api
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
class CassandraAPIStrategy(base.BaseAPIStrategy):
|
||||
|
||||
@property
|
||||
def cluster_class(self):
|
||||
return CassandraCluster
|
||||
|
||||
@property
|
||||
def cluster_controller_actions(self):
|
||||
return {
|
||||
'grow': self._action_grow_cluster,
|
||||
'shrink': self._action_shrink_cluster
|
||||
}
|
||||
|
||||
def _action_grow_cluster(self, cluster, body):
|
||||
nodes = body['grow']
|
||||
instances = []
|
||||
for node in nodes:
|
||||
instance = {
|
||||
'flavor_id': utils.get_id_from_href(node['flavorRef'])
|
||||
}
|
||||
if 'name' in node:
|
||||
instance['name'] = node['name']
|
||||
if 'volume' in node:
|
||||
instance['volume_size'] = int(node['volume']['size'])
|
||||
instances.append(instance)
|
||||
return cluster.grow(instances)
|
||||
|
||||
def _action_shrink_cluster(self, cluster, body):
|
||||
nodes = body['shrink']
|
||||
instance_ids = [node['id'] for node in nodes]
|
||||
return cluster.shrink(instance_ids)
|
||||
|
||||
@property
|
||||
def cluster_view_class(self):
|
||||
return CassandraClusterView
|
||||
|
||||
@property
|
||||
def mgmt_cluster_view_class(self):
|
||||
return CassandraMgmtClusterView
|
||||
|
||||
|
||||
class CassandraCluster(models.Cluster):
|
||||
|
||||
DEFAULT_DATA_CENTER = "dc1"
|
||||
DEFAULT_RACK = "rack1"
|
||||
|
||||
@classmethod
|
||||
def create(cls, context, name, datastore, datastore_version,
|
||||
instances, extended_properties):
|
||||
LOG.debug("Processing a request for creating a new cluster.")
|
||||
|
||||
# Updating Cluster Task.
|
||||
db_info = models.DBCluster.create(
|
||||
name=name, tenant_id=context.tenant,
|
||||
datastore_version_id=datastore_version.id,
|
||||
task_status=ClusterTasks.BUILDING_INITIAL)
|
||||
|
||||
cls._create_cluster_instances(
|
||||
context, db_info.id, db_info.name,
|
||||
datastore, datastore_version, instances, extended_properties)
|
||||
|
||||
# Calling taskmanager to further proceed for cluster-configuration.
|
||||
task_api.load(context, datastore_version.manager).create_cluster(
|
||||
db_info.id)
|
||||
|
||||
return CassandraCluster(context, db_info, datastore, datastore_version)
|
||||
|
||||
@classmethod
|
||||
def _create_cluster_instances(
|
||||
cls, context, cluster_id, cluster_name,
|
||||
datastore, datastore_version, instances, extended_properties=None):
|
||||
LOG.debug("Processing a request for new cluster instances.")
|
||||
|
||||
cassandra_conf = CONF.get(datastore_version.manager)
|
||||
eph_enabled = cassandra_conf.device_path
|
||||
vol_enabled = cassandra_conf.volume_support
|
||||
|
||||
# Validate instance flavors.
|
||||
models.get_flavors_from_instance_defs(context, instances,
|
||||
vol_enabled, eph_enabled)
|
||||
|
||||
# Compute the total volume allocation.
|
||||
req_volume_size = models.get_required_volume_size(instances,
|
||||
vol_enabled)
|
||||
|
||||
# Check requirements against quota.
|
||||
num_new_instances = len(instances)
|
||||
deltas = {'instances': num_new_instances, 'volumes': req_volume_size}
|
||||
check_quotas(context.tenant, deltas)
|
||||
|
||||
# Creating member instances.
|
||||
num_instances = len(
|
||||
CassandraClusterTasks.find_cluster_node_ids(cluster_id))
|
||||
new_instances = []
|
||||
for instance_idx, instance in enumerate(instances, num_instances + 1):
|
||||
instance_az = instance.get('availability_zone', None)
|
||||
|
||||
member_config = {"id": cluster_id,
|
||||
"instance_type": "member",
|
||||
"dc": cls.DEFAULT_DATA_CENTER,
|
||||
"rack": instance_az or cls.DEFAULT_RACK}
|
||||
|
||||
instance_name = instance.get('name')
|
||||
if not instance_name:
|
||||
instance_name = cls._build_instance_name(
|
||||
cluster_name, member_config['dc'], member_config['rack'],
|
||||
instance_idx)
|
||||
|
||||
new_instance = inst_models.Instance.create(
|
||||
context, instance_name,
|
||||
instance['flavor_id'],
|
||||
datastore_version.image_id,
|
||||
[], [],
|
||||
datastore, datastore_version,
|
||||
instance['volume_size'], None,
|
||||
nics=instance.get('nics', None),
|
||||
availability_zone=instance_az,
|
||||
configuration_id=None,
|
||||
cluster_config=member_config)
|
||||
|
||||
new_instances.append(new_instance)
|
||||
|
||||
return new_instances
|
||||
|
||||
@classmethod
|
||||
def _build_instance_name(cls, cluster_name, dc, rack, instance_idx):
|
||||
return "%s-member-%s-%s-%d" % (cluster_name, dc, rack, instance_idx)
|
||||
|
||||
def grow(self, instances):
|
||||
LOG.debug("Processing a request for growing cluster: %s" % self.id)
|
||||
|
||||
self.validate_cluster_available()
|
||||
|
||||
context = self.context
|
||||
db_info = self.db_info
|
||||
datastore = self.ds
|
||||
datastore_version = self.ds_version
|
||||
|
||||
db_info.update(task_status=ClusterTasks.GROWING_CLUSTER)
|
||||
|
||||
new_instances = self._create_cluster_instances(
|
||||
context, db_info.id, db_info.name, datastore, datastore_version,
|
||||
instances)
|
||||
|
||||
task_api.load(context, datastore_version.manager).grow_cluster(
|
||||
db_info.id, [instance.id for instance in new_instances])
|
||||
|
||||
return CassandraCluster(context, db_info, datastore, datastore_version)
|
||||
|
||||
def shrink(self, removal_ids):
|
||||
LOG.debug("Processing a request for shrinking cluster: %s" % self.id)
|
||||
|
||||
self.validate_cluster_available()
|
||||
|
||||
context = self.context
|
||||
db_info = self.db_info
|
||||
datastore = self.ds
|
||||
datastore_version = self.ds_version
|
||||
|
||||
db_info.update(task_status=ClusterTasks.SHRINKING_CLUSTER)
|
||||
|
||||
task_api.load(context, datastore_version.manager).shrink_cluster(
|
||||
db_info.id, removal_ids)
|
||||
|
||||
return CassandraCluster(context, db_info, datastore, datastore_version)
|
||||
|
||||
|
||||
class CassandraClusterView(ClusterView):
|
||||
|
||||
def build_instances(self):
|
||||
return self._build_instances(['member'], ['member'])
|
||||
|
||||
|
||||
class CassandraMgmtClusterView(MgmtClusterView):
|
||||
|
||||
def build_instances(self):
|
||||
return self._build_instances(['member'], ['member'])
|
@ -0,0 +1,96 @@
|
||||
# Copyright 2015 Tesora Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
from trove.common import cfg
|
||||
from trove.common.strategies.cluster import base
|
||||
from trove.guestagent import api as guest_api
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
class CassandraGuestAgentStrategy(base.BaseGuestAgentStrategy):
|
||||
|
||||
@property
|
||||
def guest_client_class(self):
|
||||
return CassandraGuestAgentAPI
|
||||
|
||||
|
||||
class CassandraGuestAgentAPI(guest_api.API):
|
||||
|
||||
def get_data_center(self):
|
||||
LOG.debug("Retrieving the data center for node: %s" % self.id)
|
||||
return self._call("get_data_center", guest_api.AGENT_LOW_TIMEOUT,
|
||||
self.version_cap)
|
||||
|
||||
def get_rack(self):
|
||||
LOG.debug("Retrieving the rack for node: %s" % self.id)
|
||||
return self._call("get_rack", guest_api.AGENT_LOW_TIMEOUT,
|
||||
self.version_cap)
|
||||
|
||||
def set_seeds(self, seeds):
|
||||
LOG.debug("Configuring the gossip seeds for node: %s" % self.id)
|
||||
return self._call("set_seeds", guest_api.AGENT_LOW_TIMEOUT,
|
||||
self.version_cap, seeds=seeds)
|
||||
|
||||
def get_seeds(self):
|
||||
LOG.debug("Retrieving the gossip seeds for node: %s" % self.id)
|
||||
return self._call("get_seeds", guest_api.AGENT_LOW_TIMEOUT,
|
||||
self.version_cap)
|
||||
|
||||
def set_auto_bootstrap(self, enabled):
|
||||
LOG.debug("Setting the auto-bootstrap to '%s' for node: %s"
|
||||
% (enabled, self.id))
|
||||
return self._call("set_auto_bootstrap", guest_api.AGENT_LOW_TIMEOUT,
|
||||
self.version_cap, enabled=enabled)
|
||||
|
||||
def cluster_complete(self):
|
||||
LOG.debug("Sending a setup completion notification for node: %s"
|
||||
% self.id)
|
||||
return self._call("cluster_complete", guest_api.AGENT_LOW_TIMEOUT,
|
||||
self.version_cap)
|
||||
|
||||
def node_cleanup_begin(self):
|
||||
LOG.debug("Signaling the node to prepare for cleanup: %s" % self.id)
|
||||
return self._call("node_cleanup_begin", guest_api.AGENT_LOW_TIMEOUT,
|
||||
self.version_cap)
|
||||
|
||||
def node_cleanup(self):
|
||||
LOG.debug("Running cleanup on node: %s" % self.id)
|
||||
return self._cast('node_cleanup', self.version_cap)
|
||||
|
||||
def node_decommission(self):
|
||||
LOG.debug("Decommission node: %s" % self.id)
|
||||
return self._cast("node_decommission", self.version_cap)
|
||||
|
||||
def cluster_secure(self, password):
|
||||
LOG.debug("Securing the cluster via node: %s" % self.id)
|
||||
return self._call(
|
||||
"cluster_secure", guest_api.AGENT_HIGH_TIMEOUT,
|
||||
self.version_cap, password=password)
|
||||
|
||||
def get_admin_credentials(self):
|
||||
LOG.debug("Retrieving the admin credentials from node: %s" % self.id)
|
||||
return self._call("get_admin_credentials", guest_api.AGENT_LOW_TIMEOUT,
|
||||
self.version_cap)
|
||||
|
||||
def store_admin_credentials(self, admin_credentials):
|
||||
LOG.debug("Storing the admin credentials on node: %s" % self.id)
|
||||
return self._call("store_admin_credentials",
|
||||
guest_api.AGENT_LOW_TIMEOUT, self.version_cap,
|
||||
admin_credentials=admin_credentials)
|
@ -0,0 +1,351 @@
|
||||
# Copyright 2015 Tesora Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from eventlet.timeout import Timeout
|
||||
from oslo_log import log as logging
|
||||
|
||||
from trove.common import cfg
|
||||
from trove.common.i18n import _
|
||||
from trove.common.strategies.cluster import base
|
||||
from trove.common import utils
|
||||
from trove.instance.models import DBInstance
|
||||
from trove.instance.models import Instance
|
||||
from trove.taskmanager import api as task_api
|
||||
import trove.taskmanager.models as task_models
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
USAGE_SLEEP_TIME = CONF.usage_sleep_time # seconds.
|
||||
|
||||
|
||||
class CassandraTaskManagerStrategy(base.BaseTaskManagerStrategy):
|
||||
|
||||
@property
|
||||
def task_manager_api_class(self):
|
||||
return CassandraTaskManagerAPI
|
||||
|
||||
@property
|
||||
def task_manager_cluster_tasks_class(self):
|
||||
return CassandraClusterTasks
|
||||
|
||||
|
||||
class CassandraClusterTasks(task_models.ClusterTasks):
|
||||
|
||||
def create_cluster(self, context, cluster_id):
|
||||
LOG.debug("Begin create_cluster for id: %s." % cluster_id)
|
||||
|
||||
def _create_cluster():
|
||||
cluster_node_ids = self.find_cluster_node_ids(cluster_id)
|
||||
|
||||
# Wait for cluster nodes to get to cluster-ready status.
|
||||
LOG.debug("Waiting for all nodes to become ready.")
|
||||
if not self._all_instances_ready(cluster_node_ids, cluster_id):
|
||||
return
|
||||
|
||||
cluster_nodes = self.load_cluster_nodes(context, cluster_node_ids)
|
||||
|
||||
LOG.debug("All nodes ready, proceeding with cluster setup.")
|
||||
seeds = self.choose_seed_nodes(cluster_nodes)
|
||||
|
||||
# Configure each cluster node with the list of seeds.
|
||||
# Once all nodes are configured, start the seed nodes one at a time
|
||||
# followed by the rest of the nodes.
|
||||
try:
|
||||
LOG.debug("Selected seed nodes: %s" % seeds)
|
||||
|
||||
for node in cluster_nodes:
|
||||
LOG.debug("Configuring node: %s." % node['id'])
|
||||
node['guest'].set_seeds(seeds)
|
||||
node['guest'].set_auto_bootstrap(False)
|
||||
|
||||
LOG.debug("Starting seed nodes.")
|
||||
for node in cluster_nodes:
|
||||
if node['ip'] in seeds:
|
||||
node['guest'].restart()
|
||||
node['guest'].set_auto_bootstrap(True)
|
||||
|
||||
LOG.debug("All seeds running, starting remaining nodes.")
|
||||
for node in cluster_nodes:
|
||||
if node['ip'] not in seeds:
|
||||
node['guest'].restart()
|
||||
node['guest'].set_auto_bootstrap(True)
|
||||
|
||||
# Create the in-database user via the first node. The remaining
|
||||
# nodes will replicate in-database changes automatically.
|
||||
# Only update the local authentication file on the other nodes.
|
||||
LOG.debug("Securing the cluster.")
|
||||
key = utils.generate_random_password()
|
||||
admin_creds = None
|
||||
for node in cluster_nodes:
|
||||
if admin_creds is None:
|
||||
admin_creds = node['guest'].cluster_secure(key)
|
||||
else:
|
||||
node['guest'].store_admin_credentials(admin_creds)
|
||||
node['guest'].cluster_complete()
|
||||
|
||||
LOG.debug("Cluster configuration finished successfully.")
|
||||
except Exception:
|
||||
LOG.exception(_("Error creating cluster."))
|
||||
self.update_statuses_on_failure(cluster_id)
|
||||
|
||||
timeout = Timeout(CONF.cluster_usage_timeout)
|
||||
try:
|
||||
_create_cluster()
|
||||
self.reset_task()
|
||||
except Timeout as t:
|
||||
if t is not timeout:
|
||||
raise # not my timeout
|
||||
LOG.exception(_("Timeout for building cluster."))
|
||||
self.update_statuses_on_failure(cluster_id)
|
||||
finally:
|
||||
timeout.cancel()
|
||||
|
||||
LOG.debug("End create_cluster for id: %s." % cluster_id)
|
||||
|
||||
@classmethod
|
||||
def find_cluster_node_ids(cls, cluster_id):
|
||||
db_instances = DBInstance.find_all(cluster_id=cluster_id).all()
|
||||
return [db_instance.id for db_instance in db_instances]
|
||||
|
||||
@classmethod
|
||||
def load_cluster_nodes(cls, context, node_ids):
|
||||
return [cls.build_node_info(Instance.load(context, node_id))
|
||||
for node_id in node_ids]
|
||||
|
||||
@classmethod
|
||||
def build_node_info(cls, instance):
|
||||
guest = cls.get_guest(instance)
|
||||
return {'instance': instance,
|
||||
'guest': guest,
|
||||
'id': instance.id,
|
||||
'ip': cls.get_ip(instance),
|
||||
'dc': guest.get_data_center(),
|
||||
'rack': guest.get_rack()}
|
||||
|
||||
@classmethod
|
||||
def choose_seed_nodes(cls, node_info):
|
||||
"""Select gossip seeds. The seeds are cluster nodes from which any
|
||||
new/other cluster nodes request information on the
|
||||
cluster geometry.
|
||||
They should include at least one node from each data center and
|
||||
rack. Gossip optimization is not critical, but it is recommended
|
||||
to use a small seed list.
|
||||
|
||||
Select one (random) node from each dc and rack.
|
||||
|
||||
:param node_info: List of cluster nodes.
|
||||
:type node_info: list of dicts
|
||||
"""
|
||||
ips_by_affinity = cls._group_by_affinity(node_info)
|
||||
return {ips_by_affinity[dc][rack][0]
|
||||
for dc in ips_by_affinity
|
||||
for rack in ips_by_affinity[dc]}
|
||||
|
||||
@classmethod
|
||||
def _group_by_affinity(cls, node_info):
|
||||
"""Group node IPs by affinity to data center and rack."""
|
||||
ips_by_affinity = dict()
|
||||
for node in node_info:
|
||||
ip = node['ip']
|
||||
dc = node['dc']
|
||||
rack = node['rack']
|
||||
if dc in ips_by_affinity:
|
||||
dc_nodes = ips_by_affinity[dc]
|
||||
if rack in dc_nodes:
|
||||
rack_nodes = dc_nodes[rack]
|
||||
rack_nodes.append(ip)
|
||||
else:
|
||||
dc_nodes.update({rack: [ip]})
|
||||
else:
|
||||
ips_by_affinity.update({dc: {rack: [ip]}})
|
||||
|
||||
return ips_by_affinity
|
||||
|
||||
def grow_cluster(self, context, cluster_id, new_instance_ids):
|
||||
LOG.debug("Begin grow_cluster for id: %s." % cluster_id)
|
||||
|
||||
def _grow_cluster():
|
||||
# Wait for new nodes to get to cluster-ready status.
|
||||
LOG.debug("Waiting for new nodes to become ready.")
|
||||
if not self._all_instances_ready(new_instance_ids, cluster_id):
|
||||
return
|
||||
|
||||
new_instances = [Instance.load(context, instance_id)
|
||||
for instance_id in new_instance_ids]
|
||||
added_nodes = [self.build_node_info(instance)
|
||||
for instance in new_instances]
|
||||
|
||||
LOG.debug("All nodes ready, proceeding with cluster setup.")
|
||||
|
||||
cluster_node_ids = self.find_cluster_node_ids(cluster_id)
|
||||
cluster_nodes = self.load_cluster_nodes(context, cluster_node_ids)
|
||||
|
||||
# Recompute the seed nodes based on the updated cluster geometry.
|
||||
seeds = self.choose_seed_nodes(cluster_nodes)
|
||||
|
||||
# Configure each cluster node with the updated list of seeds.
|
||||
# Since we are adding to an existing cluster, ensure that the
|
||||
# new nodes have auto-bootstrapping enabled.
|
||||
# Start the added nodes.
|
||||
try:
|
||||
LOG.debug("Selected seed nodes: %s" % seeds)
|
||||
|
||||
# Update the seeds on all nodes.
|
||||
# Also retrieve the superuser password from one previously
|
||||
# existing node.
|
||||
admin_creds = None
|
||||
for node in cluster_nodes:
|
||||
LOG.debug("Configuring node: %s." % node['id'])
|
||||
node['guest'].set_seeds(seeds)
|
||||
if (admin_creds is None) and (node not in added_nodes):
|
||||
admin_creds = node['guest'].get_admin_credentials()
|
||||
|
||||
# Start any seeds from the added nodes first.
|
||||
LOG.debug("Starting new seed nodes.")
|
||||
for node in added_nodes:
|
||||
if node['ip'] in seeds:
|
||||
node['guest'].set_auto_bootstrap(True)
|
||||
node['guest'].store_admin_credentials(admin_creds)
|
||||
node['guest'].restart()
|
||||
node['guest'].cluster_complete()
|
||||
|
||||
LOG.debug("All new seeds running, starting the remaining of "
|
||||
"added nodes.")
|
||||
for node in added_nodes:
|
||||
if node['ip'] not in seeds:
|
||||
node['guest'].set_auto_bootstrap(True)
|
||||
node['guest'].store_admin_credentials(admin_creds)
|
||||
node['guest'].restart()
|
||||
node['guest'].cluster_complete()
|
||||
|
||||
# Run nodetool cleanup on each of the previously existing nodes
|
||||
# to remove the keys that no longer belong to those nodes.
|
||||
# Wait for cleanup to complete on one node before running
|
||||
# it on the next node.
|
||||
LOG.debug("Cleaning up orphan data on old cluster nodes.")
|
||||
for node in cluster_nodes:
|
||||
if node not in added_nodes:
|
||||
nid = node['id']
|
||||
node['guest'].node_cleanup_begin()
|
||||
node['guest'].node_cleanup()
|
||||
LOG.debug("Waiting for node to finish its "
|
||||
"cleanup: %s" % nid)
|
||||
if not self._all_instances_running([nid], cluster_id):
|
||||
LOG.warning(_("Node did not complete cleanup "
|
||||
"successfully: %s") % nid)
|
||||
|
||||
LOG.debug("Cluster configuration finished successfully.")
|
||||
except Exception:
|
||||
LOG.exception(_("Error growing cluster."))
|
||||
self.update_statuses_on_failure(cluster_id)
|
||||
|
||||
timeout = Timeout(CONF.cluster_usage_timeout)
|
||||
try:
|
||||
_grow_cluster()
|
||||
self.reset_task()
|
||||
except Timeout as t:
|
||||
if t is not timeout:
|
||||
raise # not my timeout
|
||||
LOG.exception(_("Timeout for growing cluster."))
|
||||
self.update_statuses_on_failure(cluster_id)
|
||||
finally:
|
||||
timeout.cancel()
|
||||
|
||||
LOG.debug("End grow_cluster for id: %s." % cluster_id)
|
||||
|
||||
def shrink_cluster(self, context, cluster_id, removal_ids):
|
||||
LOG.debug("Begin shrink_cluster for id: %s." % cluster_id)
|
||||
|
||||
def _shrink_cluster():
|
||||
cluster_node_ids = self.find_cluster_node_ids(cluster_id)
|
||||
cluster_nodes = self.load_cluster_nodes(context, cluster_node_ids)
|
||||
|
||||
removed_nodes = CassandraClusterTasks.load_cluster_nodes(
|
||||
context, removal_ids)
|
||||
|
||||
LOG.debug("All nodes ready, proceeding with cluster setup.")
|
||||
|
||||
# Update the list of seeds on remaining nodes if necessary.
|
||||
# Once all nodes are configured, decommission the removed nodes.
|
||||
# Cassandra will stream data from decommissioned nodes to the
|
||||
# remaining ones.
|
||||
try:
|
||||
|
||||
# All nodes should have the same seeds.
|
||||
# We retrieve current seeds from the first node.
|
||||
test_node = self.load_cluster_nodes(
|
||||
context, cluster_node_ids[:1])[0]
|
||||
current_seeds = test_node['guest'].get_seeds()
|
||||
# The seeds will have to be updated on all remaining instances
|
||||
# if any of the seed nodes is going to be removed.
|
||||
update_seeds = any(node['ip'] in current_seeds
|
||||
for node in removed_nodes)
|
||||
|
||||
LOG.debug("Decommissioning removed nodes.")
|
||||
for node in removed_nodes:
|
||||
node['guest'].node_decommission()
|
||||
node['instance'].update_db(cluster_id=None)
|
||||
|
||||
# Recompute the seed nodes based on the updated cluster
|
||||
# geometry if any of the existing seed nodes was removed.
|
||||
if update_seeds:
|
||||
LOG.debug("Updating seeds on the remaining nodes.")
|
||||
cluster_nodes = self.load_cluster_nodes(
|
||||
context, cluster_node_ids)
|
||||
|
||||
remaining_nodes = [node for node in cluster_nodes
|
||||
if node not in removed_nodes]
|
||||
seeds = self.choose_seed_nodes(remaining_nodes)
|
||||
LOG.debug("Selected seed nodes: %s" % seeds)
|
||||
for node in remaining_nodes:
|
||||
LOG.debug("Configuring node: %s." % node['id'])
|
||||
node['guest'].set_seeds(seeds)
|
||||
|
||||
# Wait for the removed nodes to go SHUTDOWN.
|
||||
LOG.debug("Waiting for all decommissioned nodes to shutdown.")
|
||||
if not self._all_instances_shutdown(removal_ids, cluster_id):
|
||||
# Now detached, failed nodes will stay available
|
||||
# in the list of standalone instances.
|
||||
return
|
||||
|
||||
# Delete decommissioned instances only when the cluster is in a
|
||||
# consistent state.
|
||||
LOG.debug("Deleting decommissioned instances.")
|
||||
for node in removed_nodes:
|
||||
Instance.delete(node['instance'])
|
||||
|
||||
LOG.debug("Cluster configuration finished successfully.")
|
||||
except Exception:
|
||||
LOG.exception(_("Error shrinking cluster."))
|
||||
self.update_statuses_on_failure(cluster_id)
|
||||
|
||||
timeout = Timeout(CONF.cluster_usage_timeout)
|
||||
try:
|
||||
_shrink_cluster()
|
||||
self.reset_task()
|
||||
except Timeout as t:
|
||||
if t is not timeout:
|
||||
raise # not my timeout
|
||||
LOG.exception(_("Timeout for shrinking cluster."))
|
||||
self.update_statuses_on_failure(cluster_id)
|
||||
finally:
|
||||
timeout.cancel()
|
||||
|
||||
LOG.debug("End shrink_cluster for id: %s." % cluster_id)
|
||||
|
||||
|
||||
class CassandraTaskManagerAPI(task_api.API):
|
||||
pass
|
@ -108,7 +108,16 @@ class Manager(manager.Manager):
|
||||
LOG.debug("Applying configuration.")
|
||||
self.app.configuration_manager.save_configuration(
|
||||
config_contents)
|
||||
self.app.apply_initial_guestagent_configuration()
|
||||
cluster_name = None
|
||||
if cluster_config:
|
||||
cluster_name = cluster_config.get('id', None)
|
||||
self.app.apply_initial_guestagent_configuration(
|
||||
cluster_name=cluster_name)
|
||||
|
||||
if cluster_config:
|
||||
self.app.write_cluster_topology(
|
||||
cluster_config['dc'], cluster_config['rack'],
|
||||
prefer_local=True)
|
||||
|
||||
if device_path:
|
||||
LOG.debug("Preparing data volume.")
|
||||
@ -124,20 +133,21 @@ class Manager(manager.Manager):
|
||||
LOG.debug("Mounting new volume.")
|
||||
device.mount(mount_point)
|
||||
|
||||
if backup_info:
|
||||
self._perform_restore(backup_info, context, mount_point)
|
||||
if not cluster_config:
|
||||
if backup_info:
|
||||
self._perform_restore(backup_info, context, mount_point)
|
||||
|
||||
LOG.debug("Starting database with configuration changes.")
|
||||
self.app.start_db(update_db=False)
|
||||
LOG.debug("Starting database with configuration changes.")
|
||||
self.app.start_db(update_db=False)
|
||||
|
||||
if not self.app.has_user_config():
|
||||
LOG.debug("Securing superuser access.")
|
||||
self.app.secure()
|
||||
self.app.restart()
|
||||
if not self.app.has_user_config():
|
||||
LOG.debug("Securing superuser access.")
|
||||
self.app.secure()
|
||||
self.app.restart()
|
||||
|
||||
self.__admin = CassandraAdmin(self.app.get_current_superuser())
|
||||
|
||||
if self.is_root_enabled(context):
|
||||
if not cluster_config and self.is_root_enabled(context):
|
||||
self.status.report_root(context, self.app.default_superuser_name)
|
||||
|
||||
def change_passwords(self, context, users):
|
||||
@ -235,3 +245,39 @@ class Manager(manager.Manager):
|
||||
require restart, so this is a no-op.
|
||||
"""
|
||||
pass
|
||||
|
||||
def get_data_center(self, context):
|
||||
return self.app.get_data_center()
|
||||
|
||||
def get_rack(self, context):
|
||||
return self.app.get_rack()
|
||||
|
||||
def set_seeds(self, context, seeds):
|
||||
self.app.set_seeds(seeds)
|
||||
|
||||
def get_seeds(self, context):
|
||||
return self.app.get_seeds()
|
||||
|
||||
def set_auto_bootstrap(self, context, enabled):
|
||||
self.app.set_auto_bootstrap(enabled)
|
||||
|
||||
def node_cleanup_begin(self, context):
|
||||
self.app.node_cleanup_begin()
|
||||
|
||||
def node_cleanup(self, context):
|
||||
self.app.node_cleanup()
|
||||
|
||||
def node_decommission(self, context):
|
||||
self.app.node_decommission()
|
||||
|
||||
def cluster_secure(self, context, password):
|
||||
os_admin = self.app.cluster_secure(password)
|
||||
self.__admin = CassandraAdmin(self.app.get_current_superuser())
|
||||
return os_admin
|
||||
|
||||
def get_admin_credentials(self, context):
|
||||
return self.app.get_admin_credentials()
|
||||
|
||||
def store_admin_credentials(self, context, admin_credentials):
|
||||
self.app.store_admin_credentials(admin_credentials)
|
||||
self.__admin = CassandraAdmin(self.app.get_current_superuser())
|
||||
|
@ -30,12 +30,14 @@ from trove.common.i18n import _
|
||||
from trove.common import instance as rd_instance
|
||||
from trove.common import pagination
|
||||
from trove.common.stream_codecs import IniCodec
|
||||
from trove.common.stream_codecs import PropertiesCodec
|
||||
from trove.common.stream_codecs import SafeYamlCodec
|
||||
from trove.common import utils
|
||||
from trove.guestagent.common.configuration import ConfigurationManager
|
||||
from trove.guestagent.common.configuration import OneFileOverrideStrategy
|
||||
from trove.guestagent.common import guestagent_utils
|
||||
from trove.guestagent.common import operating_system
|
||||
from trove.guestagent.common.operating_system import FileMode
|
||||
from trove.guestagent.datastore import service
|
||||
from trove.guestagent.db import models
|
||||
from trove.guestagent import pkg
|
||||
@ -59,6 +61,13 @@ class CassandraApp(object):
|
||||
_CONF_DIR_MODS = stat.S_IRWXU
|
||||
_CONF_FILE_MODS = stat.S_IRUSR
|
||||
|
||||
CASSANDRA_CONF_FILE = "cassandra.yaml"
|
||||
CASSANDRA_TOPOLOGY_FILE = 'cassandra-rackdc.properties'
|
||||
|
||||
_TOPOLOGY_CODEC = PropertiesCodec(
|
||||
delimiter='=', unpack_singletons=True, string_mappings={
|
||||
'true': True, 'false': False})
|
||||
|
||||
CASSANDRA_KILL_CMD = "sudo killall java || true"
|
||||
|
||||
def __init__(self):
|
||||
@ -79,16 +88,23 @@ class CassandraApp(object):
|
||||
return ['cassandra']
|
||||
|
||||
@property
|
||||
def cassandra_conf(self):
|
||||
def cassandra_conf_dir(self):
|
||||
return {
|
||||
operating_system.REDHAT:
|
||||
"/etc/cassandra/default.conf/cassandra.yaml",
|
||||
operating_system.DEBIAN:
|
||||
"/etc/cassandra/cassandra.yaml",
|
||||
operating_system.SUSE:
|
||||
"/etc/cassandra/default.conf/cassandra.yaml"
|
||||
operating_system.REDHAT: "/etc/cassandra/default.conf/",
|
||||
operating_system.DEBIAN: "/etc/cassandra/",
|
||||
operating_system.SUSE: "/etc/cassandra/default.conf/"
|
||||
}[operating_system.get_os()]
|
||||
|
||||
@property
|
||||
def cassandra_conf(self):
|
||||
return guestagent_utils.build_file_path(self.cassandra_conf_dir,
|
||||
self.CASSANDRA_CONF_FILE)
|
||||
|
||||
@property
|
||||
def cassandra_topology(self):
|
||||
return guestagent_utils.build_file_path(self.cassandra_conf_dir,
|
||||
self.CASSANDRA_TOPOLOGY_FILE)
|
||||
|
||||
@property
|
||||
def cassandra_owner(self):
|
||||
return 'cassandra'
|
||||
@ -248,7 +264,10 @@ class CassandraApp(object):
|
||||
finally:
|
||||
self.stop_db() # Always restore the initial state of the service.
|
||||
|
||||
def secure(self, update_user=None):
|
||||
def cluster_secure(self, password):
|
||||
return self.secure(password=password).serialize()
|
||||
|
||||
def secure(self, update_user=None, password=None):
|
||||
"""Configure the Trove administrative user.
|
||||
Update an existing user if given.
|
||||
Create a new one using the default database credentials
|
||||
@ -256,29 +275,39 @@ class CassandraApp(object):
|
||||
"""
|
||||
LOG.info(_('Configuring Trove superuser.'))
|
||||
|
||||
current_superuser = update_user or models.CassandraUser(
|
||||
self.default_superuser_name,
|
||||
self.default_superuser_password)
|
||||
if password is None:
|
||||
password = utils.generate_random_password()
|
||||
|
||||
admin_username = update_user.name if update_user else self._ADMIN_USER
|
||||
os_admin = models.CassandraUser(admin_username, password)
|
||||
|
||||
if update_user:
|
||||
os_admin = models.CassandraUser(update_user.name,
|
||||
utils.generate_random_password())
|
||||
CassandraAdmin(current_superuser).alter_user_password(os_admin)
|
||||
CassandraAdmin(update_user).alter_user_password(os_admin)
|
||||
else:
|
||||
os_admin = models.CassandraUser(self._ADMIN_USER,
|
||||
utils.generate_random_password())
|
||||
CassandraAdmin(current_superuser)._create_superuser(os_admin)
|
||||
CassandraAdmin(os_admin).drop_user(current_superuser)
|
||||
cassandra = models.CassandraUser(
|
||||
self.default_superuser_name, self.default_superuser_password)
|
||||
CassandraAdmin(cassandra)._create_superuser(os_admin)
|
||||
CassandraAdmin(os_admin).drop_user(cassandra)
|
||||
|
||||
self.__create_cqlsh_config({self._CONF_AUTH_SEC:
|
||||
{self._CONF_USR_KEY: os_admin.name,
|
||||
self._CONF_PWD_KEY: os_admin.password}})
|
||||
|
||||
# Update the internal status with the new user.
|
||||
self.status = CassandraAppStatus(os_admin)
|
||||
self._update_admin_credentials(os_admin)
|
||||
|
||||
return os_admin
|
||||
|
||||
def _update_admin_credentials(self, user):
|
||||
self.__create_cqlsh_config({self._CONF_AUTH_SEC:
|
||||
{self._CONF_USR_KEY: user.name,
|
||||
self._CONF_PWD_KEY: user.password}})
|
||||
|
||||
# Update the internal status with the new user.
|
||||
self.status = CassandraAppStatus(user)
|
||||
|
||||
def store_admin_credentials(self, admin_credentials):
|
||||
user = models.CassandraUser.deserialize_user(admin_credentials)
|
||||
self._update_admin_credentials(user)
|
||||
|
||||
def get_admin_credentials(self):
|
||||
return self.get_current_superuser().serialize()
|
||||
|
||||
def _reset_admin_password(self):
|
||||
"""
|
||||
Reset the password of the Trove's administrative superuser.
|
||||
@ -424,6 +453,14 @@ class CassandraApp(object):
|
||||
{'data_file_directories': [self.cassandra_data_dir]})
|
||||
self._make_host_reachable()
|
||||
self._update_cluster_name_property(cluster_name or CONF.guest_id)
|
||||
# A single-node instance may use the SimpleSnitch
|
||||
# (keyspaces use SimpleStrategy).
|
||||
# A network-aware snitch has to be used otherwise.
|
||||
if cluster_name is None:
|
||||
updates = {'endpoint_snitch': 'SimpleSnitch'}
|
||||
else:
|
||||
updates = {'endpoint_snitch': 'GossipingPropertyFileSnitch'}
|
||||
self.configuration_manager.apply_system_override(updates)
|
||||
|
||||
def _make_host_reachable(self):
|
||||
"""
|
||||
@ -498,6 +535,21 @@ class CassandraApp(object):
|
||||
def remove_overrides(self):
|
||||
self.configuration_manager.remove_user_override()
|
||||
|
||||
def write_cluster_topology(self, data_center, rack, prefer_local=True):
|
||||
LOG.info(_('Saving Cassandra cluster topology configuration.'))
|
||||
|
||||
config = {'dc': data_center,
|
||||
'rack': rack,
|
||||
'prefer_local': prefer_local}
|
||||
|
||||
operating_system.write_file(self.cassandra_topology, config,
|
||||
codec=self._TOPOLOGY_CODEC, as_root=True)
|
||||
operating_system.chown(
|
||||
self.cassandra_topology,
|
||||
self.cassandra_owner, self.cassandra_owner, as_root=True)
|
||||
operating_system.chmod(
|
||||
self.cassandra_topology, FileMode.ADD_READ_ALL, as_root=True)
|
||||
|
||||
def start_db_with_conf_changes(self, config_contents):
|
||||
LOG.debug("Starting database with configuration changes.")
|
||||
if self.status.is_running:
|
||||
@ -517,6 +569,106 @@ class CassandraApp(object):
|
||||
def _get_cqlsh_conf_path(self):
|
||||
return os.path.expanduser(self.cqlsh_conf_path)
|
||||
|
||||
def get_data_center(self):
|
||||
config = operating_system.read_file(self.cassandra_topology,
|
||||
codec=self._TOPOLOGY_CODEC)
|
||||
return config['dc']
|
||||
|
||||
def get_rack(self):
|
||||
config = operating_system.read_file(self.cassandra_topology,
|
||||
codec=self._TOPOLOGY_CODEC)
|
||||
return config['rack']
|
||||
|
||||
def set_seeds(self, seeds):
|
||||
LOG.debug("Setting seed nodes: %s" % seeds)
|
||||
updates = {
|
||||
'seed_provider': {'parameters':
|
||||
[{'seeds': ','.join(seeds)}]
|
||||
}
|
||||
}
|
||||
|
||||
self.configuration_manager.apply_system_override(updates)
|
||||
|
||||
def get_seeds(self):
|
||||
"""Return a list of seed node IPs if any.
|
||||
|
||||
The seed IPs are stored as a comma-separated string in the
|
||||
seed-provider parameters:
|
||||
[{'class_name': '<name>', 'parameters': [{'seeds': '<ip>,<ip>'}, ...]}]
|
||||
"""
|
||||
|
||||
def find_first(key, dict_list):
|
||||
for item in dict_list:
|
||||
if key in item:
|
||||
return item[key]
|
||||
return []
|
||||
|
||||
sp_property = self.configuration_manager.get_value('seed_provider', [])
|
||||
seeds_str = find_first('seeds', find_first('parameters', sp_property))
|
||||
return seeds_str.split(',') if seeds_str else []
|
||||
|
||||
def set_auto_bootstrap(self, enabled):
|
||||
"""Auto-bootstrap makes new (non-seed) nodes automatically migrate the
|
||||
right data to themselves.
|
||||
The feature has to be turned OFF when initializing a fresh cluster
|
||||
without data.
|
||||
It must be turned back ON once the cluster is initialized.
|
||||
"""
|
||||
LOG.debug("Setting auto-bootstrapping: %s" % enabled)
|
||||
updates = {'auto_bootstrap': enabled}
|
||||
self.configuration_manager.apply_system_override(updates)
|
||||
|
||||
def node_cleanup_begin(self):
|
||||
"""Suspend periodic status updates and mark the instance busy
|
||||
throughout the operation.
|
||||
"""
|
||||
self.status.begin_restart()
|
||||
self.status.set_status(rd_instance.ServiceStatuses.BLOCKED)
|
||||
|
||||
def node_cleanup(self):
|
||||
"""Cassandra does not automatically remove data from nodes that
|
||||
lose part of their partition range to a newly added node.
|
||||
Cleans up keyspaces and partition keys no longer belonging to the node.
|
||||
|
||||
Do not treat cleanup failures as fatal. Resume the heartbeat after
|
||||
finishing and let it signal the true state of the instance to the
|
||||
caller.
|
||||
"""
|
||||
LOG.debug("Running node cleanup.")
|
||||
# nodetool -h <HOST> -p <PORT> -u <USER> -pw <PASSWORD> cleanup
|
||||
try:
|
||||
self._run_nodetool_command('cleanup')
|
||||
self.status.set_status(rd_instance.ServiceStatuses.RUNNING)
|
||||
except Exception:
|
||||
LOG.exception(_("The node failed to complete its cleanup."))
|
||||
finally:
|
||||
self.status.end_restart()
|
||||
|
||||
def node_decommission(self):
|
||||
"""Causes a live node to decommission itself,
|
||||
streaming its data to the next node on the ring.
|
||||
|
||||
Shutdown the database after successfully finishing the operation,
|
||||
or leave the node in a failed state otherwise.
|
||||
|
||||
Suspend periodic status updates, so that the caller can poll for the
|
||||
database shutdown.
|
||||
"""
|
||||
LOG.debug("Decommissioning the node.")
|
||||
# nodetool -h <HOST> -p <PORT> -u <USER> -pw <PASSWORD> decommission
|
||||
self.status.begin_restart()
|
||||
try:
|
||||
self._run_nodetool_command('decommission')
|
||||
except Exception:
|
||||
LOG.exception(_("The node failed to decommission itself."))
|
||||
self.status.set_status(rd_instance.ServiceStatuses.FAILED)
|
||||
return
|
||||
|
||||
try:
|
||||
self.stop_db(update_db=True, do_not_start_on_reboot=True)
|
||||
finally:
|
||||
self.status.end_restart()
|
||||
|
||||
def flush_tables(self, keyspace, *tables):
|
||||
"""Flushes one or more tables from the memtable.
|
||||
"""
|
||||
@ -528,11 +680,8 @@ class CassandraApp(object):
|
||||
def _run_nodetool_command(self, cmd, *args, **kwargs):
|
||||
"""Execute a nodetool command on this node.
|
||||
"""
|
||||
cassandra = self.get_current_superuser()
|
||||
return utils.execute('nodetool',
|
||||
'-h', 'localhost',
|
||||
'-u', cassandra.name,
|
||||
'-pw', cassandra.password, cmd, *args, **kwargs)
|
||||
return utils.execute('nodetool', '-h', 'localhost',
|
||||
cmd, *args, **kwargs)
|
||||
|
||||
def enable_root(self, root_password=None):
|
||||
"""Cassandra's 'root' user is called 'cassandra'.
|
||||
|
@ -208,59 +208,85 @@ class ClusterTasks(Cluster):
|
||||
|
||||
def _all_instances_ready(self, instance_ids, cluster_id,
|
||||
shard_id=None):
|
||||
"""Wait for all instances to get READY."""
|
||||
return self._all_instances_acquire_status(
|
||||
instance_ids, cluster_id, shard_id, ServiceStatuses.INSTANCE_READY,
|
||||
fast_fail_statuses=[ServiceStatuses.FAILED,
|
||||
ServiceStatuses.FAILED_TIMEOUT_GUESTAGENT])
|
||||
|
||||
def _all_status_ready(ids):
|
||||
LOG.debug("Checking service status of instance ids: %s" % ids)
|
||||
def _all_instances_shutdown(self, instance_ids, cluster_id,
|
||||
shard_id=None):
|
||||
"""Wait for all instances to go SHUTDOWN."""
|
||||
return self._all_instances_acquire_status(
|
||||
instance_ids, cluster_id, shard_id, ServiceStatuses.SHUTDOWN,
|
||||
fast_fail_statuses=[ServiceStatuses.FAILED,
|
||||
ServiceStatuses.FAILED_TIMEOUT_GUESTAGENT])
|
||||
|
||||
def _all_instances_running(self, instance_ids, cluster_id, shard_id=None):
|
||||
"""Wait for all instances to become ACTIVE."""
|
||||
return self._all_instances_acquire_status(
|
||||
instance_ids, cluster_id, shard_id, ServiceStatuses.RUNNING,
|
||||
fast_fail_statuses=[ServiceStatuses.FAILED,
|
||||
ServiceStatuses.FAILED_TIMEOUT_GUESTAGENT])
|
||||
|
||||
def _all_instances_acquire_status(
|
||||
self, instance_ids, cluster_id, shard_id, expected_status,
|
||||
fast_fail_statuses=None):
|
||||
|
||||
def _is_fast_fail_status(status):
|
||||
return ((fast_fail_statuses is not None) and
|
||||
((status == fast_fail_statuses) or
|
||||
(status in fast_fail_statuses)))
|
||||
|
||||
def _all_have_status(ids):
|
||||
for instance_id in ids:
|
||||
status = InstanceServiceStatus.find_by(
|
||||
instance_id=instance_id).get_status()
|
||||
if (status == ServiceStatuses.FAILED or
|
||||
status == ServiceStatuses.FAILED_TIMEOUT_GUESTAGENT):
|
||||
# if one has failed, no need to continue polling
|
||||
LOG.debug("Instance %s in %s, exiting polling." % (
|
||||
instance_id, status))
|
||||
return True
|
||||
if status != ServiceStatuses.INSTANCE_READY:
|
||||
# if one is not in a cluster-ready state,
|
||||
# continue polling
|
||||
LOG.debug("Instance %s in %s, continue polling." % (
|
||||
instance_id, status))
|
||||
return False
|
||||
LOG.debug("Instances are ready, exiting polling for: %s" % ids)
|
||||
if _is_fast_fail_status(status):
|
||||
# if one has failed, no need to continue polling
|
||||
LOG.debug("Instance %s has acquired a fast-fail status %s."
|
||||
% (instance_id, status))
|
||||
return True
|
||||
if status != expected_status:
|
||||
# if one is not in the expected state, continue polling
|
||||
LOG.debug("Instance %s was %s." % (instance_id, status))
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def _instance_ids_with_failures(ids):
|
||||
LOG.debug("Checking for service status failures for "
|
||||
"instance ids: %s" % ids)
|
||||
LOG.debug("Checking for service failures on instances: %s"
|
||||
% ids)
|
||||
failed_instance_ids = []
|
||||
for instance_id in ids:
|
||||
status = InstanceServiceStatus.find_by(
|
||||
instance_id=instance_id).get_status()
|
||||
if (status == ServiceStatuses.FAILED or
|
||||
status == ServiceStatuses.FAILED_TIMEOUT_GUESTAGENT):
|
||||
failed_instance_ids.append(instance_id)
|
||||
if _is_fast_fail_status(status):
|
||||
failed_instance_ids.append(instance_id)
|
||||
return failed_instance_ids
|
||||
|
||||
LOG.debug("Polling until service status is ready for "
|
||||
"instance ids: %s" % instance_ids)
|
||||
LOG.debug("Polling until all instances acquire %s status: %s"
|
||||
% (expected_status, instance_ids))
|
||||
try:
|
||||
utils.poll_until(lambda: instance_ids,
|
||||
lambda ids: _all_status_ready(ids),
|
||||
lambda ids: _all_have_status(ids),
|
||||
sleep_time=USAGE_SLEEP_TIME,
|
||||
time_out=CONF.usage_timeout)
|
||||
except PollTimeOut:
|
||||
LOG.exception(_("Timeout for all instance service statuses "
|
||||
"to become ready."))
|
||||
LOG.exception(_("Timed out while waiting for all instances "
|
||||
"to become %s.") % expected_status)
|
||||
self.update_statuses_on_failure(cluster_id, shard_id)
|
||||
return False
|
||||
|
||||
failed_ids = _instance_ids_with_failures(instance_ids)
|
||||
if failed_ids:
|
||||
LOG.error(_("Some instances failed to become ready: %s") %
|
||||
failed_ids)
|
||||
LOG.error(_("Some instances failed: %s") % failed_ids)
|
||||
self.update_statuses_on_failure(cluster_id, shard_id)
|
||||
return False
|
||||
|
||||
LOG.debug("All instances have acquired the expected status %s."
|
||||
% expected_status)
|
||||
|
||||
return True
|
||||
|
||||
def delete_cluster(self, context, cluster_id):
|
||||
|
@ -200,7 +200,7 @@ register(["db2_supported"], common_groups,
|
||||
database_actions_groups, user_actions_groups)
|
||||
register(["cassandra_supported"], common_groups,
|
||||
user_actions_groups, database_actions_groups,
|
||||
backup_groups, configuration_groups)
|
||||
backup_groups, configuration_groups, cluster_actions_groups)
|
||||
register(["couchbase_supported"], common_groups, backup_groups,
|
||||
root_actions_groups)
|
||||
register(["couchdb_supported"], common_groups)
|
||||
|
@ -57,7 +57,6 @@ class CassandraClient(object):
|
||||
class CassandraHelper(TestHelper):
|
||||
|
||||
DATA_COLUMN_NAME = 'value'
|
||||
cluster_node_count = 2
|
||||
|
||||
def __init__(self, expected_override_name):
|
||||
super(CassandraHelper, self).__init__(expected_override_name)
|
||||
|
@ -18,8 +18,6 @@ from trove.tests.scenario.helpers.mysql_helper import MysqlHelper
|
||||
|
||||
class MariadbHelper(MysqlHelper):
|
||||
|
||||
cluster_node_count = 3
|
||||
|
||||
def __init__(self, expected_override_name):
|
||||
super(MariadbHelper, self).__init__(expected_override_name)
|
||||
|
||||
|
@ -18,8 +18,6 @@ from trove.tests.scenario.helpers.test_helper import TestHelper
|
||||
|
||||
class MongodbHelper(TestHelper):
|
||||
|
||||
cluster_node_count = 2
|
||||
|
||||
def __init__(self, expected_override_name):
|
||||
super(MongodbHelper, self).__init__(expected_override_name)
|
||||
|
||||
|
@ -18,7 +18,5 @@ from trove.tests.scenario.helpers.mysql_helper import MysqlHelper
|
||||
|
||||
class PxcHelper(MysqlHelper):
|
||||
|
||||
cluster_node_count = 3
|
||||
|
||||
def __init__(self, expected_override_name):
|
||||
super(PxcHelper, self).__init__(expected_override_name)
|
||||
|
@ -22,8 +22,6 @@ from trove.tests.scenario.runners.test_runners import TestRunner
|
||||
|
||||
class RedisHelper(TestHelper):
|
||||
|
||||
cluster_node_count = 2
|
||||
|
||||
def __init__(self, expected_override_name):
|
||||
super(RedisHelper, self).__init__(expected_override_name)
|
||||
|
||||
|
@ -55,7 +55,7 @@ class ClusterActionsRunner(TestRunner):
|
||||
expected_instance_states=['BUILD', 'ACTIVE'],
|
||||
expected_http_code=200):
|
||||
if not num_nodes:
|
||||
num_nodes = self.test_helper.cluster_node_count
|
||||
num_nodes = self.min_cluster_node_count
|
||||
|
||||
instances_def = [
|
||||
self.build_flavor(
|
||||
@ -66,6 +66,10 @@ class ClusterActionsRunner(TestRunner):
|
||||
'test_cluster', instances_def, expected_task_name,
|
||||
expected_instance_states, expected_http_code)
|
||||
|
||||
@property
|
||||
def min_cluster_node_count(self):
|
||||
return 2
|
||||
|
||||
def assert_cluster_create(
|
||||
self, cluster_name, instances_def, expected_task_name,
|
||||
expected_instance_states, expected_http_code):
|
||||
@ -337,12 +341,36 @@ class ClusterActionsRunner(TestRunner):
|
||||
self.assert_client_code(404)
|
||||
|
||||
|
||||
class MariadbClusterActionsRunner(ClusterActionsRunner):
|
||||
class CassandraClusterActionsRunner(ClusterActionsRunner):
|
||||
|
||||
def run_cluster_root_enable(self):
|
||||
raise SkipTest("Operation is currently not supported.")
|
||||
|
||||
|
||||
class MariadbClusterActionsRunner(ClusterActionsRunner):
|
||||
|
||||
@property
|
||||
def min_cluster_node_count(self):
|
||||
return self.get_datastore_config_property('min_cluster_member_count')
|
||||
|
||||
def run_cluster_root_enable(self):
|
||||
raise SkipTest("Operation is currently not supported.")
|
||||
|
||||
|
||||
class PxcClusterActionsRunner(ClusterActionsRunner):
|
||||
|
||||
@property
|
||||
def min_cluster_node_count(self):
|
||||
return self.get_datastore_config_property('min_cluster_member_count')
|
||||
|
||||
|
||||
class VerticaClusterActionsRunner(ClusterActionsRunner):
|
||||
|
||||
@property
|
||||
def min_cluster_node_count(self):
|
||||
return self.get_datastore_config_property('cluster_member_count')
|
||||
|
||||
|
||||
class RedisClusterActionsRunner(ClusterActionsRunner):
|
||||
|
||||
def run_cluster_root_enable(self):
|
||||
|
@ -76,6 +76,15 @@ class MongodbNegativeClusterActionsRunner(NegativeClusterActionsRunner):
|
||||
max_nodes=3)
|
||||
|
||||
|
||||
class CassandraNegativeClusterActionsRunner(NegativeClusterActionsRunner):
|
||||
|
||||
def run_create_constrained_size_cluster(self):
|
||||
raise SkipTest("No constraints apply to the number of cluster nodes.")
|
||||
|
||||
def run_create_heterogeneous_cluster(self):
|
||||
raise SkipTest("No constraints apply to the size of cluster nodes.")
|
||||
|
||||
|
||||
class RedisNegativeClusterActionsRunner(NegativeClusterActionsRunner):
|
||||
|
||||
def run_create_constrained_size_cluster(self):
|
||||
|
99
trove/tests/unittests/cluster/test_cassandra_cluster.py
Normal file
99
trove/tests/unittests/cluster/test_cassandra_cluster.py
Normal file
@ -0,0 +1,99 @@
|
||||
# Copyright 2016 Tesora Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from mock import ANY
|
||||
from mock import MagicMock
|
||||
from mock import Mock
|
||||
from mock import patch
|
||||
|
||||
from trove.cluster import models
|
||||
from trove.common.strategies.cluster.experimental.cassandra.api \
|
||||
import CassandraCluster
|
||||
from trove.common.strategies.cluster.experimental.cassandra.taskmanager \
|
||||
import CassandraClusterTasks
|
||||
from trove.instance import models as inst_models
|
||||
from trove.quota import quota
|
||||
from trove.tests.unittests import trove_testtools
|
||||
|
||||
|
||||
class ClusterTest(trove_testtools.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(ClusterTest, self).setUp()
|
||||
self.context = trove_testtools.TroveTestContext(self)
|
||||
|
||||
def tearDown(self):
|
||||
super(ClusterTest, self).tearDown()
|
||||
|
||||
@patch.object(inst_models.Instance, 'create')
|
||||
@patch.object(quota.QUOTAS, 'check_quotas')
|
||||
@patch.object(models, 'get_flavors_from_instance_defs')
|
||||
@patch.object(models, 'get_required_volume_size', return_value=3)
|
||||
def test_create_cluster_instances(self, get_vol_size, _, check_quotas,
|
||||
inst_create):
|
||||
test_instances = [MagicMock(), MagicMock()]
|
||||
num_instances = len(test_instances)
|
||||
datastore = Mock(manager='cassandra')
|
||||
datastore_version = Mock(manager='cassandra')
|
||||
|
||||
with patch.object(CassandraClusterTasks, 'find_cluster_node_ids',
|
||||
return_value=[inst.id for inst in test_instances]):
|
||||
CassandraCluster._create_cluster_instances(
|
||||
self.context, 'test_cluster_id', 'test_cluster',
|
||||
datastore, datastore_version,
|
||||
test_instances)
|
||||
|
||||
check_quotas.assert_called_once_with(
|
||||
ANY, instances=num_instances, volumes=get_vol_size.return_value)
|
||||
self.assertEqual(num_instances, inst_create.call_count,
|
||||
"Unexpected number of instances created.")
|
||||
|
||||
def test_choose_seed_nodes(self):
|
||||
nodes = self._build_mock_nodes(3)
|
||||
|
||||
seeds = CassandraClusterTasks.choose_seed_nodes(nodes)
|
||||
self.assertEqual(1, len(seeds),
|
||||
"Only one seed node should be selected for a "
|
||||
"single-rack-single-dc cluster.")
|
||||
|
||||
nodes = self._build_mock_nodes(3)
|
||||
nodes[0]['rack'] = 'rack1'
|
||||
nodes[1]['rack'] = 'rack2'
|
||||
seeds = CassandraClusterTasks.choose_seed_nodes(nodes)
|
||||
self.assertEqual(2, len(seeds),
|
||||
"There should be exactly two seed nodes. "
|
||||
"One from each rack.")
|
||||
|
||||
nodes = self._build_mock_nodes(3)
|
||||
nodes[0]['rack'] = 'rack1'
|
||||
nodes[1]['rack'] = 'rack2'
|
||||
nodes[2]['dc'] = 'dc2'
|
||||
seeds = CassandraClusterTasks.choose_seed_nodes(nodes)
|
||||
self.assertEqual(3, len(seeds),
|
||||
"There should be exactly three seed nodes. "
|
||||
"One from each rack and data center.")
|
||||
|
||||
def _build_mock_nodes(self, num_nodes):
|
||||
nodes = []
|
||||
for _ in range(num_nodes):
|
||||
mock_instance = MagicMock()
|
||||
nodes.append({'instance': mock_instance,
|
||||
'guest': MagicMock(),
|
||||
'id': mock_instance.id,
|
||||
'ip': '%s_IP' % mock_instance.id,
|
||||
'dc': 'dc1',
|
||||
'rack': 'rack1'
|
||||
})
|
||||
return nodes
|
@ -296,7 +296,8 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase):
|
||||
mock_app.install_if_needed.assert_any_call(packages)
|
||||
mock_app._remove_system_tables.assert_any_call()
|
||||
mock_app.init_storage_structure.assert_any_call('/var/lib/cassandra')
|
||||
mock_app.apply_initial_guestagent_configuration.assert_any_call()
|
||||
mock_app.apply_initial_guestagent_configuration.assert_any_call(
|
||||
cluster_name=None)
|
||||
mock_app.start_db.assert_any_call(update_db=False)
|
||||
mock_app.stop_db.assert_any_call()
|
||||
if backup_info:
|
||||
|
Loading…
Reference in New Issue
Block a user