Merge "Implement user functions for Cassandra datastore"
This commit is contained in:
commit
a090f16fbc
@ -22,3 +22,4 @@ testrepository>=0.0.18 # Apache-2.0/BSD
|
||||
pymongo>=3.0.2 # Apache-2.0
|
||||
redis>=2.10.0 # MIT
|
||||
psycopg2>=2.5 # LGPL/ZPL
|
||||
cassandra-driver>=2.1.4 # Apache-2.0
|
||||
|
@ -814,6 +814,11 @@ cassandra_opts = [
|
||||
cfg.StrOpt('root_controller',
|
||||
default='trove.extensions.common.service.DefaultRootController',
|
||||
help='Root controller implementation for cassandra.'),
|
||||
cfg.ListOpt('ignore_users', default=['os_admin'],
|
||||
help='Users to exclude when listing users.'),
|
||||
cfg.ListOpt('ignore_dbs', default=['system', 'system_auth',
|
||||
'system_traces'],
|
||||
help='Databases to exclude when listing databases.'),
|
||||
cfg.StrOpt('guest_log_exposed_logs', default='',
|
||||
help='List of Guest Logs to expose for publishing.'),
|
||||
]
|
||||
|
@ -15,10 +15,14 @@
|
||||
#
|
||||
|
||||
import os
|
||||
import yaml
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
from trove.guestagent.datastore.experimental.cassandra import service
|
||||
from trove.guestagent.datastore.experimental.cassandra.service import (
|
||||
CassandraAdmin
|
||||
)
|
||||
from trove.guestagent.datastore import manager
|
||||
from trove.guestagent import volume
|
||||
|
||||
@ -29,13 +33,21 @@ LOG = logging.getLogger(__name__)
|
||||
class Manager(manager.Manager):
|
||||
|
||||
def __init__(self):
|
||||
self.appStatus = service.CassandraAppStatus()
|
||||
self.app = service.CassandraApp(self.appStatus)
|
||||
self._app = service.CassandraApp()
|
||||
self.__admin = CassandraAdmin(self.app.get_current_superuser())
|
||||
super(Manager, self).__init__('cassandra')
|
||||
|
||||
@property
|
||||
def status(self):
|
||||
return self.appStatus
|
||||
return self.app.status
|
||||
|
||||
@property
|
||||
def app(self):
|
||||
return self._app
|
||||
|
||||
@property
|
||||
def admin(self):
|
||||
return self.__admin
|
||||
|
||||
def restart(self, context):
|
||||
self.app.restart()
|
||||
@ -62,25 +74,73 @@ class Manager(manager.Manager):
|
||||
# FIXME(amrith) Once the cassandra bug
|
||||
# https://issues.apache.org/jira/browse/CASSANDRA-2356
|
||||
# is fixed, this code may have to be revisited.
|
||||
LOG.debug("Stopping database prior to changes.")
|
||||
LOG.debug("Stopping database prior to initial configuration.")
|
||||
self.app.stop_db()
|
||||
|
||||
if config_contents:
|
||||
LOG.debug("Processing configuration.")
|
||||
self.app.write_config(config_contents)
|
||||
LOG.debug("Applying configuration.")
|
||||
self.app.write_config(yaml.load(config_contents))
|
||||
self.app.make_host_reachable()
|
||||
|
||||
if device_path:
|
||||
LOG.debug("Preparing data volume.")
|
||||
device = volume.VolumeDevice(device_path)
|
||||
# unmount if device is already mounted
|
||||
device.unmount_device(device_path)
|
||||
device.format()
|
||||
if os.path.exists(mount_point):
|
||||
# rsync exiting data
|
||||
LOG.debug("Migrating existing data.")
|
||||
device.migrate_data(mount_point)
|
||||
# mount the volume
|
||||
device.mount(mount_point)
|
||||
LOG.debug("Mounting new volume.")
|
||||
device.mount(mount_point)
|
||||
|
||||
LOG.debug("Restarting database after changes.")
|
||||
self.app.start_db()
|
||||
LOG.debug("Starting database with configuration changes.")
|
||||
self.app.start_db(update_db=False)
|
||||
|
||||
if not self.app.has_user_config():
|
||||
LOG.debug("Securing superuser access.")
|
||||
self.app.secure()
|
||||
self.app.restart()
|
||||
|
||||
self.__admin = CassandraAdmin(self.app.get_current_superuser())
|
||||
|
||||
def change_passwords(self, context, users):
|
||||
self.admin.change_passwords(context, users)
|
||||
|
||||
def update_attributes(self, context, username, hostname, user_attrs):
|
||||
self.admin.update_attributes(context, username, hostname, user_attrs)
|
||||
|
||||
def create_database(self, context, databases):
|
||||
self.admin.create_database(context, databases)
|
||||
|
||||
def create_user(self, context, users):
|
||||
self.admin.create_user(context, users)
|
||||
|
||||
def delete_database(self, context, database):
|
||||
self.admin.delete_database(context, database)
|
||||
|
||||
def delete_user(self, context, user):
|
||||
self.admin.delete_user(context, user)
|
||||
|
||||
def get_user(self, context, username, hostname):
|
||||
return self.admin.get_user(context, username, hostname)
|
||||
|
||||
def grant_access(self, context, username, hostname, databases):
|
||||
self.admin.grant_access(context, username, hostname, databases)
|
||||
|
||||
def revoke_access(self, context, username, hostname, database):
|
||||
self.admin.revoke_access(context, username, hostname, database)
|
||||
|
||||
def list_access(self, context, username, hostname):
|
||||
return self.admin.list_access(context, username, hostname)
|
||||
|
||||
def list_databases(self, context, limit=None, marker=None,
|
||||
include_marker=False):
|
||||
return self.admin.list_databases(context, limit, marker,
|
||||
include_marker)
|
||||
|
||||
def list_users(self, context, limit=None, marker=None,
|
||||
include_marker=False):
|
||||
return self.admin.list_users(context, limit, marker, include_marker)
|
||||
|
@ -14,26 +14,35 @@
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
import re
|
||||
import stat
|
||||
|
||||
from cassandra.auth import PlainTextAuthProvider
|
||||
from cassandra.cluster import Cluster
|
||||
from cassandra.cluster import NoHostAvailable
|
||||
from cassandra import OperationTimedOut
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import netutils
|
||||
import yaml
|
||||
|
||||
from trove.common import cfg
|
||||
from trove.common import exception
|
||||
from trove.common.i18n import _
|
||||
from trove.common import instance as rd_instance
|
||||
from trove.common import pagination
|
||||
from trove.common.stream_codecs import IniCodec
|
||||
from trove.common.stream_codecs import SafeYamlCodec
|
||||
from trove.common import utils
|
||||
from trove.guestagent.common import guestagent_utils
|
||||
from trove.guestagent.common import operating_system
|
||||
from trove.guestagent.common.operating_system import FileMode
|
||||
from trove.guestagent.datastore.experimental.cassandra import system
|
||||
from trove.guestagent.datastore import service
|
||||
from trove.guestagent.db import models
|
||||
from trove.guestagent import pkg
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
MANAGER = CONF.datastore_manager if CONF.datastore_manager else 'cassandra'
|
||||
|
||||
packager = pkg.Package()
|
||||
|
||||
@ -41,10 +50,65 @@ packager = pkg.Package()
|
||||
class CassandraApp(object):
|
||||
"""Prepares DBaaS on a Guest container."""
|
||||
|
||||
def __init__(self, status):
|
||||
_ADMIN_USER = 'os_admin'
|
||||
|
||||
_CONF_AUTH_SEC = 'authentication'
|
||||
_CONF_USR_KEY = 'username'
|
||||
_CONF_PWD_KEY = 'password'
|
||||
_CONF_DIR_MODS = stat.S_IRWXU
|
||||
_CONF_FILE_MODS = stat.S_IRUSR
|
||||
|
||||
CASSANDRA_KILL_CMD = "sudo killall java || true"
|
||||
|
||||
def __init__(self):
|
||||
"""By default login with root no password for initial setup."""
|
||||
self.state_change_wait_time = CONF.state_change_wait_time
|
||||
self.status = status
|
||||
self.status = CassandraAppStatus(self.get_current_superuser())
|
||||
|
||||
@property
|
||||
def service_candidates(self):
|
||||
return ['cassandra']
|
||||
|
||||
@property
|
||||
def cassandra_conf(self):
|
||||
return {
|
||||
operating_system.REDHAT:
|
||||
"/etc/cassandra/default.conf/cassandra.yaml",
|
||||
operating_system.DEBIAN:
|
||||
"/etc/cassandra/cassandra.yaml",
|
||||
operating_system.SUSE:
|
||||
"/etc/cassandra/default.conf/cassandra.yaml"
|
||||
}[operating_system.get_os()]
|
||||
|
||||
@property
|
||||
def cassandra_owner(self):
|
||||
return 'cassandra'
|
||||
|
||||
@property
|
||||
def cassandra_data_dir(self):
|
||||
return guestagent_utils.build_file_path(
|
||||
self.cassandra_working_dir, 'data')
|
||||
|
||||
@property
|
||||
def cassandra_working_dir(self):
|
||||
return "/var/lib/cassandra"
|
||||
|
||||
@property
|
||||
def default_superuser_name(self):
|
||||
return "cassandra"
|
||||
|
||||
@property
|
||||
def default_superuser_password(self):
|
||||
return "cassandra"
|
||||
|
||||
@property
|
||||
def default_superuser_pwd_hash(self):
|
||||
# Default 'salted_hash' value for 'cassandra' user on Cassandra 2.1.
|
||||
return "$2a$10$wPEVuXBU7WE2Uwzqq3t19ObRJyoKztzC/Doyfr0VtDmVXC4GDAV3e"
|
||||
|
||||
@property
|
||||
def cqlsh_conf_path(self):
|
||||
return "~/.cassandra/cqlshrc"
|
||||
|
||||
def install_if_needed(self, packages):
|
||||
"""Prepare the guest machine with a cassandra server installation."""
|
||||
@ -61,86 +125,125 @@ class CassandraApp(object):
|
||||
|
||||
def start_db(self, update_db=False):
|
||||
self.status.start_db_service(
|
||||
system.SERVICE_CANDIDATES, self.state_change_wait_time,
|
||||
self.service_candidates, self.state_change_wait_time,
|
||||
enable_on_boot=True, update_db=update_db)
|
||||
|
||||
def stop_db(self, update_db=False, do_not_start_on_reboot=False):
|
||||
self.status.stop_db_service(
|
||||
system.SERVICE_CANDIDATES, self.state_change_wait_time,
|
||||
self.service_candidates, self.state_change_wait_time,
|
||||
disable_on_boot=do_not_start_on_reboot, update_db=update_db)
|
||||
|
||||
def restart(self):
|
||||
self.status.restart_db_service(
|
||||
system.SERVICE_CANDIDATES, self.state_change_wait_time)
|
||||
self.service_candidates, self.state_change_wait_time)
|
||||
|
||||
def _install_db(self, packages):
|
||||
"""Install cassandra server"""
|
||||
LOG.debug("Installing cassandra server.")
|
||||
packager.pkg_install(packages, None, system.INSTALL_TIMEOUT)
|
||||
packager.pkg_install(packages, None, 10000)
|
||||
LOG.debug("Finished installing Cassandra server")
|
||||
|
||||
def write_config(self, config_contents,
|
||||
execute_function=utils.execute_with_timeout,
|
||||
mkstemp_function=tempfile.mkstemp,
|
||||
unlink_function=os.unlink):
|
||||
def secure(self, update_user=None):
|
||||
"""Configure the Trove administrative user.
|
||||
Update an existing user if given.
|
||||
Create a new one using the default database credentials
|
||||
otherwise and drop the built-in user when finished.
|
||||
"""
|
||||
LOG.info(_('Configuring Trove superuser.'))
|
||||
|
||||
# first securely create a temp file. mkstemp() will set
|
||||
# os.O_EXCL on the open() call, and we get a file with
|
||||
# permissions of 600 by default.
|
||||
(conf_fd, conf_path) = mkstemp_function()
|
||||
current_superuser = update_user or models.CassandraUser(
|
||||
self.default_superuser_name,
|
||||
self.default_superuser_password)
|
||||
|
||||
LOG.debug('Storing temporary configuration at %s.' % conf_path)
|
||||
if update_user:
|
||||
os_admin = models.CassandraUser(update_user.name,
|
||||
utils.generate_random_password())
|
||||
CassandraAdmin(current_superuser).alter_user_password(os_admin)
|
||||
else:
|
||||
os_admin = models.CassandraUser(self._ADMIN_USER,
|
||||
utils.generate_random_password())
|
||||
CassandraAdmin(current_superuser)._create_superuser(os_admin)
|
||||
CassandraAdmin(os_admin).drop_user(current_superuser)
|
||||
|
||||
# write config and close the file, delete it if there is an
|
||||
# error. only unlink if there is a problem. In normal course,
|
||||
# we move the file.
|
||||
try:
|
||||
os.write(conf_fd, config_contents)
|
||||
operating_system.move(conf_path, system.CASSANDRA_CONF,
|
||||
as_root=True)
|
||||
# TODO(denis_makogon): figure out the dynamic way to discover
|
||||
# configs owner since it can cause errors if there is
|
||||
# no cassandra user in operating system
|
||||
operating_system.chown(system.CASSANDRA_CONF,
|
||||
'cassandra', 'cassandra', recursive=False,
|
||||
as_root=True)
|
||||
operating_system.chmod(system.CASSANDRA_CONF,
|
||||
FileMode.ADD_READ_ALL, as_root=True)
|
||||
except Exception:
|
||||
LOG.exception(
|
||||
_("Exception generating Cassandra configuration %s.") %
|
||||
conf_path)
|
||||
unlink_function(conf_path)
|
||||
raise
|
||||
finally:
|
||||
os.close(conf_fd)
|
||||
self.__create_cqlsh_config({self._CONF_AUTH_SEC:
|
||||
{self._CONF_USR_KEY: os_admin.name,
|
||||
self._CONF_PWD_KEY: os_admin.password}})
|
||||
|
||||
# Update the internal status with the new user.
|
||||
self.status = CassandraAppStatus(os_admin)
|
||||
|
||||
return os_admin
|
||||
|
||||
def __create_cqlsh_config(self, sections):
|
||||
config_path = self._get_cqlsh_conf_path()
|
||||
config_dir = os.path.dirname(config_path)
|
||||
if not os.path.exists(config_dir):
|
||||
os.mkdir(config_dir, self._CONF_DIR_MODS)
|
||||
else:
|
||||
os.chmod(config_dir, self._CONF_DIR_MODS)
|
||||
operating_system.write_file(config_path, sections, codec=IniCodec())
|
||||
os.chmod(config_path, self._CONF_FILE_MODS)
|
||||
|
||||
def get_current_superuser(self):
|
||||
"""
|
||||
Build the Trove superuser.
|
||||
Use the stored credentials.
|
||||
If not available fall back to the defaults.
|
||||
"""
|
||||
if self.has_user_config():
|
||||
return self._load_current_superuser()
|
||||
|
||||
LOG.warn(_("Trove administrative user has not been configured yet. "
|
||||
"Using the built-in default: %s")
|
||||
% self.default_superuser_name)
|
||||
return models.CassandraUser(self.default_superuser_name,
|
||||
self.default_superuser_password)
|
||||
|
||||
def has_user_config(self):
|
||||
"""
|
||||
Return TRUE if there is a client configuration file available
|
||||
on the guest.
|
||||
"""
|
||||
return os.path.exists(self._get_cqlsh_conf_path())
|
||||
|
||||
def _load_current_superuser(self):
|
||||
config = operating_system.read_file(self._get_cqlsh_conf_path(),
|
||||
codec=IniCodec())
|
||||
return models.CassandraUser(
|
||||
config[self._CONF_AUTH_SEC][self._CONF_USR_KEY],
|
||||
config[self._CONF_AUTH_SEC][self._CONF_PWD_KEY]
|
||||
)
|
||||
|
||||
def write_config(self, config_contents):
|
||||
|
||||
operating_system.write_file(
|
||||
self.cassandra_conf, config_contents, codec=SafeYamlCodec(),
|
||||
as_root=True)
|
||||
operating_system.chown(self.cassandra_conf,
|
||||
self.cassandra_owner,
|
||||
self.cassandra_owner,
|
||||
recursive=False, as_root=True)
|
||||
operating_system.chmod(self.cassandra_conf,
|
||||
FileMode.ADD_READ_ALL, as_root=True)
|
||||
|
||||
LOG.info(_('Wrote new Cassandra configuration.'))
|
||||
|
||||
def read_conf(self):
|
||||
"""Returns cassandra.yaml in dict structure."""
|
||||
|
||||
LOG.debug("Opening cassandra.yaml.")
|
||||
with open(system.CASSANDRA_CONF, 'r') as config:
|
||||
LOG.debug("Preparing YAML object from cassandra.yaml.")
|
||||
yamled = yaml.load(config.read())
|
||||
return yamled
|
||||
|
||||
def update_config_with_single(self, key, value):
|
||||
"""Updates single key:value in 'cassandra.yaml'."""
|
||||
|
||||
yamled = self.read_conf()
|
||||
yamled = operating_system.read_file(self.cassandra_conf,
|
||||
codec=SafeYamlCodec())
|
||||
yamled.update({key: value})
|
||||
LOG.debug("Updating cassandra.yaml with %(key)s: %(value)s."
|
||||
% {'key': key, 'value': value})
|
||||
dump = yaml.dump(yamled, default_flow_style=False)
|
||||
LOG.debug("Dumping YAML to stream.")
|
||||
self.write_config(dump)
|
||||
self.write_config(yamled)
|
||||
|
||||
def update_conf_with_group(self, group):
|
||||
"""Updates group of key:value in 'cassandra.yaml'."""
|
||||
|
||||
yamled = self.read_conf()
|
||||
yamled = operating_system.read_file(self.cassandra_conf,
|
||||
codec=SafeYamlCodec())
|
||||
for key, value in group.iteritems():
|
||||
if key == 'seed':
|
||||
(yamled.get('seed_provider')[0].
|
||||
@ -150,9 +253,8 @@ class CassandraApp(object):
|
||||
yamled.update({key: value})
|
||||
LOG.debug("Updating cassandra.yaml with %(key)s: %(value)s."
|
||||
% {'key': key, 'value': value})
|
||||
dump = yaml.dump(yamled, default_flow_style=False)
|
||||
LOG.debug("Dumping YAML to stream")
|
||||
self.write_config(dump)
|
||||
self.write_config(yamled)
|
||||
|
||||
def make_host_reachable(self):
|
||||
updates = {
|
||||
@ -180,22 +282,540 @@ class CassandraApp(object):
|
||||
LOG.debug("Resetting configuration")
|
||||
self.write_config(config_contents)
|
||||
|
||||
def _get_cqlsh_conf_path(self):
|
||||
return os.path.expanduser(self.cqlsh_conf_path)
|
||||
|
||||
|
||||
class CassandraAppStatus(service.BaseDbStatus):
|
||||
|
||||
def __init__(self, superuser):
|
||||
"""
|
||||
:param superuser: User account the Status uses for connecting
|
||||
to the database.
|
||||
:type superuser: CassandraUser
|
||||
"""
|
||||
super(CassandraAppStatus, self).__init__()
|
||||
self.__user = superuser
|
||||
|
||||
def set_superuser(self, user):
|
||||
self.__user = user
|
||||
|
||||
def _get_actual_db_status(self):
|
||||
try:
|
||||
# If status check would be successful,
|
||||
# bot stdin and stdout would contain nothing
|
||||
out, err = utils.execute_with_timeout(system.CASSANDRA_STATUS,
|
||||
shell=True)
|
||||
if "Connection error. Could not connect to" not in err:
|
||||
with CassandraLocalhostConnection(self.__user):
|
||||
return rd_instance.ServiceStatuses.RUNNING
|
||||
else:
|
||||
return rd_instance.ServiceStatuses.SHUTDOWN
|
||||
except (exception.ProcessExecutionError, OSError):
|
||||
LOG.exception(_("Error getting Cassandra status"))
|
||||
except NoHostAvailable:
|
||||
return rd_instance.ServiceStatuses.SHUTDOWN
|
||||
except Exception:
|
||||
LOG.exception(_("Error getting Cassandra status."))
|
||||
|
||||
return rd_instance.ServiceStatuses.SHUTDOWN
|
||||
|
||||
def cleanup_stalled_db_services(self):
|
||||
utils.execute_with_timeout(system.CASSANDRA_KILL, shell=True)
|
||||
utils.execute_with_timeout(CassandraApp.CASSANDRA_KILL_CMD, shell=True)
|
||||
|
||||
|
||||
class CassandraAdmin(object):
|
||||
"""Handles administrative tasks on the Cassandra database.
|
||||
|
||||
In Cassandra only SUPERUSERS can create other users and grant permissions
|
||||
to database resources. Trove uses the 'cassandra' superuser to perform its
|
||||
administrative tasks.
|
||||
|
||||
The users it creates are all 'normal' (NOSUPERUSER) accounts.
|
||||
The permissions it can grant are also limited to non-superuser operations.
|
||||
This is to prevent anybody from creating a new superuser via the Trove API.
|
||||
"""
|
||||
|
||||
# Non-superuser grant modifiers.
|
||||
__NO_SUPERUSER_MODIFIERS = ('ALTER', 'CREATE', 'DROP', 'MODIFY', 'SELECT')
|
||||
|
||||
_KS_NAME_REGEX = re.compile('^<keyspace (.+)>$')
|
||||
|
||||
def __init__(self, user):
|
||||
self.__admin_user = user
|
||||
|
||||
def create_user(self, context, users):
|
||||
"""
|
||||
Create new non-superuser accounts.
|
||||
New users are by default granted full access to all database resources.
|
||||
"""
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
for item in users:
|
||||
self._create_user_and_grant(client,
|
||||
self._deserialize_user(item))
|
||||
|
||||
def _create_user_and_grant(self, client, user):
|
||||
"""
|
||||
Create new non-superuser account and grant it full access to its
|
||||
databases.
|
||||
"""
|
||||
self._create_user(client, user)
|
||||
for db in user.databases:
|
||||
self._grant_full_access_on_keyspace(
|
||||
client, self._deserialize_keyspace(db), user)
|
||||
|
||||
def _create_user(self, client, user):
|
||||
# Create only NOSUPERUSER accounts here.
|
||||
LOG.debug("Creating a new user '%s'." % user.name)
|
||||
client.execute("CREATE USER '{}' WITH PASSWORD %s NOSUPERUSER;",
|
||||
(user.name,), (user.password,))
|
||||
|
||||
def _create_superuser(self, user):
|
||||
"""Create a new superuser account and grant it full superuser-level
|
||||
access to all keyspaces.
|
||||
"""
|
||||
LOG.debug("Creating a new superuser '%s'." % user.name)
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
client.execute("CREATE USER '{}' WITH PASSWORD %s SUPERUSER;",
|
||||
(user.name,), (user.password,))
|
||||
client.execute("GRANT ALL PERMISSIONS ON ALL KEYSPACES TO '{}';",
|
||||
(user.name,))
|
||||
|
||||
def delete_user(self, context, user):
|
||||
self.drop_user(self._deserialize_user(user))
|
||||
|
||||
def drop_user(self, user):
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
self._drop_user(client, user)
|
||||
|
||||
def _drop_user(self, client, user):
|
||||
LOG.debug("Deleting user '%s'." % user.name)
|
||||
client.execute("DROP USER '{}';", (user.name, ))
|
||||
|
||||
def get_user(self, context, username, hostname):
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
user = self._find_user(client, username)
|
||||
return user.serialize() if user is not None else None
|
||||
|
||||
def _find_user(self, client, username):
|
||||
"""
|
||||
Lookup a user with a given username.
|
||||
Omit user names on the ignore list.
|
||||
Return a new Cassandra user instance or None if no match is found.
|
||||
"""
|
||||
return next((user for user in self._get_listed_users(client)
|
||||
if user.name == username), None)
|
||||
|
||||
def list_users(self, context, limit=None, marker=None,
|
||||
include_marker=False):
|
||||
"""
|
||||
List all non-superuser accounts. Omit names on the ignored list.
|
||||
Return an empty set if None.
|
||||
"""
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
users = [user.serialize() for user in
|
||||
self._get_listed_users(client)]
|
||||
return pagination.paginate_list(users, limit, marker,
|
||||
include_marker)
|
||||
|
||||
def _get_listed_users(self, client):
|
||||
"""
|
||||
Return a set of unique user instances.
|
||||
Omit user names on the ignore list.
|
||||
"""
|
||||
return self._get_users(
|
||||
client, lambda user: user.name not in self.ignore_users)
|
||||
|
||||
def _get_users(self, client, matcher=None):
|
||||
"""
|
||||
:param matcher Filter expression.
|
||||
:type matcher callable
|
||||
"""
|
||||
acl = self._get_acl(client)
|
||||
return {self._build_user(user.name, acl)
|
||||
for user in client.execute("LIST USERS;")
|
||||
if not matcher or matcher(user)}
|
||||
|
||||
def _load_user(self, client, username, check_reserved=True):
|
||||
if check_reserved:
|
||||
self._check_reserved_user_name(username)
|
||||
|
||||
acl = self._get_acl(client, username=username)
|
||||
return self._build_user(username, acl)
|
||||
|
||||
def _build_user(self, username, acl):
|
||||
user = models.CassandraUser(username)
|
||||
for ks, permissions in acl.get(username, {}).items():
|
||||
if permissions:
|
||||
user.databases.append(models.CassandraSchema(ks).serialize())
|
||||
return user
|
||||
|
||||
def _get_acl(self, client, username=None):
|
||||
"""Return the ACL for a database user.
|
||||
Return ACLs for all users if no particular username is specified.
|
||||
|
||||
The ACL has the following format:
|
||||
{username #1:
|
||||
{keyspace #1: {access mod(s)...},
|
||||
keyspace #2: {...}},
|
||||
username #2:
|
||||
{keyspace #1: {...},
|
||||
keyspace #3: {...}}
|
||||
}
|
||||
"""
|
||||
|
||||
def build_list_query(username):
|
||||
query_tokens = ["LIST ALL PERMISSIONS"]
|
||||
if username:
|
||||
query_tokens.extend(["OF", "'%s'" % username])
|
||||
query_tokens.append("NORECURSIVE;")
|
||||
return ' '.join(query_tokens)
|
||||
|
||||
def parse_keyspace_name(resource):
|
||||
"""Parse a keyspace name from a resource string.
|
||||
The resource string has the following form:
|
||||
<object name>
|
||||
where 'object' is one of the database objects (keyspace, table...).
|
||||
Return the name as a singleton set. Return an empty set if no match
|
||||
is found.
|
||||
"""
|
||||
match = self._KS_NAME_REGEX.match(resource)
|
||||
if match:
|
||||
return {match.group(1)}
|
||||
return {}
|
||||
|
||||
def update_acl(username, keyspace, permission, acl):
|
||||
permissions = acl.get(username, {}).get(keyspace)
|
||||
if permissions is None:
|
||||
guestagent_utils.update_dict({user: {keyspace: {permission}}},
|
||||
acl)
|
||||
else:
|
||||
permissions.add(permission)
|
||||
|
||||
all_keyspace_names = None
|
||||
acl = dict()
|
||||
for item in client.execute(build_list_query(username)):
|
||||
user = item.username
|
||||
resource = item.resource
|
||||
permission = item.permission
|
||||
if user and resource and permission:
|
||||
if resource == '<all keyspaces>':
|
||||
# Cache the full keyspace list to improve performance and
|
||||
# ensure consistent results for all users.
|
||||
if all_keyspace_names is None:
|
||||
all_keyspace_names = {
|
||||
item.name
|
||||
for item in self._get_available_keyspaces(client)
|
||||
}
|
||||
keyspaces = all_keyspace_names
|
||||
else:
|
||||
keyspaces = parse_keyspace_name(resource)
|
||||
|
||||
for keyspace in keyspaces:
|
||||
update_acl(user, keyspace, permission, acl)
|
||||
|
||||
return acl
|
||||
|
||||
def list_superusers(self):
|
||||
"""List all system users existing in the database."""
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
return self._get_users(client, lambda user: user.super)
|
||||
|
||||
def grant_access(self, context, username, hostname, databases):
|
||||
"""
|
||||
Grant full access on keyspaces to a given username.
|
||||
"""
|
||||
user = models.CassandraUser(username)
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
for db in databases:
|
||||
self._grant_full_access_on_keyspace(
|
||||
client, models.CassandraSchema(db), user)
|
||||
|
||||
def revoke_access(self, context, username, hostname, database):
|
||||
"""
|
||||
Revoke all permissions on any database resources from a given username.
|
||||
"""
|
||||
user = models.CassandraUser(username)
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
self._revoke_all_access_on_keyspace(
|
||||
client, models.CassandraSchema(database), user)
|
||||
|
||||
def _grant_full_access_on_keyspace(self, client, keyspace, user,
|
||||
check_reserved=True):
|
||||
"""
|
||||
Grant all non-superuser permissions on a keyspace to a given user.
|
||||
"""
|
||||
if check_reserved:
|
||||
self._check_reserved_user_name(user.name)
|
||||
self._check_reserved_keyspace_name(keyspace.name)
|
||||
|
||||
for access in self.__NO_SUPERUSER_MODIFIERS:
|
||||
self._grant_permission_on_keyspace(client, access, keyspace, user)
|
||||
|
||||
def _grant_permission_on_keyspace(self, client, modifier, keyspace, user):
|
||||
"""
|
||||
Grant a non-superuser permission on a keyspace to a given user.
|
||||
Raise an exception if the caller attempts to grant a superuser access.
|
||||
"""
|
||||
LOG.debug("Granting '%s' access on '%s' to user '%s'."
|
||||
% (modifier, keyspace.name, user.name))
|
||||
if modifier in self.__NO_SUPERUSER_MODIFIERS:
|
||||
client.execute("GRANT {} ON KEYSPACE \"{}\" TO '{}';",
|
||||
(modifier, keyspace.name, user.name))
|
||||
else:
|
||||
raise exception.UnprocessableEntity(
|
||||
"Invalid permission modifier (%s). Allowed values are: '%s'"
|
||||
% (modifier, ', '.join(self.__NO_SUPERUSER_MODIFIERS)))
|
||||
|
||||
def _revoke_all_access_on_keyspace(self, client, keyspace, user,
|
||||
check_reserved=True):
|
||||
if check_reserved:
|
||||
self._check_reserved_user_name(user.name)
|
||||
self._check_reserved_keyspace_name(keyspace.name)
|
||||
|
||||
LOG.debug("Revoking all permissions on '%s' from user '%s'."
|
||||
% (keyspace.name, user.name))
|
||||
client.execute("REVOKE ALL PERMISSIONS ON KEYSPACE \"{}\" FROM '{}';",
|
||||
(keyspace.name, user.name))
|
||||
|
||||
def update_attributes(self, context, username, hostname, user_attrs):
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
user = self._load_user(client, username)
|
||||
new_name = user_attrs.get('name')
|
||||
new_password = user_attrs.get('password')
|
||||
self._update_user(client, user, new_name, new_password)
|
||||
|
||||
def _update_user(self, client, user, new_username, new_password):
|
||||
"""
|
||||
Update a user of a given username.
|
||||
Updatable attributes include username and password.
|
||||
If a new username and password are given a new user with those
|
||||
attributes is created and all permissions from the original
|
||||
user get transfered to it. The original user is then dropped
|
||||
therefore revoking its permissions.
|
||||
If only new password is specified the existing user gets altered
|
||||
with that password.
|
||||
"""
|
||||
if new_username is not None and user.name != new_username:
|
||||
if new_password is not None:
|
||||
self._rename_user(client, user, new_username, new_password)
|
||||
else:
|
||||
raise exception.UnprocessableEntity(
|
||||
_("Updating username requires specifying a password "
|
||||
"as well."))
|
||||
elif new_password is not None and user.password != new_password:
|
||||
user.password = new_password
|
||||
self._alter_user_password(client, user)
|
||||
|
||||
def _rename_user(self, client, user, new_username, new_password):
|
||||
"""
|
||||
Rename a given user also updating its password.
|
||||
Transfer the current permissions to the new username.
|
||||
Drop the old username therefore revoking its permissions.
|
||||
"""
|
||||
LOG.debug("Renaming user '%s' to '%s'" % (user.name, new_username))
|
||||
new_user = models.CassandraUser(new_username, new_password)
|
||||
new_user.databases.extend(user.databases)
|
||||
self._create_user_and_grant(client, new_user)
|
||||
self._drop_user(client, user)
|
||||
|
||||
def alter_user_password(self, user):
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
self._alter_user_password(client, user)
|
||||
|
||||
def change_passwords(self, context, users):
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
for user in users:
|
||||
self._alter_user_password(client, self._deserialize_user(user))
|
||||
|
||||
def _alter_user_password(self, client, user):
|
||||
LOG.debug("Changing password of user '%s'." % user.name)
|
||||
client.execute("ALTER USER '{}' "
|
||||
"WITH PASSWORD %s;", (user.name,), (user.password,))
|
||||
|
||||
def create_database(self, context, databases):
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
for item in databases:
|
||||
self._create_single_node_keyspace(
|
||||
client, self._deserialize_keyspace(item))
|
||||
|
||||
def _create_single_node_keyspace(self, client, keyspace):
|
||||
"""
|
||||
Create a single-replica keyspace.
|
||||
|
||||
Cassandra stores replicas on multiple nodes to ensure reliability and
|
||||
fault tolerance. All replicas are equally important;
|
||||
there is no primary or master.
|
||||
A replication strategy determines the nodes where
|
||||
replicas are placed. SimpleStrategy is for a single data center only.
|
||||
The total number of replicas across the cluster is referred to as the
|
||||
replication factor.
|
||||
|
||||
Replication Strategy:
|
||||
'SimpleStrategy' is not optimized for multiple data centers.
|
||||
'replication_factor' The number of replicas of data on multiple nodes.
|
||||
Required for SimpleStrategy; otherwise, not used.
|
||||
|
||||
Keyspace names are case-insensitive by default.
|
||||
To make a name case-sensitive, enclose it in double quotation marks.
|
||||
"""
|
||||
client.execute("CREATE KEYSPACE \"{}\" WITH REPLICATION = "
|
||||
"{{ 'class' : 'SimpleStrategy', "
|
||||
"'replication_factor' : 1 }};", (keyspace.name,))
|
||||
|
||||
def delete_database(self, context, database):
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
self._drop_keyspace(client, self._deserialize_keyspace(database))
|
||||
|
||||
def _drop_keyspace(self, client, keyspace):
|
||||
LOG.debug("Dropping keyspace '%s'." % keyspace.name)
|
||||
client.execute("DROP KEYSPACE \"{}\";", (keyspace.name,))
|
||||
|
||||
def list_databases(self, context, limit=None, marker=None,
|
||||
include_marker=False):
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
databases = [keyspace.serialize() for keyspace
|
||||
in self._get_available_keyspaces(client)]
|
||||
return pagination.paginate_list(databases, limit, marker,
|
||||
include_marker)
|
||||
|
||||
def _get_available_keyspaces(self, client):
|
||||
"""
|
||||
Return a set of unique keyspace instances.
|
||||
Omit keyspace names on the ignore list.
|
||||
"""
|
||||
return {models.CassandraSchema(db.keyspace_name)
|
||||
for db in client.execute("SELECT * FROM "
|
||||
"system.schema_keyspaces;")
|
||||
if db.keyspace_name not in self.ignore_dbs}
|
||||
|
||||
def list_access(self, context, username, hostname):
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
user = self._find_user(client, username)
|
||||
if user:
|
||||
return user.databases
|
||||
|
||||
raise exception.UserNotFound(username)
|
||||
|
||||
def _deserialize_keyspace(self, keyspace_dict, check_reserved=True):
|
||||
if keyspace_dict:
|
||||
db = models.CassandraSchema.deserialize_schema(keyspace_dict)
|
||||
if check_reserved:
|
||||
self._check_reserved_keyspace_name(db.name)
|
||||
|
||||
return db
|
||||
|
||||
return None
|
||||
|
||||
def _check_reserved_keyspace_name(self, name):
|
||||
if name in self.ignore_dbs:
|
||||
raise ValueError(_("This keyspace-name is reserved: %s") % name)
|
||||
|
||||
def _deserialize_user(self, user_dict, check_reserved=True):
|
||||
if user_dict:
|
||||
user = models.CassandraUser.deserialize_user(user_dict)
|
||||
if check_reserved:
|
||||
self._check_reserved_user_name(user.name)
|
||||
|
||||
return user
|
||||
|
||||
return None
|
||||
|
||||
def _check_reserved_user_name(self, name):
|
||||
if name in self.ignore_users:
|
||||
raise ValueError(_("This user-name is reserved: %s") % name)
|
||||
|
||||
@property
|
||||
def ignore_users(self):
|
||||
return cfg.get_ignored_users(manager=MANAGER)
|
||||
|
||||
@property
|
||||
def ignore_dbs(self):
|
||||
return cfg.get_ignored_dbs(manager=MANAGER)
|
||||
|
||||
|
||||
class CassandraConnection(object):
|
||||
"""A wrapper to manage a Cassandra connection."""
|
||||
|
||||
# Cassandra 2.1 only supports protocol versions 3 and lower.
|
||||
NATIVE_PROTOCOL_VERSION = 3
|
||||
|
||||
def __init__(self, contact_points, user):
|
||||
self.__user = user
|
||||
# A Cluster is initialized with a set of initial contact points.
|
||||
# After the driver connects to one of the nodes it will automatically
|
||||
# discover the rest.
|
||||
# Will connect to '127.0.0.1' if None contact points are given.
|
||||
self._cluster = Cluster(
|
||||
contact_points=contact_points,
|
||||
auth_provider=PlainTextAuthProvider(user.name, user.password),
|
||||
protocol_version=self.NATIVE_PROTOCOL_VERSION)
|
||||
self.__session = None
|
||||
|
||||
def __enter__(self):
|
||||
self.__connect()
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
self.__disconnect()
|
||||
|
||||
def execute(self, query, identifiers=None, data_values=None, timeout=None):
|
||||
"""
|
||||
Execute a query with a given sequence or dict of data values to bind.
|
||||
If a sequence is used, '%s' should be used the placeholder for each
|
||||
argument. If a dict is used, '%(name)s' style placeholders must
|
||||
be used.
|
||||
Only data values should be supplied this way. Other items,
|
||||
such as keyspaces, table names, and column names should be set
|
||||
ahead of time. Use the '{}' style placeholders and
|
||||
'identifiers' parameter for those.
|
||||
Raise an exception if the operation exceeds the given timeout (sec).
|
||||
There is no timeout if set to None.
|
||||
Return a set of rows or an empty list if None.
|
||||
"""
|
||||
if self.__is_active():
|
||||
try:
|
||||
rows = self.__session.execute(self.__bind(query, identifiers),
|
||||
data_values, timeout)
|
||||
return rows or []
|
||||
except OperationTimedOut:
|
||||
LOG.error(_("Query execution timed out."))
|
||||
raise
|
||||
|
||||
LOG.debug("Cannot perform this operation on a closed connection.")
|
||||
raise exception.UnprocessableEntity()
|
||||
|
||||
def __bind(self, query, identifiers):
|
||||
if identifiers:
|
||||
return query.format(*identifiers)
|
||||
return query
|
||||
|
||||
def __connect(self):
|
||||
if not self._cluster.is_shutdown:
|
||||
LOG.debug("Connecting to a Cassandra cluster as '%s'."
|
||||
% self.__user.name)
|
||||
if not self.__is_active():
|
||||
self.__session = self._cluster.connect()
|
||||
else:
|
||||
LOG.debug("Connection already open.")
|
||||
LOG.debug("Connected to cluster: '%s'"
|
||||
% self._cluster.metadata.cluster_name)
|
||||
for host in self._cluster.metadata.all_hosts():
|
||||
LOG.debug("Connected to node: '%s' in rack '%s' at datacenter "
|
||||
"'%s'" % (host.address, host.rack, host.datacenter))
|
||||
else:
|
||||
LOG.debug("Cannot perform this operation on a terminated cluster.")
|
||||
raise exception.UnprocessableEntity()
|
||||
|
||||
def __disconnect(self):
|
||||
if self.__is_active():
|
||||
try:
|
||||
LOG.debug("Disconnecting from cluster: '%s'"
|
||||
% self._cluster.metadata.cluster_name)
|
||||
self._cluster.shutdown()
|
||||
self.__session.shutdown()
|
||||
except Exception:
|
||||
LOG.debug("Failed to disconnect from a Cassandra cluster.")
|
||||
|
||||
def __is_active(self):
|
||||
return self.__session and not self.__session.is_shutdown
|
||||
|
||||
|
||||
class CassandraLocalhostConnection(CassandraConnection):
|
||||
"""
|
||||
A connection to the localhost Cassandra server.
|
||||
"""
|
||||
|
||||
def __init__(self, user):
|
||||
super(CassandraLocalhostConnection, self).__init__(None, user)
|
||||
|
@ -1,31 +0,0 @@
|
||||
# Copyright 2013 Mirantis Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from trove.common import cfg
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
SERVICE_CANDIDATES = ["cassandra"]
|
||||
|
||||
CASSANDRA_DATA_DIR = "/var/lib/cassandra/data"
|
||||
CASSANDRA_CONF = "/etc/cassandra/cassandra.yaml"
|
||||
CASSANDRA_TEMP_CONF = "/tmp/cassandra.yaml"
|
||||
CASSANDRA_TEMP_DIR = "/tmp/cassandra"
|
||||
|
||||
CASSANDRA_STATUS = """echo "use system;" > /tmp/check; cqlsh -f /tmp/check"""
|
||||
|
||||
CASSANDRA_KILL = "sudo killall java || true"
|
||||
SERVICE_STOP_TIMEOUT = 60
|
||||
INSTALL_TIMEOUT = 10000
|
@ -158,6 +158,36 @@ class MongoDBSchema(DatastoreSchema):
|
||||
return ['_name']
|
||||
|
||||
|
||||
class CassandraSchema(DatastoreSchema):
|
||||
"""Represents a Cassandra schema and its associated properties.
|
||||
|
||||
Keyspace names are 32 or fewer alpha-numeric characters and underscores,
|
||||
the first of which is an alpha character.
|
||||
"""
|
||||
|
||||
def __init__(self, name=None, deserializing=False):
|
||||
super(CassandraSchema, self).__init__()
|
||||
|
||||
if not (bool(deserializing) != bool(name)):
|
||||
raise ValueError(_("Bad args. name: %(name)s, "
|
||||
"deserializing %(deser)s.")
|
||||
% ({'name': bool(name),
|
||||
'deser': bool(deserializing)}))
|
||||
if not deserializing:
|
||||
self.name = name
|
||||
|
||||
@property
|
||||
def _max_schema_name_length(self):
|
||||
return 32
|
||||
|
||||
def _is_valid_schema_name(self, value):
|
||||
return True
|
||||
|
||||
@classmethod
|
||||
def _dict_requirements(cls):
|
||||
return ['_name']
|
||||
|
||||
|
||||
class MySQLDatabase(Base):
|
||||
"""Represents a Database and its properties."""
|
||||
|
||||
@ -746,6 +776,45 @@ class MongoDBUser(DatastoreUser):
|
||||
return ['_name']
|
||||
|
||||
|
||||
class CassandraUser(DatastoreUser):
|
||||
"""Represents a Cassandra user and its associated properties."""
|
||||
|
||||
def __init__(self, name=None, password=None, deserializing=False):
|
||||
super(CassandraUser, self).__init__()
|
||||
|
||||
if ((not (bool(deserializing) != bool(name))) or
|
||||
(bool(deserializing) and bool(password))):
|
||||
raise ValueError(_("Bad args. name: %(name)s, "
|
||||
"password %(pass)s, "
|
||||
"deserializing %(deser)s.")
|
||||
% ({'name': bool(name),
|
||||
'pass': bool(password),
|
||||
'deser': bool(deserializing)}))
|
||||
if not deserializing:
|
||||
self.name = name
|
||||
self.password = password
|
||||
|
||||
def _build_database_schema(self, name):
|
||||
return CassandraSchema(name)
|
||||
|
||||
@property
|
||||
def _max_username_length(self):
|
||||
return 65535
|
||||
|
||||
def _is_valid_name(self, value):
|
||||
return True
|
||||
|
||||
def _is_valid_host_name(self, value):
|
||||
return True
|
||||
|
||||
def _is_valid_password(self, value):
|
||||
return True
|
||||
|
||||
@classmethod
|
||||
def _dict_requirements(cls):
|
||||
return ['_name']
|
||||
|
||||
|
||||
class MySQLUser(Base):
|
||||
"""Represents a MySQL User and its associated properties."""
|
||||
|
||||
|
@ -5,8 +5,8 @@ max_hint_window_in_ms: 10800000
|
||||
hinted_handoff_throttle_in_kb: 1024
|
||||
max_hints_delivery_threads: 2
|
||||
batchlog_replay_throttle_in_kb: 1024
|
||||
authenticator: AllowAllAuthenticator
|
||||
authorizer: AllowAllAuthorizer
|
||||
authenticator: org.apache.cassandra.auth.PasswordAuthenticator
|
||||
authorizer: org.apache.cassandra.auth.CassandraAuthorizer
|
||||
permissions_validity_in_ms: 2000
|
||||
partitioner: org.apache.cassandra.dht.Murmur3Partitioner
|
||||
data_file_directories:
|
||||
@ -38,11 +38,11 @@ trickle_fsync: false
|
||||
trickle_fsync_interval_in_kb: 10240
|
||||
storage_port: 7000
|
||||
ssl_storage_port: 7001
|
||||
listen_address: localhost
|
||||
listen_address: 127.0.0.1
|
||||
start_native_transport: true
|
||||
native_transport_port: 9042
|
||||
start_rpc: true
|
||||
rpc_address: localhost
|
||||
rpc_address: 127.0.0.1
|
||||
rpc_port: 9160
|
||||
rpc_keepalive: true
|
||||
rpc_server_type: sync
|
||||
|
@ -185,6 +185,7 @@ register(["user"], user_actions_groups)
|
||||
register(["db2_supported"], common_groups,
|
||||
database_actions_groups, user_actions_groups)
|
||||
register(["cassandra_supported"], common_groups,
|
||||
user_actions_groups, database_actions_groups,
|
||||
backup_groups, configuration_groups)
|
||||
register(["couchbase_supported"], common_groups, backup_groups,
|
||||
root_actions_groups)
|
||||
|
@ -13,17 +13,25 @@
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
import random
|
||||
import string
|
||||
|
||||
from mock import ANY
|
||||
from mock import call
|
||||
from mock import MagicMock
|
||||
from mock import NonCallableMagicMock
|
||||
from mock import patch
|
||||
from oslo_utils import netutils
|
||||
from testtools import ExpectedException
|
||||
|
||||
from trove.common.context import TroveContext
|
||||
from trove.common import exception
|
||||
from trove.common.instance import ServiceStatuses
|
||||
from trove.guestagent.datastore.experimental.cassandra import (
|
||||
manager as cass_manager)
|
||||
from trove.guestagent.datastore.experimental.cassandra import (
|
||||
service as cass_service)
|
||||
from trove.guestagent.db import models
|
||||
from trove.guestagent import pkg as pkg
|
||||
from trove.guestagent import volume
|
||||
from trove.tests.unittests import trove_testtools
|
||||
@ -31,6 +39,32 @@ from trove.tests.unittests import trove_testtools
|
||||
|
||||
class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase):
|
||||
|
||||
__N_GAK = '_get_available_keyspaces'
|
||||
__N_GLU = '_get_listed_users'
|
||||
__N_BU = '_build_user'
|
||||
__N_RU = '_rename_user'
|
||||
__N_AUP = '_alter_user_password'
|
||||
__N_CAU = 'trove.guestagent.db.models.CassandraUser'
|
||||
__N_CU = '_create_user'
|
||||
__N_GFA = '_grant_full_access_on_keyspace'
|
||||
__N_DU = '_drop_user'
|
||||
|
||||
__ACCESS_MODIFIERS = ('ALTER', 'CREATE', 'DROP', 'MODIFY', 'SELECT')
|
||||
__CREATE_DB_FORMAT = (
|
||||
"CREATE KEYSPACE \"{}\" WITH REPLICATION = "
|
||||
"{{ 'class' : 'SimpleStrategy', 'replication_factor' : 1 }};"
|
||||
)
|
||||
__DROP_DB_FORMAT = "DROP KEYSPACE \"{}\";"
|
||||
__CREATE_USR_FORMAT = "CREATE USER '{}' WITH PASSWORD %s NOSUPERUSER;"
|
||||
__ALTER_USR_FORMAT = "ALTER USER '{}' WITH PASSWORD %s;"
|
||||
__DROP_USR_FORMAT = "DROP USER '{}';"
|
||||
__GRANT_FORMAT = "GRANT {} ON KEYSPACE \"{}\" TO '{}';"
|
||||
__REVOKE_FORMAT = "REVOKE ALL PERMISSIONS ON KEYSPACE \"{}\" FROM '{}';"
|
||||
__LIST_PERMISSIONS_FORMAT = "LIST ALL PERMISSIONS NORECURSIVE;"
|
||||
__LIST_PERMISSIONS_OF_FORMAT = "LIST ALL PERMISSIONS OF '{}' NORECURSIVE;"
|
||||
__LIST_DB_FORMAT = "SELECT * FROM system.schema_keyspaces;"
|
||||
__LIST_USR_FORMAT = "LIST USERS;"
|
||||
|
||||
def setUp(self):
|
||||
super(GuestAgentCassandraDBManagerTest, self).setUp()
|
||||
self.real_status = cass_service.CassandraAppStatus.set_status
|
||||
@ -45,6 +79,9 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase):
|
||||
return_value=FakeInstanceServiceStatus())
|
||||
self.context = TroveContext()
|
||||
self.manager = cass_manager.Manager()
|
||||
self.manager._Manager__admin = cass_service.CassandraAdmin(
|
||||
models.CassandraUser('Test'))
|
||||
self.admin = self.manager._Manager__admin
|
||||
self.pkg = cass_service.packager
|
||||
self.real_db_app_status = cass_service.CassandraAppStatus
|
||||
self.origin_os_path_exists = os.path.exists
|
||||
@ -74,10 +111,11 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase):
|
||||
netutils.get_my_ipv4 = self.original_get_ip
|
||||
cass_service.CassandraApp.make_host_reachable = (
|
||||
self.orig_make_host_reachable)
|
||||
cass_service.CassandraAppStatus.set_status = self.real_status
|
||||
|
||||
def test_update_status(self):
|
||||
mock_status = MagicMock()
|
||||
self.manager.appStatus = mock_status
|
||||
self.manager.app.status = mock_status
|
||||
self.manager.update_status(self.context)
|
||||
mock_status.update.assert_any_call()
|
||||
|
||||
@ -109,8 +147,8 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase):
|
||||
|
||||
mock_status = MagicMock()
|
||||
mock_app = MagicMock()
|
||||
self.manager.appStatus = mock_status
|
||||
self.manager.app = mock_app
|
||||
mock_app.status = mock_status
|
||||
self.manager._app = mock_app
|
||||
|
||||
mock_status.begin_install = MagicMock(return_value=None)
|
||||
mock_app.install_if_needed = MagicMock(return_value=None)
|
||||
@ -144,5 +182,419 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase):
|
||||
mock_app.install_if_needed.assert_any_call(packages)
|
||||
mock_app.init_storage_structure.assert_any_call('/var/lib/cassandra')
|
||||
mock_app.make_host_reachable.assert_any_call()
|
||||
mock_app.start_db.assert_any_call()
|
||||
mock_app.start_db.assert_any_call(update_db=False)
|
||||
mock_app.stop_db.assert_any_call()
|
||||
|
||||
def test_keyspace_validation(self):
|
||||
valid_name = self._get_random_name(32)
|
||||
db = models.CassandraSchema(valid_name)
|
||||
self.assertEqual(valid_name, db.name)
|
||||
with ExpectedException(ValueError):
|
||||
models.CassandraSchema(self._get_random_name(33))
|
||||
|
||||
def test_user_validation(self):
|
||||
valid_name = self._get_random_name(65535)
|
||||
usr = models.CassandraUser(valid_name, 'password')
|
||||
self.assertEqual(valid_name, usr.name)
|
||||
self.assertEqual('password', usr.password)
|
||||
with ExpectedException(ValueError):
|
||||
models.CassandraUser(self._get_random_name(65536))
|
||||
|
||||
@classmethod
|
||||
def _serialize_collection(self, *collection):
|
||||
return [item.serialize() for item in collection]
|
||||
|
||||
@classmethod
|
||||
def _get_random_name(self, size, chars=string.letters + string.digits):
|
||||
return ''.join(random.choice(chars) for _ in range(size))
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_create_database(self, conn):
|
||||
db1 = models.CassandraSchema('db1')
|
||||
db2 = models.CassandraSchema('db2')
|
||||
db3 = models.CassandraSchema(self._get_random_name(32))
|
||||
|
||||
self.manager.create_database(self.context,
|
||||
self._serialize_collection(db1, db2, db3))
|
||||
conn.return_value.execute.assert_has_calls([
|
||||
call(self.__CREATE_DB_FORMAT, (db1.name,)),
|
||||
call(self.__CREATE_DB_FORMAT, (db2.name,)),
|
||||
call(self.__CREATE_DB_FORMAT, (db3.name,))
|
||||
])
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_delete_database(self, conn):
|
||||
db = models.CassandraSchema(self._get_random_name(32))
|
||||
self.manager.delete_database(self.context, db.serialize())
|
||||
conn.return_value.execute.assert_called_once_with(
|
||||
self.__DROP_DB_FORMAT, (db.name,))
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_create_user(self, conn):
|
||||
usr1 = models.CassandraUser('usr1')
|
||||
usr2 = models.CassandraUser('usr2', '')
|
||||
usr3 = models.CassandraUser(self._get_random_name(1025), 'password')
|
||||
|
||||
self.manager.create_user(self.context,
|
||||
self._serialize_collection(usr1, usr2, usr3))
|
||||
conn.return_value.execute.assert_has_calls([
|
||||
call(self.__CREATE_USR_FORMAT, (usr1.name,), (usr1.password,)),
|
||||
call(self.__CREATE_USR_FORMAT, (usr2.name,), (usr2.password,)),
|
||||
call(self.__CREATE_USR_FORMAT, (usr3.name,), (usr3.password,))
|
||||
])
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_delete_user(self, conn):
|
||||
usr = models.CassandraUser(self._get_random_name(1025), 'password')
|
||||
self.manager.delete_user(self.context, usr.serialize())
|
||||
conn.return_value.execute.assert_called_once_with(
|
||||
self.__DROP_USR_FORMAT, (usr.name,))
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_change_passwords(self, conn):
|
||||
usr1 = models.CassandraUser('usr1')
|
||||
usr2 = models.CassandraUser('usr2', '')
|
||||
usr3 = models.CassandraUser(self._get_random_name(1025), 'password')
|
||||
|
||||
self.manager.change_passwords(self.context, self._serialize_collection(
|
||||
usr1, usr2, usr3))
|
||||
conn.return_value.execute.assert_has_calls([
|
||||
call(self.__ALTER_USR_FORMAT, (usr1.name,), (usr1.password,)),
|
||||
call(self.__ALTER_USR_FORMAT, (usr2.name,), (usr2.password,)),
|
||||
call(self.__ALTER_USR_FORMAT, (usr3.name,), (usr3.password,))
|
||||
])
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_alter_user_password(self, conn):
|
||||
usr1 = models.CassandraUser('usr1')
|
||||
usr2 = models.CassandraUser('usr2', '')
|
||||
usr3 = models.CassandraUser(self._get_random_name(1025), 'password')
|
||||
|
||||
self.admin.alter_user_password(usr1)
|
||||
self.admin.alter_user_password(usr2)
|
||||
self.admin.alter_user_password(usr3)
|
||||
conn.return_value.execute.assert_has_calls([
|
||||
call(self.__ALTER_USR_FORMAT, (usr1.name,), (usr1.password,)),
|
||||
call(self.__ALTER_USR_FORMAT, (usr2.name,), (usr2.password,)),
|
||||
call(self.__ALTER_USR_FORMAT, (usr3.name,), (usr3.password,))
|
||||
])
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_grant_access(self, conn):
|
||||
usr1 = models.CassandraUser('usr1')
|
||||
usr2 = models.CassandraUser('usr1', 'password')
|
||||
db1 = models.CassandraSchema('db1')
|
||||
db2 = models.CassandraSchema('db2')
|
||||
db3 = models.CassandraSchema('db3')
|
||||
|
||||
self.manager.grant_access(self.context, usr1.name, None, [db1.name,
|
||||
db2.name])
|
||||
self.manager.grant_access(self.context, usr2.name, None, [db3.name])
|
||||
|
||||
expected = []
|
||||
for modifier in self.__ACCESS_MODIFIERS:
|
||||
expected.append(call(self.__GRANT_FORMAT,
|
||||
(modifier, db1.name, usr1.name)))
|
||||
expected.append(call(self.__GRANT_FORMAT,
|
||||
(modifier, db3.name, usr2.name)))
|
||||
|
||||
conn.return_value.execute.assert_has_calls(expected, any_order=True)
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_revoke_access(self, conn):
|
||||
usr1 = models.CassandraUser('usr1')
|
||||
usr2 = models.CassandraUser('usr1', 'password')
|
||||
db1 = models.CassandraSchema('db1')
|
||||
db2 = models.CassandraSchema('db2')
|
||||
|
||||
self.manager.revoke_access(self.context, usr1.name, None, db1.name)
|
||||
self.manager.revoke_access(self.context, usr2.name, None, db2.name)
|
||||
conn.return_value.execute.assert_has_calls([
|
||||
call(self.__REVOKE_FORMAT, (db1.name, usr1.name)),
|
||||
call(self.__REVOKE_FORMAT, (db2.name, usr2.name))
|
||||
])
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_get_available_keyspaces(self, conn):
|
||||
self.manager.list_databases(self.context)
|
||||
conn.return_value.execute.assert_called_once_with(
|
||||
self.__LIST_DB_FORMAT)
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_list_databases(self, conn):
|
||||
db1 = models.CassandraSchema('db1')
|
||||
db2 = models.CassandraSchema('db2')
|
||||
db3 = models.CassandraSchema(self._get_random_name(32))
|
||||
|
||||
with patch.object(self.admin, self.__N_GAK, return_value={db1, db2,
|
||||
db3}):
|
||||
found = self.manager.list_databases(self.context)
|
||||
self.assertEqual(2, len(found))
|
||||
self.assertEqual(3, len(found[0]))
|
||||
self.assertEqual(None, found[1])
|
||||
self.assertIn(db1.serialize(), found[0])
|
||||
self.assertIn(db2.serialize(), found[0])
|
||||
self.assertIn(db3.serialize(), found[0])
|
||||
|
||||
with patch.object(self.admin, self.__N_GAK, return_value=set()):
|
||||
found = self.manager.list_databases(self.context)
|
||||
self.assertEqual(([], None), found)
|
||||
|
||||
def test_get_acl(self):
|
||||
r0 = NonCallableMagicMock(username='user1', resource='<all keyspaces>',
|
||||
permission='SELECT')
|
||||
r1 = NonCallableMagicMock(username='user2', resource='<keyspace ks1>',
|
||||
permission='SELECT')
|
||||
r2 = NonCallableMagicMock(username='user2', resource='<keyspace ks2>',
|
||||
permission='SELECT')
|
||||
r3 = NonCallableMagicMock(username='user2', resource='<keyspace ks2>',
|
||||
permission='ALTER')
|
||||
r4 = NonCallableMagicMock(username='user3', resource='<table ks2.t1>',
|
||||
permission='SELECT')
|
||||
r5 = NonCallableMagicMock(username='user3', resource='',
|
||||
permission='ALTER')
|
||||
r6 = NonCallableMagicMock(username='user3', resource='<keyspace ks2>',
|
||||
permission='')
|
||||
r7 = NonCallableMagicMock(username='user3', resource='',
|
||||
permission='')
|
||||
r8 = NonCallableMagicMock(username='user3', resource='<keyspace ks1>',
|
||||
permission='DELETE')
|
||||
r9 = NonCallableMagicMock(username='user4', resource='<all keyspaces>',
|
||||
permission='UPDATE')
|
||||
r10 = NonCallableMagicMock(username='user4', resource='<keyspace ks1>',
|
||||
permission='DELETE')
|
||||
|
||||
available_ks = {models.CassandraSchema('ks1'),
|
||||
models.CassandraSchema('ks2'),
|
||||
models.CassandraSchema('ks3')}
|
||||
|
||||
mock_result_set = [r0, r1, r2, r3, r4, r5, r6, r7, r8, r9, r9, r9, r10]
|
||||
execute_mock = MagicMock(return_value=mock_result_set)
|
||||
mock_client = MagicMock(execute=execute_mock)
|
||||
|
||||
with patch.object(self.admin,
|
||||
self.__N_GAK, return_value=available_ks) as gak_mock:
|
||||
acl = self.admin._get_acl(mock_client)
|
||||
execute_mock.assert_called_once_with(
|
||||
self.__LIST_PERMISSIONS_FORMAT)
|
||||
gak_mock.assert_called_once_with(mock_client)
|
||||
|
||||
self.assertEqual({'user1': {'ks1': {'SELECT'},
|
||||
'ks2': {'SELECT'},
|
||||
'ks3': {'SELECT'}},
|
||||
'user2': {'ks1': {'SELECT'},
|
||||
'ks2': {'SELECT', 'ALTER'}},
|
||||
'user3': {'ks1': {'DELETE'}},
|
||||
'user4': {'ks1': {'UPDATE', 'DELETE'},
|
||||
'ks2': {'UPDATE'},
|
||||
'ks3': {'UPDATE'}}
|
||||
},
|
||||
acl)
|
||||
|
||||
mock_result_set = [r1, r2, r3]
|
||||
execute_mock = MagicMock(return_value=mock_result_set)
|
||||
mock_client = MagicMock(execute=execute_mock)
|
||||
|
||||
with patch.object(self.admin,
|
||||
self.__N_GAK, return_value=available_ks) as gak_mock:
|
||||
acl = self.admin._get_acl(mock_client, username='user2')
|
||||
execute_mock.assert_called_once_with(
|
||||
self.__LIST_PERMISSIONS_OF_FORMAT.format('user2'))
|
||||
gak_mock.assert_not_called()
|
||||
|
||||
self.assertEqual({'user2': {'ks1': {'SELECT'},
|
||||
'ks2': {'SELECT', 'ALTER'}}}, acl)
|
||||
|
||||
mock_result_set = []
|
||||
execute_mock = MagicMock(return_value=mock_result_set)
|
||||
mock_client = MagicMock(execute=execute_mock)
|
||||
|
||||
with patch.object(self.admin,
|
||||
self.__N_GAK, return_value=available_ks) as gak_mock:
|
||||
acl = self.admin._get_acl(mock_client, username='nonexisting')
|
||||
execute_mock.assert_called_once_with(
|
||||
self.__LIST_PERMISSIONS_OF_FORMAT.format('nonexisting'))
|
||||
gak_mock.assert_not_called()
|
||||
|
||||
self.assertEqual({}, acl)
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_get_listed_users(self, conn):
|
||||
usr1 = models.CassandraUser(self._get_random_name(1025))
|
||||
usr2 = models.CassandraUser(self._get_random_name(1025))
|
||||
usr3 = models.CassandraUser(self._get_random_name(1025))
|
||||
db1 = models.CassandraSchema('db1')
|
||||
db2 = models.CassandraSchema('db2')
|
||||
usr1.databases.append(db1.serialize())
|
||||
usr3.databases.append(db2.serialize())
|
||||
|
||||
rv_1 = NonCallableMagicMock()
|
||||
rv_1.configure_mock(name=usr1.name, super=False)
|
||||
rv_2 = NonCallableMagicMock()
|
||||
rv_2.configure_mock(name=usr2.name, super=False)
|
||||
rv_3 = NonCallableMagicMock()
|
||||
rv_3.configure_mock(name=usr3.name, super=True)
|
||||
|
||||
with patch.object(conn.return_value, 'execute', return_value=iter(
|
||||
[rv_1, rv_2, rv_3])):
|
||||
with patch.object(self.admin, '_get_acl',
|
||||
return_value={usr1.name: {db1.name: {'SELECT'},
|
||||
db2.name: {}},
|
||||
usr3.name: {db2.name: {'SELECT'}}}
|
||||
):
|
||||
usrs = self.manager.list_users(self.context)
|
||||
conn.return_value.execute.assert_has_calls([
|
||||
call(self.__LIST_USR_FORMAT),
|
||||
], any_order=True)
|
||||
self.assertIn(usr1.serialize(), usrs[0])
|
||||
self.assertIn(usr2.serialize(), usrs[0])
|
||||
self.assertIn(usr3.serialize(), usrs[0])
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_list_access(self, conn):
|
||||
usr1 = models.CassandraUser('usr1')
|
||||
usr2 = models.CassandraUser('usr2')
|
||||
usr3 = models.CassandraUser(self._get_random_name(1025), 'password')
|
||||
db1 = models.CassandraSchema('db1').serialize()
|
||||
db2 = models.CassandraSchema('db2').serialize()
|
||||
usr2.databases.append(db1)
|
||||
usr3.databases.append(db1)
|
||||
usr3.databases.append(db2)
|
||||
|
||||
with patch.object(self.admin, self.__N_GLU, return_value={usr1, usr2,
|
||||
usr3}):
|
||||
usr1_dbs = self.manager.list_access(self.context, usr1.name, None)
|
||||
usr2_dbs = self.manager.list_access(self.context, usr2.name, None)
|
||||
usr3_dbs = self.manager.list_access(self.context, usr3.name, None)
|
||||
self.assertEqual([], usr1_dbs)
|
||||
self.assertEqual([db1], usr2_dbs)
|
||||
self.assertEqual([db1, db2], usr3_dbs)
|
||||
|
||||
with patch.object(self.admin, self.__N_GLU, return_value=set()):
|
||||
with ExpectedException(exception.UserNotFound):
|
||||
self.manager.list_access(self.context, usr3.name, None)
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_list_users(self, conn):
|
||||
usr1 = models.CassandraUser('usr1')
|
||||
usr2 = models.CassandraUser('usr2')
|
||||
usr3 = models.CassandraUser(self._get_random_name(1025), 'password')
|
||||
|
||||
with patch.object(self.admin, self.__N_GLU, return_value={usr1, usr2,
|
||||
usr3}):
|
||||
found = self.manager.list_users(self.context)
|
||||
self.assertEqual(2, len(found))
|
||||
self.assertEqual(3, len(found[0]))
|
||||
self.assertEqual(None, found[1])
|
||||
self.assertIn(usr1.serialize(), found[0])
|
||||
self.assertIn(usr2.serialize(), found[0])
|
||||
self.assertIn(usr3.serialize(), found[0])
|
||||
|
||||
with patch.object(self.admin, self.__N_GLU, return_value=set()):
|
||||
self.assertEqual(([], None), self.manager.list_users(self.context))
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_get_user(self, conn):
|
||||
usr1 = models.CassandraUser('usr1')
|
||||
usr2 = models.CassandraUser('usr2')
|
||||
usr3 = models.CassandraUser(self._get_random_name(1025), 'password')
|
||||
|
||||
with patch.object(self.admin, self.__N_GLU, return_value={usr1, usr2,
|
||||
usr3}):
|
||||
found = self.manager.get_user(self.context, usr2.name, None)
|
||||
self.assertEqual(usr2.serialize(), found)
|
||||
|
||||
with patch.object(self.admin, self.__N_GLU, return_value=set()):
|
||||
self.assertIsNone(
|
||||
self.manager.get_user(self.context, usr2.name, None))
|
||||
|
||||
@patch.object(cass_service.CassandraAdmin, '_deserialize_keyspace',
|
||||
side_effect=lambda p1: p1)
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_rename_user(self, conn, ks_deserializer):
|
||||
usr = models.CassandraUser('usr')
|
||||
db1 = models.CassandraSchema('db1').serialize()
|
||||
db2 = models.CassandraSchema('db2').serialize()
|
||||
usr.databases.append(db1)
|
||||
usr.databases.append(db2)
|
||||
|
||||
new_user = models.CassandraUser('new_user')
|
||||
with patch(self.__N_CAU, return_value=new_user):
|
||||
with patch.object(self.admin, self.__N_BU, return_value=usr):
|
||||
with patch.object(self.admin, self.__N_CU) as create:
|
||||
with patch.object(self.admin, self.__N_GFA) as grant:
|
||||
with patch.object(self.admin, self.__N_DU) as drop:
|
||||
usr_attrs = {'name': 'user', 'password': 'trove'}
|
||||
self.manager.update_attributes(self.context,
|
||||
usr.name, None,
|
||||
usr_attrs)
|
||||
create.assert_called_once_with(ANY, new_user)
|
||||
grant.assert_has_calls([call(ANY, db1, ANY),
|
||||
call(ANY, db2, ANY)])
|
||||
drop.assert_called_once_with(ANY, usr)
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_update_attributes(self, conn):
|
||||
usr = models.CassandraUser('usr', 'pwd')
|
||||
|
||||
with patch.object(self.admin, self.__N_BU, return_value=usr):
|
||||
usr_attrs = {'name': usr.name, 'password': usr.password}
|
||||
with patch.object(self.admin, self.__N_RU) as rename:
|
||||
with patch.object(self.admin, self.__N_AUP) as alter:
|
||||
self.manager.update_attributes(self.context, usr.name,
|
||||
None, usr_attrs)
|
||||
self.assertEqual(0, rename.call_count)
|
||||
self.assertEqual(0, alter.call_count)
|
||||
|
||||
usr_attrs = {'name': 'user', 'password': 'password'}
|
||||
with patch.object(self.admin, self.__N_RU) as rename:
|
||||
with patch.object(self.admin, self.__N_AUP) as alter:
|
||||
self.manager.update_attributes(self.context, usr.name,
|
||||
None, usr_attrs)
|
||||
rename.assert_called_once_with(ANY, usr, usr_attrs['name'],
|
||||
usr_attrs['password'])
|
||||
self.assertEqual(0, alter.call_count)
|
||||
|
||||
usr_attrs = {'name': 'user', 'password': usr.password}
|
||||
with patch.object(self.admin, self.__N_RU) as rename:
|
||||
with patch.object(self.admin, self.__N_AUP) as alter:
|
||||
self.manager.update_attributes(self.context, usr.name,
|
||||
None, usr_attrs)
|
||||
rename.assert_called_once_with(ANY, usr, usr_attrs['name'],
|
||||
usr_attrs['password'])
|
||||
self.assertEqual(0, alter.call_count)
|
||||
|
||||
usr_attrs = {'name': 'user'}
|
||||
with patch.object(self.admin, self.__N_RU) as rename:
|
||||
with patch.object(self.admin, self.__N_AUP) as alter:
|
||||
with ExpectedException(
|
||||
exception.UnprocessableEntity, "Updating username "
|
||||
"requires specifying a password as well."):
|
||||
self.manager.update_attributes(self.context, usr.name,
|
||||
None, usr_attrs)
|
||||
self.assertEqual(0, rename.call_count)
|
||||
self.assertEqual(0, alter.call_count)
|
||||
|
||||
usr_attrs = {'name': usr.name, 'password': 'password'}
|
||||
with patch.object(self.admin, self.__N_RU) as rename:
|
||||
with patch.object(self.admin, self.__N_AUP) as alter:
|
||||
self.manager.update_attributes(self.context, usr.name,
|
||||
None, usr_attrs)
|
||||
alter.assert_called_once_with(ANY, usr)
|
||||
self.assertEqual(0, rename.call_count)
|
||||
|
||||
usr_attrs = {'password': usr.password}
|
||||
with patch.object(self.admin, self.__N_RU) as rename:
|
||||
with patch.object(self.admin, self.__N_AUP) as alter:
|
||||
self.manager.update_attributes(self.context, usr.name,
|
||||
None, usr_attrs)
|
||||
self.assertEqual(0, rename.call_count)
|
||||
self.assertEqual(0, alter.call_count)
|
||||
|
||||
usr_attrs = {'password': 'trove'}
|
||||
with patch.object(self.admin, self.__N_RU) as rename:
|
||||
with patch.object(self.admin, self.__N_AUP) as alter:
|
||||
self.manager.update_attributes(self.context, usr.name,
|
||||
None, usr_attrs)
|
||||
alter.assert_called_once_with(ANY, usr)
|
||||
self.assertEqual(0, rename.call_count)
|
||||
|
@ -45,8 +45,6 @@ from trove.guestagent.common.operating_system import FileMode
|
||||
from trove.guestagent.common import sql_query
|
||||
from trove.guestagent.datastore.experimental.cassandra import (
|
||||
service as cass_service)
|
||||
from trove.guestagent.datastore.experimental.cassandra import (
|
||||
system as cass_system)
|
||||
from trove.guestagent.datastore.experimental.couchbase import (
|
||||
service as couchservice)
|
||||
from trove.guestagent.datastore.experimental.couchdb import (
|
||||
@ -2354,17 +2352,14 @@ class CassandraDBAppTest(BaseAppTest.AppTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(CassandraDBAppTest, self).setUp(str(uuid4()))
|
||||
self.exec_patch = patch.object(utils, 'execute_with_timeout')
|
||||
self.addCleanup(self.exec_patch.stop)
|
||||
self.exec_mock = self.exec_patch.start()
|
||||
self.sleep = time.sleep
|
||||
self.orig_time_time = time.time
|
||||
self.pkg_version = cass_service.packager.pkg_version
|
||||
self.pkg = cass_service.packager
|
||||
util.init_db()
|
||||
status = FakeAppStatus(self.FAKE_ID,
|
||||
rd_instance.ServiceStatuses.NEW)
|
||||
self.cassandra = cass_service.CassandraApp(status)
|
||||
self.cassandra = cass_service.CassandraApp()
|
||||
self.cassandra.status = FakeAppStatus(self.FAKE_ID,
|
||||
rd_instance.ServiceStatuses.NEW)
|
||||
self.orig_unlink = os.unlink
|
||||
|
||||
@property
|
||||
@ -2381,14 +2376,14 @@ class CassandraDBAppTest(BaseAppTest.AppTestCase):
|
||||
|
||||
@property
|
||||
def expected_service_candidates(self):
|
||||
return cass_system.SERVICE_CANDIDATES
|
||||
return self.cassandra.service_candidates
|
||||
|
||||
def tearDown(self):
|
||||
super(CassandraDBAppTest, self).tearDown()
|
||||
time.sleep = self.sleep
|
||||
time.time = self.orig_time_time
|
||||
cass_service.packager.pkg_version = self.pkg_version
|
||||
cass_service.packager = self.pkg
|
||||
super(CassandraDBAppTest, self).tearDown()
|
||||
|
||||
def assert_reported_status(self, expected_status):
|
||||
service_status = InstanceServiceStatus.find_by(
|
||||
@ -2397,10 +2392,9 @@ class CassandraDBAppTest(BaseAppTest.AppTestCase):
|
||||
|
||||
@patch.object(utils, 'execute_with_timeout')
|
||||
def test_service_cleanup(self, exec_mock):
|
||||
cass_service.CassandraAppStatus().cleanup_stalled_db_services()
|
||||
exec_mock.assert_called_once_with(
|
||||
cass_system.CASSANDRA_KILL,
|
||||
shell=True)
|
||||
cass_service.CassandraAppStatus(Mock()).cleanup_stalled_db_services()
|
||||
exec_mock.assert_called_once_with(self.cassandra.CASSANDRA_KILL_CMD,
|
||||
shell=True)
|
||||
|
||||
def test_install(self):
|
||||
|
||||
@ -2424,72 +2418,24 @@ class CassandraDBAppTest(BaseAppTest.AppTestCase):
|
||||
|
||||
self.assert_reported_status(rd_instance.ServiceStatuses.NEW)
|
||||
|
||||
@patch('trove.guestagent.datastore.experimental.cassandra.service.LOG')
|
||||
def test_cassandra_error_in_write_config_verify_unlink(self, *args):
|
||||
# this test verifies not only that the write_config
|
||||
# method properly invoked execute, but also that it properly
|
||||
# attempted to unlink the file (as a result of the exception)
|
||||
|
||||
mock_unlink = Mock(return_value=0)
|
||||
|
||||
# We call tempfile.mkstemp() here and Mock() the mkstemp()
|
||||
# parameter to write_config for testability.
|
||||
(temp_handle, temp_config_name) = tempfile.mkstemp()
|
||||
mock_mkstemp = MagicMock(return_value=(temp_handle, temp_config_name))
|
||||
|
||||
configuration = 'this is my configuration'
|
||||
|
||||
with patch('trove.guestagent.common.operating_system.move',
|
||||
side_effect=ProcessExecutionError('some exception')):
|
||||
self.assertRaises(ProcessExecutionError,
|
||||
self.cassandra.write_config,
|
||||
config_contents=configuration,
|
||||
execute_function=Mock(),
|
||||
mkstemp_function=mock_mkstemp,
|
||||
unlink_function=mock_unlink)
|
||||
|
||||
self.assertEqual(1, mock_unlink.call_count)
|
||||
|
||||
# really delete the temporary_config_file
|
||||
os.unlink(temp_config_name)
|
||||
|
||||
@patch.multiple('trove.guestagent.common.operating_system',
|
||||
chown=DEFAULT, chmod=DEFAULT, move=DEFAULT)
|
||||
def test_cassandra_write_config(self, chown, chmod, move):
|
||||
# ensure that write_config creates a temporary file, and then
|
||||
# moves the file to the final place. Also validate the
|
||||
# contents of the file written.
|
||||
|
||||
# We call tempfile.mkstemp() here and Mock() the mkstemp()
|
||||
# parameter to write_config for testability.
|
||||
(temp_handle, temp_config_name) = tempfile.mkstemp()
|
||||
mock_mkstemp = MagicMock(return_value=(temp_handle, temp_config_name))
|
||||
|
||||
chown=DEFAULT, chmod=DEFAULT, write_file=DEFAULT)
|
||||
def test_cassandra_write_config(self, chown, chmod, write_file):
|
||||
configuration = 'some arbitrary configuration text'
|
||||
self.cassandra.write_config(configuration)
|
||||
|
||||
mock_execute = MagicMock(return_value=('', ''))
|
||||
|
||||
self.cassandra.write_config(configuration,
|
||||
execute_function=mock_execute,
|
||||
mkstemp_function=mock_mkstemp)
|
||||
|
||||
move.assert_called_with(temp_config_name, cass_system.CASSANDRA_CONF,
|
||||
as_root=True)
|
||||
chown.assert_called_with(cass_system.CASSANDRA_CONF,
|
||||
write_file.assert_called_with(
|
||||
self.cassandra.cassandra_conf,
|
||||
configuration,
|
||||
codec=ANY,
|
||||
as_root=True)
|
||||
chown.assert_called_with(self.cassandra.cassandra_conf,
|
||||
"cassandra", "cassandra", recursive=False,
|
||||
as_root=True)
|
||||
chmod.assert_called_with(
|
||||
cass_system.CASSANDRA_CONF, FileMode.ADD_READ_ALL, as_root=True)
|
||||
|
||||
self.assertEqual(1, mock_mkstemp.call_count)
|
||||
|
||||
with open(temp_config_name, 'r') as config_file:
|
||||
configuration_data = config_file.read()
|
||||
|
||||
self.assertEqual(configuration, configuration_data)
|
||||
|
||||
# really delete the temporary_config_file
|
||||
os.unlink(temp_config_name)
|
||||
self.cassandra.cassandra_conf,
|
||||
FileMode.ADD_READ_ALL,
|
||||
as_root=True)
|
||||
|
||||
|
||||
class CouchbaseAppTest(BaseAppTest.AppTestCase):
|
||||
|
Loading…
x
Reference in New Issue
Block a user