Reuse Cassandra connections
Cache and reuse the connections in the Admin and Status objects to avoid leaking resources. Initialize the App and Admin objects themselves lazily at the first access. The Connection object has been changed to act as both an instance object and context manager. Change-Id: I4263b870a1099d345ab1940c61ccdf4680a706ca Closes-Bug: 1566946
This commit is contained in:
parent
6a447af625
commit
728df1640f
@ -0,0 +1,4 @@
|
||||
---
|
||||
fixes:
|
||||
- Make guestagent reuse Cassandra connections to eliminate resource
|
||||
leaks. Bug 1566946.
|
@ -24,9 +24,6 @@ from trove.common import instance as trove_instance
|
||||
from trove.common.notification import EndNotification
|
||||
from trove.guestagent import backup
|
||||
from trove.guestagent.datastore.experimental.cassandra import service
|
||||
from trove.guestagent.datastore.experimental.cassandra.service import (
|
||||
CassandraAdmin
|
||||
)
|
||||
from trove.guestagent.datastore import manager
|
||||
from trove.guestagent import volume
|
||||
|
||||
@ -37,10 +34,10 @@ CONF = cfg.CONF
|
||||
|
||||
class Manager(manager.Manager):
|
||||
|
||||
def __init__(self):
|
||||
self._app = service.CassandraApp()
|
||||
self.__admin = CassandraAdmin(self.app.get_current_superuser())
|
||||
super(Manager, self).__init__('cassandra')
|
||||
def __init__(self, manager_name='cassandra'):
|
||||
super(Manager, self).__init__(manager_name)
|
||||
self._app = None
|
||||
self._admin = None
|
||||
|
||||
@property
|
||||
def status(self):
|
||||
@ -48,11 +45,18 @@ class Manager(manager.Manager):
|
||||
|
||||
@property
|
||||
def app(self):
|
||||
if self._app is None:
|
||||
self._app = self.build_app()
|
||||
return self._app
|
||||
|
||||
def build_app(self):
|
||||
return service.CassandraApp()
|
||||
|
||||
@property
|
||||
def admin(self):
|
||||
return self.__admin
|
||||
if self._admin is None:
|
||||
self._admin = self.app.build_admin()
|
||||
return self._admin
|
||||
|
||||
@property
|
||||
def configuration_manager(self):
|
||||
@ -145,7 +149,7 @@ class Manager(manager.Manager):
|
||||
self.app.secure()
|
||||
self.app.restart()
|
||||
|
||||
self.__admin = CassandraAdmin(self.app.get_current_superuser())
|
||||
self._admin = self.app.build_admin()
|
||||
|
||||
if not cluster_config and self.is_root_enabled(context):
|
||||
self.status.report_root(context, self.app.default_superuser_name)
|
||||
@ -272,7 +276,7 @@ class Manager(manager.Manager):
|
||||
|
||||
def cluster_secure(self, context, password):
|
||||
os_admin = self.app.cluster_secure(password)
|
||||
self.__admin = CassandraAdmin(self.app.get_current_superuser())
|
||||
self._admin = self.app.build_admin()
|
||||
return os_admin
|
||||
|
||||
def get_admin_credentials(self, context):
|
||||
@ -280,4 +284,4 @@ class Manager(manager.Manager):
|
||||
|
||||
def store_admin_credentials(self, context, admin_credentials):
|
||||
self.app.store_admin_credentials(admin_credentials)
|
||||
self.__admin = CassandraAdmin(self.app.get_current_superuser())
|
||||
self._admin = self.app.build_admin()
|
||||
|
@ -21,6 +21,7 @@ from cassandra.auth import PlainTextAuthProvider
|
||||
from cassandra.cluster import Cluster
|
||||
from cassandra.cluster import NoHostAvailable
|
||||
from cassandra import OperationTimedOut
|
||||
from cassandra.policies import ConstantReconnectionPolicy
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import netutils
|
||||
|
||||
@ -45,7 +46,6 @@ from trove.guestagent import pkg
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
MANAGER = CONF.datastore_manager if CONF.datastore_manager else 'cassandra'
|
||||
|
||||
packager = pkg.Package()
|
||||
|
||||
@ -135,6 +135,9 @@ class CassandraApp(object):
|
||||
def cqlsh_conf_path(self):
|
||||
return "~/.cassandra/cqlshrc"
|
||||
|
||||
def build_admin(self):
|
||||
return CassandraAdmin(self.get_current_superuser())
|
||||
|
||||
def install_if_needed(self, packages):
|
||||
"""Prepare the guest machine with a Cassandra server installation."""
|
||||
LOG.info(_("Preparing Guest as a Cassandra Server"))
|
||||
@ -664,6 +667,12 @@ class CassandraApp(object):
|
||||
LOG.exception(_("The node failed to decommission itself."))
|
||||
self.status.set_status(rd_instance.ServiceStatuses.FAILED)
|
||||
return
|
||||
finally:
|
||||
# Cassandra connections have ability to automatically discover and
|
||||
# fallback to other cluster nodes whenever a node goes down.
|
||||
# Reset the status after decomissioning to ensure the heartbeat
|
||||
# connection talks to this node only.
|
||||
self.status = CassandraAppStatus(self.get_current_superuser())
|
||||
|
||||
try:
|
||||
self.stop_db(update_db=True, do_not_start_on_reboot=True)
|
||||
@ -690,7 +699,7 @@ class CassandraApp(object):
|
||||
superuser-level access to all keyspaces.
|
||||
"""
|
||||
cassandra = models.CassandraRootUser(password=root_password)
|
||||
admin = CassandraAdmin(self.get_current_superuser())
|
||||
admin = self.build_admin()
|
||||
if self.is_root_enabled():
|
||||
admin.alter_user_password(cassandra)
|
||||
else:
|
||||
@ -702,7 +711,7 @@ class CassandraApp(object):
|
||||
"""The Trove administrative user ('os_admin') should normally be the
|
||||
only superuser in the system.
|
||||
"""
|
||||
found = CassandraAdmin(self.get_current_superuser()).list_superusers()
|
||||
found = self.build_admin().list_superusers()
|
||||
return len([user for user in found
|
||||
if user.name != self._ADMIN_USER]) > 0
|
||||
|
||||
@ -717,11 +726,18 @@ class CassandraAppStatus(service.BaseDbStatus):
|
||||
"""
|
||||
super(CassandraAppStatus, self).__init__()
|
||||
self.__user = superuser
|
||||
self.__client = None
|
||||
|
||||
@property
|
||||
def client(self):
|
||||
if self.__client is None:
|
||||
self.__client = CassandraLocalhostConnection(self.__user)
|
||||
return self.__client
|
||||
|
||||
def _get_actual_db_status(self):
|
||||
try:
|
||||
with CassandraLocalhostConnection(self.__user):
|
||||
return rd_instance.ServiceStatuses.RUNNING
|
||||
self.client.execute('SELECT now() FROM system.local;')
|
||||
return rd_instance.ServiceStatuses.RUNNING
|
||||
except NoHostAvailable:
|
||||
return rd_instance.ServiceStatuses.SHUTDOWN
|
||||
except Exception:
|
||||
@ -752,16 +768,22 @@ class CassandraAdmin(object):
|
||||
|
||||
def __init__(self, user):
|
||||
self.__admin_user = user
|
||||
self.__client = None
|
||||
|
||||
@property
|
||||
def client(self):
|
||||
if self.__client is None:
|
||||
self.__client = CassandraLocalhostConnection(self.__admin_user)
|
||||
return self.__client
|
||||
|
||||
def create_user(self, context, users):
|
||||
"""
|
||||
Create new non-superuser accounts.
|
||||
New users are by default granted full access to all database resources.
|
||||
"""
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
for item in users:
|
||||
self._create_user_and_grant(client,
|
||||
self._deserialize_user(item))
|
||||
for item in users:
|
||||
self._create_user_and_grant(self.client,
|
||||
self._deserialize_user(item))
|
||||
|
||||
def _create_user_and_grant(self, client, user):
|
||||
"""
|
||||
@ -784,27 +806,24 @@ class CassandraAdmin(object):
|
||||
access to all keyspaces.
|
||||
"""
|
||||
LOG.debug("Creating a new superuser '%s'." % user.name)
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
client.execute("CREATE USER '{}' WITH PASSWORD %s SUPERUSER;",
|
||||
(user.name,), (user.password,))
|
||||
client.execute("GRANT ALL PERMISSIONS ON ALL KEYSPACES TO '{}';",
|
||||
(user.name,))
|
||||
self.client.execute("CREATE USER '{}' WITH PASSWORD %s SUPERUSER;",
|
||||
(user.name,), (user.password,))
|
||||
self.client.execute(
|
||||
"GRANT ALL PERMISSIONS ON ALL KEYSPACES TO '{}';", (user.name,))
|
||||
|
||||
def delete_user(self, context, user):
|
||||
self.drop_user(self._deserialize_user(user))
|
||||
|
||||
def drop_user(self, user):
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
self._drop_user(client, user)
|
||||
self._drop_user(self.client, user)
|
||||
|
||||
def _drop_user(self, client, user):
|
||||
LOG.debug("Deleting user '%s'." % user.name)
|
||||
client.execute("DROP USER '{}';", (user.name, ))
|
||||
|
||||
def get_user(self, context, username, hostname):
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
user = self._find_user(client, username)
|
||||
return user.serialize() if user is not None else None
|
||||
user = self._find_user(self.client, username)
|
||||
return user.serialize() if user is not None else None
|
||||
|
||||
def _find_user(self, client, username):
|
||||
"""
|
||||
@ -821,11 +840,9 @@ class CassandraAdmin(object):
|
||||
List all non-superuser accounts. Omit names on the ignored list.
|
||||
Return an empty set if None.
|
||||
"""
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
users = [user.serialize() for user in
|
||||
self._get_listed_users(client)]
|
||||
return pagination.paginate_list(users, limit, marker,
|
||||
include_marker)
|
||||
users = [user.serialize() for user in
|
||||
self._get_listed_users(self.client)]
|
||||
return pagination.paginate_list(users, limit, marker, include_marker)
|
||||
|
||||
def _get_listed_users(self, client):
|
||||
"""
|
||||
@ -927,27 +944,24 @@ class CassandraAdmin(object):
|
||||
|
||||
def list_superusers(self):
|
||||
"""List all system users existing in the database."""
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
return self._get_users(client, lambda user: user.super)
|
||||
return self._get_users(self.client, lambda user: user.super)
|
||||
|
||||
def grant_access(self, context, username, hostname, databases):
|
||||
"""
|
||||
Grant full access on keyspaces to a given username.
|
||||
"""
|
||||
user = models.CassandraUser(username)
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
for db in databases:
|
||||
self._grant_full_access_on_keyspace(
|
||||
client, models.CassandraSchema(db), user)
|
||||
for db in databases:
|
||||
self._grant_full_access_on_keyspace(
|
||||
self.client, models.CassandraSchema(db), user)
|
||||
|
||||
def revoke_access(self, context, username, hostname, database):
|
||||
"""
|
||||
Revoke all permissions on any database resources from a given username.
|
||||
"""
|
||||
user = models.CassandraUser(username)
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
self._revoke_all_access_on_keyspace(
|
||||
client, models.CassandraSchema(database), user)
|
||||
self._revoke_all_access_on_keyspace(
|
||||
self.client, models.CassandraSchema(database), user)
|
||||
|
||||
def _grant_full_access_on_keyspace(self, client, keyspace, user,
|
||||
check_reserved=True):
|
||||
@ -988,11 +1002,10 @@ class CassandraAdmin(object):
|
||||
(keyspace.name, user.name))
|
||||
|
||||
def update_attributes(self, context, username, hostname, user_attrs):
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
user = self._load_user(client, username)
|
||||
new_name = user_attrs.get('name')
|
||||
new_password = user_attrs.get('password')
|
||||
self._update_user(client, user, new_name, new_password)
|
||||
user = self._load_user(self.client, username)
|
||||
new_name = user_attrs.get('name')
|
||||
new_password = user_attrs.get('password')
|
||||
self._update_user(self.client, user, new_name, new_password)
|
||||
|
||||
def _update_user(self, client, user, new_username, new_password):
|
||||
"""
|
||||
@ -1029,13 +1042,12 @@ class CassandraAdmin(object):
|
||||
self._drop_user(client, user)
|
||||
|
||||
def alter_user_password(self, user):
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
self._alter_user_password(client, user)
|
||||
self._alter_user_password(self.client, user)
|
||||
|
||||
def change_passwords(self, context, users):
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
for user in users:
|
||||
self._alter_user_password(client, self._deserialize_user(user))
|
||||
for user in users:
|
||||
self._alter_user_password(self.client,
|
||||
self._deserialize_user(user))
|
||||
|
||||
def _alter_user_password(self, client, user):
|
||||
LOG.debug("Changing password of user '%s'." % user.name)
|
||||
@ -1043,10 +1055,9 @@ class CassandraAdmin(object):
|
||||
"WITH PASSWORD %s;", (user.name,), (user.password,))
|
||||
|
||||
def create_database(self, context, databases):
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
for item in databases:
|
||||
self._create_single_node_keyspace(
|
||||
client, self._deserialize_keyspace(item))
|
||||
for item in databases:
|
||||
self._create_single_node_keyspace(
|
||||
self.client, self._deserialize_keyspace(item))
|
||||
|
||||
def _create_single_node_keyspace(self, client, keyspace):
|
||||
"""
|
||||
@ -1073,8 +1084,8 @@ class CassandraAdmin(object):
|
||||
"'replication_factor' : 1 }};", (keyspace.name,))
|
||||
|
||||
def delete_database(self, context, database):
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
self._drop_keyspace(client, self._deserialize_keyspace(database))
|
||||
self._drop_keyspace(self.client,
|
||||
self._deserialize_keyspace(database))
|
||||
|
||||
def _drop_keyspace(self, client, keyspace):
|
||||
LOG.debug("Dropping keyspace '%s'." % keyspace.name)
|
||||
@ -1082,11 +1093,10 @@ class CassandraAdmin(object):
|
||||
|
||||
def list_databases(self, context, limit=None, marker=None,
|
||||
include_marker=False):
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
databases = [keyspace.serialize() for keyspace
|
||||
in self._get_available_keyspaces(client)]
|
||||
return pagination.paginate_list(databases, limit, marker,
|
||||
include_marker)
|
||||
databases = [keyspace.serialize() for keyspace
|
||||
in self._get_available_keyspaces(self.client)]
|
||||
return pagination.paginate_list(databases, limit, marker,
|
||||
include_marker)
|
||||
|
||||
def _get_available_keyspaces(self, client):
|
||||
"""
|
||||
@ -1099,10 +1109,9 @@ class CassandraAdmin(object):
|
||||
if db.keyspace_name not in self.ignore_dbs}
|
||||
|
||||
def list_access(self, context, username, hostname):
|
||||
with CassandraLocalhostConnection(self.__admin_user) as client:
|
||||
user = self._find_user(client, username)
|
||||
if user:
|
||||
return user.databases
|
||||
user = self._find_user(self.client, username)
|
||||
if user:
|
||||
return user.databases
|
||||
|
||||
raise exception.UserNotFound(username)
|
||||
|
||||
@ -1136,11 +1145,11 @@ class CassandraAdmin(object):
|
||||
|
||||
@property
|
||||
def ignore_users(self):
|
||||
return cfg.get_ignored_users(manager=MANAGER)
|
||||
return cfg.get_ignored_users()
|
||||
|
||||
@property
|
||||
def ignore_dbs(self):
|
||||
return cfg.get_ignored_dbs(manager=MANAGER)
|
||||
return cfg.get_ignored_dbs()
|
||||
|
||||
|
||||
class CassandraConnection(object):
|
||||
@ -1148,6 +1157,8 @@ class CassandraConnection(object):
|
||||
|
||||
# Cassandra 2.1 only supports protocol versions 3 and lower.
|
||||
NATIVE_PROTOCOL_VERSION = 3
|
||||
CONNECTION_TIMEOUT_SEC = CONF.agent_call_high_timeout
|
||||
RECONNECT_DELAY_SEC = 3
|
||||
|
||||
def __init__(self, contact_points, user):
|
||||
self.__user = user
|
||||
@ -1155,18 +1166,25 @@ class CassandraConnection(object):
|
||||
# After the driver connects to one of the nodes it will automatically
|
||||
# discover the rest.
|
||||
# Will connect to '127.0.0.1' if None contact points are given.
|
||||
#
|
||||
# Set the 'reconnection_policy' so that dead connections recover fast.
|
||||
self._cluster = Cluster(
|
||||
contact_points=contact_points,
|
||||
auth_provider=PlainTextAuthProvider(user.name, user.password),
|
||||
protocol_version=self.NATIVE_PROTOCOL_VERSION)
|
||||
protocol_version=self.NATIVE_PROTOCOL_VERSION,
|
||||
connect_timeout=self.CONNECTION_TIMEOUT_SEC,
|
||||
control_connection_timeout=self.CONNECTION_TIMEOUT_SEC,
|
||||
reconnection_policy=ConstantReconnectionPolicy(
|
||||
self.RECONNECT_DELAY_SEC, max_attempts=None))
|
||||
self.__session = None
|
||||
|
||||
self._connect()
|
||||
|
||||
def __enter__(self):
|
||||
self.__connect()
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
self.__disconnect()
|
||||
self._disconnect()
|
||||
|
||||
def execute(self, query, identifiers=None, data_values=None, timeout=None):
|
||||
"""
|
||||
@ -1182,7 +1200,7 @@ class CassandraConnection(object):
|
||||
There is no timeout if set to None.
|
||||
Return a set of rows or an empty list if None.
|
||||
"""
|
||||
if self.__is_active():
|
||||
if self.is_active():
|
||||
try:
|
||||
rows = self.__session.execute(self.__bind(query, identifiers),
|
||||
data_values, timeout)
|
||||
@ -1199,11 +1217,11 @@ class CassandraConnection(object):
|
||||
return query.format(*identifiers)
|
||||
return query
|
||||
|
||||
def __connect(self):
|
||||
def _connect(self):
|
||||
if not self._cluster.is_shutdown:
|
||||
LOG.debug("Connecting to a Cassandra cluster as '%s'."
|
||||
% self.__user.name)
|
||||
if not self.__is_active():
|
||||
if not self.is_active():
|
||||
self.__session = self._cluster.connect()
|
||||
else:
|
||||
LOG.debug("Connection already open.")
|
||||
@ -1216,19 +1234,23 @@ class CassandraConnection(object):
|
||||
LOG.debug("Cannot perform this operation on a terminated cluster.")
|
||||
raise exception.UnprocessableEntity()
|
||||
|
||||
def __disconnect(self):
|
||||
if self.__is_active():
|
||||
def _disconnect(self):
|
||||
if self.is_active():
|
||||
try:
|
||||
LOG.debug("Disconnecting from cluster: '%s'"
|
||||
% self._cluster.metadata.cluster_name)
|
||||
self._cluster.shutdown()
|
||||
self.__session.shutdown()
|
||||
except Exception:
|
||||
LOG.debug("Failed to disconnect from a Cassandra cluster.")
|
||||
|
||||
def __is_active(self):
|
||||
def is_active(self):
|
||||
return self.__session and not self.__session.is_shutdown
|
||||
|
||||
def __del__(self):
|
||||
# The connections would survive the parent object's GC.
|
||||
# We need to close it explicitly.
|
||||
self._disconnect()
|
||||
|
||||
|
||||
class CassandraLocalhostConnection(CassandraConnection):
|
||||
"""
|
||||
|
@ -75,6 +75,13 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase):
|
||||
@patch('trove.guestagent.datastore.experimental.cassandra.service.LOG')
|
||||
def setUp(self, *args, **kwargs):
|
||||
super(GuestAgentCassandraDBManagerTest, self).setUp()
|
||||
|
||||
conn_patcher = patch.multiple(cass_service.CassandraConnection,
|
||||
_connect=DEFAULT,
|
||||
is_active=Mock(return_value=True))
|
||||
self.addCleanup(conn_patcher.stop)
|
||||
conn_patcher.start()
|
||||
|
||||
self.real_status = cass_service.CassandraAppStatus.set_status
|
||||
|
||||
class FakeInstanceServiceStatus(object):
|
||||
@ -87,9 +94,12 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase):
|
||||
return_value=FakeInstanceServiceStatus())
|
||||
self.context = trove_testtools.TroveTestContext(self)
|
||||
self.manager = cass_manager.Manager()
|
||||
self.manager._Manager__admin = cass_service.CassandraAdmin(
|
||||
self.manager._app = cass_service.CassandraApp()
|
||||
self.manager._admin = cass_service.CassandraAdmin(
|
||||
models.CassandraUser('Test'))
|
||||
self.admin = self.manager._Manager__admin
|
||||
self.admin = self.manager._admin
|
||||
self.admin._CassandraAdmin__client = MagicMock()
|
||||
self.conn = self.admin._CassandraAdmin__client
|
||||
self.pkg = cass_service.packager
|
||||
self.origin_os_path_exists = os.path.exists
|
||||
self.origin_format = volume.VolumeDevice.format
|
||||
@ -328,64 +338,58 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase):
|
||||
chars=string.ascii_letters + string.digits):
|
||||
return ''.join(random.choice(chars) for _ in range(size))
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_create_database(self, conn):
|
||||
def test_create_database(self):
|
||||
db1 = models.CassandraSchema('db1')
|
||||
db2 = models.CassandraSchema('db2')
|
||||
db3 = models.CassandraSchema(self._get_random_name(32))
|
||||
|
||||
self.manager.create_database(self.context,
|
||||
self._serialize_collection(db1, db2, db3))
|
||||
conn.return_value.execute.assert_has_calls([
|
||||
self.conn.execute.assert_has_calls([
|
||||
call(self.__CREATE_DB_FORMAT, (db1.name,)),
|
||||
call(self.__CREATE_DB_FORMAT, (db2.name,)),
|
||||
call(self.__CREATE_DB_FORMAT, (db3.name,))
|
||||
])
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_delete_database(self, conn):
|
||||
def test_delete_database(self):
|
||||
db = models.CassandraSchema(self._get_random_name(32))
|
||||
self.manager.delete_database(self.context, db.serialize())
|
||||
conn.return_value.execute.assert_called_once_with(
|
||||
self.conn.execute.assert_called_once_with(
|
||||
self.__DROP_DB_FORMAT, (db.name,))
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_create_user(self, conn):
|
||||
def test_create_user(self):
|
||||
usr1 = models.CassandraUser('usr1')
|
||||
usr2 = models.CassandraUser('usr2', '')
|
||||
usr3 = models.CassandraUser(self._get_random_name(1025), 'password')
|
||||
|
||||
self.manager.create_user(self.context,
|
||||
self._serialize_collection(usr1, usr2, usr3))
|
||||
conn.return_value.execute.assert_has_calls([
|
||||
self.conn.execute.assert_has_calls([
|
||||
call(self.__CREATE_USR_FORMAT, (usr1.name,), (usr1.password,)),
|
||||
call(self.__CREATE_USR_FORMAT, (usr2.name,), (usr2.password,)),
|
||||
call(self.__CREATE_USR_FORMAT, (usr3.name,), (usr3.password,))
|
||||
])
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_delete_user(self, conn):
|
||||
def test_delete_user(self):
|
||||
usr = models.CassandraUser(self._get_random_name(1025), 'password')
|
||||
self.manager.delete_user(self.context, usr.serialize())
|
||||
conn.return_value.execute.assert_called_once_with(
|
||||
self.conn.execute.assert_called_once_with(
|
||||
self.__DROP_USR_FORMAT, (usr.name,))
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_change_passwords(self, conn):
|
||||
def test_change_passwords(self):
|
||||
usr1 = models.CassandraUser('usr1')
|
||||
usr2 = models.CassandraUser('usr2', '')
|
||||
usr3 = models.CassandraUser(self._get_random_name(1025), 'password')
|
||||
|
||||
self.manager.change_passwords(self.context, self._serialize_collection(
|
||||
usr1, usr2, usr3))
|
||||
conn.return_value.execute.assert_has_calls([
|
||||
self.conn.execute.assert_has_calls([
|
||||
call(self.__ALTER_USR_FORMAT, (usr1.name,), (usr1.password,)),
|
||||
call(self.__ALTER_USR_FORMAT, (usr2.name,), (usr2.password,)),
|
||||
call(self.__ALTER_USR_FORMAT, (usr3.name,), (usr3.password,))
|
||||
])
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_alter_user_password(self, conn):
|
||||
def test_alter_user_password(self):
|
||||
usr1 = models.CassandraUser('usr1')
|
||||
usr2 = models.CassandraUser('usr2', '')
|
||||
usr3 = models.CassandraUser(self._get_random_name(1025), 'password')
|
||||
@ -393,14 +397,13 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase):
|
||||
self.admin.alter_user_password(usr1)
|
||||
self.admin.alter_user_password(usr2)
|
||||
self.admin.alter_user_password(usr3)
|
||||
conn.return_value.execute.assert_has_calls([
|
||||
self.conn.execute.assert_has_calls([
|
||||
call(self.__ALTER_USR_FORMAT, (usr1.name,), (usr1.password,)),
|
||||
call(self.__ALTER_USR_FORMAT, (usr2.name,), (usr2.password,)),
|
||||
call(self.__ALTER_USR_FORMAT, (usr3.name,), (usr3.password,))
|
||||
])
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_grant_access(self, conn):
|
||||
def test_grant_access(self):
|
||||
usr1 = models.CassandraUser('usr1')
|
||||
usr2 = models.CassandraUser('usr1', 'password')
|
||||
db1 = models.CassandraSchema('db1')
|
||||
@ -418,10 +421,11 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase):
|
||||
expected.append(call(self.__GRANT_FORMAT,
|
||||
(modifier, db3.name, usr2.name)))
|
||||
|
||||
conn.return_value.execute.assert_has_calls(expected, any_order=True)
|
||||
self.conn.execute.assert_has_calls(
|
||||
expected,
|
||||
any_order=True)
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_revoke_access(self, conn):
|
||||
def test_revoke_access(self):
|
||||
usr1 = models.CassandraUser('usr1')
|
||||
usr2 = models.CassandraUser('usr1', 'password')
|
||||
db1 = models.CassandraSchema('db1')
|
||||
@ -429,19 +433,17 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase):
|
||||
|
||||
self.manager.revoke_access(self.context, usr1.name, None, db1.name)
|
||||
self.manager.revoke_access(self.context, usr2.name, None, db2.name)
|
||||
conn.return_value.execute.assert_has_calls([
|
||||
self.conn.execute.assert_has_calls([
|
||||
call(self.__REVOKE_FORMAT, (db1.name, usr1.name)),
|
||||
call(self.__REVOKE_FORMAT, (db2.name, usr2.name))
|
||||
])
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_get_available_keyspaces(self, conn):
|
||||
def test_get_available_keyspaces(self):
|
||||
self.manager.list_databases(self.context)
|
||||
conn.return_value.execute.assert_called_once_with(
|
||||
self.conn.execute.assert_called_once_with(
|
||||
self.__LIST_DB_FORMAT)
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_list_databases(self, conn):
|
||||
def test_list_databases(self):
|
||||
db1 = models.CassandraSchema('db1')
|
||||
db2 = models.CassandraSchema('db2')
|
||||
db3 = models.CassandraSchema(self._get_random_name(32))
|
||||
@ -538,8 +540,7 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase):
|
||||
|
||||
self.assertEqual({}, acl)
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_get_listed_users(self, conn):
|
||||
def test_get_listed_users(self):
|
||||
usr1 = models.CassandraUser(self._get_random_name(1025))
|
||||
usr2 = models.CassandraUser(self._get_random_name(1025))
|
||||
usr3 = models.CassandraUser(self._get_random_name(1025))
|
||||
@ -555,7 +556,7 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase):
|
||||
rv_3 = NonCallableMagicMock()
|
||||
rv_3.configure_mock(name=usr3.name, super=True)
|
||||
|
||||
with patch.object(conn.return_value, 'execute', return_value=iter(
|
||||
with patch.object(self.conn, 'execute', return_value=iter(
|
||||
[rv_1, rv_2, rv_3])):
|
||||
with patch.object(self.admin, '_get_acl',
|
||||
return_value={usr1.name: {db1.name: {'SELECT'},
|
||||
@ -563,15 +564,14 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase):
|
||||
usr3.name: {db2.name: {'SELECT'}}}
|
||||
):
|
||||
usrs = self.manager.list_users(self.context)
|
||||
conn.return_value.execute.assert_has_calls([
|
||||
self.conn.execute.assert_has_calls([
|
||||
call(self.__LIST_USR_FORMAT),
|
||||
], any_order=True)
|
||||
self.assertIn(usr1.serialize(), usrs[0])
|
||||
self.assertIn(usr2.serialize(), usrs[0])
|
||||
self.assertIn(usr3.serialize(), usrs[0])
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_list_access(self, conn):
|
||||
def test_list_access(self):
|
||||
usr1 = models.CassandraUser('usr1')
|
||||
usr2 = models.CassandraUser('usr2')
|
||||
usr3 = models.CassandraUser(self._get_random_name(1025), 'password')
|
||||
@ -594,8 +594,7 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase):
|
||||
with ExpectedException(exception.UserNotFound):
|
||||
self.manager.list_access(self.context, usr3.name, None)
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_list_users(self, conn):
|
||||
def test_list_users(self):
|
||||
usr1 = models.CassandraUser('usr1')
|
||||
usr2 = models.CassandraUser('usr2')
|
||||
usr3 = models.CassandraUser(self._get_random_name(1025), 'password')
|
||||
@ -613,8 +612,7 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase):
|
||||
with patch.object(self.admin, self.__N_GLU, return_value=set()):
|
||||
self.assertEqual(([], None), self.manager.list_users(self.context))
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_get_user(self, conn):
|
||||
def test_get_user(self):
|
||||
usr1 = models.CassandraUser('usr1')
|
||||
usr2 = models.CassandraUser('usr2')
|
||||
usr3 = models.CassandraUser(self._get_random_name(1025), 'password')
|
||||
@ -630,8 +628,7 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase):
|
||||
|
||||
@patch.object(cass_service.CassandraAdmin, '_deserialize_keyspace',
|
||||
side_effect=lambda p1: p1)
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_rename_user(self, conn, ks_deserializer):
|
||||
def test_rename_user(self, ks_deserializer):
|
||||
usr = models.CassandraUser('usr')
|
||||
db1 = models.CassandraSchema('db1').serialize()
|
||||
db2 = models.CassandraSchema('db2').serialize()
|
||||
@ -653,8 +650,7 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase):
|
||||
call(ANY, db2, ANY)])
|
||||
drop.assert_called_once_with(ANY, usr)
|
||||
|
||||
@patch.object(cass_service.CassandraLocalhostConnection, '__enter__')
|
||||
def test_update_attributes(self, conn):
|
||||
def test_update_attributes(self):
|
||||
usr = models.CassandraUser('usr', 'pwd')
|
||||
|
||||
with patch.object(self.admin, self.__N_BU, return_value=usr):
|
||||
|
Loading…
Reference in New Issue
Block a user