feat(sqlalchemy): add shards controller

This patch focuses on providing the shards controller for sqlalchemy.

As a bonus, a decorator is added that flags all methods that handles
all methods that raise a connection error.

The sqlalchemy driver passes all unit tests for the shards
controller. The initial suite is running on the sqlite engine.

A few general fixes have been applied:

- shards listing unit test now uses characters for names rather than
  numbers as strings to avoid sorting variations between storage
  engines
- cache.get_cache no longer takes a configOpts object

Change-Id: I7400884244be94f7600a5fb489b38c7058d36b31
Partially-Implements: blueprint: sql-storage-driver
This commit is contained in:
Alejandro Cabrera 2014-02-05 13:52:01 -05:00
parent 503e67d2f0
commit 63483b790f
9 changed files with 265 additions and 6 deletions

View File

@ -0,0 +1,20 @@
# Copyright (c) 2014 Rackspace Hosting Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from marconi.queues.storage.sqlalchemy import driver
# Hoist classes into package namespace
ControlDriver = driver.ControlDriver
DataDriver = driver.DataDriver

View File

@ -0,0 +1,19 @@
# Copyright (c) 2014 Rackspace Hosting Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from marconi.queues.storage.sqlalchemy import shards
ShardsController = shards.ShardsController

View File

@ -20,6 +20,7 @@ from oslo.config import cfg
from marconi.common import decorators from marconi.common import decorators
from marconi.queues import storage from marconi.queues import storage
from marconi.queues.storage.sqlalchemy import controllers
from marconi.queues.storage.sqlalchemy import tables from marconi.queues.storage.sqlalchemy import tables
@ -95,7 +96,7 @@ class ControlDriver(storage.ControlDriverBase):
@property @property
def shards_controller(self): def shards_controller(self):
raise NotImplementedError() return controllers.ShardsController(self)
@property @property
def catalogue_controller(self): def catalogue_controller(self):

View File

@ -0,0 +1,141 @@
# Copyright (c) 2014 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
"""shards: an implementation of the shard management storage
controller for sqlalchemy.
Schema:
'n': name :: six.text_type
'u': uri :: six.text_type
'w': weight :: int
'o': options :: dict
"""
import functools
import json
import sqlalchemy as sa
from marconi.common import utils as common_utils
from marconi.queues.storage import base
from marconi.queues.storage import errors
from marconi.queues.storage.sqlalchemy import tables
from marconi.queues.storage.sqlalchemy import utils
class ShardsController(base.ShardsBase):
def __init__(self, *args, **kwargs):
super(ShardsController, self).__init__(*args, **kwargs)
self._conn = self.driver.connection
@utils.raises_conn_error
def list(self, marker=None, limit=10, detailed=False):
marker = marker or ''
# TODO(cpp-cabrera): optimization - limit the columns returned
# when detailed=False by specifying them in the select()
# clause
stmt = sa.sql.select([tables.Shards]).where(
tables.Shards.c.name > marker
).limit(limit)
cursor = self._conn.execute(stmt)
normalizer = functools.partial(_normalize, detailed=detailed)
return (normalizer(v) for v in cursor)
@utils.raises_conn_error
def get(self, name, detailed=False):
stmt = sa.sql.select([tables.Shards]).where(
tables.Shards.c.name == name
)
shard = self._conn.execute(stmt).fetchone()
if shard is None:
raise errors.ShardDoesNotExist(name)
return _normalize(shard, detailed)
# TODO(cpp-cabrera): rename to upsert
@utils.raises_conn_error
def create(self, name, weight, uri, options=None):
opts = None if options is None else json.dumps(options)
try:
stmt = sa.sql.expression.insert(tables.Shards).values(
name=name, weight=weight, uri=uri, options=opts
)
self._conn.execute(stmt)
except sa.exc.IntegrityError:
# TODO(cpp-cabrera): merge update/create into a single
# method with introduction of upsert
self.update(name, weight=weight, uri=uri,
options=options)
@utils.raises_conn_error
def exists(self, name):
stmt = sa.sql.select([tables.Shards.c.name]).where(
tables.Shards.c.name == name
).limit(1)
return self._conn.execute(stmt).fetchone() is not None
@utils.raises_conn_error
def update(self, name, **kwargs):
# NOTE(cpp-cabrera): by pruning None-valued kwargs, we avoid
# overwriting the existing options field with None, since that
# one can be null.
names = ('uri', 'weight', 'options')
fields = common_utils.fields(kwargs, names,
pred=lambda x: x is not None)
assert fields, '`weight`, `uri`, or `options` not found in kwargs'
if 'options' in fields:
fields['options'] = json.dumps(fields['options'])
stmt = sa.sql.update(tables.Shards).where(
tables.Shards.c.name == name).values(**fields)
res = self._conn.execute(stmt)
if res.rowcount == 0:
raise errors.ShardDoesNotExist(name)
@utils.raises_conn_error
def delete(self, name):
stmt = sa.sql.expression.delete(tables.Shards).where(
tables.Shards.c.name == name
)
self._conn.execute(stmt)
@utils.raises_conn_error
def drop_all(self):
stmt = sa.sql.expression.delete(tables.Shards)
self._conn.execute(stmt)
def _normalize(shard, detailed=False):
ret = {
'name': shard[0],
'uri': shard[1],
'weight': shard[2],
}
if detailed:
opts = shard[3]
ret['options'] = json.loads(opts) if opts else None
return ret

