diff --git a/marconi/queues/storage/sqlalchemy/__init__.py b/marconi/queues/storage/sqlalchemy/__init__.py index e69de29bb..47c996ec1 100644 --- a/marconi/queues/storage/sqlalchemy/__init__.py +++ b/marconi/queues/storage/sqlalchemy/__init__.py @@ -0,0 +1,20 @@ +# Copyright (c) 2014 Rackspace Hosting Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from marconi.queues.storage.sqlalchemy import driver + +# Hoist classes into package namespace +ControlDriver = driver.ControlDriver +DataDriver = driver.DataDriver diff --git a/marconi/queues/storage/sqlalchemy/controllers.py b/marconi/queues/storage/sqlalchemy/controllers.py new file mode 100644 index 000000000..fa89ae028 --- /dev/null +++ b/marconi/queues/storage/sqlalchemy/controllers.py @@ -0,0 +1,19 @@ +# Copyright (c) 2014 Rackspace Hosting Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from marconi.queues.storage.sqlalchemy import shards + + +ShardsController = shards.ShardsController diff --git a/marconi/queues/storage/sqlalchemy/driver.py b/marconi/queues/storage/sqlalchemy/driver.py index 84e7bf1b1..3890035e6 100644 --- a/marconi/queues/storage/sqlalchemy/driver.py +++ b/marconi/queues/storage/sqlalchemy/driver.py @@ -20,6 +20,7 @@ from oslo.config import cfg from marconi.common import decorators from marconi.queues import storage +from marconi.queues.storage.sqlalchemy import controllers from marconi.queues.storage.sqlalchemy import tables @@ -95,7 +96,7 @@ class ControlDriver(storage.ControlDriverBase): @property def shards_controller(self): - raise NotImplementedError() + return controllers.ShardsController(self) @property def catalogue_controller(self): diff --git a/marconi/queues/storage/sqlalchemy/shards.py b/marconi/queues/storage/sqlalchemy/shards.py new file mode 100644 index 000000000..5365d8cef --- /dev/null +++ b/marconi/queues/storage/sqlalchemy/shards.py @@ -0,0 +1,141 @@ +# Copyright (c) 2014 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# +# See the License for the specific language governing permissions and +# limitations under the License. + +"""shards: an implementation of the shard management storage +controller for sqlalchemy. + +Schema: + 'n': name :: six.text_type + 'u': uri :: six.text_type + 'w': weight :: int + 'o': options :: dict +""" + +import functools +import json + +import sqlalchemy as sa + +from marconi.common import utils as common_utils +from marconi.queues.storage import base +from marconi.queues.storage import errors +from marconi.queues.storage.sqlalchemy import tables +from marconi.queues.storage.sqlalchemy import utils + + +class ShardsController(base.ShardsBase): + + def __init__(self, *args, **kwargs): + super(ShardsController, self).__init__(*args, **kwargs) + + self._conn = self.driver.connection + + @utils.raises_conn_error + def list(self, marker=None, limit=10, detailed=False): + marker = marker or '' + + # TODO(cpp-cabrera): optimization - limit the columns returned + # when detailed=False by specifying them in the select() + # clause + stmt = sa.sql.select([tables.Shards]).where( + tables.Shards.c.name > marker + ).limit(limit) + cursor = self._conn.execute(stmt) + + normalizer = functools.partial(_normalize, detailed=detailed) + return (normalizer(v) for v in cursor) + + @utils.raises_conn_error + def get(self, name, detailed=False): + stmt = sa.sql.select([tables.Shards]).where( + tables.Shards.c.name == name + ) + + shard = self._conn.execute(stmt).fetchone() + if shard is None: + raise errors.ShardDoesNotExist(name) + + return _normalize(shard, detailed) + + # TODO(cpp-cabrera): rename to upsert + @utils.raises_conn_error + def create(self, name, weight, uri, options=None): + opts = None if options is None else json.dumps(options) + + try: + stmt = sa.sql.expression.insert(tables.Shards).values( + name=name, weight=weight, uri=uri, options=opts + ) + self._conn.execute(stmt) + + except sa.exc.IntegrityError: + # TODO(cpp-cabrera): merge update/create into a single + # method with introduction of upsert + self.update(name, weight=weight, uri=uri, + options=options) + + @utils.raises_conn_error + def exists(self, name): + stmt = sa.sql.select([tables.Shards.c.name]).where( + tables.Shards.c.name == name + ).limit(1) + return self._conn.execute(stmt).fetchone() is not None + + @utils.raises_conn_error + def update(self, name, **kwargs): + # NOTE(cpp-cabrera): by pruning None-valued kwargs, we avoid + # overwriting the existing options field with None, since that + # one can be null. + names = ('uri', 'weight', 'options') + fields = common_utils.fields(kwargs, names, + pred=lambda x: x is not None) + + assert fields, '`weight`, `uri`, or `options` not found in kwargs' + + if 'options' in fields: + fields['options'] = json.dumps(fields['options']) + + stmt = sa.sql.update(tables.Shards).where( + tables.Shards.c.name == name).values(**fields) + + res = self._conn.execute(stmt) + if res.rowcount == 0: + raise errors.ShardDoesNotExist(name) + + @utils.raises_conn_error + def delete(self, name): + stmt = sa.sql.expression.delete(tables.Shards).where( + tables.Shards.c.name == name + ) + self._conn.execute(stmt) + + @utils.raises_conn_error + def drop_all(self): + stmt = sa.sql.expression.delete(tables.Shards) + self._conn.execute(stmt) + + +def _normalize(shard, detailed=False): + ret = { + 'name': shard[0], + 'uri': shard[1], + 'weight': shard[2], + } + if detailed: + opts = shard[3] + ret['options'] = json.loads(opts) if opts else None + + return ret diff --git a/marconi/queues/storage/sqlalchemy/tables.py b/marconi/queues/storage/sqlalchemy/tables.py index 784ee7634..cfb33c8ee 100644 --- a/marconi/queues/storage/sqlalchemy/tables.py +++ b/marconi/queues/storage/sqlalchemy/tables.py @@ -106,3 +106,9 @@ Locked = sa.Table('Locked', metadata, sa.ForeignKey("Messages.id", ondelete="CASCADE"), nullable=False), ) + +Shards = sa.Table('Shards', metadata, + sa.Column('name', sa.String, primary_key=True), + sa.Column('uri', sa.String, nullable=False), + sa.Column('weight', sa.INTEGER, nullable=False), + sa.Column('options', sa.BINARY)) diff --git a/marconi/queues/storage/sqlalchemy/utils.py b/marconi/queues/storage/sqlalchemy/utils.py new file mode 100644 index 000000000..86211f477 --- /dev/null +++ b/marconi/queues/storage/sqlalchemy/utils.py @@ -0,0 +1,43 @@ +# Copyright (c) 2014 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# +# See the License for the specific language governing permissions and +# limitations under the License. +import functools + +from sqlalchemy import exc + +import marconi.openstack.common.log as logging +from marconi.queues.storage import errors + + +LOG = logging.getLogger(__name__) + + +def raises_conn_error(func): + """Handles sqlalchemy DisconnectionError + + When sqlalchemy detects a disconnect from the database server, it + retries a number of times. After failing that number of times, it + will convert the internal DisconnectionError into an + InvalidRequestError. This decorator handles that error. + """ + @functools.wraps(func) + def wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + except exc.InvalidRequestError as ex: + LOG.exception(ex) + raise errors.ConnectionError() + + return wrapper diff --git a/marconi/tests/queues/storage/base.py b/marconi/tests/queues/storage/base.py index b17b58614..601a901ad 100644 --- a/marconi/tests/queues/storage/base.py +++ b/marconi/tests/queues/storage/base.py @@ -711,25 +711,26 @@ class ShardsControllerTest(ControllerBaseTest): # NOTE(cpp-cabrera): base entry interferes with listing results self.shards_controller.delete(self.shard) + name_gen = lambda i: chr(ord('A') + i) for i in range(15): - self.shards_controller.create(str(i), i, str(i), {}) + self.shards_controller.create(name_gen(i), i, str(i), {}) res = list(self.shards_controller.list()) self.assertEqual(len(res), 10) for i, entry in enumerate(res): - self._shard_expects(entry, str(i), i, str(i)) + self._shard_expects(entry, name_gen(i), i, str(i)) self.assertNotIn('options', entry) res = list(self.shards_controller.list(limit=5)) self.assertEqual(len(res), 5) - res = next(self.shards_controller.list(marker='3')) - self._shard_expects(res, '4', 4, '4') + res = next(self.shards_controller.list(marker=name_gen(3))) + self._shard_expects(res, name_gen(4), 4, '4') res = list(self.shards_controller.list(detailed=True)) self.assertEqual(len(res), 10) for i, entry in enumerate(res): - self._shard_expects(entry, str(i), i, str(i)) + self._shard_expects(entry, name_gen(i), i, str(i)) self.assertIn('options', entry) self.assertEqual(entry['options'], {}) diff --git a/tests/etc/wsgi_sqlalchemy.conf b/tests/etc/wsgi_sqlalchemy.conf new file mode 100644 index 000000000..42649e150 --- /dev/null +++ b/tests/etc/wsgi_sqlalchemy.conf @@ -0,0 +1,13 @@ +[DEFAULT] +debug = False +verbose = False + +[drivers] +transport = wsgi +storage = sqlalchemy + +[drivers:transport:wsgi] +port = 8888 + +[drivers:storage:sqlalchemy] +uri = sqlite:///:memory: diff --git a/tests/unit/queues/storage/test_impl_sqlalchemy.py b/tests/unit/queues/storage/test_impl_sqlalchemy.py index b835b58b8..daed4a6be 100644 --- a/tests/unit/queues/storage/test_impl_sqlalchemy.py +++ b/tests/unit/queues/storage/test_impl_sqlalchemy.py @@ -18,8 +18,11 @@ import datetime import sqlalchemy as sa +from marconi.queues.storage import sqlalchemy +from marconi.queues.storage.sqlalchemy import controllers from marconi.queues.storage.sqlalchemy import tables from marconi import tests as testing +from marconi.tests.queues.storage import base class SqlalchemyTableTests(testing.TestBase): @@ -50,3 +53,15 @@ class SqlalchemyTableTests(testing.TestBase): row = rs.fetchone() self.assertIsNone(row) + + +class SqlalchemyShardsTest(base.ShardsControllerTest): + driver_class = sqlalchemy.ControlDriver + controller_class = controllers.ShardsController + + def setUp(self): + super(SqlalchemyShardsTest, self).setUp() + self.load_conf('wsgi_sqlalchemy.conf') + + def tearDown(self): + super(SqlalchemyShardsTest, self).tearDown()