Merge "Validate databases for user grants"
This commit is contained in:
commit
195d501a24
@ -376,13 +376,20 @@ class MySqlAdmin(object):
|
||||
def grant_access(self, username, hostname, databases):
|
||||
"""Grant a user permission to use a given database."""
|
||||
user = self._get_user(username, hostname)
|
||||
mydb = models.ValidatedMySQLDatabase()
|
||||
with LocalSqlClient(get_engine()) as client:
|
||||
for database in databases:
|
||||
g = sql_query.Grant(permissions='ALL', database=database,
|
||||
user=user.name, host=user.host,
|
||||
hashed=user.password)
|
||||
t = text(str(g))
|
||||
client.execute(t)
|
||||
try:
|
||||
mydb.name = database
|
||||
except ValueError:
|
||||
LOG.info(_(
|
||||
"Grant access to %s is not allowed") % database)
|
||||
|
||||
g = sql_query.Grant(permissions='ALL', database=mydb.name,
|
||||
user=user.name, host=user.host,
|
||||
hashed=user.password)
|
||||
t = text(str(g))
|
||||
client.execute(t)
|
||||
|
||||
def is_root_enabled(self):
|
||||
"""Return True if root access is enabled; False otherwise."""
|
||||
|
@ -105,6 +105,12 @@ class UserAccessBase(object):
|
||||
access = [db.name for db in access]
|
||||
asserts.assert_equal(set(access), set(databases))
|
||||
|
||||
def _test_ignore_access(self, users, databases, expected_response=200):
|
||||
databases = [d for d in databases if d not in ['lost+found',
|
||||
'mysql',
|
||||
'information_schema']]
|
||||
self._test_access(users, databases, expected_response)
|
||||
|
||||
def _reset_access(self):
|
||||
for user in self.users:
|
||||
for database in self.databases + self.ghostdbs:
|
||||
@ -277,6 +283,16 @@ class TestUserAccessPositive(UserAccessBase):
|
||||
self._grant_access_plural(self.users, self.databases)
|
||||
self._test_access(self.users, self.databases)
|
||||
|
||||
@test(depends_on=[test_no_access])
|
||||
def test_grant_full_access_ignore_databases(self):
|
||||
# The users are granted access to all test databases.
|
||||
all_dbs = []
|
||||
all_dbs.extend(self.databases)
|
||||
all_dbs.extend(['lost+found', 'mysql', 'information_schema'])
|
||||
self._reset_access()
|
||||
self._grant_access_plural(self.users, self.databases)
|
||||
self._test_ignore_access(self.users, self.databases)
|
||||
|
||||
@test(depends_on=[test_grant_full_access])
|
||||
def test_grant_idempotence(self):
|
||||
# Grant operations can be repeated with no ill effects.
|
||||
|
@ -85,6 +85,21 @@ class GuestAgentManagerTest(testtools.TestCase):
|
||||
self.manager.delete_user(self.context, user)
|
||||
verify(dbaas.MySqlAdmin).delete_user(user)
|
||||
|
||||
def test_grant_access(self):
|
||||
username = "test_user"
|
||||
hostname = "test_host"
|
||||
databases = ["test_database"]
|
||||
when(dbaas.MySqlAdmin).grant_access(username,
|
||||
hostname,
|
||||
databases).thenReturn(None)
|
||||
|
||||
self.manager.grant_access(self.context,
|
||||
username,
|
||||
hostname,
|
||||
databases)
|
||||
|
||||
verify(dbaas.MySqlAdmin).grant_access(username, hostname, databases)
|
||||
|
||||
def test_list_databases(self):
|
||||
when(dbaas.MySqlAdmin).list_databases(None, None,
|
||||
False).thenReturn(['database1'])
|
||||
|
Loading…
x
Reference in New Issue
Block a user