Add a pool_group to pools in v1.1

Pools can't be grouped together based on type, region or anything. This
is bad for a couple of things but probably the most critical effect is
that when adding pools of different types, the algorithm will balance
across all existing pools regardless of their type.

This patch adds a group field to pools. The group field's default value
is None so that it is backwards compatible for v1 and to avoid forcing
operators to set a group when they simply don't care.

In addition to the above, this patch makes flavors use the pool group
instead of the pool name. This allow us to have multiple pools baking a
specific flavor. Moreover, this also allows us to balance a flavor
across of registered pools and it's the base of the future queue
migrations work.

Change-Id: Ibeba39725ddbc1bc82d4e9d5bceff8b9d69d5839
Closes-bug: #1373994
This commit is contained in:
Flavio Percoco 2014-09-25 16:58:35 +02:00
parent 346ccd1cb0
commit 2d3ee77efe
14 changed files with 159 additions and 64 deletions

View File

@ -401,7 +401,7 @@ class MongodbPoolsTests(base.PoolsControllerTest):
super(MongodbPoolsTests, self).tearDown()
def test_delete_pool_used_by_flavor(self):
self.flavors_controller.create('durable', self.pool,
self.flavors_controller.create('durable', self.pool_group,
project=self.project,
capabilities={})

View File

@ -47,14 +47,19 @@ class PoolCatalogTest(testing.TestBase):
# NOTE(cpp-cabrera): populate catalogue
self.pool = str(uuid.uuid1())
self.pool2 = str(uuid.uuid1())
self.pool_group = 'pool-group'
self.queue = str(uuid.uuid1())
self.flavor = str(uuid.uuid1())
self.project = str(uuid.uuid1())
self.pools_ctrl.create(self.pool, 100, 'sqlite://:memory:')
self.pools_ctrl.create(self.pool2, 100,
'sqlite://:memory:',
group=self.pool_group)
self.catalogue_ctrl.insert(self.project, self.queue, self.pool)
self.catalog = pooling.Catalog(self.conf, cache, control)
self.flavors_ctrl.create(self.flavor, self.pool,
self.flavors_ctrl.create(self.flavor, self.pool_group,
project=self.project)
def tearDown(self):

View File

@ -38,6 +38,16 @@ patch_uri = {
}
}
patch_group = {
'type': 'object', 'properties': {
'uri': {
'type': 'string'
},
'additionalProperties': False
}
}
patch_weight = {
'type': 'object', 'properties': {
'weight': {
@ -50,6 +60,7 @@ patch_weight = {
create = {
'type': 'object', 'properties': {
'weight': patch_weight['properties']['weight'],
'group': patch_group['properties']['uri'],
'uri': patch_uri['properties']['uri'],
'options': patch_options['properties']['options']
},

View File

@ -183,12 +183,15 @@ class ResponseSchema(api.Api):
'uri': {
'type': 'string'
},
'group': {
'type': ['string', 'null']
},
'options': {
'type': 'object',
'additionalProperties': True
}
},
'required': ['href', 'weight', 'uri'],
'required': ['href', 'weight', 'uri', 'group'],
'additionalProperties': False,
},
}
@ -232,6 +235,9 @@ class ResponseSchema(api.Api):
'uri': {
'type': 'string'
},
'group': {
'type': ['string', 'null']
},
'weight': {
'type': 'number',
'minimum': -1

View File

@ -561,7 +561,7 @@ class PoolsBase(ControllerBase):
raise NotImplementedError
@abc.abstractmethod
def create(self, name, weight, uri, options=None):
def create(self, name, weight, uri, group=None, options=None):
"""Registers a pool entry.
:param name: The name of this pool
@ -571,12 +571,30 @@ class PoolsBase(ControllerBase):
:param uri: A URI that can be used by a storage client
(e.g., pymongo) to access this pool.
:type uri: six.text_type
:param group: The group of this pool
:type group: six.text_type
:param options: Options used to configure this pool
:type options: dict
"""
raise NotImplementedError
@abc.abstractmethod
def get_group(self, group=None, detailed=False):
"""Returns a single pool entry.
:param group: The group to filter on. `None` returns
pools that are not assigned to any pool.
:type group: six.text_type
:param detailed: Should the options data be included?
:type detailed: bool
:returns: weight, uri, and options for this pool
:rtype: {}
:raises: PoolDoesNotExist if not found
"""
raise NotImplementedError
@abc.abstractmethod
def get(self, name, detailed=False):
"""Returns a single pool entry.

View File

@ -94,10 +94,9 @@ class FlavorsController(base.FlavorsBase):
@utils.raises_conn_error
def create(self, name, pool, project=None, capabilities=None):
# NOTE(flaper87): It's faster to call exists and raise an
# error than calling get and letting the controller raise
# the exception.
if not self._pools_ctrl.exists(pool):
# NOTE(flaper87): Check if there are pools in this group.
# Should there be a `group_exists` method?
if not list(self._pools_ctrl.get_group(pool)):
raise errors.PoolDoesNotExist(pool)
capabilities = {} if capabilities is None else capabilities

View File

@ -74,10 +74,19 @@ class PoolsController(base.PoolsBase):
return _normalize(res, detailed)
@utils.raises_conn_error
def create(self, name, weight, uri, options=None):
def get_group(self, group=None, detailed=False):
cursor = self._col.find({'g': group}, fields=_field_spec(detailed))
normalizer = functools.partial(_normalize, detailed=detailed)
return utils.HookedCursor(cursor, normalizer)
@utils.raises_conn_error
def create(self, name, weight, uri, group=None, options=None):
options = {} if options is None else options
self._col.update({'n': name},
{'$set': {'n': name, 'w': weight, 'u': uri,
{'$set': {'n': name,
'w': weight,
'u': uri,
'g': group,
'o': options}},
upsert=True)
@ -87,11 +96,13 @@ class PoolsController(base.PoolsBase):
@utils.raises_conn_error
def update(self, name, **kwargs):
names = ('uri', 'weight', 'options')
names = ('uri', 'weight', 'group', 'options')
fields = common_utils.fields(kwargs, names,
pred=lambda x: x is not None,
key_transform=lambda x: x[0])
assert fields, '`weight`, `uri`, or `options` not found in kwargs'
assert fields, ('`weight`, `uri`, `group`, '
'or `options` not found in kwargs')
res = self._col.update({'n': name},
{'$set': fields},
upsert=False)
@ -103,14 +114,22 @@ class PoolsController(base.PoolsBase):
# NOTE(wpf): Initializing the Flavors controller here instead of
# doing so in __init__ is required to avoid falling in a maximum
# recursion error.
flavor_ctl = self.driver.flavors_controller
res = list(flavor_ctl._list_by_pool(name))
try:
pool = self.get(name)
pools_group = self.get_group(pool['group'])
flavor_ctl = self.driver.flavors_controller
res = list(flavor_ctl._list_by_pool(pool['group']))
if res:
flavors = ', '.join([x['name'] for x in res])
raise errors.PoolInUseByFlavor(name, flavors)
# NOTE(flaper87): If this is the only pool in the
# group and it's being used by a flavor, don't allow
# it to be deleted.
if res and len(pools_group) == 1:
flavors = ', '.join([x['name'] for x in res])
raise errors.PoolInUseByFlavor(name, flavors)
self._col.remove({'n': name}, w=0)
self._col.remove({'n': name}, w=0)
except errors.PoolDoesNotExist:
pass
@utils.raises_conn_error
def drop_all(self):
@ -121,6 +140,7 @@ class PoolsController(base.PoolsBase):
def _normalize(pool, detailed=False):
ret = {
'name': pool['n'],
'group': pool['g'],
'uri': pool['u'],
'weight': pool['w'],
}

View File

@ -445,11 +445,16 @@ class Catalog(object):
if flavor is not None:
flavor = self._flavor_ctrl.get(flavor, project=project)
pool = flavor['pool']
pools = self._pools_ctrl.get_group(group=flavor['pool'],
detailed=True)
pool = select.weighted(pools)
pool = pool and pool['name'] or None
else:
# NOTE(cpp-cabrera): limit=0 implies unlimited - select from
# all pools
pool = select.weighted(self._pools_ctrl.list(limit=0))
# NOTE(flaper87): Get pools assigned to the default
# group `None`. We should consider adding a `default_group`
# option in the future.
pools = self._pools_ctrl.get_group(detailed=True)
pool = select.weighted(pools)
pool = pool and pool['name'] or None
if not pool:

View File

@ -57,6 +57,16 @@ class PoolsController(base.PoolsBase):
normalizer = functools.partial(_normalize, detailed=detailed)
return (normalizer(v) for v in cursor)
@utils.raises_conn_error
def get_group(self, group=None, detailed=False):
stmt = sa.sql.select([tables.Pools]).where(
tables.Pools.c.group == group
)
cursor = self._conn.execute(stmt)
normalizer = functools.partial(_normalize, detailed=detailed)
return (normalizer(v) for v in cursor)
@utils.raises_conn_error
def get(self, name, detailed=False):
stmt = sa.sql.select([tables.Pools]).where(
@ -71,12 +81,12 @@ class PoolsController(base.PoolsBase):
# TODO(cpp-cabrera): rename to upsert
@utils.raises_conn_error
def create(self, name, weight, uri, options=None):
def create(self, name, weight, uri, group=None, options=None):
opts = None if options is None else utils.json_encode(options)
try:
stmt = sa.sql.expression.insert(tables.Pools).values(
name=name, weight=weight, uri=uri, options=opts
name=name, weight=weight, uri=uri, group=group, options=opts
)
self._conn.execute(stmt)
@ -84,7 +94,7 @@ class PoolsController(base.PoolsBase):
# TODO(cpp-cabrera): merge update/create into a single
# method with introduction of upsert
self.update(name, weight=weight, uri=uri,
options=options)
group=group, options=options)
@utils.raises_conn_error
def exists(self, name):
@ -98,11 +108,12 @@ class PoolsController(base.PoolsBase):
# NOTE(cpp-cabrera): by pruning None-valued kwargs, we avoid
# overwriting the existing options field with None, since that
# one can be null.
names = ('uri', 'weight', 'options')
names = ('uri', 'weight', 'group', 'options')
fields = common_utils.fields(kwargs, names,
pred=lambda x: x is not None)
assert fields, '`weight`, `uri`, or `options` not found in kwargs'
assert fields, ('`weight`, `uri`, `group`, '
'or `options` not found in kwargs')
if 'options' in fields:
fields['options'] = utils.json_encode(fields['options'])
@ -130,11 +141,12 @@ class PoolsController(base.PoolsBase):
def _normalize(pool, detailed=False):
ret = {
'name': pool[0],
'uri': pool[1],
'weight': pool[2],
'group': pool[1],
'uri': pool[2],
'weight': pool[3],
}
if detailed:
opts = pool[3]
opts = pool[4]
ret['options'] = utils.json_decode(opts) if opts else {}
return ret

View File

@ -58,6 +58,7 @@ Queues = sa.Table('Queues', metadata,
Pools = sa.Table('Pools', metadata,
sa.Column('name', sa.String(64), primary_key=True),
sa.Column('group', sa.String(64), nullable=True),
sa.Column('uri', sa.String(255), nullable=False),
sa.Column('weight', sa.INTEGER, nullable=False),
sa.Column('options', sa.BINARY))

View File

@ -106,6 +106,7 @@ class Resource(object):
self._validators = {
'weight': validator_type(schema.patch_weight),
'uri': validator_type(schema.patch_uri),
'group': validator_type(schema.patch_uri),
'options': validator_type(schema.patch_options),
'create': validator_type(schema.create)
}
@ -159,6 +160,7 @@ class Resource(object):
)
self._ctrl.create(pool, weight=data['weight'],
uri=data['uri'],
group=data.get('group'),
options=data.get('options', {}))
response.status = falcon.HTTP_201
response.location = request.path
@ -187,7 +189,7 @@ class Resource(object):
"""Allows one to update a pool's weight, uri, and/or options.
This method expects the user to submit a JSON object
containing at least one of: 'uri', 'weight', 'options'. If
containing at least one of: 'uri', 'weight', 'group', 'options'. If
none are found, the request is flagged as bad. There is also
strict format checking through the use of
jsonschema. Appropriate errors are returned in each case for
@ -199,11 +201,11 @@ class Resource(object):
LOG.debug(u'PATCH pool - name: %s', pool)
data = wsgi_utils.load(request)
EXPECT = ('weight', 'uri', 'options')
EXPECT = ('weight', 'uri', 'group', 'options')
if not any([(field in data) for field in EXPECT]):
LOG.debug(u'PATCH pool, bad params')
raise wsgi_errors.HTTPBadRequestBody(
'One of `uri`, `weight`, or `options` needs '
'One of `uri`, `weight`, `group`, or `options` needs '
'to be specified'
)

View File

@ -919,7 +919,9 @@ class PoolsControllerTest(ControllerBaseTest):
# Let's create one pool
self.pool = str(uuid.uuid1())
self.pools_controller.create(self.pool, 100, 'localhost', {})
self.pool_group = str(uuid.uuid1())
self.pools_controller.create(self.pool, 100, 'localhost',
group=self.pool_group, options={})
def tearDown(self):
self.pools_controller.drop_all()
@ -927,14 +929,17 @@ class PoolsControllerTest(ControllerBaseTest):
def test_create_succeeds(self):
self.pools_controller.create(str(uuid.uuid1()),
100, 'localhost', {})
100, 'localhost',
options={})
def test_create_replaces_on_duplicate_insert(self):
name = str(uuid.uuid1())
self.pools_controller.create(name,
100, 'localhost', {})
100, 'localhost',
options={})
self.pools_controller.create(name,
111, 'localhost2', {})
111, 'localhost2',
options={})
entry = self.pools_controller.get(name)
self._pool_expects(entry, xname=name, xweight=111,
xlocation='localhost2')
@ -1004,7 +1009,7 @@ class PoolsControllerTest(ControllerBaseTest):
if n > marker:
marker = n
self.pools_controller.create(n, w, str(i), {})
self.pools_controller.create(n, w, str(i), options={})
# Get the target pool
def _pool(name):
@ -1033,7 +1038,7 @@ class PoolsControllerTest(ControllerBaseTest):
self.assertEqual(len(res), 15)
next_name = marker + 'n'
self.pools_controller.create(next_name, 123, '123', {})
self.pools_controller.create(next_name, 123, '123', options={})
res = next(self.pools_controller.list(marker=marker))
self._pool_expects(res, next_name, 123, '123')
self.pools_controller.delete(next_name)
@ -1174,7 +1179,9 @@ class FlavorsControllerTest(ControllerBaseTest):
# Let's create one pool
self.pool = str(uuid.uuid1())
self.pools_controller.create(self.pool, 100, 'localhost', {})
self.pool_group = str(uuid.uuid1())
self.pools_controller.create(self.pool, 100, 'localhost',
group=self.pool_group, options={})
self.addCleanup(self.pools_controller.delete, self.pool)
def tearDown(self):
@ -1182,7 +1189,7 @@ class FlavorsControllerTest(ControllerBaseTest):
super(FlavorsControllerTest, self).tearDown()
def test_create_succeeds(self):
self.flavors_controller.create('durable', self.pool,
self.flavors_controller.create('durable', self.pool_group,
project=self.project,
capabilities={})
@ -1196,12 +1203,13 @@ class FlavorsControllerTest(ControllerBaseTest):
def test_create_replaces_on_duplicate_insert(self):
name = str(uuid.uuid1())
self.flavors_controller.create(name, self.pool,
self.flavors_controller.create(name, self.pool_group,
project=self.project,
capabilities={})
pool2 = 'another_pool'
self.pools_controller.create(pool2, 100, 'localhost', {})
self.pools_controller.create(pool2, 100, 'localhost',
group=pool2, options={})
self.addCleanup(self.pools_controller.delete, pool2)
self.flavors_controller.create(name, pool2,
@ -1213,22 +1221,22 @@ class FlavorsControllerTest(ControllerBaseTest):
def test_get_returns_expected_content(self):
name = 'durable'
capabilities = {'fifo': True}
self.flavors_controller.create(name, self.pool,
self.flavors_controller.create(name, self.pool_group,
project=self.project,
capabilities=capabilities)
res = self.flavors_controller.get(name, project=self.project)
self._flavors_expects(res, name, self.project, self.pool)
self._flavors_expects(res, name, self.project, self.pool_group)
self.assertNotIn('capabilities', res)
def test_detailed_get_returns_expected_content(self):
name = 'durable'
capabilities = {'fifo': True}
self.flavors_controller.create(name, self.pool,
self.flavors_controller.create(name, self.pool_group,
project=self.project,
capabilities=capabilities)
res = self.flavors_controller.get(name, project=self.project,
detailed=True)
self._flavors_expects(res, name, self.project, self.pool)
self._flavors_expects(res, name, self.project, self.pool_group)
self.assertIn('capabilities', res)
self.assertEqual(res['capabilities'], capabilities)
@ -1237,7 +1245,7 @@ class FlavorsControllerTest(ControllerBaseTest):
self.flavors_controller.get, 'notexists')
def test_exists(self):
self.flavors_controller.create('exists', self.pool,
self.flavors_controller.create('exists', self.pool_group,
project=self.project,
capabilities={})
self.assertTrue(self.flavors_controller.exists('exists',
@ -1247,11 +1255,11 @@ class FlavorsControllerTest(ControllerBaseTest):
def test_update_raises_assertion_error_on_bad_fields(self):
self.assertRaises(AssertionError, self.pools_controller.update,
self.pool)
self.pool_group)
def test_update_works(self):
name = 'yummy'
self.flavors_controller.create(name, self.pool,
self.flavors_controller.create(name, self.pool_group,
project=self.project,
capabilities={})
@ -1269,7 +1277,7 @@ class FlavorsControllerTest(ControllerBaseTest):
def test_delete_works(self):
name = 'puke'
self.flavors_controller.create(name, self.pool,
self.flavors_controller.create(name, self.pool_group,
project=self.project,
capabilities={})
self.flavors_controller.delete(name, project=self.project)
@ -1287,7 +1295,8 @@ class FlavorsControllerTest(ControllerBaseTest):
name_gen = lambda i: chr(ord('A') + i)
for i in range(15):
pool = str(i)
self.pools_controller.create(pool, 100, 'localhost', {})
self.pools_controller.create(pool, 100, 'localhost',
group=pool, options={})
self.addCleanup(self.pools_controller.delete, pool)
self.flavors_controller.create(name_gen(i), project=self.project,

View File

@ -88,12 +88,15 @@ class FlavorsBaseTest(base.V1_1Base):
self.queue_path = self.url_prefix + '/queues/' + self.queue
self.pool = 'mypool'
self.pool_group = 'mypool-group'
self.pool_path = self.url_prefix + '/pools/' + self.pool
self.pool_doc = {'weight': 100, 'uri': 'sqlite://:memory:'}
self.pool_doc = {'weight': 100,
'group': self.pool_group,
'uri': 'sqlite://:memory:'}
self.simulate_put(self.pool_path, body=jsonutils.dumps(self.pool_doc))
self.flavor = 'test-flavor'
self.doc = {'capabilities': {}, 'pool': 'mypool'}
self.doc = {'capabilities': {}, 'pool': self.pool_group}
self.flavor_path = self.url_prefix + '/flavors/' + self.flavor
self.simulate_put(self.flavor_path, body=jsonutils.dumps(self.doc))
self.assertEqual(self.srmock.status, falcon.HTTP_201)

View File

@ -24,7 +24,7 @@ from zaqar.tests.queues.transport.wsgi import base
@contextlib.contextmanager
def pool(test, name, weight, uri, options={}):
def pool(test, name, weight, uri, group=None, options={}):
"""A context manager for constructing a pool for use in testing.
Deletes the pool after exiting the context.
@ -38,20 +38,21 @@ def pool(test, name, weight, uri, options={}):
:returns: (name, weight, uri, options)
:rtype: see above
"""
doc = {'weight': weight, 'uri': uri, 'options': options}
doc = {'weight': weight, 'uri': uri,
'group': group, 'options': options}
path = test.url_prefix + '/pools/' + name
test.simulate_put(path, body=jsonutils.dumps(doc))
try:
yield name, weight, uri, options
yield name, weight, uri, group, options
finally:
test.simulate_delete(path)
@contextlib.contextmanager
def pools(test, count, uri):
def pools(test, count, uri, group):
"""A context manager for constructing pools for use in testing.
Deletes the pools after exiting the context.
@ -67,7 +68,8 @@ def pools(test, count, uri):
{str(i): i})
for i in range(count)]
for path, weight, option in args:
doc = {'weight': weight, 'uri': uri, 'options': option}
doc = {'weight': weight, 'uri': uri,
'group': group, 'options': option}
test.simulate_put(path, body=jsonutils.dumps(doc))
try:
@ -82,7 +84,9 @@ class PoolsBaseTest(base.V1_1Base):
def setUp(self):
super(PoolsBaseTest, self).setUp()
self.doc = {'weight': 100, 'uri': 'sqlite://:memory:'}
self.doc = {'weight': 100,
'group': 'mygroup',
'uri': 'sqlite://:memory:'}
self.pool = self.url_prefix + '/pools/' + str(uuid.uuid1())
self.simulate_put(self.pool, body=jsonutils.dumps(self.doc))
self.assertEqual(self.srmock.status, falcon.HTTP_201)
@ -95,7 +99,7 @@ class PoolsBaseTest(base.V1_1Base):
def test_put_pool_works(self):
name = str(uuid.uuid1())
weight, uri = self.doc['weight'], self.doc['uri']
with pool(self, name, weight, uri):
with pool(self, name, weight, uri, group='my-group'):
self.assertEqual(self.srmock.status, falcon.HTTP_201)
def test_put_raises_if_missing_fields(self):
@ -243,7 +247,7 @@ class PoolsBaseTest(base.V1_1Base):
if marker:
query += '&marker={2}'.format(marker)
with pools(self, count, self.doc['uri']) as expected:
with pools(self, count, self.doc['uri'], 'my-group') as expected:
result = self.simulate_get(self.url_prefix + '/pools',
query_string=query)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
@ -260,7 +264,7 @@ class PoolsBaseTest(base.V1_1Base):
# pool weight and the index of pools fixture to get the
# right pool to verify.
expect = expected[s['weight']]
path, weight = expect[:2]
path, weight, group = expect[:3]
self._pool_expect(s, path, weight, self.doc['uri'])
if detailed:
self.assertIn('options', s)
@ -281,7 +285,7 @@ class PoolsBaseTest(base.V1_1Base):
def test_listing_marker_is_respected(self):
self.simulate_delete(self.pool)
with pools(self, 10, self.doc['uri']) as expected:
with pools(self, 10, self.doc['uri'], 'my-group') as expected:
result = self.simulate_get(self.url_prefix + '/pools',
query_string='?marker=3')
self.assertEqual(self.srmock.status, falcon.HTTP_200)