Merge "User and Database List in Create Not Validated"

This commit is contained in:
Jenkins 2013-09-24 17:44:51 +00:00 committed by Gerrit Code Review
commit 40179e971d
4 changed files with 184 additions and 6 deletions

View File

@ -293,3 +293,21 @@ class BackupFileNotFound(NotFound):
class SwiftAuthError(TroveError):
message = _("Swift account not accessible for tenant %(tenant_id)s.")
class DatabaseForUserNotInDatabaseListError(TroveError):
message = _("The request indicates that user %(user)s should have access "
"to database %(database)s, but database %(database)s is not "
"included in the initial databases list.")
class DatabaseInitialDatabaseDuplicateError(TroveError):
message = _("Two or more databases share the same name in the initial "
"databases list. Please correct the names or remove the "
"duplicate entries.")
class DatabaseInitialUserDuplicateError(TroveError):
message = _("Two or more users share the same name and host in the "
"initial users list. Please correct the names or remove the "
"duplicate entries.")

View File

@ -24,9 +24,13 @@ def populate_validated_databases(dbs):
"""
try:
databases = []
unique_identities = set()
for database in dbs:
mydb = guest_models.ValidatedMySQLDatabase()
mydb.name = database.get('name', '')
if mydb.name in unique_identities:
raise exception.DatabaseInitialDatabaseDuplicateError()
unique_identities.add(mydb.name)
mydb.character_set = database.get('character_set', '')
mydb.collate = database.get('collate', '')
databases.append(mydb.serialize())
@ -39,18 +43,28 @@ def populate_validated_databases(dbs):
raise exception.BadRequest(safe_string)
def populate_users(users):
def populate_users(users, initial_databases=None):
"""Create a serializable request containing users"""
users_data = []
unique_identities = set()
for user in users:
u = guest_models.MySQLUser()
u.name = user.get('name', '')
u.host = user.get('host')
user_identity = (u.name, u.host)
if user_identity in unique_identities:
raise exception.DatabaseInitialUserDuplicateError()
unique_identities.add(user_identity)
u.password = user.get('password', '')
dbs = user.get('databases', '')
if dbs:
for db in dbs:
u.databases = db.get('name', '')
user_dbs = user.get('databases', '')
# user_db_names guaranteed unique and non-empty by apischema
user_db_names = [user_db.get('name', '') for user_db in user_dbs]
for user_db_name in user_db_names:
if (initial_databases is not None and user_db_name not in
initial_databases):
raise exception.DatabaseForUserNotInDatabaseListError(
user=u.name, database=user_db_name)
u.databases = user_db_name
users_data.append(u.serialize())
return users_data

View File

@ -188,9 +188,11 @@ class InstanceController(wsgi.Controller):
flavor_id = utils.get_id_from_href(flavor_ref)
databases = populate_validated_databases(
body['instance'].get('databases', []))
database_names = [database.get('_name', '') for database in databases]
users = None
try:
users = populate_users(body['instance'].get('users', []))
users = populate_users(body['instance'].get('users', []),
database_names)
except ValueError as ve:
raise exception.BadRequest(msg=ve)

View File

@ -0,0 +1,144 @@
# Copyright 2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from testtools import TestCase
from testtools.matchers import Is
from testtools.matchers import Equals
from trove.common.exception import DatabaseInitialDatabaseDuplicateError
from trove.common.exception import DatabaseForUserNotInDatabaseListError
from trove.common.exception import DatabaseInitialUserDuplicateError
from trove.extensions.mysql.common import populate_validated_databases
from trove.extensions.mysql.common import populate_users
class MySqlCommonTest(TestCase):
def setUp(self):
super(MySqlCommonTest, self).setUp()
def tearDown(self):
super(MySqlCommonTest, self).tearDown()
def test_initial_databases_none(self):
databases = []
result = populate_validated_databases(databases)
self.assertThat(len(result), Is(0))
def test_initial_databases_single(self):
databases = [{'name': 'one_db'}]
result = populate_validated_databases(databases)
self.assertThat(len(result), Is(1))
self.assertThat(result[0]['_name'], Equals('one_db'))
def test_initial_databases_unique(self):
databases = [{'name': 'one_db'}, {'name': 'diff_db'}]
result = populate_validated_databases(databases)
self.assertThat(len(result), Is(2))
def test_initial_databases_duplicate(self):
databases = [{'name': 'same_db'}, {'name': 'same_db'}]
self.assertRaises(DatabaseInitialDatabaseDuplicateError,
populate_validated_databases, databases)
def test_initial_databases_intermingled(self):
databases = [{'name': 'a_db'}, {'name': 'b_db'}, {'name': 'a_db'}]
self.assertRaises(DatabaseInitialDatabaseDuplicateError,
populate_validated_databases, databases)
def test_populate_users_single(self):
users = [{'name': 'bob', 'password': 'x'}]
result = populate_users(users)
self.assertThat(len(result), Is(1))
self.assertThat(result[0]['_name'], Equals('bob'))
self.assertThat(result[0]['_password'], Equals('x'))
def test_populate_users_unique_host(self):
users = [{'name': 'bob', 'password': 'x', 'host': '127.0.0.1'},
{'name': 'bob', 'password': 'x', 'host': '128.0.0.1'}]
result = populate_users(users)
self.assertThat(len(result), Is(2))
def test_populate_users_unique_name(self):
users = [{'name': 'bob', 'password': 'x', 'host': '127.0.0.1'},
{'name': 'tom', 'password': 'x', 'host': '127.0.0.1'}]
result = populate_users(users)
self.assertThat(len(result), Is(2))
def test_populate_users_duplicate(self):
users = [{'name': 'bob', 'password': 'x', 'host': '127.0.0.1'},
{'name': 'bob', 'password': 'y', 'host': '127.0.0.1'}]
self.assertRaises(DatabaseInitialUserDuplicateError,
populate_users, users)
def test_populate_users_intermingled(self):
users = [{'name': 'bob', 'password': 'x', 'host': '127.0.0.1'},
{'name': 'tom', 'password': 'y', 'host': '128.0.0.1'},
{'name': 'bob', 'password': 'z', 'host': '127.0.0.1'}]
self.assertRaises(DatabaseInitialUserDuplicateError,
populate_users, users)
def test_populate_users_both_db_list_empty(self):
initial_databases = []
users = [{"name": "bob", "password": "x"}]
result = populate_users(users, initial_databases)
self.assertThat(len(result), Is(1))
def test_populate_users_initial_db_list_empty(self):
initial_databases = []
users = [{"name": "bob", "password": "x",
"databases": [{"name": "my_db"}]}]
self.assertRaises(DatabaseForUserNotInDatabaseListError,
populate_users, users, initial_databases)
def test_populate_users_user_db_list_empty(self):
initial_databases = ['my_db']
users = [{"name": "bob", "password": "x"}]
result = populate_users(users, initial_databases)
self.assertThat(len(result), Is(1))
def test_populate_users_db_in_list(self):
initial_databases = ['my_db']
users = [{"name": "bob", "password": "x",
"databases": [{"name": "my_db"}]}]
result = populate_users(users, initial_databases)
self.assertThat(len(result), Is(1))
def test_populate_users_db_multi_in_list(self):
initial_databases = ['a_db', 'b_db', 'c_db', 'd_db']
users = [{"name": "bob", "password": "x",
"databases": [{"name": "a_db"}]},
{"name": "tom", "password": "y",
"databases": [{"name": "c_db"}]},
{"name": "sue", "password": "z",
"databases": [{"name": "c_db"}]}]
result = populate_users(users, initial_databases)
self.assertThat(len(result), Is(3))
def test_populate_users_db_not_in_list(self):
initial_databases = ['a_db', 'b_db', 'c_db', 'd_db']
users = [{"name": "bob", "password": "x",
"databases": [{"name": "fake_db"}]}]
self.assertRaises(DatabaseForUserNotInDatabaseListError,
populate_users, users, initial_databases)
def test_populate_users_db_multi_not_in_list(self):
initial_databases = ['a_db', 'b_db', 'c_db', 'd_db']
users = [{"name": "bob", "password": "x",
"databases": [{"name": "a_db"}]},
{"name": "tom", "password": "y",
"databases": [{"name": "fake_db"}]},
{"name": "sue", "password": "z",
"databases": [{"name": "d_db"}]}]
self.assertRaises(DatabaseForUserNotInDatabaseListError,
populate_users, users, initial_databases)