diff --git a/marconi/queues/storage/sqlalchemy/claims.py b/marconi/queues/storage/sqlalchemy/claims.py new file mode 100644 index 000000000..46a30d4f2 --- /dev/null +++ b/marconi/queues/storage/sqlalchemy/claims.py @@ -0,0 +1,177 @@ +# Copyright (c) 2014 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sqlalchemy as sa +from sqlalchemy.sql import func as sfunc + +from marconi.openstack.common import timeutils +from marconi.queues import storage +from marconi.queues.storage import errors +from marconi.queues.storage.sqlalchemy import tables +from marconi.queues.storage.sqlalchemy import utils + + +class ClaimController(storage.Claim): + + def __get(self, cid): + # NOTE(flaper87): This probably needs to + # join on `Claim` to check the claim ttl. + sel = sa.sql.select([tables.Messages.c.id, + tables.Messages.c.body, + tables.Messages.c.ttl, + tables.Messages.c.created], + sa.and_( + tables.Messages.c.ttl > + utils.get_age(tables.Messages.c.created), + #tables.Messages.c.ttl > + #utils.get_age(tables.Claims.c.created), + tables.Messages.c.cid == cid + )) + records = self.driver.run(sel) + + for id, body, ttl, created in records: + yield { + 'id': utils.msgid_encode(id), + 'ttl': ttl, + 'age': (timeutils.utcnow() - created).seconds, + 'body': body, + } + + def get(self, queue, claim_id, project=None): + if project is None: + project = '' + + cid = utils.cid_decode(claim_id) + if cid is None: + raise errors.ClaimDoesNotExist(claim_id, queue, project) + + with self.driver.trans() as trans: + sel = sa.sql.select([tables.Claims.c.id, + tables.Claims.c.ttl, + tables.Claims.c.created], + sa.and_(tables.Claims.c.ttl > + utils.get_age(tables.Claims.c.created), + tables.Claims.c.id == cid, + tables.Queues.c.project == project, + tables.Queues.c.name == queue), + from_obj=[tables.Queues.join(tables.Claims)]) + + res = trans.execute(sel).fetchone() + if res is None: + raise errors.ClaimDoesNotExist(claim_id, queue, project) + + cid, ttl, created = res + return ( + {'id': claim_id, + 'ttl': ttl, + 'age': (timeutils.utcnow() - created).seconds}, + self.__get(cid) + ) + + def create(self, queue, metadata, project=None, + limit=storage.DEFAULT_MESSAGES_PER_CLAIM): + + if project is None: + project = '' + + with self.driver.trans() as trans: + try: + qid = utils.get_qid(self.driver, queue, project) + except errors.QueueDoesNotExist: + return None, iter([]) + + # Clean up all expired claims in this queue + + dlt = tables.Claims.delete().where(sa.and_( + tables.Claims.c.ttl <= + utils.get_age(tables.Claims.c.created), + tables.Claims.c.qid == qid)) + trans.execute(dlt) + + ins = tables.Claims.insert().values(qid=qid, ttl=metadata['ttl']) + res = trans.execute(ins) + + cid = res.lastrowid + + and_stmt = sa.and_(tables.Messages.c.cid == (None), + tables.Messages.c.ttl > + sfunc.now() - tables.Messages.c.created, + tables.Messages.c.qid == qid) + sel = sa.sql.select([tables.Messages.c.id], and_stmt).limit(limit) + + records = [t[0] for t in trans.execute(sel)] + and_stmt = sa.and_(tables.Messages.c.id.in_(records)) + update = tables.Messages.update().values(cid=cid).where(and_stmt) + trans.execute(update) + + # NOTE(flaper87): I bet there's a better way + # to do this. + messages_ttl = metadata['ttl'] + metadata['grace'] + update = (tables.Messages.update().values(ttl=messages_ttl). + where(sa.and_( + tables.Messages.c.ttl < messages_ttl, + tables.Messages.c.cid == cid))) + trans.execute(update) + + return (utils.cid_encode(cid), self.__get(cid)) + + def update(self, queue, claim_id, metadata, project=None): + if project is None: + project = '' + + cid = utils.cid_decode(claim_id) + if cid is None: + raise errors.ClaimDoesNotExist(claim_id, queue, project) + + age = utils.get_age(tables.Claims.c.created) + with self.driver.trans() as trans: + qid = utils.get_qid(self.driver, queue, project) + + update = tables.Claims.update().where(sa.and_( + tables.Claims.c.ttl > age, + tables.Claims.c.id == cid, + tables.Claims.c.id == qid)).\ + values(ttl=metadata['ttl']) + + res = trans.execute(update) + if res.rowcount != 1: + raise errors.ClaimDoesNotExist(claim_id, queue, project) + + update = (tables.Messages.update(). + values(ttl=metadata['ttl']). + where(sa.and_( + tables.Messages.c.ttl < metadata['ttl'], + tables.Messages.c.cid == cid))) + trans.execute(update) + + def delete(self, queue, claim_id, project=None): + if project is None: + project = '' + + cid = utils.cid_decode(claim_id) + if cid is None: + return + + with self.driver.trans() as trans: + qid = utils.get_qid(self.driver, queue, project) + and_stmt = sa.and_(tables.Claims.c.id == cid, + tables.Claims.c.qid == qid) + dlt = tables.Claims.delete().where(and_stmt) + trans.execute(dlt) + + update = (tables.Messages.update().values(cid=None). + where(tables.Messages.c.cid == cid)) + + trans.execute(update) diff --git a/marconi/queues/storage/sqlalchemy/controllers.py b/marconi/queues/storage/sqlalchemy/controllers.py index 64b4ba80f..dd84a4424 100644 --- a/marconi/queues/storage/sqlalchemy/controllers.py +++ b/marconi/queues/storage/sqlalchemy/controllers.py @@ -1,3 +1,4 @@ +# Copyright (c) 2014 Red Hat, Inc. # Copyright (c) 2014 Rackspace Hosting Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -10,12 +11,19 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. +# # See the License for the specific language governing permissions and # limitations under the License. from marconi.queues.storage.sqlalchemy import catalogue +from marconi.queues.storage.sqlalchemy import claims +from marconi.queues.storage.sqlalchemy import messages +from marconi.queues.storage.sqlalchemy import queues from marconi.queues.storage.sqlalchemy import shards +QueueController = queues.QueueController +ClaimController = claims.ClaimController +MessageController = messages.MessageController CatalogueController = catalogue.CatalogueController ShardsController = shards.ShardsController diff --git a/marconi/queues/storage/sqlalchemy/driver.py b/marconi/queues/storage/sqlalchemy/driver.py index dd565d1d7..5636bca85 100644 --- a/marconi/queues/storage/sqlalchemy/driver.py +++ b/marconi/queues/storage/sqlalchemy/driver.py @@ -14,15 +14,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -import sqlalchemy as sa +import contextlib from oslo.config import cfg +import sqlalchemy as sa from marconi.common import decorators from marconi.queues import storage from marconi.queues.storage.sqlalchemy import controllers from marconi.queues.storage.sqlalchemy import tables - +from marconi.queues.storage.sqlalchemy import utils _SQLALCHEMY_OPTIONS = [ cfg.StrOpt('uri', default='sqlite:///:memory:', @@ -57,17 +58,46 @@ class DataDriver(storage.DataDriverBase): def close_connection(self): self.connection.close() + @contextlib.contextmanager + def trans(self): + with self.engine.begin() as connection: + yield connection + + def run(self, statement): + """Performs a SQL query. + + :param sql: a query string with the '?' placeholders + :param args: the arguments to substitute the placeholders + """ + return self.connection.execute(statement) + + def get(self, statement): + """Runs sql and returns the first entry in the results. + + :raises: utils.NoResult if the result set is empty + """ + res = self.run(statement) + r = res.fetchone() + if r is None: + raise utils.NoResult() + else: + res.close() + return r + @decorators.lazy_property(write=False) def queue_controller(self): - raise NotImplementedError() + return controllers.QueueController(self) @decorators.lazy_property(write=False) def message_controller(self): - raise NotImplementedError() + return controllers.MessageController(self) @decorators.lazy_property(write=False) def claim_controller(self): - raise NotImplementedError() + return controllers.ClaimController(self) + + def is_alive(self): + return True class ControlDriver(storage.ControlDriverBase): diff --git a/marconi/queues/storage/sqlalchemy/messages.py b/marconi/queues/storage/sqlalchemy/messages.py new file mode 100644 index 000000000..caeab96ba --- /dev/null +++ b/marconi/queues/storage/sqlalchemy/messages.py @@ -0,0 +1,270 @@ +# Copyright (c) 2014 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import calendar +import json + +import sqlalchemy as sa +from sqlalchemy.sql import func as sfunc + +from marconi.openstack.common import timeutils +from marconi.queues import storage +from marconi.queues.storage import errors +from marconi.queues.storage.sqlalchemy import tables +from marconi.queues.storage.sqlalchemy import utils + + +class MessageController(storage.Message): + + def get(self, queue, message_id, project, count=False): + + if project is None: + project = '' + + mid = utils.msgid_decode(message_id) + + if mid is None: + raise errors.MessageDoesNotExist(message_id, queue, project) + + try: + j = sa.join(tables.Messages, tables.Queues, + tables.Messages.c.qid == tables.Queues.c.id) + + sel = sa.sql.select([tables.Messages.c.body, + tables.Messages.c.ttl, + tables.Messages.c.created]) + + if count: + sel = sa.sql.select([sfunc.count(tables.Messages.c.id)]) + + sel = sel.select_from(j) + sel = sel.where(sa.and_(tables.Messages.c.id == mid, + tables.Queues.c.project == project, + tables.Queues.c.name == queue, + tables.Messages.c.ttl > + sfunc.now() - tables.Messages.c.created)) + + return self.driver.get(sel)[0] + except utils.NoResult: + raise errors.MessageDoesNotExist(message_id, queue, project) + + def bulk_get(self, queue, message_ids, project): + if project is None: + project = '' + + message_ids = [id for id in + map(utils.msgid_decode, message_ids) + if id is not None] + + statement = sa.sql.select([tables.Messages.c.id, + tables.Messages.c.body, + tables.Messages.c.ttl, + tables.Messages.c.created]) + + and_stmt = [tables.Messages.c.id.in_(message_ids), + tables.Queues.c.name == queue, + tables.Queues.c.project == project, + tables.Messages.c.ttl > + sfunc.now() - tables.Messages.c.created] + + j = sa.join(tables.Messages, tables.Queues, + tables.Messages.c.qid == tables.Queues.c.id) + + statement = statement.select_from(j).where(sa.and_(*and_stmt)) + + now = timeutils.utcnow_ts() + records = self.driver.run(statement) + for id, body, ttl, created in records: + yield { + 'id': utils.msgid_encode(id), + 'ttl': ttl, + 'age': now - calendar.timegm(created.timetuple()), + 'body': json.loads(body), + } + + def first(self, queue, project=None, sort=1): + if project is None: + project = '' + + qid = utils.get_qid(self.driver, queue, project) + + sel = sa.sql.select([tables.Messages.c.id, + tables.Messages.c.body, + tables.Messages.c.ttl, + tables.Messages.c.created], + sa.and_( + tables.Messages.c.ttl > + sfunc.now() - tables.Messages.c.created, + tables.Messages.c.qid == qid)) + if sort not in (1, -1): + raise ValueError(u'sort must be either 1 (ascending) ' + u'or -1 (descending)') + + order = sa.asc + if sort == -1: + order = sa.desc + + sel = sel.order_by(order(tables.Messages.c.id)) + + try: + id, body, ttl, created = self.driver.get(sel) + except utils.NoResult: + raise errors.QueueIsEmpty(queue, project) + + created_iso = timeutils.isotime(created) + return { + 'id': utils.msgid_encode(id), + 'ttl': ttl, + 'created': created_iso, + 'age': int((timeutils.utcnow() - created).seconds), + 'body': body, + } + + def list(self, queue, project, marker=None, + limit=storage.DEFAULT_MESSAGES_PER_PAGE, + echo=False, client_uuid=None, include_claimed=False): + + if project is None: + project = '' + + with self.driver.trans() as trans: + sel = sa.sql.select([tables.Messages.c.id, + tables.Messages.c.body, + tables.Messages.c.ttl, + tables.Messages.c.created]) + + j = sa.join(tables.Messages, tables.Queues, + tables.Messages.c.qid == tables.Queues.c.id) + + sel = sel.select_from(j) + and_clause = [tables.Queues.c.name == queue, + tables.Queues.c.project == project] + + if not echo: + and_clause.append(tables.Messages.c.client != str(client_uuid)) + + if marker: + mark = utils.marker_decode(marker) + if mark: + and_clause.append(tables.Messages.c.id > mark) + + if not include_claimed: + and_clause.append(tables.Messages.c.cid == (None)) + + sel = sel.where(sa.and_(*and_clause)) + sel = sel.limit(limit) + + records = trans.execute(sel) + marker_id = {} + + def it(): + now = timeutils.utcnow_ts() + for id, body, ttl, created in records: + marker_id['next'] = id + yield { + 'id': utils.msgid_encode(id), + 'ttl': ttl, + 'age': now - calendar.timegm(created.timetuple()), + 'body': json.loads(body), + } + + yield it() + yield utils.marker_encode(marker_id['next']) + + def post(self, queue, messages, client_uuid, project): + if project is None: + project = '' + + with self.driver.trans() as trans: + qid = utils.get_qid(self.driver, queue, project) + + # cleanup all expired messages in this queue + #self.driver.run(''' + # delete from Messages + # where ttl <= julianday() * 86400.0 - created + # and qid = ?''', qid) + + # executemany() sets lastrowid to None, so no matter we manually + # generate the IDs or not, we still need to query for it. + + def it(): + for m in messages: + yield dict(qid=qid, + ttl=m['ttl'], + body=json.dumps(m['body']), + client=str(client_uuid)) + + result = trans.execute(tables.Messages.insert(), list(it())) + + statement = sa.sql.select([tables.Messages.c.id]) + statement = statement.limit(result.rowcount) + statement = statement.order_by(tables.Messages.c.id.desc()) + result = trans.execute(statement).fetchall() + + return map(utils.msgid_encode, [i[0] for i in reversed(result)]) + + def delete(self, queue, message_id, project, claim=None): + if project is None: + project = '' + + mid = utils.msgid_decode(message_id) + if mid is None: + return + + with self.driver.trans() as trans: + try: + self.get(queue, message_id, project, count=True) + except errors.MessageDoesNotExist: + return + + statement = tables.Messages.delete() + and_stmt = [tables.Messages.c.id == mid] + + exists = sa.sql.select([tables.Messages.c.id], sa.and_(*and_stmt)) + + if not trans.execute(exists).first(): + return + + cid = claim and utils.cid_decode(claim) or None + + if claim and cid is None: + return + + and_stmt.append(tables.Messages.c.cid == cid) + + statement = statement.where(sa.and_(*and_stmt)) + res = trans.execute(statement) + + if res.rowcount == 0: + raise errors.MessageIsClaimed(mid) + + def bulk_delete(self, queue, message_ids, project): + if project is None: + project = '' + + message_ids = ','.join( + ["'%s'" % id for id in + map(utils.msgid_decode, message_ids) if id] + ) + + with self.driver.trans() as trans: + qid = utils.get_qid(self.driver, queue, project) + + statement = tables.Messages.delete() + + and_stmt = [tables.Messages.c.id.in_(message_ids), + tables.Messages.c.qid == qid] + + trans.execute(statement.where(sa.and_(*and_stmt))) diff --git a/marconi/queues/storage/sqlalchemy/queues.py b/marconi/queues/storage/sqlalchemy/queues.py new file mode 100644 index 000000000..b02ac79a0 --- /dev/null +++ b/marconi/queues/storage/sqlalchemy/queues.py @@ -0,0 +1,169 @@ +# Copyright (c) 2014 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# +# See the License for the specific language governing permissions and +# limitations under the License. + +import json + +import sqlalchemy as sa +from sqlalchemy.sql import func as sfunc + +from marconi.queues import storage +from marconi.queues.storage import errors +from marconi.queues.storage.sqlalchemy import tables +from marconi.queues.storage.sqlalchemy import utils + + +class QueueController(storage.Queue): + + def list(self, project, marker=None, + limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False): + + if project is None: + project = '' + + fields = [tables.Queues.c.name] + if detailed: + fields.append(tables.Queues.c.metadata) + + if marker: + sel = sa.sql.select(fields, sa.and_( + tables.Queues.c.project == project, + tables.Queues.c.name > marker)) + else: + sel = sa.sql.select(fields, tables.Queues.c.project == project) + + sel = sel.order_by(sa.asc(tables.Queues.c.name)).limit(limit) + records = self.driver.run(sel) + + marker_name = {} + + def it(): + for rec in records: + marker_name['next'] = rec[0] + yield ({'name': rec[0]} if not detailed + else + {'name': rec[0], 'metadata': json.loads(rec[1])}) + + yield it() + yield marker_name['next'] + + def get_metadata(self, name, project): + if project is None: + project = '' + + try: + sel = sa.sql.select([tables.Queues.c.metadata], sa.and_( + tables.Queues.c.project == project, + tables.Queues.c.name == name + )) + return json.loads(self.driver.get(sel)[0]) + except utils.NoResult: + raise errors.QueueDoesNotExist(name, project) + + def create(self, name, project): + if project is None: + project = '' + + try: + ins = tables.Queues.insert().values(project=project, name=name, + metadata=json.dumps({})) + res = self.driver.run(ins) + except sa.exc.IntegrityError: + return False + + return res.rowcount == 1 + + def exists(self, name, project): + if project is None: + project = '' + + sel = sa.sql.select([tables.Queues.c.id], sa.and_( + tables.Queues.c.project == project, + tables.Queues.c.name == name + )) + res = self.driver.run(sel) + r = res.fetchone() + res.close() + return r is not None + + def set_metadata(self, name, metadata, project): + if project is None: + project = '' + + update = tables.Queues.update().\ + where(sa.and_( + tables.Queues.c.project == project, + tables.Queues.c.name == name)).\ + values(metadata=json.dumps(metadata)) + res = self.driver.run(update) + + try: + if res.rowcount != 1: + raise errors.QueueDoesNotExist(name, project) + finally: + res.close() + + def delete(self, name, project): + if project is None: + project = '' + + dlt = tables.Queues.delete().where(sa.and_( + tables.Queues.c.project == project, + tables.Queues.c.name == name)) + self.driver.run(dlt) + + def stats(self, name, project): + if project is None: + project = '' + + qid = utils.get_qid(self.driver, name, project) + sel = sa.sql.select([ + sa.sql.select([sa.func.count(tables.Messages.c.id)], + sa.and_( + tables.Messages.c.qid == qid, + tables.Messages.c.cid != (None), + tables.Messages.c.ttl > + sfunc.now() - tables.Messages.c.created, + )), + sa.sql.select([sa.func.count(tables.Messages.c.id)], + sa.and_( + tables.Messages.c.qid == qid, + tables.Messages.c.cid == (None), + tables.Messages.c.ttl > + sfunc.now() - tables.Messages.c.created, + )) + ]) + + claimed, free = self.driver.get(sel) + + total = free + claimed + + message_stats = { + 'claimed': claimed, + 'free': free, + 'total': total, + } + + try: + message_controller = self.driver.message_controller + oldest = message_controller.first(name, project, sort=1) + newest = message_controller.first(name, project, sort=-1) + except errors.QueueIsEmpty: + pass + else: + message_stats['oldest'] = utils.stat_message(oldest) + message_stats['newest'] = utils.stat_message(newest) + + return {'messages': message_stats} diff --git a/marconi/queues/storage/sqlalchemy/tables.py b/marconi/queues/storage/sqlalchemy/tables.py index ebdc37645..7aeca3aa7 100644 --- a/marconi/queues/storage/sqlalchemy/tables.py +++ b/marconi/queues/storage/sqlalchemy/tables.py @@ -16,23 +16,13 @@ import sqlalchemy as sa +from marconi.openstack.common import timeutils + metadata = sa.MetaData() +now = timeutils.utcnow + -''' -create table -if not exists -Messages ( - id INTEGER, - qid INTEGER, - ttl INTEGER, - body DOCUMENT, - client TEXT, - created DATETIME, - PRIMARY KEY(id), - FOREIGN KEY(qid) references Queues(id) on delete cascade -) -''' Messages = sa.Table('Messages', metadata, sa.Column('id', sa.INTEGER, primary_key=True), sa.Column('qid', sa.INTEGER, @@ -41,21 +31,13 @@ Messages = sa.Table('Messages', metadata, sa.Column('ttl', sa.INTEGER), sa.Column('body', sa.LargeBinary), sa.Column('client', sa.TEXT), - sa.Column('created', sa.DATETIME), + sa.Column('created', sa.TIMESTAMP, + default=now, onupdate=now), + sa.Column('cid', sa.INTEGER, + sa.ForeignKey("Claims.id", ondelete='SET NULL')), ) -''' -create table -if not exists -Claims ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - qid INTEGER, - ttl INTEGER, - created DATETIME, - FOREIGN KEY(qid) references Queues(id) on delete cascade -) -''' Claims = sa.Table('Claims', metadata, sa.Column('id', sa.INTEGER, primary_key=True, autoincrement=True), @@ -63,22 +45,11 @@ Claims = sa.Table('Claims', metadata, sa.ForeignKey("Queues.id", ondelete="CASCADE"), nullable=False), sa.Column('ttl', sa.INTEGER), - sa.Column('created', sa.DATETIME), + sa.Column('created', sa.TIMESTAMP, + default=now, onupdate=now), ) -''' -create table -if not exists -Queues ( - id INTEGER, - project TEXT, - name TEXT, - metadata DOCUMENT, - PRIMARY KEY(id), - UNIQUE(project, name) -) -''' Queues = sa.Table('Queues', metadata, sa.Column('id', sa.INTEGER, primary_key=True), sa.Column('project', sa.String), @@ -88,31 +59,13 @@ Queues = sa.Table('Queues', metadata, ) -''' -create table -if not exists -Locked ( - cid INTEGER, - msgid INTEGER, - FOREIGN KEY(cid) references Claims(id) on delete cascade, - FOREIGN KEY(msgid) references Messages(id) on delete cascade -) -''' -Locked = sa.Table('Locked', metadata, - sa.Column('cid', sa.INTEGER, - sa.ForeignKey("Claims.id", ondelete="CASCADE"), - nullable=False), - sa.Column('msgid', sa.INTEGER, - sa.ForeignKey("Messages.id", ondelete="CASCADE"), - nullable=False), - ) - Shards = sa.Table('Shards', metadata, sa.Column('name', sa.String, primary_key=True), sa.Column('uri', sa.String, nullable=False), sa.Column('weight', sa.INTEGER, nullable=False), sa.Column('options', sa.BINARY)) + Catalogue = sa.Table('Catalogue', metadata, sa.Column('shard', sa.String, sa.ForeignKey('Shards.name', diff --git a/marconi/queues/storage/sqlalchemy/utils.py b/marconi/queues/storage/sqlalchemy/utils.py index 86211f477..f04a926ba 100644 --- a/marconi/queues/storage/sqlalchemy/utils.py +++ b/marconi/queues/storage/sqlalchemy/utils.py @@ -1,3 +1,4 @@ +# Copyright (c) 2014 Red Hat, Inc. # Copyright (c) 2014 Rackspace, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -15,13 +16,17 @@ # limitations under the License. import functools +import sqlalchemy as sa from sqlalchemy import exc +from sqlalchemy.sql import func as sfunc -import marconi.openstack.common.log as logging +from marconi.openstack.common import log as logging from marconi.queues.storage import errors +from marconi.queues.storage.sqlalchemy import tables LOG = logging.getLogger(__name__) +UNIX_EPOCH_AS_JULIAN_SEC = 2440587.5 * 86400.0 def raises_conn_error(func): @@ -41,3 +46,79 @@ def raises_conn_error(func): raise errors.ConnectionError() return wrapper + + +class NoResult(Exception): + pass + + +def get_qid(driver, queue, project): + sel = sa.sql.select([tables.Queues.c.id], sa.and_( + tables.Queues.c.project == project, + tables.Queues.c.name == queue)) + try: + return driver.get(sel)[0] + except NoResult: + raise errors.QueueDoesNotExist(queue, project) + + +def get_age(created): + return sfunc.now() - created + +# The utilities below make the database IDs opaque to the users +# of Marconi API. The only purpose is to advise the users NOT to +# make assumptions on the implementation of and/or relationship +# between the message IDs, the markers, and claim IDs. +# +# The magic numbers are arbitrarily picked; the numbers themselves +# come with no special functionalities. + + +def msgid_encode(id): + return hex(id ^ 0x5c693a53)[2:] + + +def msgid_decode(id): + try: + return int(id, 16) ^ 0x5c693a53 + + except ValueError: + return None + + +def marker_encode(id): + return oct(id ^ 0x3c96a355)[1:] + + +def marker_decode(id): + try: + return int(id, 8) ^ 0x3c96a355 + + except ValueError: + return None + + +def cid_encode(id): + return hex(id ^ 0x63c9a59c)[2:] + + +def cid_decode(id): + try: + return int(id, 16) ^ 0x63c9a59c + + except ValueError: + return None + + +def julian_to_unix(julian_sec): + """Converts Julian timestamp, in seconds, to a UNIX timestamp.""" + return int(round(julian_sec - UNIX_EPOCH_AS_JULIAN_SEC)) + + +def stat_message(message): + """Creates a stat document based on a message.""" + return { + 'id': message['id'], + 'age': message['age'], + 'created': message['created'], + } diff --git a/marconi/tests/queues/storage/base.py b/marconi/tests/queues/storage/base.py index 601a901ad..b58784641 100644 --- a/marconi/tests/queues/storage/base.py +++ b/marconi/tests/queues/storage/base.py @@ -81,6 +81,7 @@ class QueueControllerTest(ControllerBaseTest): def setUp(self): super(QueueControllerTest, self).setUp() + self.queue_controller = self.driver.queue_controller self.message_controller = self.driver.message_controller self.claim_controller = self.driver.claim_controller @@ -395,10 +396,6 @@ class MessageControllerTest(ControllerBaseTest): messages = [{'body': 3.14, 'ttl': 0}] client_uuid = uuid.uuid4() - [msgid] = self.controller.post(self.queue_name, messages, - project=self.project, - client_uuid=client_uuid) - [msgid] = self.controller.post(self.queue_name, messages, project=self.project, client_uuid=client_uuid) diff --git a/tests/unit/queues/storage/test_impl_sqlalchemy.py b/tests/unit/queues/storage/test_impl_sqlalchemy.py index 27f1d0a29..7a534039d 100644 --- a/tests/unit/queues/storage/test_impl_sqlalchemy.py +++ b/tests/unit/queues/storage/test_impl_sqlalchemy.py @@ -18,6 +18,7 @@ import datetime import sqlalchemy as sa +from marconi.queues.storage import errors from marconi.queues.storage import sqlalchemy from marconi.queues.storage.sqlalchemy import controllers from marconi.queues.storage.sqlalchemy import tables @@ -55,6 +56,29 @@ class SqlalchemyTableTests(testing.TestBase): self.assertIsNone(row) +class SqlalchemyQueueTests(base.QueueControllerTest): + driver_class = sqlalchemy.DataDriver + controller_class = controllers.QueueController + + +class SqlalchemyMessageTests(base.MessageControllerTest): + driver_class = sqlalchemy.DataDriver + controller_class = controllers.MessageController + + def test_empty_queue_exception(self): + queue_name = 'empty-queue-test' + self.queue_controller.create(queue_name, None) + + self.assertRaises(errors.QueueIsEmpty, + self.controller.first, + queue_name, None, sort=1) + + +class SqlalchemyClaimTests(base.ClaimControllerTest): + driver_class = sqlalchemy.DataDriver + controller_class = controllers.ClaimController + + class SqlalchemyShardsTest(base.ShardsControllerTest): driver_class = sqlalchemy.ControlDriver controller_class = controllers.ShardsController