Merge "Vertica Cluster Grow and Shrink"
This commit is contained in:
commit
35b5ac0646
@ -1155,6 +1155,8 @@ vertica_opts = [
|
|||||||
help='Root controller implementation for Vertica.'),
|
help='Root controller implementation for Vertica.'),
|
||||||
cfg.StrOpt('guest_log_exposed_logs', default='',
|
cfg.StrOpt('guest_log_exposed_logs', default='',
|
||||||
help='List of Guest Logs to expose for publishing.'),
|
help='List of Guest Logs to expose for publishing.'),
|
||||||
|
cfg.IntOpt('min_ksafety', default=0,
|
||||||
|
help='Minimum k-safety setting permitted for vertica clusters'),
|
||||||
]
|
]
|
||||||
|
|
||||||
# DB2
|
# DB2
|
||||||
|
@ -557,6 +557,12 @@ class ClusterNumInstancesNotLargeEnough(TroveError):
|
|||||||
"be at least %(num_instances)s.")
|
"be at least %(num_instances)s.")
|
||||||
|
|
||||||
|
|
||||||
|
class ClusterNumInstancesBelowSafetyThreshold(TroveError):
|
||||||
|
message = _("The number of instances in your cluster cannot "
|
||||||
|
"safely be lowered below the current level based"
|
||||||
|
"on your current fault-tolerance settings.")
|
||||||
|
|
||||||
|
|
||||||
class ClusterShrinkMustNotLeaveClusterEmpty(TroveError):
|
class ClusterShrinkMustNotLeaveClusterEmpty(TroveError):
|
||||||
message = _("Must leave at least one instance in the cluster when "
|
message = _("Must leave at least one instance in the cluster when "
|
||||||
"shrinking.")
|
"shrinking.")
|
||||||
|
@ -21,6 +21,7 @@ from trove.common import cfg
|
|||||||
from trove.common import exception
|
from trove.common import exception
|
||||||
from trove.common import remote
|
from trove.common import remote
|
||||||
from trove.common.strategies.cluster import base
|
from trove.common.strategies.cluster import base
|
||||||
|
from trove.common import utils
|
||||||
from trove.extensions.mgmt.clusters.views import MgmtClusterView
|
from trove.extensions.mgmt.clusters.views import MgmtClusterView
|
||||||
from trove.instance import models as inst_models
|
from trove.instance import models as inst_models
|
||||||
from trove.quota.quota import check_quotas
|
from trove.quota.quota import check_quotas
|
||||||
@ -37,6 +38,25 @@ class VerticaAPIStrategy(base.BaseAPIStrategy):
|
|||||||
def cluster_class(self):
|
def cluster_class(self):
|
||||||
return VerticaCluster
|
return VerticaCluster
|
||||||
|
|
||||||
|
def _action_grow(self, cluster, body):
|
||||||
|
nodes = body['grow']
|
||||||
|
instances = []
|
||||||
|
for node in nodes:
|
||||||
|
instance = {
|
||||||
|
'flavor_id': utils.get_id_from_href(node['flavorRef'])
|
||||||
|
}
|
||||||
|
if 'name' in node:
|
||||||
|
instance['name'] = node['name']
|
||||||
|
if 'volume' in node:
|
||||||
|
instance['volume_size'] = int(node['volume']['size'])
|
||||||
|
instances.append(instance)
|
||||||
|
return cluster.grow(instances)
|
||||||
|
|
||||||
|
def _action_shrink(self, cluster, body):
|
||||||
|
nodes = body['shrink']
|
||||||
|
instance_ids = [node['id'] for node in nodes]
|
||||||
|
return cluster.shrink(instance_ids)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def cluster_view_class(self):
|
def cluster_view_class(self):
|
||||||
return VerticaClusterView
|
return VerticaClusterView
|
||||||
@ -48,15 +68,18 @@ class VerticaAPIStrategy(base.BaseAPIStrategy):
|
|||||||
|
|
||||||
class VerticaCluster(models.Cluster):
|
class VerticaCluster(models.Cluster):
|
||||||
|
|
||||||
@classmethod
|
@staticmethod
|
||||||
def create(cls, context, name, datastore, datastore_version,
|
def _create_instances(context, db_info, datastore, datastore_version,
|
||||||
instances, extended_properties):
|
instances, new_cluster):
|
||||||
LOG.debug("Initiating cluster creation.")
|
|
||||||
vertica_conf = CONF.get(datastore_version.manager)
|
vertica_conf = CONF.get(datastore_version.manager)
|
||||||
num_instances = len(instances)
|
num_instances = len(instances)
|
||||||
|
|
||||||
|
existing = inst_models.DBInstance.find_all(cluster_id=db_info.id).all()
|
||||||
|
num_existing = len(existing)
|
||||||
|
|
||||||
# Matching number of instances with configured cluster_member_count
|
# Matching number of instances with configured cluster_member_count
|
||||||
if num_instances != vertica_conf.cluster_member_count:
|
if new_cluster \
|
||||||
|
and num_instances != vertica_conf.cluster_member_count:
|
||||||
raise exception.ClusterNumInstancesNotSupported(
|
raise exception.ClusterNumInstancesNotSupported(
|
||||||
num_instances=vertica_conf.cluster_member_count)
|
num_instances=vertica_conf.cluster_member_count)
|
||||||
|
|
||||||
@ -98,36 +121,119 @@ class VerticaCluster(models.Cluster):
|
|||||||
azs = [instance.get('availability_zone', None)
|
azs = [instance.get('availability_zone', None)
|
||||||
for instance in instances]
|
for instance in instances]
|
||||||
|
|
||||||
# Updating Cluster Task
|
# Creating member instances
|
||||||
|
minstances = []
|
||||||
|
for i in range(0, num_instances):
|
||||||
|
if i == 0 and new_cluster:
|
||||||
|
member_config = {"id": db_info.id, "instance_type": "master"}
|
||||||
|
else:
|
||||||
|
member_config = {"id": db_info.id, "instance_type": "member"}
|
||||||
|
instance_name = "%s-member-%s" % (db_info.name,
|
||||||
|
str(i + num_existing + 1))
|
||||||
|
minstances.append(
|
||||||
|
inst_models.Instance.create(context, instance_name,
|
||||||
|
flavor_id,
|
||||||
|
datastore_version.image_id,
|
||||||
|
[], [], datastore,
|
||||||
|
datastore_version,
|
||||||
|
volume_size, None,
|
||||||
|
nics=nics[i],
|
||||||
|
availability_zone=azs[i],
|
||||||
|
configuration_id=None,
|
||||||
|
cluster_config=member_config)
|
||||||
|
)
|
||||||
|
return minstances
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def create(cls, context, name, datastore, datastore_version,
|
||||||
|
instances, extended_properties):
|
||||||
|
LOG.debug("Initiating cluster creation.")
|
||||||
|
|
||||||
|
vertica_conf = CONF.get(datastore_version.manager)
|
||||||
|
num_instances = len(instances)
|
||||||
|
|
||||||
|
# Matching number of instances with configured cluster_member_count
|
||||||
|
if num_instances != vertica_conf.cluster_member_count:
|
||||||
|
raise exception.ClusterNumInstancesNotSupported(
|
||||||
|
num_instances=vertica_conf.cluster_member_count)
|
||||||
|
|
||||||
db_info = models.DBCluster.create(
|
db_info = models.DBCluster.create(
|
||||||
name=name, tenant_id=context.tenant,
|
name=name, tenant_id=context.tenant,
|
||||||
datastore_version_id=datastore_version.id,
|
datastore_version_id=datastore_version.id,
|
||||||
task_status=ClusterTasks.BUILDING_INITIAL)
|
task_status=ClusterTasks.BUILDING_INITIAL)
|
||||||
|
|
||||||
# Creating member instances
|
cls._create_instances(context, db_info, datastore, datastore_version,
|
||||||
for i in range(0, num_instances):
|
instances, new_cluster=True)
|
||||||
if i == 0:
|
|
||||||
member_config = {"id": db_info.id, "instance_type": "master"}
|
|
||||||
else:
|
|
||||||
member_config = {"id": db_info.id, "instance_type": "member"}
|
|
||||||
instance_name = "%s-member-%s" % (name, str(i + 1))
|
|
||||||
inst_models.Instance.create(context, instance_name,
|
|
||||||
flavor_id,
|
|
||||||
datastore_version.image_id,
|
|
||||||
[], [], datastore,
|
|
||||||
datastore_version,
|
|
||||||
volume_size, None,
|
|
||||||
nics=nics[i],
|
|
||||||
availability_zone=azs[i],
|
|
||||||
configuration_id=None,
|
|
||||||
cluster_config=member_config)
|
|
||||||
|
|
||||||
# Calling taskmanager to further proceed for cluster-configuration
|
# Calling taskmanager to further proceed for cluster-configuration
|
||||||
task_api.load(context, datastore_version.manager).create_cluster(
|
task_api.load(context, datastore_version.manager).create_cluster(
|
||||||
db_info.id)
|
db_info.id)
|
||||||
|
|
||||||
return VerticaCluster(context, db_info, datastore, datastore_version)
|
return VerticaCluster(context, db_info, datastore, datastore_version)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def k_safety(n):
|
||||||
|
"""
|
||||||
|
Vertica defines k-safety values of 0, 1 or 2:
|
||||||
|
https://my.vertica.com/docs/7.1.x/HTML/Content/Authoring/Glossary/
|
||||||
|
K-Safety.htm
|
||||||
|
"""
|
||||||
|
if n < 3:
|
||||||
|
return 0
|
||||||
|
elif n < 5:
|
||||||
|
return 1
|
||||||
|
else:
|
||||||
|
return 2
|
||||||
|
|
||||||
|
def grow(self, instances):
|
||||||
|
LOG.debug("Growing cluster.")
|
||||||
|
|
||||||
|
self.validate_cluster_available()
|
||||||
|
|
||||||
|
context = self.context
|
||||||
|
db_info = self.db_info
|
||||||
|
datastore = self.ds
|
||||||
|
datastore_version = self.ds_version
|
||||||
|
|
||||||
|
db_info.update(task_status=ClusterTasks.GROWING_CLUSTER)
|
||||||
|
|
||||||
|
new_instances = self._create_instances(context, db_info, datastore,
|
||||||
|
datastore_version, instances,
|
||||||
|
new_cluster=False)
|
||||||
|
|
||||||
|
task_api.load(context, datastore_version.manager).grow_cluster(
|
||||||
|
db_info.id, [instance.id for instance in new_instances])
|
||||||
|
|
||||||
|
return VerticaCluster(context, db_info, datastore, datastore_version)
|
||||||
|
|
||||||
|
def shrink(self, instance_ids):
|
||||||
|
self.validate_cluster_available()
|
||||||
|
|
||||||
|
context = self.context
|
||||||
|
db_info = self.db_info
|
||||||
|
datastore_version = self.ds_version
|
||||||
|
|
||||||
|
db_instances = inst_models.DBInstance.find_all(cluster_id=db_info.id,
|
||||||
|
deleted=False).all()
|
||||||
|
|
||||||
|
all_instance_ids = [db_instance.id for db_instance in db_instances]
|
||||||
|
|
||||||
|
left_instances = [instance_id for instance_id
|
||||||
|
in all_instance_ids
|
||||||
|
if instance_id not in instance_ids]
|
||||||
|
|
||||||
|
k = self.k_safety(len(left_instances))
|
||||||
|
|
||||||
|
vertica_conf = CONF.get(datastore_version.manager)
|
||||||
|
if k < vertica_conf.min_ksafety:
|
||||||
|
raise exception.ClusterNumInstancesBelowSafetyThreshold()
|
||||||
|
|
||||||
|
db_info.update(task_status=ClusterTasks.SHRINKING_CLUSTER)
|
||||||
|
|
||||||
|
task_api.load(context, datastore_version.manager).shrink_cluster(
|
||||||
|
self.db_info.id, instance_ids)
|
||||||
|
return VerticaCluster(self.context, db_info,
|
||||||
|
self.ds, self.ds_version)
|
||||||
|
|
||||||
|
|
||||||
class VerticaClusterView(ClusterView):
|
class VerticaClusterView(ClusterView):
|
||||||
|
|
||||||
|
@ -47,6 +47,21 @@ class VerticaGuestAgentAPI(guest_api.API):
|
|||||||
return self._call("install_cluster", CONF.cluster_usage_timeout,
|
return self._call("install_cluster", CONF.cluster_usage_timeout,
|
||||||
self.version_cap, members=members)
|
self.version_cap, members=members)
|
||||||
|
|
||||||
|
def grow_cluster(self, members):
|
||||||
|
LOG.debug("Growing Vertica cluster with members: %s." % members)
|
||||||
|
return self._call("grow_cluster", CONF.cluster_usage_timeout,
|
||||||
|
self.version_cap, members=members)
|
||||||
|
|
||||||
|
def shrink_cluster(self, members):
|
||||||
|
LOG.debug("Shrinking Vertica cluster with members: %s." % members)
|
||||||
|
return self._call("shrink_cluster", CONF.cluster_usage_timeout,
|
||||||
|
self.version_cap, members=members)
|
||||||
|
|
||||||
|
def mark_design_ksafe(self, k):
|
||||||
|
LOG.debug("Setting vertica k-safety level to : %s." % k)
|
||||||
|
return self._call("mark_design_ksafe", CONF.cluster_usage_timeout,
|
||||||
|
self.version_cap, k=k)
|
||||||
|
|
||||||
def cluster_complete(self):
|
def cluster_complete(self):
|
||||||
LOG.debug("Notifying cluster install completion.")
|
LOG.debug("Notifying cluster install completion.")
|
||||||
return self._call("cluster_complete", guest_api.AGENT_HIGH_TIMEOUT,
|
return self._call("cluster_complete", guest_api.AGENT_HIGH_TIMEOUT,
|
||||||
|
@ -17,6 +17,8 @@ from oslo_log import log as logging
|
|||||||
from trove.common import cfg
|
from trove.common import cfg
|
||||||
from trove.common.i18n import _
|
from trove.common.i18n import _
|
||||||
from trove.common.strategies.cluster import base
|
from trove.common.strategies.cluster import base
|
||||||
|
from trove.common.strategies.cluster.experimental.vertica.api import \
|
||||||
|
VerticaCluster
|
||||||
from trove.instance.models import DBInstance
|
from trove.instance.models import DBInstance
|
||||||
from trove.instance.models import Instance
|
from trove.instance.models import Instance
|
||||||
from trove.taskmanager import api as task_api
|
from trove.taskmanager import api as task_api
|
||||||
@ -47,7 +49,8 @@ class VerticaClusterTasks(task_models.ClusterTasks):
|
|||||||
def _create_cluster():
|
def _create_cluster():
|
||||||
|
|
||||||
# Fetch instances by cluster_id against instances table.
|
# Fetch instances by cluster_id against instances table.
|
||||||
db_instances = DBInstance.find_all(cluster_id=cluster_id).all()
|
db_instances = DBInstance.find_all(cluster_id=cluster_id,
|
||||||
|
deleted=False).all()
|
||||||
instance_ids = [db_instance.id for db_instance in db_instances]
|
instance_ids = [db_instance.id for db_instance in db_instances]
|
||||||
|
|
||||||
# Wait for cluster members to get to cluster-ready status.
|
# Wait for cluster members to get to cluster-ready status.
|
||||||
@ -106,6 +109,116 @@ class VerticaClusterTasks(task_models.ClusterTasks):
|
|||||||
|
|
||||||
LOG.debug("End create_cluster for id: %s." % cluster_id)
|
LOG.debug("End create_cluster for id: %s." % cluster_id)
|
||||||
|
|
||||||
|
def grow_cluster(self, context, cluster_id, new_instance_ids):
|
||||||
|
|
||||||
|
def _grow_cluster():
|
||||||
|
LOG.debug("begin grow_cluster for Vertica cluster %s" % cluster_id)
|
||||||
|
|
||||||
|
db_instances = DBInstance.find_all(cluster_id=cluster_id,
|
||||||
|
deleted=False).all()
|
||||||
|
|
||||||
|
instance_ids = [db_instance.id for db_instance in db_instances]
|
||||||
|
|
||||||
|
# Wait for new cluster members to get to cluster-ready status.
|
||||||
|
if not self._all_instances_ready(new_instance_ids, cluster_id):
|
||||||
|
return
|
||||||
|
|
||||||
|
new_insts = [Instance.load(context, instance_id)
|
||||||
|
for instance_id in new_instance_ids]
|
||||||
|
|
||||||
|
existing_instances = [Instance.load(context, instance_id)
|
||||||
|
for instance_id
|
||||||
|
in instance_ids
|
||||||
|
if instance_id not in new_instance_ids]
|
||||||
|
|
||||||
|
existing_guests = [self.get_guest(i) for i in existing_instances]
|
||||||
|
new_guests = [self.get_guest(i) for i in new_insts]
|
||||||
|
all_guests = new_guests + existing_guests
|
||||||
|
|
||||||
|
authorized_users_without_password = ['root', 'dbadmin']
|
||||||
|
new_ips = [self.get_ip(instance) for instance in new_insts]
|
||||||
|
|
||||||
|
for user in authorized_users_without_password:
|
||||||
|
pub_key = [guest.get_public_keys(user) for guest in all_guests]
|
||||||
|
for guest in all_guests:
|
||||||
|
guest.authorize_public_keys(user, pub_key)
|
||||||
|
|
||||||
|
for db_instance in db_instances:
|
||||||
|
if db_instance['type'] == 'master':
|
||||||
|
LOG.debug("Found 'master' instance, calling grow on guest")
|
||||||
|
master_instance = Instance.load(context,
|
||||||
|
db_instance.id)
|
||||||
|
self.get_guest(master_instance).grow_cluster(new_ips)
|
||||||
|
break
|
||||||
|
|
||||||
|
for guest in new_guests:
|
||||||
|
guest.cluster_complete()
|
||||||
|
|
||||||
|
timeout = Timeout(CONF.cluster_usage_timeout)
|
||||||
|
|
||||||
|
try:
|
||||||
|
_grow_cluster()
|
||||||
|
self.reset_task()
|
||||||
|
except Timeout as t:
|
||||||
|
if t is not timeout:
|
||||||
|
raise # not my timeout
|
||||||
|
LOG.exception(_("Timeout for growing cluster."))
|
||||||
|
self.update_statuses_on_failure(cluster_id)
|
||||||
|
except Exception:
|
||||||
|
LOG.exception(_("Error growing cluster %s.") % cluster_id)
|
||||||
|
self.update_statuses_on_failure(cluster_id)
|
||||||
|
finally:
|
||||||
|
timeout.cancel()
|
||||||
|
|
||||||
|
def shrink_cluster(self, context, cluster_id, instance_ids):
|
||||||
|
def _shrink_cluster():
|
||||||
|
db_instances = DBInstance.find_all(cluster_id=cluster_id,
|
||||||
|
deleted=False).all()
|
||||||
|
|
||||||
|
all_instance_ids = [db_instance.id for db_instance in db_instances]
|
||||||
|
|
||||||
|
remove_instances = [Instance.load(context, instance_id)
|
||||||
|
for instance_id in instance_ids]
|
||||||
|
|
||||||
|
left_instances = [Instance.load(context, instance_id)
|
||||||
|
for instance_id
|
||||||
|
in all_instance_ids
|
||||||
|
if instance_id not in instance_ids]
|
||||||
|
|
||||||
|
remove_member_ips = [self.get_ip(instance)
|
||||||
|
for instance in remove_instances]
|
||||||
|
|
||||||
|
k = VerticaCluster.k_safety(len(left_instances))
|
||||||
|
|
||||||
|
for db_instance in db_instances:
|
||||||
|
if db_instance['type'] == 'master':
|
||||||
|
master_instance = Instance.load(context,
|
||||||
|
db_instance.id)
|
||||||
|
if self.get_ip(master_instance) in remove_member_ips:
|
||||||
|
raise RuntimeError(_("Cannot remove master instance!"))
|
||||||
|
LOG.debug(_("Marking cluster k-safety: %s") % k)
|
||||||
|
self.get_guest(master_instance).mark_design_ksafe(k)
|
||||||
|
self.get_guest(master_instance).shrink_cluster(
|
||||||
|
remove_member_ips)
|
||||||
|
break
|
||||||
|
|
||||||
|
for r in remove_instances:
|
||||||
|
Instance.delete(r)
|
||||||
|
|
||||||
|
timeout = Timeout(CONF.cluster_usage_timeout)
|
||||||
|
try:
|
||||||
|
_shrink_cluster()
|
||||||
|
self.reset_task()
|
||||||
|
except Timeout as t:
|
||||||
|
if t is not timeout:
|
||||||
|
raise
|
||||||
|
LOG.exception(_("Timeout for shrinking cluster."))
|
||||||
|
self.update_statuses_on_failure(cluster_id)
|
||||||
|
finally:
|
||||||
|
timeout.cancel()
|
||||||
|
|
||||||
|
LOG.debug("end shrink_cluster for Vertica cluster id %s" % self.id)
|
||||||
|
|
||||||
|
|
||||||
class VerticaTaskManagerAPI(task_api.API):
|
class VerticaTaskManagerAPI(task_api.API):
|
||||||
|
|
||||||
|
@ -109,3 +109,32 @@ class Manager(manager.Manager):
|
|||||||
LOG.exception(_('Cluster installation failed.'))
|
LOG.exception(_('Cluster installation failed.'))
|
||||||
self.appStatus.set_status(rd_ins.ServiceStatuses.FAILED)
|
self.appStatus.set_status(rd_ins.ServiceStatuses.FAILED)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
def grow_cluster(self, context, members):
|
||||||
|
try:
|
||||||
|
LOG.debug("Growing cluster to members: %s." % members)
|
||||||
|
self.app.grow_cluster(members)
|
||||||
|
LOG.debug("grow_cluster call has finished.")
|
||||||
|
except Exception:
|
||||||
|
LOG.exception(_('Cluster grow failed.'))
|
||||||
|
self.appStatus.set_status(rd_ins.ServiceStatuses.FAILED)
|
||||||
|
raise
|
||||||
|
|
||||||
|
def shrink_cluster(self, context, members):
|
||||||
|
try:
|
||||||
|
LOG.debug("Shrinking cluster members: %s." % members)
|
||||||
|
self.app.shrink_cluster(members)
|
||||||
|
LOG.debug("shrink_cluster call has finished.")
|
||||||
|
except Exception:
|
||||||
|
LOG.exception(_('Cluster shrink failed.'))
|
||||||
|
self.appStatus.set_status(rd_ins.ServiceStatuses.FAILED)
|
||||||
|
raise
|
||||||
|
|
||||||
|
def mark_design_ksafe(self, context, k):
|
||||||
|
try:
|
||||||
|
LOG.debug("Setting vertica k-safety to %s." % k)
|
||||||
|
self.app.mark_design_ksafe(k)
|
||||||
|
except Exception:
|
||||||
|
LOG.exception(_('K-safety setting failed.'))
|
||||||
|
self.appStatus.set_status(rd_ins.ServiceStatuses.FAILED)
|
||||||
|
raise
|
||||||
|
@ -153,6 +153,43 @@ class VerticaApp(object):
|
|||||||
finally:
|
finally:
|
||||||
self.status.end_restart()
|
self.status.end_restart()
|
||||||
|
|
||||||
|
def add_db_to_node(self, members=netutils.get_my_ipv4()):
|
||||||
|
"""Add db to host with admintools"""
|
||||||
|
LOG.info(_("Calling admintools to add DB to host"))
|
||||||
|
try:
|
||||||
|
# Create db after install
|
||||||
|
db_password = self._get_database_password()
|
||||||
|
create_db_command = (system.ADD_DB_TO_NODE % (members,
|
||||||
|
DB_NAME,
|
||||||
|
db_password))
|
||||||
|
system.shell_execute(create_db_command, "dbadmin")
|
||||||
|
except exception.ProcessExecutionError:
|
||||||
|
# Give vertica some time to get the the node up, won't be available
|
||||||
|
# by the time adminTools -t db_add_node completes
|
||||||
|
LOG.info(_("adminTools failed as expected - wait for node"))
|
||||||
|
self.wait_for_node_status()
|
||||||
|
LOG.info(_("Vertica add db to host completed."))
|
||||||
|
|
||||||
|
def remove_db_from_node(self, members=netutils.get_my_ipv4()):
|
||||||
|
"""Remove db from node with admintools"""
|
||||||
|
LOG.info(_("Removing db from node"))
|
||||||
|
try:
|
||||||
|
# Create db after install
|
||||||
|
db_password = self._get_database_password()
|
||||||
|
create_db_command = (system.REMOVE_DB_FROM_NODE % (members,
|
||||||
|
DB_NAME,
|
||||||
|
db_password))
|
||||||
|
system.shell_execute(create_db_command, "dbadmin")
|
||||||
|
except exception.ProcessExecutionError:
|
||||||
|
# Give vertica some time to get the the node up, won't be available
|
||||||
|
# by the time adminTools -t db_add_node completes
|
||||||
|
LOG.info(_("adminTools failed as expected - wait for node"))
|
||||||
|
|
||||||
|
# Give vertica some time to take the node down - it won't be available
|
||||||
|
# by the time adminTools -t db_add_node completes
|
||||||
|
self.wait_for_node_status()
|
||||||
|
LOG.info(_("Vertica remove host from db completed."))
|
||||||
|
|
||||||
def create_db(self, members=netutils.get_my_ipv4()):
|
def create_db(self, members=netutils.get_my_ipv4()):
|
||||||
"""Prepare the guest machine with a Vertica db creation."""
|
"""Prepare the guest machine with a Vertica db creation."""
|
||||||
LOG.info(_("Creating database on Vertica host."))
|
LOG.info(_("Creating database on Vertica host."))
|
||||||
@ -182,6 +219,18 @@ class VerticaApp(object):
|
|||||||
self._generate_database_password()
|
self._generate_database_password()
|
||||||
LOG.info(_("install_vertica completed."))
|
LOG.info(_("install_vertica completed."))
|
||||||
|
|
||||||
|
def update_vertica(self, command, members=netutils.get_my_ipv4()):
|
||||||
|
LOG.info(_("Calling update_vertica with command %s") % command)
|
||||||
|
try:
|
||||||
|
update_vertica_cmd = (system.UPDATE_VERTICA % (command, members,
|
||||||
|
MOUNT_POINT))
|
||||||
|
system.shell_execute(update_vertica_cmd)
|
||||||
|
except exception.ProcessExecutionError:
|
||||||
|
LOG.exception(_("update_vertica failed."))
|
||||||
|
raise RuntimeError(_("update_vertica failed."))
|
||||||
|
# self._generate_database_password()
|
||||||
|
LOG.info(_("update_vertica completed."))
|
||||||
|
|
||||||
def add_udls(self):
|
def add_udls(self):
|
||||||
"""Load the user defined load libraries into the database."""
|
"""Load the user defined load libraries into the database."""
|
||||||
LOG.info(_("Adding configured user defined load libraries."))
|
LOG.info(_("Adding configured user defined load libraries."))
|
||||||
@ -287,6 +336,17 @@ class VerticaApp(object):
|
|||||||
LOG.exception(_("Failed to prepare for install_vertica."))
|
LOG.exception(_("Failed to prepare for install_vertica."))
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
def mark_design_ksafe(self, k):
|
||||||
|
"""Wrapper for mark_design_ksafe function for setting k-safety """
|
||||||
|
LOG.info(_("Setting Vertica k-safety to %s") % str(k))
|
||||||
|
out, err = system.exec_vsql_command(self._get_database_password(),
|
||||||
|
system.MARK_DESIGN_KSAFE % k)
|
||||||
|
# Only fail if we get an ERROR as opposed to a warning complaining
|
||||||
|
# about setting k = 0
|
||||||
|
if "ERROR" in err:
|
||||||
|
LOG.error(err)
|
||||||
|
raise RuntimeError(_("Failed to set k-safety level %s.") % k)
|
||||||
|
|
||||||
def _create_user(self, username, password, role):
|
def _create_user(self, username, password, role):
|
||||||
"""Creates a user, granting and enabling the given role for it."""
|
"""Creates a user, granting and enabling the given role for it."""
|
||||||
LOG.info(_("Creating user in Vertica database."))
|
LOG.info(_("Creating user in Vertica database."))
|
||||||
@ -418,3 +478,41 @@ class VerticaApp(object):
|
|||||||
LOG.debug("Creating database with members: %s." % cluster_members)
|
LOG.debug("Creating database with members: %s." % cluster_members)
|
||||||
self.create_db(cluster_members)
|
self.create_db(cluster_members)
|
||||||
LOG.debug("Cluster configured on members: %s." % cluster_members)
|
LOG.debug("Cluster configured on members: %s." % cluster_members)
|
||||||
|
|
||||||
|
def grow_cluster(self, members):
|
||||||
|
"""Adds nodes to cluster."""
|
||||||
|
cluster_members = ','.join(members)
|
||||||
|
LOG.debug("Growing cluster with members: %s." % cluster_members)
|
||||||
|
self.update_vertica("--add-hosts", cluster_members)
|
||||||
|
self._export_conf_to_members(members)
|
||||||
|
LOG.debug("Creating database with members: %s." % cluster_members)
|
||||||
|
self.add_db_to_node(cluster_members)
|
||||||
|
LOG.debug("Cluster configured on members: %s." % cluster_members)
|
||||||
|
|
||||||
|
def shrink_cluster(self, members):
|
||||||
|
"""Removes nodes from cluster."""
|
||||||
|
cluster_members = ','.join(members)
|
||||||
|
LOG.debug("Shrinking cluster with members: %s." % cluster_members)
|
||||||
|
self.remove_db_from_node(cluster_members)
|
||||||
|
self.update_vertica("--remove-hosts", cluster_members)
|
||||||
|
|
||||||
|
def wait_for_node_status(self, status='UP'):
|
||||||
|
"""Wait until all nodes are the same status"""
|
||||||
|
# select node_state from nodes where node_state <> 'UP'
|
||||||
|
def _wait_for_node_status():
|
||||||
|
out, err = system.exec_vsql_command(self._get_database_password(),
|
||||||
|
system.NODE_STATUS % status)
|
||||||
|
LOG.debug("Polled vertica node states: %s" % out)
|
||||||
|
|
||||||
|
if err:
|
||||||
|
LOG.error(err)
|
||||||
|
raise RuntimeError(_("Failed to query for root user."))
|
||||||
|
|
||||||
|
return "0 rows" in out
|
||||||
|
|
||||||
|
try:
|
||||||
|
utils.poll_until(_wait_for_node_status, time_out=600,
|
||||||
|
sleep_time=15)
|
||||||
|
except exception.PollTimeOut:
|
||||||
|
raise RuntimeError(_("Timed out waiting for cluster to"
|
||||||
|
"change to status %s") % status)
|
||||||
|
@ -14,6 +14,10 @@
|
|||||||
from trove.common import utils
|
from trove.common import utils
|
||||||
|
|
||||||
ALTER_USER_PASSWORD = "ALTER USER %s IDENTIFIED BY '%s'"
|
ALTER_USER_PASSWORD = "ALTER USER %s IDENTIFIED BY '%s'"
|
||||||
|
ADD_DB_TO_NODE = ("/opt/vertica/bin/adminTools -t db_add_node -a"
|
||||||
|
" %s -d %s -p '%s'")
|
||||||
|
REMOVE_DB_FROM_NODE = ("/opt/vertica/bin/adminTools -t db_remove_node -s"
|
||||||
|
" %s -d %s -i -p '%s'")
|
||||||
CREATE_DB = ("/opt/vertica/bin/adminTools -t create_db -s"
|
CREATE_DB = ("/opt/vertica/bin/adminTools -t create_db -s"
|
||||||
" %s -d %s -c %s -D %s -p '%s'")
|
" %s -d %s -c %s -D %s -p '%s'")
|
||||||
CREATE_USER = "CREATE USER %s IDENTIFIED BY '%s'"
|
CREATE_USER = "CREATE USER %s IDENTIFIED BY '%s'"
|
||||||
@ -21,7 +25,10 @@ ENABLE_FOR_USER = "ALTER USER %s DEFAULT ROLE %s"
|
|||||||
GRANT_TO_USER = "GRANT %s to %s"
|
GRANT_TO_USER = "GRANT %s to %s"
|
||||||
INSTALL_VERTICA = ("/opt/vertica/sbin/install_vertica -s %s"
|
INSTALL_VERTICA = ("/opt/vertica/sbin/install_vertica -s %s"
|
||||||
" -d %s -X -N -S default -r"
|
" -d %s -X -N -S default -r"
|
||||||
" /vertica.deb -L CE -Y --no-system-checks")
|
" /vertica.deb -L CE -Y --no-system-checks"
|
||||||
|
" --ignore-aws-instance-type")
|
||||||
|
MARK_DESIGN_KSAFE = "SELECT MARK_DESIGN_KSAFE(%s)"
|
||||||
|
NODE_STATUS = "SELECT node_state FROM nodes where node_state <> '%s'"
|
||||||
STOP_DB = "/opt/vertica/bin/adminTools -t stop_db -F -d %s -p '%s'"
|
STOP_DB = "/opt/vertica/bin/adminTools -t stop_db -F -d %s -p '%s'"
|
||||||
START_DB = "/opt/vertica/bin/adminTools -t start_db -d %s -p '%s'"
|
START_DB = "/opt/vertica/bin/adminTools -t start_db -d %s -p '%s'"
|
||||||
STATUS_ACTIVE_DB = "/opt/vertica/bin/adminTools -t show_active_db"
|
STATUS_ACTIVE_DB = "/opt/vertica/bin/adminTools -t show_active_db"
|
||||||
@ -33,6 +40,18 @@ SEND_CONF_TO_SERVER = ("rsync -v -e 'ssh -o "
|
|||||||
"StrictHostKeyChecking=no' --perms --owner --group "
|
"StrictHostKeyChecking=no' --perms --owner --group "
|
||||||
"%s %s:%s")
|
"%s %s:%s")
|
||||||
SSH_KEY_GEN = "ssh-keygen -f %s/.ssh/id_rsa -t rsa -N ''"
|
SSH_KEY_GEN = "ssh-keygen -f %s/.ssh/id_rsa -t rsa -N ''"
|
||||||
|
UPDATE_VERTICA = ("/opt/vertica/sbin/update_vertica %s %s "
|
||||||
|
" -d %s -X -N -S default -r"
|
||||||
|
" /vertica.deb -L CE -Y --no-system-checks"
|
||||||
|
" --ignore-aws-instance-type")
|
||||||
|
UPDATE_REMOVE = ("/opt/vertica/sbin/update_vertica --remove-hosts %s "
|
||||||
|
" -d %s -X -N -S default -r"
|
||||||
|
" /vertica.deb -L CE -Y --no-system-checks"
|
||||||
|
" --ignore-aws-instance-type")
|
||||||
|
UPDATE_ADD = ("/opt/vertica/sbin/update_vertica --add-hosts %s "
|
||||||
|
" -d %s -X -N -S default -r"
|
||||||
|
" /vertica.deb -L CE -Y --no-system-checks"
|
||||||
|
" --ignore-aws-instance-type")
|
||||||
USER_EXISTS = ("/opt/vertica/bin/vsql -w '%s' -c "
|
USER_EXISTS = ("/opt/vertica/bin/vsql -w '%s' -c "
|
||||||
"\"select 1 from users where user_name = '%s'\" "
|
"\"select 1 from users where user_name = '%s'\" "
|
||||||
"| grep row | awk '{print $1}' | cut -c2-")
|
"| grep row | awk '{print $1}' | cut -c2-")
|
||||||
|
@ -70,11 +70,13 @@ class ClusterTest(trove_testtools.TestCase):
|
|||||||
'instance_type': 'member'},
|
'instance_type': 'member'},
|
||||||
{'volume_size': 1, 'flavor_id': '1234',
|
{'volume_size': 1, 'flavor_id': '1234',
|
||||||
'instance_type': 'member'}]
|
'instance_type': 'member'}]
|
||||||
|
self.db_instances = [1, 2, 3]
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
super(ClusterTest, self).tearDown()
|
super(ClusterTest, self).tearDown()
|
||||||
|
|
||||||
def test_create_empty_instances(self):
|
@patch.object(inst_models.DBInstance, 'find_all')
|
||||||
|
def test_create_empty_instances(self, *args):
|
||||||
self.assertRaises(exception.ClusterNumInstancesNotSupported,
|
self.assertRaises(exception.ClusterNumInstancesNotSupported,
|
||||||
Cluster.create,
|
Cluster.create,
|
||||||
Mock(),
|
Mock(),
|
||||||
@ -83,7 +85,9 @@ class ClusterTest(trove_testtools.TestCase):
|
|||||||
self.datastore_version,
|
self.datastore_version,
|
||||||
[], None)
|
[], None)
|
||||||
|
|
||||||
def test_create_flavor_not_specified(self):
|
@patch.object(DBCluster, 'create')
|
||||||
|
@patch.object(inst_models.DBInstance, 'find_all')
|
||||||
|
def test_create_flavor_not_specified(self, *args):
|
||||||
instances = self.instances
|
instances = self.instances
|
||||||
instances[0]['flavor_id'] = None
|
instances[0]['flavor_id'] = None
|
||||||
self.assertRaises(exception.ClusterFlavorsNotEqual,
|
self.assertRaises(exception.ClusterFlavorsNotEqual,
|
||||||
@ -96,9 +100,11 @@ class ClusterTest(trove_testtools.TestCase):
|
|||||||
None
|
None
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@patch.object(DBCluster, 'create')
|
||||||
|
@patch.object(inst_models.DBInstance, 'find_all')
|
||||||
@patch.object(remote, 'create_nova_client')
|
@patch.object(remote, 'create_nova_client')
|
||||||
def test_create_invalid_flavor_specified(self,
|
def test_create_invalid_flavor_specified(self, mock_client,
|
||||||
mock_client):
|
mock_find_all, mock_create):
|
||||||
instances = [{'flavor_id': '1234'},
|
instances = [{'flavor_id': '1234'},
|
||||||
{'flavor_id': '1234'},
|
{'flavor_id': '1234'},
|
||||||
{'flavor_id': '1234'}]
|
{'flavor_id': '1234'}]
|
||||||
@ -117,9 +123,11 @@ class ClusterTest(trove_testtools.TestCase):
|
|||||||
None
|
None
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@patch.object(DBCluster, 'create')
|
||||||
|
@patch.object(inst_models.DBInstance, 'find_all')
|
||||||
@patch.object(remote, 'create_nova_client')
|
@patch.object(remote, 'create_nova_client')
|
||||||
def test_create_volume_no_specified(self,
|
def test_create_volume_no_specified(self, mock_client, mock_find_all,
|
||||||
mock_client):
|
mock_create):
|
||||||
instances = self.instances
|
instances = self.instances
|
||||||
instances[0]['volume_size'] = None
|
instances[0]['volume_size'] = None
|
||||||
flavors = Mock()
|
flavors = Mock()
|
||||||
@ -134,11 +142,15 @@ class ClusterTest(trove_testtools.TestCase):
|
|||||||
None
|
None
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@patch.object(DBCluster, 'create')
|
||||||
|
@patch.object(inst_models.DBInstance, 'find_all')
|
||||||
@patch.object(remote, 'create_nova_client')
|
@patch.object(remote, 'create_nova_client')
|
||||||
@patch.object(vertica_api, 'CONF')
|
@patch.object(vertica_api, 'CONF')
|
||||||
def test_create_storage_specified_with_no_volume_support(self,
|
def test_create_storage_specified_with_no_volume_support(self,
|
||||||
mock_conf,
|
mock_conf,
|
||||||
mock_client):
|
mock_client,
|
||||||
|
mock_find_all,
|
||||||
|
mock_create):
|
||||||
mock_conf.get = Mock(
|
mock_conf.get = Mock(
|
||||||
return_value=FakeOptGroup(volume_support=False))
|
return_value=FakeOptGroup(volume_support=False))
|
||||||
instances = self.instances
|
instances = self.instances
|
||||||
@ -155,11 +167,15 @@ class ClusterTest(trove_testtools.TestCase):
|
|||||||
None
|
None
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@patch.object(DBCluster, 'create')
|
||||||
|
@patch.object(inst_models.DBInstance, 'find_all')
|
||||||
@patch.object(remote, 'create_nova_client')
|
@patch.object(remote, 'create_nova_client')
|
||||||
@patch.object(vertica_api, 'CONF')
|
@patch.object(vertica_api, 'CONF')
|
||||||
def test_create_storage_not_specified_and_no_ephemeral_flavor(self,
|
def test_create_storage_not_specified_and_no_ephemeral_flavor(self,
|
||||||
mock_conf,
|
mock_conf,
|
||||||
mock_client):
|
mock_client,
|
||||||
|
m_find_all,
|
||||||
|
mock_create):
|
||||||
class FakeFlavor:
|
class FakeFlavor:
|
||||||
def __init__(self, flavor_id):
|
def __init__(self, flavor_id):
|
||||||
self.flavor_id = flavor_id
|
self.flavor_id = flavor_id
|
||||||
@ -188,8 +204,11 @@ class ClusterTest(trove_testtools.TestCase):
|
|||||||
None
|
None
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@patch.object(DBCluster, 'create')
|
||||||
|
@patch.object(inst_models.DBInstance, 'find_all')
|
||||||
@patch.object(remote, 'create_nova_client')
|
@patch.object(remote, 'create_nova_client')
|
||||||
def test_create_volume_not_equal(self, mock_client):
|
def test_create_volume_not_equal(self, mock_client, mock_find_all,
|
||||||
|
mock_create):
|
||||||
instances = self.instances
|
instances = self.instances
|
||||||
instances[0]['volume_size'] = 2
|
instances[0]['volume_size'] = 2
|
||||||
flavors = Mock()
|
flavors = Mock()
|
||||||
@ -204,13 +223,14 @@ class ClusterTest(trove_testtools.TestCase):
|
|||||||
None
|
None
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@patch.object(inst_models.DBInstance, 'find_all')
|
||||||
@patch.object(inst_models.Instance, 'create')
|
@patch.object(inst_models.Instance, 'create')
|
||||||
@patch.object(DBCluster, 'create')
|
@patch.object(DBCluster, 'create')
|
||||||
@patch.object(task_api, 'load')
|
@patch.object(task_api, 'load')
|
||||||
@patch.object(QUOTAS, 'check_quotas')
|
@patch.object(QUOTAS, 'check_quotas')
|
||||||
@patch.object(remote, 'create_nova_client')
|
@patch.object(remote, 'create_nova_client')
|
||||||
def test_create(self, mock_client, mock_check_quotas, mock_task_api,
|
def test_create(self, mock_client, mock_check_quotas, mock_task_api,
|
||||||
mock_db_create, mock_ins_create):
|
mock_db_create, mock_ins_create, mock_find_all):
|
||||||
instances = self.instances
|
instances = self.instances
|
||||||
flavors = Mock()
|
flavors = Mock()
|
||||||
mock_client.return_value.flavors = flavors
|
mock_client.return_value.flavors = flavors
|
||||||
@ -224,6 +244,7 @@ class ClusterTest(trove_testtools.TestCase):
|
|||||||
mock_db_create.return_value.id)
|
mock_db_create.return_value.id)
|
||||||
self.assertEqual(3, mock_ins_create.call_count)
|
self.assertEqual(3, mock_ins_create.call_count)
|
||||||
|
|
||||||
|
@patch.object(inst_models.DBInstance, 'find_all')
|
||||||
@patch.object(vertica_api, 'CONF')
|
@patch.object(vertica_api, 'CONF')
|
||||||
@patch.object(inst_models.Instance, 'create')
|
@patch.object(inst_models.Instance, 'create')
|
||||||
@patch.object(DBCluster, 'create')
|
@patch.object(DBCluster, 'create')
|
||||||
@ -232,7 +253,8 @@ class ClusterTest(trove_testtools.TestCase):
|
|||||||
@patch.object(remote, 'create_nova_client')
|
@patch.object(remote, 'create_nova_client')
|
||||||
def test_create_with_ephemeral_flavor(self, mock_client, mock_check_quotas,
|
def test_create_with_ephemeral_flavor(self, mock_client, mock_check_quotas,
|
||||||
mock_task_api, mock_db_create,
|
mock_task_api, mock_db_create,
|
||||||
mock_ins_create, mock_conf):
|
mock_ins_create, mock_conf,
|
||||||
|
mock_find_all):
|
||||||
class FakeFlavor:
|
class FakeFlavor:
|
||||||
def __init__(self, flavor_id):
|
def __init__(self, flavor_id):
|
||||||
self.flavor_id = flavor_id
|
self.flavor_id = flavor_id
|
||||||
|
Loading…
x
Reference in New Issue
Block a user