View File

@ -106,3 +106,9 @@ Locked = sa.Table('Locked', metadata,
sa.ForeignKey("Messages.id", ondelete="CASCADE"), sa.ForeignKey("Messages.id", ondelete="CASCADE"),
nullable=False), nullable=False),
) )
Shards = sa.Table('Shards', metadata,
sa.Column('name', sa.String, primary_key=True),
sa.Column('uri', sa.String, nullable=False),
sa.Column('weight', sa.INTEGER, nullable=False),
sa.Column('options', sa.BINARY))

View File

@ -0,0 +1,43 @@
# Copyright (c) 2014 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
from sqlalchemy import exc
import marconi.openstack.common.log as logging
from marconi.queues.storage import errors
LOG = logging.getLogger(__name__)
def raises_conn_error(func):
"""Handles sqlalchemy DisconnectionError
When sqlalchemy detects a disconnect from the database server, it
retries a number of times. After failing that number of times, it
will convert the internal DisconnectionError into an
InvalidRequestError. This decorator handles that error.
"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except exc.InvalidRequestError as ex:
LOG.exception(ex)
raise errors.ConnectionError()
return wrapper

View File

@ -711,25 +711,26 @@ class ShardsControllerTest(ControllerBaseTest):
# NOTE(cpp-cabrera): base entry interferes with listing results # NOTE(cpp-cabrera): base entry interferes with listing results
self.shards_controller.delete(self.shard) self.shards_controller.delete(self.shard)
name_gen = lambda i: chr(ord('A') + i)
for i in range(15): for i in range(15):
self.shards_controller.create(str(i), i, str(i), {}) self.shards_controller.create(name_gen(i), i, str(i), {})
res = list(self.shards_controller.list()) res = list(self.shards_controller.list())
self.assertEqual(len(res), 10) self.assertEqual(len(res), 10)
for i, entry in enumerate(res): for i, entry in enumerate(res):
self._shard_expects(entry, str(i), i, str(i)) self._shard_expects(entry, name_gen(i), i, str(i))
self.assertNotIn('options', entry) self.assertNotIn('options', entry)
res = list(self.shards_controller.list(limit=5)) res = list(self.shards_controller.list(limit=5))
self.assertEqual(len(res), 5) self.assertEqual(len(res), 5)
res = next(self.shards_controller.list(marker='3')) res = next(self.shards_controller.list(marker=name_gen(3)))
self._shard_expects(res, '4', 4, '4') self._shard_expects(res, name_gen(4), 4, '4')
res = list(self.shards_controller.list(detailed=True)) res = list(self.shards_controller.list(detailed=True))
self.assertEqual(len(res), 10) self.assertEqual(len(res), 10)
for i, entry in enumerate(res): for i, entry in enumerate(res):
self._shard_expects(entry, str(i), i, str(i)) self._shard_expects(entry, name_gen(i), i, str(i))
self.assertIn('options', entry) self.assertIn('options', entry)
self.assertEqual(entry['options'], {}) self.assertEqual(entry['options'], {})

View File

@ -0,0 +1,13 @@
[DEFAULT]
debug = False
verbose = False
[drivers]
transport = wsgi
storage = sqlalchemy
[drivers:transport:wsgi]
port = 8888
[drivers:storage:sqlalchemy]
uri = sqlite:///:memory:

View File

@ -18,8 +18,11 @@ import datetime
import sqlalchemy as sa import sqlalchemy as sa
from marconi.queues.storage import sqlalchemy
from marconi.queues.storage.sqlalchemy import controllers
from marconi.queues.storage.sqlalchemy import tables from marconi.queues.storage.sqlalchemy import tables
from marconi import tests as testing from marconi import tests as testing
from marconi.tests.queues.storage import base
class SqlalchemyTableTests(testing.TestBase): class SqlalchemyTableTests(testing.TestBase):
@ -50,3 +53,15 @@ class SqlalchemyTableTests(testing.TestBase):
row = rs.fetchone() row = rs.fetchone()
self.assertIsNone(row) self.assertIsNone(row)
class SqlalchemyShardsTest(base.ShardsControllerTest):
driver_class = sqlalchemy.ControlDriver
controller_class = controllers.ShardsController
def setUp(self):
super(SqlalchemyShardsTest, self).setUp()
self.load_conf('wsgi_sqlalchemy.conf')
def tearDown(self):
super(SqlalchemyShardsTest, self).tearDown()