From 01ff9529415ae46a5a8b2d3fa9981cb80e99cd5c Mon Sep 17 00:00:00 2001 From: Petr Malik Date: Tue, 17 Mar 2015 15:40:06 -0400 Subject: [PATCH] Implement user functions for Cassandra datastore This patch set implements the following functionality for Cassandra datastore. create/delete/get user list users change password grant/revoke/list access update attributes create/delete database list databases Notes on Cassandra users: In Cassandra only SUPERUSERS can create other users and grant permissions to database resources. Trove uses the 'os_admin' superuser to perform its administrative tasks. It proactively removes the built-in 'cassandra' superuser on prepare. The users it creates are all 'normal' (NOSUPERUSER) accounts. The permissions it can grant are also limited to non-superuser operations. This is to prevent anybody from creating a new superuser via the Trove API. Updatable attributes include username and password. The configuration template had to be updated to enable authentication and authorization support (original configuration allowed anonymous connections). Default implementations used are: authenticator: org.apache.cassandra.auth.PasswordAuthenticator authorizer: org.apache.cassandra.auth.CassandraAuthorizer The superuser password is set to a random Trove password which is then stored in a Trove-read-only file in '~/.cassandra/cqlshrc' which is also the default location for client settings. Notes on Cassandra keyspaces: Cassandra stores replicas on multiple nodes to ensure reliability and fault tolerance. All replicas are equally important; there is no primary or master. A replication strategy determines the nodes where replicas are placed. The total number of replicas across the cluster is referred to as the replication factor. The above 'create database' implementation uses 'SimpleStrategy' with just a single replica on the guest machine. This is a very simplistic configuration only good for the most basic applications and demonstration purposes. SimpleStrategy is for a single data center only. The following system keyspaces have been included in the default 'ignore_dbs' configuration list and therefore excluded from all database operations: 'system', 'system_auth', 'system_traces' Notes on user rename: Cassandra does not have a native way for renaming users. The reason why Cassandra itself does not implement rename is apparently just lack of demand for that feature. We implement it by creating a new user, transferring permissions and dropping the old one (which also removes its existing permissions). I asked about the sanity of this rename approach on the Cassandra mailing list and IRC channel and there should not be anything inherently wrong with the proposed procedure. This method, however, requires the user to always provide a password. Additional notes: Trove uses the official open-source Python driver for Cassandra to connect to the database and execute queries. The connection is implemented in CassandraConnection. It is now also used to obtain the current database status as opposed to the original method of parsing output of the client tool. The 'common/operating_system' module was extended with two new functions for reading/writing ini-style and YAML configuration files to/from Python dicts. Unit tests were added to 'guestagent/test_operating_system'. The existing Manager unit tests were extended to include the added functionality. Also includes some minor improvements to comments and log messages. Used the existing operating_system interface to update file ownership. The system module was removed and its contents moved to the Application class. This is to reduce the number of files and help facilitate overriding. Implements: blueprint cassandra-database-user-functions Depends-On: I0faa3a4b9c7302064fb2413b572e2fc515efff0d Change-Id: I7021f6a0e9a3a933f00cfb7a5d987dc6fe2f95a6 --- test-requirements.txt | 1 + trove/common/cfg.py | 5 + .../experimental/cassandra/manager.py | 78 +- .../experimental/cassandra/service.py | 752 ++++++++++++++++-- .../experimental/cassandra/system.py | 31 - trove/guestagent/db/models.py | 69 ++ trove/templates/cassandra/config.template | 8 +- trove/tests/int_tests.py | 1 + .../guestagent/test_cassandra_manager.py | 460 ++++++++++- .../tests/unittests/guestagent/test_dbaas.py | 94 +-- 10 files changed, 1311 insertions(+), 188 deletions(-) delete mode 100644 trove/guestagent/datastore/experimental/cassandra/system.py diff --git a/test-requirements.txt b/test-requirements.txt index 5de00caad9..771b9479a6 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -22,3 +22,4 @@ testrepository>=0.0.18 # Apache-2.0/BSD pymongo>=3.0.2 # Apache-2.0 redis>=2.10.0 # MIT psycopg2>=2.5 # LGPL/ZPL +cassandra-driver>=2.1.4 # Apache-2.0 diff --git a/trove/common/cfg.py b/trove/common/cfg.py index a7e6887791..9450a4ac0e 100644 --- a/trove/common/cfg.py +++ b/trove/common/cfg.py @@ -814,6 +814,11 @@ cassandra_opts = [ cfg.StrOpt('root_controller', default='trove.extensions.common.service.DefaultRootController', help='Root controller implementation for cassandra.'), + cfg.ListOpt('ignore_users', default=['os_admin'], + help='Users to exclude when listing users.'), + cfg.ListOpt('ignore_dbs', default=['system', 'system_auth', + 'system_traces'], + help='Databases to exclude when listing databases.'), cfg.StrOpt('guest_log_exposed_logs', default='', help='List of Guest Logs to expose for publishing.'), ] diff --git a/trove/guestagent/datastore/experimental/cassandra/manager.py b/trove/guestagent/datastore/experimental/cassandra/manager.py index f2d8036de1..99a48f6ab3 100644 --- a/trove/guestagent/datastore/experimental/cassandra/manager.py +++ b/trove/guestagent/datastore/experimental/cassandra/manager.py @@ -15,10 +15,14 @@ # import os +import yaml from oslo_log import log as logging from trove.guestagent.datastore.experimental.cassandra import service +from trove.guestagent.datastore.experimental.cassandra.service import ( + CassandraAdmin +) from trove.guestagent.datastore import manager from trove.guestagent import volume @@ -29,13 +33,21 @@ LOG = logging.getLogger(__name__) class Manager(manager.Manager): def __init__(self): - self.appStatus = service.CassandraAppStatus() - self.app = service.CassandraApp(self.appStatus) + self._app = service.CassandraApp() + self.__admin = CassandraAdmin(self.app.get_current_superuser()) super(Manager, self).__init__('cassandra') @property def status(self): - return self.appStatus + return self.app.status + + @property + def app(self): + return self._app + + @property + def admin(self): + return self.__admin def restart(self, context): self.app.restart() @@ -62,25 +74,73 @@ class Manager(manager.Manager): # FIXME(amrith) Once the cassandra bug # https://issues.apache.org/jira/browse/CASSANDRA-2356 # is fixed, this code may have to be revisited. - LOG.debug("Stopping database prior to changes.") + LOG.debug("Stopping database prior to initial configuration.") self.app.stop_db() if config_contents: - LOG.debug("Processing configuration.") - self.app.write_config(config_contents) + LOG.debug("Applying configuration.") + self.app.write_config(yaml.load(config_contents)) self.app.make_host_reachable() if device_path: + LOG.debug("Preparing data volume.") device = volume.VolumeDevice(device_path) # unmount if device is already mounted device.unmount_device(device_path) device.format() if os.path.exists(mount_point): # rsync exiting data + LOG.debug("Migrating existing data.") device.migrate_data(mount_point) # mount the volume - device.mount(mount_point) LOG.debug("Mounting new volume.") + device.mount(mount_point) - LOG.debug("Restarting database after changes.") - self.app.start_db() + LOG.debug("Starting database with configuration changes.") + self.app.start_db(update_db=False) + + if not self.app.has_user_config(): + LOG.debug("Securing superuser access.") + self.app.secure() + self.app.restart() + + self.__admin = CassandraAdmin(self.app.get_current_superuser()) + + def change_passwords(self, context, users): + self.admin.change_passwords(context, users) + + def update_attributes(self, context, username, hostname, user_attrs): + self.admin.update_attributes(context, username, hostname, user_attrs) + + def create_database(self, context, databases): + self.admin.create_database(context, databases) + + def create_user(self, context, users): + self.admin.create_user(context, users) + + def delete_database(self, context, database): + self.admin.delete_database(context, database) + + def delete_user(self, context, user): + self.admin.delete_user(context, user) + + def get_user(self, context, username, hostname): + return self.admin.get_user(context, username, hostname) + + def grant_access(self, context, username, hostname, databases): + self.admin.grant_access(context, username, hostname, databases) + + def revoke_access(self, context, username, hostname, database): + self.admin.revoke_access(context, username, hostname, database) + + def list_access(self, context, username, hostname): + return self.admin.list_access(context, username, hostname) + + def list_databases(self, context, limit=None, marker=None, + include_marker=False): + return self.admin.list_databases(context, limit, marker, + include_marker) + + def list_users(self, context, limit=None, marker=None, + include_marker=False): + return self.admin.list_users(context, limit, marker, include_marker) diff --git a/trove/guestagent/datastore/experimental/cassandra/service.py b/trove/guestagent/datastore/experimental/cassandra/service.py index 2c5379cb23..5369219180 100644 --- a/trove/guestagent/datastore/experimental/cassandra/service.py +++ b/trove/guestagent/datastore/experimental/cassandra/service.py @@ -14,26 +14,35 @@ # under the License. import os -import tempfile +import re +import stat +from cassandra.auth import PlainTextAuthProvider +from cassandra.cluster import Cluster +from cassandra.cluster import NoHostAvailable +from cassandra import OperationTimedOut from oslo_log import log as logging from oslo_utils import netutils -import yaml from trove.common import cfg from trove.common import exception from trove.common.i18n import _ from trove.common import instance as rd_instance +from trove.common import pagination +from trove.common.stream_codecs import IniCodec +from trove.common.stream_codecs import SafeYamlCodec from trove.common import utils +from trove.guestagent.common import guestagent_utils from trove.guestagent.common import operating_system from trove.guestagent.common.operating_system import FileMode -from trove.guestagent.datastore.experimental.cassandra import system from trove.guestagent.datastore import service +from trove.guestagent.db import models from trove.guestagent import pkg LOG = logging.getLogger(__name__) CONF = cfg.CONF +MANAGER = CONF.datastore_manager if CONF.datastore_manager else 'cassandra' packager = pkg.Package() @@ -41,10 +50,65 @@ packager = pkg.Package() class CassandraApp(object): """Prepares DBaaS on a Guest container.""" - def __init__(self, status): + _ADMIN_USER = 'os_admin' + + _CONF_AUTH_SEC = 'authentication' + _CONF_USR_KEY = 'username' + _CONF_PWD_KEY = 'password' + _CONF_DIR_MODS = stat.S_IRWXU + _CONF_FILE_MODS = stat.S_IRUSR + + CASSANDRA_KILL_CMD = "sudo killall java || true" + + def __init__(self): """By default login with root no password for initial setup.""" self.state_change_wait_time = CONF.state_change_wait_time - self.status = status + self.status = CassandraAppStatus(self.get_current_superuser()) + + @property + def service_candidates(self): + return ['cassandra'] + + @property + def cassandra_conf(self): + return { + operating_system.REDHAT: + "/etc/cassandra/default.conf/cassandra.yaml", + operating_system.DEBIAN: + "/etc/cassandra/cassandra.yaml", + operating_system.SUSE: + "/etc/cassandra/default.conf/cassandra.yaml" + }[operating_system.get_os()] + + @property + def cassandra_owner(self): + return 'cassandra' + + @property + def cassandra_data_dir(self): + return guestagent_utils.build_file_path( + self.cassandra_working_dir, 'data') + + @property + def cassandra_working_dir(self): + return "/var/lib/cassandra" + + @property + def default_superuser_name(self): + return "cassandra" + + @property + def default_superuser_password(self): + return "cassandra" + + @property + def default_superuser_pwd_hash(self): + # Default 'salted_hash' value for 'cassandra' user on Cassandra 2.1. + return "$2a$10$wPEVuXBU7WE2Uwzqq3t19ObRJyoKztzC/Doyfr0VtDmVXC4GDAV3e" + + @property + def cqlsh_conf_path(self): + return "~/.cassandra/cqlshrc" def install_if_needed(self, packages): """Prepare the guest machine with a cassandra server installation.""" @@ -61,86 +125,125 @@ class CassandraApp(object): def start_db(self, update_db=False): self.status.start_db_service( - system.SERVICE_CANDIDATES, self.state_change_wait_time, + self.service_candidates, self.state_change_wait_time, enable_on_boot=True, update_db=update_db) def stop_db(self, update_db=False, do_not_start_on_reboot=False): self.status.stop_db_service( - system.SERVICE_CANDIDATES, self.state_change_wait_time, + self.service_candidates, self.state_change_wait_time, disable_on_boot=do_not_start_on_reboot, update_db=update_db) def restart(self): self.status.restart_db_service( - system.SERVICE_CANDIDATES, self.state_change_wait_time) + self.service_candidates, self.state_change_wait_time) def _install_db(self, packages): """Install cassandra server""" LOG.debug("Installing cassandra server.") - packager.pkg_install(packages, None, system.INSTALL_TIMEOUT) + packager.pkg_install(packages, None, 10000) LOG.debug("Finished installing Cassandra server") - def write_config(self, config_contents, - execute_function=utils.execute_with_timeout, - mkstemp_function=tempfile.mkstemp, - unlink_function=os.unlink): + def secure(self, update_user=None): + """Configure the Trove administrative user. + Update an existing user if given. + Create a new one using the default database credentials + otherwise and drop the built-in user when finished. + """ + LOG.info(_('Configuring Trove superuser.')) - # first securely create a temp file. mkstemp() will set - # os.O_EXCL on the open() call, and we get a file with - # permissions of 600 by default. - (conf_fd, conf_path) = mkstemp_function() + current_superuser = update_user or models.CassandraUser( + self.default_superuser_name, + self.default_superuser_password) - LOG.debug('Storing temporary configuration at %s.' % conf_path) + if update_user: + os_admin = models.CassandraUser(update_user.name, + utils.generate_random_password()) + CassandraAdmin(current_superuser).alter_user_password(os_admin) + else: + os_admin = models.CassandraUser(self._ADMIN_USER, + utils.generate_random_password()) + CassandraAdmin(current_superuser)._create_superuser(os_admin) + CassandraAdmin(os_admin).drop_user(current_superuser) - # write config and close the file, delete it if there is an - # error. only unlink if there is a problem. In normal course, - # we move the file. - try: - os.write(conf_fd, config_contents) - operating_system.move(conf_path, system.CASSANDRA_CONF, - as_root=True) - # TODO(denis_makogon): figure out the dynamic way to discover - # configs owner since it can cause errors if there is - # no cassandra user in operating system - operating_system.chown(system.CASSANDRA_CONF, - 'cassandra', 'cassandra', recursive=False, - as_root=True) - operating_system.chmod(system.CASSANDRA_CONF, - FileMode.ADD_READ_ALL, as_root=True) - except Exception: - LOG.exception( - _("Exception generating Cassandra configuration %s.") % - conf_path) - unlink_function(conf_path) - raise - finally: - os.close(conf_fd) + self.__create_cqlsh_config({self._CONF_AUTH_SEC: + {self._CONF_USR_KEY: os_admin.name, + self._CONF_PWD_KEY: os_admin.password}}) + + # Update the internal status with the new user. + self.status = CassandraAppStatus(os_admin) + + return os_admin + + def __create_cqlsh_config(self, sections): + config_path = self._get_cqlsh_conf_path() + config_dir = os.path.dirname(config_path) + if not os.path.exists(config_dir): + os.mkdir(config_dir, self._CONF_DIR_MODS) + else: + os.chmod(config_dir, self._CONF_DIR_MODS) + operating_system.write_file(config_path, sections, codec=IniCodec()) + os.chmod(config_path, self._CONF_FILE_MODS) + + def get_current_superuser(self): + """ + Build the Trove superuser. + Use the stored credentials. + If not available fall back to the defaults. + """ + if self.has_user_config(): + return self._load_current_superuser() + + LOG.warn(_("Trove administrative user has not been configured yet. " + "Using the built-in default: %s") + % self.default_superuser_name) + return models.CassandraUser(self.default_superuser_name, + self.default_superuser_password) + + def has_user_config(self): + """ + Return TRUE if there is a client configuration file available + on the guest. + """ + return os.path.exists(self._get_cqlsh_conf_path()) + + def _load_current_superuser(self): + config = operating_system.read_file(self._get_cqlsh_conf_path(), + codec=IniCodec()) + return models.CassandraUser( + config[self._CONF_AUTH_SEC][self._CONF_USR_KEY], + config[self._CONF_AUTH_SEC][self._CONF_PWD_KEY] + ) + + def write_config(self, config_contents): + + operating_system.write_file( + self.cassandra_conf, config_contents, codec=SafeYamlCodec(), + as_root=True) + operating_system.chown(self.cassandra_conf, + self.cassandra_owner, + self.cassandra_owner, + recursive=False, as_root=True) + operating_system.chmod(self.cassandra_conf, + FileMode.ADD_READ_ALL, as_root=True) LOG.info(_('Wrote new Cassandra configuration.')) - def read_conf(self): - """Returns cassandra.yaml in dict structure.""" - - LOG.debug("Opening cassandra.yaml.") - with open(system.CASSANDRA_CONF, 'r') as config: - LOG.debug("Preparing YAML object from cassandra.yaml.") - yamled = yaml.load(config.read()) - return yamled - def update_config_with_single(self, key, value): """Updates single key:value in 'cassandra.yaml'.""" - yamled = self.read_conf() + yamled = operating_system.read_file(self.cassandra_conf, + codec=SafeYamlCodec()) yamled.update({key: value}) LOG.debug("Updating cassandra.yaml with %(key)s: %(value)s." % {'key': key, 'value': value}) - dump = yaml.dump(yamled, default_flow_style=False) LOG.debug("Dumping YAML to stream.") - self.write_config(dump) + self.write_config(yamled) def update_conf_with_group(self, group): """Updates group of key:value in 'cassandra.yaml'.""" - yamled = self.read_conf() + yamled = operating_system.read_file(self.cassandra_conf, + codec=SafeYamlCodec()) for key, value in group.iteritems(): if key == 'seed': (yamled.get('seed_provider')[0]. @@ -150,9 +253,8 @@ class CassandraApp(object): yamled.update({key: value}) LOG.debug("Updating cassandra.yaml with %(key)s: %(value)s." % {'key': key, 'value': value}) - dump = yaml.dump(yamled, default_flow_style=False) LOG.debug("Dumping YAML to stream") - self.write_config(dump) + self.write_config(yamled) def make_host_reachable(self): updates = { @@ -180,22 +282,540 @@ class CassandraApp(object): LOG.debug("Resetting configuration") self.write_config(config_contents) + def _get_cqlsh_conf_path(self): + return os.path.expanduser(self.cqlsh_conf_path) + class CassandraAppStatus(service.BaseDbStatus): + def __init__(self, superuser): + """ + :param superuser: User account the Status uses for connecting + to the database. + :type superuser: CassandraUser + """ + super(CassandraAppStatus, self).__init__() + self.__user = superuser + + def set_superuser(self, user): + self.__user = user + def _get_actual_db_status(self): try: - # If status check would be successful, - # bot stdin and stdout would contain nothing - out, err = utils.execute_with_timeout(system.CASSANDRA_STATUS, - shell=True) - if "Connection error. Could not connect to" not in err: + with CassandraLocalhostConnection(self.__user): return rd_instance.ServiceStatuses.RUNNING - else: - return rd_instance.ServiceStatuses.SHUTDOWN - except (exception.ProcessExecutionError, OSError): - LOG.exception(_("Error getting Cassandra status")) + except NoHostAvailable: return rd_instance.ServiceStatuses.SHUTDOWN + except Exception: + LOG.exception(_("Error getting Cassandra status.")) + + return rd_instance.ServiceStatuses.SHUTDOWN def cleanup_stalled_db_services(self): - utils.execute_with_timeout(system.CASSANDRA_KILL, shell=True) + utils.execute_with_timeout(CassandraApp.CASSANDRA_KILL_CMD, shell=True) + + +class CassandraAdmin(object): + """Handles administrative tasks on the Cassandra database. + + In Cassandra only SUPERUSERS can create other users and grant permissions + to database resources. Trove uses the 'cassandra' superuser to perform its + administrative tasks. + + The users it creates are all 'normal' (NOSUPERUSER) accounts. + The permissions it can grant are also limited to non-superuser operations. + This is to prevent anybody from creating a new superuser via the Trove API. + """ + + # Non-superuser grant modifiers. + __NO_SUPERUSER_MODIFIERS = ('ALTER', 'CREATE', 'DROP', 'MODIFY', 'SELECT') + + _KS_NAME_REGEX = re.compile('^$') + + def __init__(self, user): + self.__admin_user = user + + def create_user(self, context, users): + """ + Create new non-superuser accounts. + New users are by default granted full access to all database resources. + """ + with CassandraLocalhostConnection(self.__admin_user) as client: + for item in users: + self._create_user_and_grant(client, + self._deserialize_user(item)) + + def _create_user_and_grant(self, client, user): + """ + Create new non-superuser account and grant it full access to its + databases. + """ + self._create_user(client, user) + for db in user.databases: + self._grant_full_access_on_keyspace( + client, self._deserialize_keyspace(db), user) + + def _create_user(self, client, user): + # Create only NOSUPERUSER accounts here. + LOG.debug("Creating a new user '%s'." % user.name) + client.execute("CREATE USER '{}' WITH PASSWORD %s NOSUPERUSER;", + (user.name,), (user.password,)) + + def _create_superuser(self, user): + """Create a new superuser account and grant it full superuser-level + access to all keyspaces. + """ + LOG.debug("Creating a new superuser '%s'." % user.name) + with CassandraLocalhostConnection(self.__admin_user) as client: + client.execute("CREATE USER '{}' WITH PASSWORD %s SUPERUSER;", + (user.name,), (user.password,)) + client.execute("GRANT ALL PERMISSIONS ON ALL KEYSPACES TO '{}';", + (user.name,)) + + def delete_user(self, context, user): + self.drop_user(self._deserialize_user(user)) + + def drop_user(self, user): + with CassandraLocalhostConnection(self.__admin_user) as client: + self._drop_user(client, user) + + def _drop_user(self, client, user): + LOG.debug("Deleting user '%s'." % user.name) + client.execute("DROP USER '{}';", (user.name, )) + + def get_user(self, context, username, hostname): + with CassandraLocalhostConnection(self.__admin_user) as client: + user = self._find_user(client, username) + return user.serialize() if user is not None else None + + def _find_user(self, client, username): + """ + Lookup a user with a given username. + Omit user names on the ignore list. + Return a new Cassandra user instance or None if no match is found. + """ + return next((user for user in self._get_listed_users(client) + if user.name == username), None) + + def list_users(self, context, limit=None, marker=None, + include_marker=False): + """ + List all non-superuser accounts. Omit names on the ignored list. + Return an empty set if None. + """ + with CassandraLocalhostConnection(self.__admin_user) as client: + users = [user.serialize() for user in + self._get_listed_users(client)] + return pagination.paginate_list(users, limit, marker, + include_marker) + + def _get_listed_users(self, client): + """ + Return a set of unique user instances. + Omit user names on the ignore list. + """ + return self._get_users( + client, lambda user: user.name not in self.ignore_users) + + def _get_users(self, client, matcher=None): + """ + :param matcher Filter expression. + :type matcher callable + """ + acl = self._get_acl(client) + return {self._build_user(user.name, acl) + for user in client.execute("LIST USERS;") + if not matcher or matcher(user)} + + def _load_user(self, client, username, check_reserved=True): + if check_reserved: + self._check_reserved_user_name(username) + + acl = self._get_acl(client, username=username) + return self._build_user(username, acl) + + def _build_user(self, username, acl): + user = models.CassandraUser(username) + for ks, permissions in acl.get(username, {}).items(): + if permissions: + user.databases.append(models.CassandraSchema(ks).serialize()) + return user + + def _get_acl(self, client, username=None): + """Return the ACL for a database user. + Return ACLs for all users if no particular username is specified. + + The ACL has the following format: + {username #1: + {keyspace #1: {access mod(s)...}, + keyspace #2: {...}}, + username #2: + {keyspace #1: {...}, + keyspace #3: {...}} + } + """ + + def build_list_query(username): + query_tokens = ["LIST ALL PERMISSIONS"] + if username: + query_tokens.extend(["OF", "'%s'" % username]) + query_tokens.append("NORECURSIVE;") + return ' '.join(query_tokens) + + def parse_keyspace_name(resource): + """Parse a keyspace name from a resource string. + The resource string has the following form: + + where 'object' is one of the database objects (keyspace, table...). + Return the name as a singleton set. Return an empty set if no match + is found. + """ + match = self._KS_NAME_REGEX.match(resource) + if match: + return {match.group(1)} + return {} + + def update_acl(username, keyspace, permission, acl): + permissions = acl.get(username, {}).get(keyspace) + if permissions is None: + guestagent_utils.update_dict({user: {keyspace: {permission}}}, + acl) + else: + permissions.add(permission) + + all_keyspace_names = None + acl = dict() + for item in client.execute(build_list_query(username)): + user = item.username + resource = item.resource + permission = item.permission + if user and resource and permission: + if resource == '': + # Cache the full keyspace list to improve performance and + # ensure consistent results for all users. + if all_keyspace_names is None: + all_keyspace_names = { + item.name + for item in self._get_available_keyspaces(client) + } + keyspaces = all_keyspace_names + else: + keyspaces = parse_keyspace_name(resource) + + for keyspace in keyspaces: + update_acl(user, keyspace, permission, acl) + + return acl + + def list_superusers(self): + """List all system users existing in the database.""" + with CassandraLocalhostConnection(self.__admin_user) as client: + return self._get_users(client, lambda user: user.super) + + def grant_access(self, context, username, hostname, databases): + """ + Grant full access on keyspaces to a given username. + """ + user = models.CassandraUser(username) + with CassandraLocalhostConnection(self.__admin_user) as client: + for db in databases: + self._grant_full_access_on_keyspace( + client, models.CassandraSchema(db), user) + + def revoke_access(self, context, username, hostname, database): + """ + Revoke all permissions on any database resources from a given username. + """ + user = models.CassandraUser(username) + with CassandraLocalhostConnection(self.__admin_user) as client: + self._revoke_all_access_on_keyspace( + client, models.CassandraSchema(database), user) + + def _grant_full_access_on_keyspace(self, client, keyspace, user, + check_reserved=True): + """ + Grant all non-superuser permissions on a keyspace to a given user. + """ + if check_reserved: + self._check_reserved_user_name(user.name) + self._check_reserved_keyspace_name(keyspace.name) + + for access in self.__NO_SUPERUSER_MODIFIERS: + self._grant_permission_on_keyspace(client, access, keyspace, user) + + def _grant_permission_on_keyspace(self, client, modifier, keyspace, user): + """ + Grant a non-superuser permission on a keyspace to a given user. + Raise an exception if the caller attempts to grant a superuser access. + """ + LOG.debug("Granting '%s' access on '%s' to user '%s'." + % (modifier, keyspace.name, user.name)) + if modifier in self.__NO_SUPERUSER_MODIFIERS: + client.execute("GRANT {} ON KEYSPACE \"{}\" TO '{}';", + (modifier, keyspace.name, user.name)) + else: + raise exception.UnprocessableEntity( + "Invalid permission modifier (%s). Allowed values are: '%s'" + % (modifier, ', '.join(self.__NO_SUPERUSER_MODIFIERS))) + + def _revoke_all_access_on_keyspace(self, client, keyspace, user, + check_reserved=True): + if check_reserved: + self._check_reserved_user_name(user.name) + self._check_reserved_keyspace_name(keyspace.name) + + LOG.debug("Revoking all permissions on '%s' from user '%s'." + % (keyspace.name, user.name)) + client.execute("REVOKE ALL PERMISSIONS ON KEYSPACE \"{}\" FROM '{}';", + (keyspace.name, user.name)) + + def update_attributes(self, context, username, hostname, user_attrs): + with CassandraLocalhostConnection(self.__admin_user) as client: + user = self._load_user(client, username) + new_name = user_attrs.get('name') + new_password = user_attrs.get('password') + self._update_user(client, user, new_name, new_password) + + def _update_user(self, client, user, new_username, new_password): + """ + Update a user of a given username. + Updatable attributes include username and password. + If a new username and password are given a new user with those + attributes is created and all permissions from the original + user get transfered to it. The original user is then dropped + therefore revoking its permissions. + If only new password is specified the existing user gets altered + with that password. + """ + if new_username is not None and user.name != new_username: + if new_password is not None: + self._rename_user(client, user, new_username, new_password) + else: + raise exception.UnprocessableEntity( + _("Updating username requires specifying a password " + "as well.")) + elif new_password is not None and user.password != new_password: + user.password = new_password + self._alter_user_password(client, user) + + def _rename_user(self, client, user, new_username, new_password): + """ + Rename a given user also updating its password. + Transfer the current permissions to the new username. + Drop the old username therefore revoking its permissions. + """ + LOG.debug("Renaming user '%s' to '%s'" % (user.name, new_username)) + new_user = models.CassandraUser(new_username, new_password) + new_user.databases.extend(user.databases) + self._create_user_and_grant(client, new_user) + self._drop_user(client, user) + + def alter_user_password(self, user): + with CassandraLocalhostConnection(self.__admin_user) as client: + self._alter_user_password(client, user) + + def change_passwords(self, context, users): + with CassandraLocalhostConnection(self.__admin_user) as client: + for user in users: + self._alter_user_password(client, self._deserialize_user(user)) + + def _alter_user_password(self, client, user): + LOG.debug("Changing password of user '%s'." % user.name) + client.execute("ALTER USER '{}' " + "WITH PASSWORD %s;", (user.name,), (user.password,)) + + def create_database(self, context, databases): + with CassandraLocalhostConnection(self.__admin_user) as client: + for item in databases: + self._create_single_node_keyspace( + client, self._deserialize_keyspace(item)) + + def _create_single_node_keyspace(self, client, keyspace): + """ + Create a single-replica keyspace. + + Cassandra stores replicas on multiple nodes to ensure reliability and + fault tolerance. All replicas are equally important; + there is no primary or master. + A replication strategy determines the nodes where + replicas are placed. SimpleStrategy is for a single data center only. + The total number of replicas across the cluster is referred to as the + replication factor. + + Replication Strategy: + 'SimpleStrategy' is not optimized for multiple data centers. + 'replication_factor' The number of replicas of data on multiple nodes. + Required for SimpleStrategy; otherwise, not used. + + Keyspace names are case-insensitive by default. + To make a name case-sensitive, enclose it in double quotation marks. + """ + client.execute("CREATE KEYSPACE \"{}\" WITH REPLICATION = " + "{{ 'class' : 'SimpleStrategy', " + "'replication_factor' : 1 }};", (keyspace.name,)) + + def delete_database(self, context, database): + with CassandraLocalhostConnection(self.__admin_user) as client: + self._drop_keyspace(client, self._deserialize_keyspace(database)) + + def _drop_keyspace(self, client, keyspace): + LOG.debug("Dropping keyspace '%s'." % keyspace.name) + client.execute("DROP KEYSPACE \"{}\";", (keyspace.name,)) + + def list_databases(self, context, limit=None, marker=None, + include_marker=False): + with CassandraLocalhostConnection(self.__admin_user) as client: + databases = [keyspace.serialize() for keyspace + in self._get_available_keyspaces(client)] + return pagination.paginate_list(databases, limit, marker, + include_marker) + + def _get_available_keyspaces(self, client): + """ + Return a set of unique keyspace instances. + Omit keyspace names on the ignore list. + """ + return {models.CassandraSchema(db.keyspace_name) + for db in client.execute("SELECT * FROM " + "system.schema_keyspaces;") + if db.keyspace_name not in self.ignore_dbs} + + def list_access(self, context, username, hostname): + with CassandraLocalhostConnection(self.__admin_user) as client: + user = self._find_user(client, username) + if user: + return user.databases + + raise exception.UserNotFound(username) + + def _deserialize_keyspace(self, keyspace_dict, check_reserved=True): + if keyspace_dict: + db = models.CassandraSchema.deserialize_schema(keyspace_dict) + if check_reserved: + self._check_reserved_keyspace_name(db.name) + + return db + + return None + + def _check_reserved_keyspace_name(self, name): + if name in self.ignore_dbs: + raise ValueError(_("This keyspace-name is reserved: %s") % name) + + def _deserialize_user(self, user_dict, check_reserved=True): + if user_dict: + user = models.CassandraUser.deserialize_user(user_dict) + if check_reserved: + self._check_reserved_user_name(user.name) + + return user + + return None + + def _check_reserved_user_name(self, name): + if name in self.ignore_users: + raise ValueError(_("This user-name is reserved: %s") % name) + + @property + def ignore_users(self): + return cfg.get_ignored_users(manager=MANAGER) + + @property + def ignore_dbs(self): + return cfg.get_ignored_dbs(manager=MANAGER) + + +class CassandraConnection(object): + """A wrapper to manage a Cassandra connection.""" + + # Cassandra 2.1 only supports protocol versions 3 and lower. + NATIVE_PROTOCOL_VERSION = 3 + + def __init__(self, contact_points, user): + self.__user = user + # A Cluster is initialized with a set of initial contact points. + # After the driver connects to one of the nodes it will automatically + # discover the rest. + # Will connect to '127.0.0.1' if None contact points are given. + self._cluster = Cluster( + contact_points=contact_points, + auth_provider=PlainTextAuthProvider(user.name, user.password), + protocol_version=self.NATIVE_PROTOCOL_VERSION) + self.__session = None + + def __enter__(self): + self.__connect() + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.__disconnect() + + def execute(self, query, identifiers=None, data_values=None, timeout=None): + """ + Execute a query with a given sequence or dict of data values to bind. + If a sequence is used, '%s' should be used the placeholder for each + argument. If a dict is used, '%(name)s' style placeholders must + be used. + Only data values should be supplied this way. Other items, + such as keyspaces, table names, and column names should be set + ahead of time. Use the '{}' style placeholders and + 'identifiers' parameter for those. + Raise an exception if the operation exceeds the given timeout (sec). + There is no timeout if set to None. + Return a set of rows or an empty list if None. + """ + if self.__is_active(): + try: + rows = self.__session.execute(self.__bind(query, identifiers), + data_values, timeout) + return rows or [] + except OperationTimedOut: + LOG.error(_("Query execution timed out.")) + raise + + LOG.debug("Cannot perform this operation on a closed connection.") + raise exception.UnprocessableEntity() + + def __bind(self, query, identifiers): + if identifiers: + return query.format(*identifiers) + return query + + def __connect(self): + if not self._cluster.is_shutdown: + LOG.debug("Connecting to a Cassandra cluster as '%s'." + % self.__user.name) + if not self.__is_active(): + self.__session = self._cluster.connect() + else: + LOG.debug("Connection already open.") + LOG.debug("Connected to cluster: '%s'" + % self._cluster.metadata.cluster_name) + for host in self._cluster.metadata.all_hosts(): + LOG.debug("Connected to node: '%s' in rack '%s' at datacenter " + "'%s'" % (host.address, host.rack, host.datacenter)) + else: + LOG.debug("Cannot perform this operation on a terminated cluster.") + raise exception.UnprocessableEntity() + + def __disconnect(self): + if self.__is_active(): + try: + LOG.debug("Disconnecting from cluster: '%s'" + % self._cluster.metadata.cluster_name) + self._cluster.shutdown() + self.__session.shutdown() + except Exception: + LOG.debug("Failed to disconnect from a Cassandra cluster.") + + def __is_active(self): + return self.__session and not self.__session.is_shutdown + + +class CassandraLocalhostConnection(CassandraConnection): + """ + A connection to the localhost Cassandra server. + """ + + def __init__(self, user): + super(CassandraLocalhostConnection, self).__init__(None, user) diff --git a/trove/guestagent/datastore/experimental/cassandra/system.py b/trove/guestagent/datastore/experimental/cassandra/system.py deleted file mode 100644 index 481c397ac9..0000000000 --- a/trove/guestagent/datastore/experimental/cassandra/system.py +++ /dev/null @@ -1,31 +0,0 @@ -# Copyright 2013 Mirantis Inc. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from trove.common import cfg - -CONF = cfg.CONF - -SERVICE_CANDIDATES = ["cassandra"] - -CASSANDRA_DATA_DIR = "/var/lib/cassandra/data" -CASSANDRA_CONF = "/etc/cassandra/cassandra.yaml" -CASSANDRA_TEMP_CONF = "/tmp/cassandra.yaml" -CASSANDRA_TEMP_DIR = "/tmp/cassandra" - -CASSANDRA_STATUS = """echo "use system;" > /tmp/check; cqlsh -f /tmp/check""" - -CASSANDRA_KILL = "sudo killall java || true" -SERVICE_STOP_TIMEOUT = 60 -INSTALL_TIMEOUT = 10000 diff --git a/trove/guestagent/db/models.py b/trove/guestagent/db/models.py index 04bbe4ce08..ec5bc1bc3a 100644 --- a/trove/guestagent/db/models.py +++ b/trove/guestagent/db/models.py @@ -158,6 +158,36 @@ class MongoDBSchema(DatastoreSchema): return ['_name'] +class CassandraSchema(DatastoreSchema): + """Represents a Cassandra schema and its associated properties. + + Keyspace names are 32 or fewer alpha-numeric characters and underscores, + the first of which is an alpha character. + """ + + def __init__(self, name=None, deserializing=False): + super(CassandraSchema, self).__init__() + + if not (bool(deserializing) != bool(name)): + raise ValueError(_("Bad args. name: %(name)s, " + "deserializing %(deser)s.") + % ({'name': bool(name), + 'deser': bool(deserializing)})) + if not deserializing: + self.name = name + + @property + def _max_schema_name_length(self): + return 32 + + def _is_valid_schema_name(self, value): + return True + + @classmethod + def _dict_requirements(cls): + return ['_name'] + + class MySQLDatabase(Base): """Represents a Database and its properties.""" @@ -746,6 +776,45 @@ class MongoDBUser(DatastoreUser): return ['_name'] +class CassandraUser(DatastoreUser): + """Represents a Cassandra user and its associated properties.""" + + def __init__(self, name=None, password=None, deserializing=False): + super(CassandraUser, self).__init__() + + if ((not (bool(deserializing) != bool(name))) or + (bool(deserializing) and bool(password))): + raise ValueError(_("Bad args. name: %(name)s, " + "password %(pass)s, " + "deserializing %(deser)s.") + % ({'name': bool(name), + 'pass': bool(password), + 'deser': bool(deserializing)})) + if not deserializing: + self.name = name + self.password = password + + def _build_database_schema(self, name): + return CassandraSchema(name) + + @property + def _max_username_length(self): + return 65535 + + def _is_valid_name(self, value): + return True + + def _is_valid_host_name(self, value): + return True + + def _is_valid_password(self, value): + return True + + @classmethod + def _dict_requirements(cls): + return ['_name'] + + class MySQLUser(Base): """Represents a MySQL User and its associated properties.""" diff --git a/trove/templates/cassandra/config.template b/trove/templates/cassandra/config.template index 221009fc85..e369ff8e01 100644 --- a/trove/templates/cassandra/config.template +++ b/trove/templates/cassandra/config.template @@ -5,8 +5,8 @@ max_hint_window_in_ms: 10800000 hinted_handoff_throttle_in_kb: 1024 max_hints_delivery_threads: 2 batchlog_replay_throttle_in_kb: 1024 -authenticator: AllowAllAuthenticator -authorizer: AllowAllAuthorizer +authenticator: org.apache.cassandra.auth.PasswordAuthenticator +authorizer: org.apache.cassandra.auth.CassandraAuthorizer permissions_validity_in_ms: 2000 partitioner: org.apache.cassandra.dht.Murmur3Partitioner data_file_directories: @@ -38,11 +38,11 @@ trickle_fsync: false trickle_fsync_interval_in_kb: 10240 storage_port: 7000 ssl_storage_port: 7001 -listen_address: localhost +listen_address: 127.0.0.1 start_native_transport: true native_transport_port: 9042 start_rpc: true -rpc_address: localhost +rpc_address: 127.0.0.1 rpc_port: 9160 rpc_keepalive: true rpc_server_type: sync diff --git a/trove/tests/int_tests.py b/trove/tests/int_tests.py index 2a2b2f8cf5..76329a4619 100644 --- a/trove/tests/int_tests.py +++ b/trove/tests/int_tests.py @@ -185,6 +185,7 @@ register(["user"], user_actions_groups) register(["db2_supported"], common_groups, database_actions_groups, user_actions_groups) register(["cassandra_supported"], common_groups, + user_actions_groups, database_actions_groups, backup_groups, configuration_groups) register(["couchbase_supported"], common_groups, backup_groups, root_actions_groups) diff --git a/trove/tests/unittests/guestagent/test_cassandra_manager.py b/trove/tests/unittests/guestagent/test_cassandra_manager.py index b6019b1ff0..137e2be5a7 100644 --- a/trove/tests/unittests/guestagent/test_cassandra_manager.py +++ b/trove/tests/unittests/guestagent/test_cassandra_manager.py @@ -13,17 +13,25 @@ # under the License. import os +import random +import string +from mock import ANY +from mock import call from mock import MagicMock +from mock import NonCallableMagicMock from mock import patch from oslo_utils import netutils +from testtools import ExpectedException from trove.common.context import TroveContext +from trove.common import exception from trove.common.instance import ServiceStatuses from trove.guestagent.datastore.experimental.cassandra import ( manager as cass_manager) from trove.guestagent.datastore.experimental.cassandra import ( service as cass_service) +from trove.guestagent.db import models from trove.guestagent import pkg as pkg from trove.guestagent import volume from trove.tests.unittests import trove_testtools @@ -31,6 +39,32 @@ from trove.tests.unittests import trove_testtools class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase): + __N_GAK = '_get_available_keyspaces' + __N_GLU = '_get_listed_users' + __N_BU = '_build_user' + __N_RU = '_rename_user' + __N_AUP = '_alter_user_password' + __N_CAU = 'trove.guestagent.db.models.CassandraUser' + __N_CU = '_create_user' + __N_GFA = '_grant_full_access_on_keyspace' + __N_DU = '_drop_user' + + __ACCESS_MODIFIERS = ('ALTER', 'CREATE', 'DROP', 'MODIFY', 'SELECT') + __CREATE_DB_FORMAT = ( + "CREATE KEYSPACE \"{}\" WITH REPLICATION = " + "{{ 'class' : 'SimpleStrategy', 'replication_factor' : 1 }};" + ) + __DROP_DB_FORMAT = "DROP KEYSPACE \"{}\";" + __CREATE_USR_FORMAT = "CREATE USER '{}' WITH PASSWORD %s NOSUPERUSER;" + __ALTER_USR_FORMAT = "ALTER USER '{}' WITH PASSWORD %s;" + __DROP_USR_FORMAT = "DROP USER '{}';" + __GRANT_FORMAT = "GRANT {} ON KEYSPACE \"{}\" TO '{}';" + __REVOKE_FORMAT = "REVOKE ALL PERMISSIONS ON KEYSPACE \"{}\" FROM '{}';" + __LIST_PERMISSIONS_FORMAT = "LIST ALL PERMISSIONS NORECURSIVE;" + __LIST_PERMISSIONS_OF_FORMAT = "LIST ALL PERMISSIONS OF '{}' NORECURSIVE;" + __LIST_DB_FORMAT = "SELECT * FROM system.schema_keyspaces;" + __LIST_USR_FORMAT = "LIST USERS;" + def setUp(self): super(GuestAgentCassandraDBManagerTest, self).setUp() self.real_status = cass_service.CassandraAppStatus.set_status @@ -45,6 +79,9 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase): return_value=FakeInstanceServiceStatus()) self.context = TroveContext() self.manager = cass_manager.Manager() + self.manager._Manager__admin = cass_service.CassandraAdmin( + models.CassandraUser('Test')) + self.admin = self.manager._Manager__admin self.pkg = cass_service.packager self.real_db_app_status = cass_service.CassandraAppStatus self.origin_os_path_exists = os.path.exists @@ -74,10 +111,11 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase): netutils.get_my_ipv4 = self.original_get_ip cass_service.CassandraApp.make_host_reachable = ( self.orig_make_host_reachable) + cass_service.CassandraAppStatus.set_status = self.real_status def test_update_status(self): mock_status = MagicMock() - self.manager.appStatus = mock_status + self.manager.app.status = mock_status self.manager.update_status(self.context) mock_status.update.assert_any_call() @@ -109,8 +147,8 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase): mock_status = MagicMock() mock_app = MagicMock() - self.manager.appStatus = mock_status - self.manager.app = mock_app + mock_app.status = mock_status + self.manager._app = mock_app mock_status.begin_install = MagicMock(return_value=None) mock_app.install_if_needed = MagicMock(return_value=None) @@ -144,5 +182,419 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase): mock_app.install_if_needed.assert_any_call(packages) mock_app.init_storage_structure.assert_any_call('/var/lib/cassandra') mock_app.make_host_reachable.assert_any_call() - mock_app.start_db.assert_any_call() + mock_app.start_db.assert_any_call(update_db=False) mock_app.stop_db.assert_any_call() + + def test_keyspace_validation(self): + valid_name = self._get_random_name(32) + db = models.CassandraSchema(valid_name) + self.assertEqual(valid_name, db.name) + with ExpectedException(ValueError): + models.CassandraSchema(self._get_random_name(33)) + + def test_user_validation(self): + valid_name = self._get_random_name(65535) + usr = models.CassandraUser(valid_name, 'password') + self.assertEqual(valid_name, usr.name) + self.assertEqual('password', usr.password) + with ExpectedException(ValueError): + models.CassandraUser(self._get_random_name(65536)) + + @classmethod + def _serialize_collection(self, *collection): + return [item.serialize() for item in collection] + + @classmethod + def _get_random_name(self, size, chars=string.letters + string.digits): + return ''.join(random.choice(chars) for _ in range(size)) + + @patch.object(cass_service.CassandraLocalhostConnection, '__enter__') + def test_create_database(self, conn): + db1 = models.CassandraSchema('db1') + db2 = models.CassandraSchema('db2') + db3 = models.CassandraSchema(self._get_random_name(32)) + + self.manager.create_database(self.context, + self._serialize_collection(db1, db2, db3)) + conn.return_value.execute.assert_has_calls([ + call(self.__CREATE_DB_FORMAT, (db1.name,)), + call(self.__CREATE_DB_FORMAT, (db2.name,)), + call(self.__CREATE_DB_FORMAT, (db3.name,)) + ]) + + @patch.object(cass_service.CassandraLocalhostConnection, '__enter__') + def test_delete_database(self, conn): + db = models.CassandraSchema(self._get_random_name(32)) + self.manager.delete_database(self.context, db.serialize()) + conn.return_value.execute.assert_called_once_with( + self.__DROP_DB_FORMAT, (db.name,)) + + @patch.object(cass_service.CassandraLocalhostConnection, '__enter__') + def test_create_user(self, conn): + usr1 = models.CassandraUser('usr1') + usr2 = models.CassandraUser('usr2', '') + usr3 = models.CassandraUser(self._get_random_name(1025), 'password') + + self.manager.create_user(self.context, + self._serialize_collection(usr1, usr2, usr3)) + conn.return_value.execute.assert_has_calls([ + call(self.__CREATE_USR_FORMAT, (usr1.name,), (usr1.password,)), + call(self.__CREATE_USR_FORMAT, (usr2.name,), (usr2.password,)), + call(self.__CREATE_USR_FORMAT, (usr3.name,), (usr3.password,)) + ]) + + @patch.object(cass_service.CassandraLocalhostConnection, '__enter__') + def test_delete_user(self, conn): + usr = models.CassandraUser(self._get_random_name(1025), 'password') + self.manager.delete_user(self.context, usr.serialize()) + conn.return_value.execute.assert_called_once_with( + self.__DROP_USR_FORMAT, (usr.name,)) + + @patch.object(cass_service.CassandraLocalhostConnection, '__enter__') + def test_change_passwords(self, conn): + usr1 = models.CassandraUser('usr1') + usr2 = models.CassandraUser('usr2', '') + usr3 = models.CassandraUser(self._get_random_name(1025), 'password') + + self.manager.change_passwords(self.context, self._serialize_collection( + usr1, usr2, usr3)) + conn.return_value.execute.assert_has_calls([ + call(self.__ALTER_USR_FORMAT, (usr1.name,), (usr1.password,)), + call(self.__ALTER_USR_FORMAT, (usr2.name,), (usr2.password,)), + call(self.__ALTER_USR_FORMAT, (usr3.name,), (usr3.password,)) + ]) + + @patch.object(cass_service.CassandraLocalhostConnection, '__enter__') + def test_alter_user_password(self, conn): + usr1 = models.CassandraUser('usr1') + usr2 = models.CassandraUser('usr2', '') + usr3 = models.CassandraUser(self._get_random_name(1025), 'password') + + self.admin.alter_user_password(usr1) + self.admin.alter_user_password(usr2) + self.admin.alter_user_password(usr3) + conn.return_value.execute.assert_has_calls([ + call(self.__ALTER_USR_FORMAT, (usr1.name,), (usr1.password,)), + call(self.__ALTER_USR_FORMAT, (usr2.name,), (usr2.password,)), + call(self.__ALTER_USR_FORMAT, (usr3.name,), (usr3.password,)) + ]) + + @patch.object(cass_service.CassandraLocalhostConnection, '__enter__') + def test_grant_access(self, conn): + usr1 = models.CassandraUser('usr1') + usr2 = models.CassandraUser('usr1', 'password') + db1 = models.CassandraSchema('db1') + db2 = models.CassandraSchema('db2') + db3 = models.CassandraSchema('db3') + + self.manager.grant_access(self.context, usr1.name, None, [db1.name, + db2.name]) + self.manager.grant_access(self.context, usr2.name, None, [db3.name]) + + expected = [] + for modifier in self.__ACCESS_MODIFIERS: + expected.append(call(self.__GRANT_FORMAT, + (modifier, db1.name, usr1.name))) + expected.append(call(self.__GRANT_FORMAT, + (modifier, db3.name, usr2.name))) + + conn.return_value.execute.assert_has_calls(expected, any_order=True) + + @patch.object(cass_service.CassandraLocalhostConnection, '__enter__') + def test_revoke_access(self, conn): + usr1 = models.CassandraUser('usr1') + usr2 = models.CassandraUser('usr1', 'password') + db1 = models.CassandraSchema('db1') + db2 = models.CassandraSchema('db2') + + self.manager.revoke_access(self.context, usr1.name, None, db1.name) + self.manager.revoke_access(self.context, usr2.name, None, db2.name) + conn.return_value.execute.assert_has_calls([ + call(self.__REVOKE_FORMAT, (db1.name, usr1.name)), + call(self.__REVOKE_FORMAT, (db2.name, usr2.name)) + ]) + + @patch.object(cass_service.CassandraLocalhostConnection, '__enter__') + def test_get_available_keyspaces(self, conn): + self.manager.list_databases(self.context) + conn.return_value.execute.assert_called_once_with( + self.__LIST_DB_FORMAT) + + @patch.object(cass_service.CassandraLocalhostConnection, '__enter__') + def test_list_databases(self, conn): + db1 = models.CassandraSchema('db1') + db2 = models.CassandraSchema('db2') + db3 = models.CassandraSchema(self._get_random_name(32)) + + with patch.object(self.admin, self.__N_GAK, return_value={db1, db2, + db3}): + found = self.manager.list_databases(self.context) + self.assertEqual(2, len(found)) + self.assertEqual(3, len(found[0])) + self.assertEqual(None, found[1]) + self.assertIn(db1.serialize(), found[0]) + self.assertIn(db2.serialize(), found[0]) + self.assertIn(db3.serialize(), found[0]) + + with patch.object(self.admin, self.__N_GAK, return_value=set()): + found = self.manager.list_databases(self.context) + self.assertEqual(([], None), found) + + def test_get_acl(self): + r0 = NonCallableMagicMock(username='user1', resource='', + permission='SELECT') + r1 = NonCallableMagicMock(username='user2', resource='', + permission='SELECT') + r2 = NonCallableMagicMock(username='user2', resource='', + permission='SELECT') + r3 = NonCallableMagicMock(username='user2', resource='', + permission='ALTER') + r4 = NonCallableMagicMock(username='user3', resource='', + permission='SELECT') + r5 = NonCallableMagicMock(username='user3', resource='', + permission='ALTER') + r6 = NonCallableMagicMock(username='user3', resource='', + permission='') + r7 = NonCallableMagicMock(username='user3', resource='', + permission='') + r8 = NonCallableMagicMock(username='user3', resource='', + permission='DELETE') + r9 = NonCallableMagicMock(username='user4', resource='', + permission='UPDATE') + r10 = NonCallableMagicMock(username='user4', resource='', + permission='DELETE') + + available_ks = {models.CassandraSchema('ks1'), + models.CassandraSchema('ks2'), + models.CassandraSchema('ks3')} + + mock_result_set = [r0, r1, r2, r3, r4, r5, r6, r7, r8, r9, r9, r9, r10] + execute_mock = MagicMock(return_value=mock_result_set) + mock_client = MagicMock(execute=execute_mock) + + with patch.object(self.admin, + self.__N_GAK, return_value=available_ks) as gak_mock: + acl = self.admin._get_acl(mock_client) + execute_mock.assert_called_once_with( + self.__LIST_PERMISSIONS_FORMAT) + gak_mock.assert_called_once_with(mock_client) + + self.assertEqual({'user1': {'ks1': {'SELECT'}, + 'ks2': {'SELECT'}, + 'ks3': {'SELECT'}}, + 'user2': {'ks1': {'SELECT'}, + 'ks2': {'SELECT', 'ALTER'}}, + 'user3': {'ks1': {'DELETE'}}, + 'user4': {'ks1': {'UPDATE', 'DELETE'}, + 'ks2': {'UPDATE'}, + 'ks3': {'UPDATE'}} + }, + acl) + + mock_result_set = [r1, r2, r3] + execute_mock = MagicMock(return_value=mock_result_set) + mock_client = MagicMock(execute=execute_mock) + + with patch.object(self.admin, + self.__N_GAK, return_value=available_ks) as gak_mock: + acl = self.admin._get_acl(mock_client, username='user2') + execute_mock.assert_called_once_with( + self.__LIST_PERMISSIONS_OF_FORMAT.format('user2')) + gak_mock.assert_not_called() + + self.assertEqual({'user2': {'ks1': {'SELECT'}, + 'ks2': {'SELECT', 'ALTER'}}}, acl) + + mock_result_set = [] + execute_mock = MagicMock(return_value=mock_result_set) + mock_client = MagicMock(execute=execute_mock) + + with patch.object(self.admin, + self.__N_GAK, return_value=available_ks) as gak_mock: + acl = self.admin._get_acl(mock_client, username='nonexisting') + execute_mock.assert_called_once_with( + self.__LIST_PERMISSIONS_OF_FORMAT.format('nonexisting')) + gak_mock.assert_not_called() + + self.assertEqual({}, acl) + + @patch.object(cass_service.CassandraLocalhostConnection, '__enter__') + def test_get_listed_users(self, conn): + usr1 = models.CassandraUser(self._get_random_name(1025)) + usr2 = models.CassandraUser(self._get_random_name(1025)) + usr3 = models.CassandraUser(self._get_random_name(1025)) + db1 = models.CassandraSchema('db1') + db2 = models.CassandraSchema('db2') + usr1.databases.append(db1.serialize()) + usr3.databases.append(db2.serialize()) + + rv_1 = NonCallableMagicMock() + rv_1.configure_mock(name=usr1.name, super=False) + rv_2 = NonCallableMagicMock() + rv_2.configure_mock(name=usr2.name, super=False) + rv_3 = NonCallableMagicMock() + rv_3.configure_mock(name=usr3.name, super=True) + + with patch.object(conn.return_value, 'execute', return_value=iter( + [rv_1, rv_2, rv_3])): + with patch.object(self.admin, '_get_acl', + return_value={usr1.name: {db1.name: {'SELECT'}, + db2.name: {}}, + usr3.name: {db2.name: {'SELECT'}}} + ): + usrs = self.manager.list_users(self.context) + conn.return_value.execute.assert_has_calls([ + call(self.__LIST_USR_FORMAT), + ], any_order=True) + self.assertIn(usr1.serialize(), usrs[0]) + self.assertIn(usr2.serialize(), usrs[0]) + self.assertIn(usr3.serialize(), usrs[0]) + + @patch.object(cass_service.CassandraLocalhostConnection, '__enter__') + def test_list_access(self, conn): + usr1 = models.CassandraUser('usr1') + usr2 = models.CassandraUser('usr2') + usr3 = models.CassandraUser(self._get_random_name(1025), 'password') + db1 = models.CassandraSchema('db1').serialize() + db2 = models.CassandraSchema('db2').serialize() + usr2.databases.append(db1) + usr3.databases.append(db1) + usr3.databases.append(db2) + + with patch.object(self.admin, self.__N_GLU, return_value={usr1, usr2, + usr3}): + usr1_dbs = self.manager.list_access(self.context, usr1.name, None) + usr2_dbs = self.manager.list_access(self.context, usr2.name, None) + usr3_dbs = self.manager.list_access(self.context, usr3.name, None) + self.assertEqual([], usr1_dbs) + self.assertEqual([db1], usr2_dbs) + self.assertEqual([db1, db2], usr3_dbs) + + with patch.object(self.admin, self.__N_GLU, return_value=set()): + with ExpectedException(exception.UserNotFound): + self.manager.list_access(self.context, usr3.name, None) + + @patch.object(cass_service.CassandraLocalhostConnection, '__enter__') + def test_list_users(self, conn): + usr1 = models.CassandraUser('usr1') + usr2 = models.CassandraUser('usr2') + usr3 = models.CassandraUser(self._get_random_name(1025), 'password') + + with patch.object(self.admin, self.__N_GLU, return_value={usr1, usr2, + usr3}): + found = self.manager.list_users(self.context) + self.assertEqual(2, len(found)) + self.assertEqual(3, len(found[0])) + self.assertEqual(None, found[1]) + self.assertIn(usr1.serialize(), found[0]) + self.assertIn(usr2.serialize(), found[0]) + self.assertIn(usr3.serialize(), found[0]) + + with patch.object(self.admin, self.__N_GLU, return_value=set()): + self.assertEqual(([], None), self.manager.list_users(self.context)) + + @patch.object(cass_service.CassandraLocalhostConnection, '__enter__') + def test_get_user(self, conn): + usr1 = models.CassandraUser('usr1') + usr2 = models.CassandraUser('usr2') + usr3 = models.CassandraUser(self._get_random_name(1025), 'password') + + with patch.object(self.admin, self.__N_GLU, return_value={usr1, usr2, + usr3}): + found = self.manager.get_user(self.context, usr2.name, None) + self.assertEqual(usr2.serialize(), found) + + with patch.object(self.admin, self.__N_GLU, return_value=set()): + self.assertIsNone( + self.manager.get_user(self.context, usr2.name, None)) + + @patch.object(cass_service.CassandraAdmin, '_deserialize_keyspace', + side_effect=lambda p1: p1) + @patch.object(cass_service.CassandraLocalhostConnection, '__enter__') + def test_rename_user(self, conn, ks_deserializer): + usr = models.CassandraUser('usr') + db1 = models.CassandraSchema('db1').serialize() + db2 = models.CassandraSchema('db2').serialize() + usr.databases.append(db1) + usr.databases.append(db2) + + new_user = models.CassandraUser('new_user') + with patch(self.__N_CAU, return_value=new_user): + with patch.object(self.admin, self.__N_BU, return_value=usr): + with patch.object(self.admin, self.__N_CU) as create: + with patch.object(self.admin, self.__N_GFA) as grant: + with patch.object(self.admin, self.__N_DU) as drop: + usr_attrs = {'name': 'user', 'password': 'trove'} + self.manager.update_attributes(self.context, + usr.name, None, + usr_attrs) + create.assert_called_once_with(ANY, new_user) + grant.assert_has_calls([call(ANY, db1, ANY), + call(ANY, db2, ANY)]) + drop.assert_called_once_with(ANY, usr) + + @patch.object(cass_service.CassandraLocalhostConnection, '__enter__') + def test_update_attributes(self, conn): + usr = models.CassandraUser('usr', 'pwd') + + with patch.object(self.admin, self.__N_BU, return_value=usr): + usr_attrs = {'name': usr.name, 'password': usr.password} + with patch.object(self.admin, self.__N_RU) as rename: + with patch.object(self.admin, self.__N_AUP) as alter: + self.manager.update_attributes(self.context, usr.name, + None, usr_attrs) + self.assertEqual(0, rename.call_count) + self.assertEqual(0, alter.call_count) + + usr_attrs = {'name': 'user', 'password': 'password'} + with patch.object(self.admin, self.__N_RU) as rename: + with patch.object(self.admin, self.__N_AUP) as alter: + self.manager.update_attributes(self.context, usr.name, + None, usr_attrs) + rename.assert_called_once_with(ANY, usr, usr_attrs['name'], + usr_attrs['password']) + self.assertEqual(0, alter.call_count) + + usr_attrs = {'name': 'user', 'password': usr.password} + with patch.object(self.admin, self.__N_RU) as rename: + with patch.object(self.admin, self.__N_AUP) as alter: + self.manager.update_attributes(self.context, usr.name, + None, usr_attrs) + rename.assert_called_once_with(ANY, usr, usr_attrs['name'], + usr_attrs['password']) + self.assertEqual(0, alter.call_count) + + usr_attrs = {'name': 'user'} + with patch.object(self.admin, self.__N_RU) as rename: + with patch.object(self.admin, self.__N_AUP) as alter: + with ExpectedException( + exception.UnprocessableEntity, "Updating username " + "requires specifying a password as well."): + self.manager.update_attributes(self.context, usr.name, + None, usr_attrs) + self.assertEqual(0, rename.call_count) + self.assertEqual(0, alter.call_count) + + usr_attrs = {'name': usr.name, 'password': 'password'} + with patch.object(self.admin, self.__N_RU) as rename: + with patch.object(self.admin, self.__N_AUP) as alter: + self.manager.update_attributes(self.context, usr.name, + None, usr_attrs) + alter.assert_called_once_with(ANY, usr) + self.assertEqual(0, rename.call_count) + + usr_attrs = {'password': usr.password} + with patch.object(self.admin, self.__N_RU) as rename: + with patch.object(self.admin, self.__N_AUP) as alter: + self.manager.update_attributes(self.context, usr.name, + None, usr_attrs) + self.assertEqual(0, rename.call_count) + self.assertEqual(0, alter.call_count) + + usr_attrs = {'password': 'trove'} + with patch.object(self.admin, self.__N_RU) as rename: + with patch.object(self.admin, self.__N_AUP) as alter: + self.manager.update_attributes(self.context, usr.name, + None, usr_attrs) + alter.assert_called_once_with(ANY, usr) + self.assertEqual(0, rename.call_count) diff --git a/trove/tests/unittests/guestagent/test_dbaas.py b/trove/tests/unittests/guestagent/test_dbaas.py index 37194e891d..4a9f3e9941 100644 --- a/trove/tests/unittests/guestagent/test_dbaas.py +++ b/trove/tests/unittests/guestagent/test_dbaas.py @@ -45,8 +45,6 @@ from trove.guestagent.common.operating_system import FileMode from trove.guestagent.common import sql_query from trove.guestagent.datastore.experimental.cassandra import ( service as cass_service) -from trove.guestagent.datastore.experimental.cassandra import ( - system as cass_system) from trove.guestagent.datastore.experimental.couchbase import ( service as couchservice) from trove.guestagent.datastore.experimental.couchdb import ( @@ -2354,17 +2352,14 @@ class CassandraDBAppTest(BaseAppTest.AppTestCase): def setUp(self): super(CassandraDBAppTest, self).setUp(str(uuid4())) - self.exec_patch = patch.object(utils, 'execute_with_timeout') - self.addCleanup(self.exec_patch.stop) - self.exec_mock = self.exec_patch.start() self.sleep = time.sleep self.orig_time_time = time.time self.pkg_version = cass_service.packager.pkg_version self.pkg = cass_service.packager util.init_db() - status = FakeAppStatus(self.FAKE_ID, - rd_instance.ServiceStatuses.NEW) - self.cassandra = cass_service.CassandraApp(status) + self.cassandra = cass_service.CassandraApp() + self.cassandra.status = FakeAppStatus(self.FAKE_ID, + rd_instance.ServiceStatuses.NEW) self.orig_unlink = os.unlink @property @@ -2381,14 +2376,14 @@ class CassandraDBAppTest(BaseAppTest.AppTestCase): @property def expected_service_candidates(self): - return cass_system.SERVICE_CANDIDATES + return self.cassandra.service_candidates def tearDown(self): - super(CassandraDBAppTest, self).tearDown() time.sleep = self.sleep time.time = self.orig_time_time cass_service.packager.pkg_version = self.pkg_version cass_service.packager = self.pkg + super(CassandraDBAppTest, self).tearDown() def assert_reported_status(self, expected_status): service_status = InstanceServiceStatus.find_by( @@ -2397,10 +2392,9 @@ class CassandraDBAppTest(BaseAppTest.AppTestCase): @patch.object(utils, 'execute_with_timeout') def test_service_cleanup(self, exec_mock): - cass_service.CassandraAppStatus().cleanup_stalled_db_services() - exec_mock.assert_called_once_with( - cass_system.CASSANDRA_KILL, - shell=True) + cass_service.CassandraAppStatus(Mock()).cleanup_stalled_db_services() + exec_mock.assert_called_once_with(self.cassandra.CASSANDRA_KILL_CMD, + shell=True) def test_install(self): @@ -2424,72 +2418,24 @@ class CassandraDBAppTest(BaseAppTest.AppTestCase): self.assert_reported_status(rd_instance.ServiceStatuses.NEW) - @patch('trove.guestagent.datastore.experimental.cassandra.service.LOG') - def test_cassandra_error_in_write_config_verify_unlink(self, *args): - # this test verifies not only that the write_config - # method properly invoked execute, but also that it properly - # attempted to unlink the file (as a result of the exception) - - mock_unlink = Mock(return_value=0) - - # We call tempfile.mkstemp() here and Mock() the mkstemp() - # parameter to write_config for testability. - (temp_handle, temp_config_name) = tempfile.mkstemp() - mock_mkstemp = MagicMock(return_value=(temp_handle, temp_config_name)) - - configuration = 'this is my configuration' - - with patch('trove.guestagent.common.operating_system.move', - side_effect=ProcessExecutionError('some exception')): - self.assertRaises(ProcessExecutionError, - self.cassandra.write_config, - config_contents=configuration, - execute_function=Mock(), - mkstemp_function=mock_mkstemp, - unlink_function=mock_unlink) - - self.assertEqual(1, mock_unlink.call_count) - - # really delete the temporary_config_file - os.unlink(temp_config_name) - @patch.multiple('trove.guestagent.common.operating_system', - chown=DEFAULT, chmod=DEFAULT, move=DEFAULT) - def test_cassandra_write_config(self, chown, chmod, move): - # ensure that write_config creates a temporary file, and then - # moves the file to the final place. Also validate the - # contents of the file written. - - # We call tempfile.mkstemp() here and Mock() the mkstemp() - # parameter to write_config for testability. - (temp_handle, temp_config_name) = tempfile.mkstemp() - mock_mkstemp = MagicMock(return_value=(temp_handle, temp_config_name)) - + chown=DEFAULT, chmod=DEFAULT, write_file=DEFAULT) + def test_cassandra_write_config(self, chown, chmod, write_file): configuration = 'some arbitrary configuration text' + self.cassandra.write_config(configuration) - mock_execute = MagicMock(return_value=('', '')) - - self.cassandra.write_config(configuration, - execute_function=mock_execute, - mkstemp_function=mock_mkstemp) - - move.assert_called_with(temp_config_name, cass_system.CASSANDRA_CONF, - as_root=True) - chown.assert_called_with(cass_system.CASSANDRA_CONF, + write_file.assert_called_with( + self.cassandra.cassandra_conf, + configuration, + codec=ANY, + as_root=True) + chown.assert_called_with(self.cassandra.cassandra_conf, "cassandra", "cassandra", recursive=False, as_root=True) chmod.assert_called_with( - cass_system.CASSANDRA_CONF, FileMode.ADD_READ_ALL, as_root=True) - - self.assertEqual(1, mock_mkstemp.call_count) - - with open(temp_config_name, 'r') as config_file: - configuration_data = config_file.read() - - self.assertEqual(configuration, configuration_data) - - # really delete the temporary_config_file - os.unlink(temp_config_name) + self.cassandra.cassandra_conf, + FileMode.ADD_READ_ALL, + as_root=True) class CouchbaseAppTest(BaseAppTest.AppTestCase):