diff --git a/doc/source/api/autoindex.rst b/doc/source/api/autoindex.rst index e13d6cee7..e2a3792cc 100644 --- a/doc/source/api/autoindex.rst +++ b/doc/source/api/autoindex.rst @@ -1,97 +1,5 @@ .. toctree:: + :glob: :maxdepth: 1 - zaqar.api.v1_1.request.rst - zaqar.api.v1_1.response.rst - zaqar.api.v1.request.rst - zaqar.api.v1.response.rst - zaqar.bootstrap.rst - zaqar.cmd.gc.rst - zaqar.cmd.server.rst - zaqar.common.access.rst - zaqar.common.api.api.rst - zaqar.common.api.request.rst - zaqar.common.api.response.rst - zaqar.common.api.schemas.flavors.rst - zaqar.common.api.schemas.pools.rst - zaqar.common.cli.rst - zaqar.common.decorators.rst - zaqar.common.errors.rst - zaqar.common.pipeline.rst - zaqar.common.storage.select.rst - zaqar.common.transport.wsgi.helpers.rst - zaqar.common.utils.rst - zaqar.context.rst - zaqar.i18n.rst - zaqar.openstack.common.cache._backends.memory.rst - zaqar.openstack.common.cache.backends.rst - zaqar.openstack.common.cache.cache.rst - zaqar.openstack.common.config.generator.rst - zaqar.openstack.common.excutils.rst - zaqar.openstack.common.fileutils.rst - zaqar.openstack.common.gettextutils.rst - zaqar.openstack.common.importutils.rst - zaqar.openstack.common.local.rst - zaqar.openstack.common.lockutils.rst - zaqar.openstack.common.strutils.rst - zaqar.openstack.common.timeutils.rst - zaqar.storage.base.rst - zaqar.storage.errors.rst - zaqar.storage.mongodb.catalogue.rst - zaqar.storage.mongodb.claims.rst - zaqar.storage.mongodb.controllers.rst - zaqar.storage.mongodb.driver.rst - zaqar.storage.mongodb.flavors.rst - zaqar.storage.mongodb.messages.rst - zaqar.storage.mongodb.options.rst - zaqar.storage.mongodb.pools.rst - zaqar.storage.mongodb.queues.rst - zaqar.storage.mongodb.utils.rst - zaqar.storage.pipeline.rst - zaqar.storage.pooling.rst - zaqar.storage.redis.claims.rst - zaqar.storage.redis.controllers.rst - zaqar.storage.redis.driver.rst - zaqar.storage.redis.messages.rst - zaqar.storage.redis.models.rst - zaqar.storage.redis.options.rst - zaqar.storage.redis.queues.rst - zaqar.storage.redis.scripting.rst - zaqar.storage.redis.utils.rst - zaqar.storage.sqlalchemy.catalogue.rst - zaqar.storage.sqlalchemy.claims.rst - zaqar.storage.sqlalchemy.controllers.rst - zaqar.storage.sqlalchemy.driver.rst - zaqar.storage.sqlalchemy.messages.rst - zaqar.storage.sqlalchemy.options.rst - zaqar.storage.sqlalchemy.pools.rst - zaqar.storage.sqlalchemy.queues.rst - zaqar.storage.sqlalchemy.tables.rst - zaqar.storage.sqlalchemy.utils.rst - zaqar.storage.utils.rst - zaqar.transport.auth.rst - zaqar.transport.base.rst - zaqar.transport.utils.rst - zaqar.transport.validation.rst - zaqar.transport.wsgi.app.rst - zaqar.transport.wsgi.driver.rst - zaqar.transport.wsgi.errors.rst - zaqar.transport.wsgi.utils.rst - zaqar.transport.wsgi.v1_0.claims.rst - zaqar.transport.wsgi.v1_0.health.rst - zaqar.transport.wsgi.v1_0.homedoc.rst - zaqar.transport.wsgi.v1_0.messages.rst - zaqar.transport.wsgi.v1_0.metadata.rst - zaqar.transport.wsgi.v1_0.pools.rst - zaqar.transport.wsgi.v1_0.queues.rst - zaqar.transport.wsgi.v1_0.stats.rst - zaqar.transport.wsgi.v1_1.claims.rst - zaqar.transport.wsgi.v1_1.flavors.rst - zaqar.transport.wsgi.v1_1.health.rst - zaqar.transport.wsgi.v1_1.homedoc.rst - zaqar.transport.wsgi.v1_1.messages.rst - zaqar.transport.wsgi.v1_1.ping.rst - zaqar.transport.wsgi.v1_1.pools.rst - zaqar.transport.wsgi.v1_1.queues.rst - zaqar.transport.wsgi.v1_1.stats.rst - zaqar.version.rst + * diff --git a/doc/source/api/zaqar.storage.sqlalchemy.claims.rst b/doc/source/api/zaqar.storage.sqlalchemy.claims.rst deleted file mode 100644 index c8b68a914..000000000 --- a/doc/source/api/zaqar.storage.sqlalchemy.claims.rst +++ /dev/null @@ -1,7 +0,0 @@ -The :mod:`zaqar.storage.sqlalchemy.claims` module -================================================== - -.. automodule:: zaqar.storage.sqlalchemy.claims - :members: - :undoc-members: - :show-inheritance: diff --git a/doc/source/api/zaqar.storage.sqlalchemy.messages.rst b/doc/source/api/zaqar.storage.sqlalchemy.messages.rst deleted file mode 100644 index f56ed4bf0..000000000 --- a/doc/source/api/zaqar.storage.sqlalchemy.messages.rst +++ /dev/null @@ -1,7 +0,0 @@ -The :mod:`zaqar.storage.sqlalchemy.messages` module -==================================================== - -.. automodule:: zaqar.storage.sqlalchemy.messages - :members: - :undoc-members: - :show-inheritance: diff --git a/zaqar/storage/sqlalchemy/claims.py b/zaqar/storage/sqlalchemy/claims.py deleted file mode 100644 index 7400fecd6..000000000 --- a/zaqar/storage/sqlalchemy/claims.py +++ /dev/null @@ -1,183 +0,0 @@ -# Copyright (c) 2014 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from oslo_utils import timeutils -import sqlalchemy as sa -from sqlalchemy.sql import func as sfunc - -from zaqar import storage -from zaqar.storage import errors -from zaqar.storage.sqlalchemy import tables -from zaqar.storage.sqlalchemy import utils - - -class ClaimController(storage.Claim): - - def __get(self, cid, trans): - # NOTE(flaper87): This probably needs to - # join on `Claim` to check the claim ttl. - sel = sa.sql.select([tables.Messages.c.id, - tables.Messages.c.body, - tables.Messages.c.ttl, - tables.Messages.c.created], - sa.and_( - tables.Messages.c.ttl > - utils.get_age(tables.Messages.c.created), - # tables.Messages.c.ttl > - # utils.get_age(tables.Claims.c.created), - tables.Messages.c.cid == cid)) - - records = trans.execute(sel) - - for id, body, ttl, created in records: - yield { - 'id': utils.msgid_encode(int(id)), - 'ttl': ttl, - 'age': (timeutils.utcnow() - created).seconds, - 'body': utils.json_decode(body), - } - - def get(self, queue, claim_id, project=None): - if project is None: - project = '' - - cid = utils.cid_decode(claim_id) - if cid is None: - raise errors.ClaimDoesNotExist(claim_id, queue, project) - - with self.driver.trans() as trans: - sel = sa.sql.select([tables.Claims.c.id, - tables.Claims.c.ttl, - tables.Claims.c.created], - sa.and_(tables.Claims.c.ttl > - utils.get_age(tables.Claims.c.created), - tables.Claims.c.id == cid, - tables.Queues.c.project == project, - tables.Queues.c.name == queue), - from_obj=[tables.Queues.join(tables.Claims)]) - - res = trans.execute(sel).fetchone() - if res is None: - raise errors.ClaimDoesNotExist(claim_id, queue, project) - - cid, ttl, created = res - return ( - {'id': claim_id, - 'ttl': ttl, - 'age': (timeutils.utcnow() - created).seconds}, - list(self.__get(cid, trans)) - ) - - def create(self, queue, metadata, project=None, - limit=storage.DEFAULT_MESSAGES_PER_CLAIM): - - if project is None: - project = '' - - with self.driver.trans() as trans: - try: - qid = utils.get_qid(self.driver.control_driver, queue, project) - except errors.QueueDoesNotExist: - return None, iter([]) - - # Clean up all expired claims in this queue - dlt = tables.Claims.delete().where(sa.and_( - tables.Claims.c.ttl <= - utils.get_age(tables.Claims.c.created), - tables.Claims.c.qid == qid)) - trans.execute(dlt) - - ins = tables.Claims.insert().values(qid=qid, ttl=metadata['ttl']) - res = trans.execute(ins) - - cid = res.lastrowid - - and_stmt = sa.and_(tables.Messages.c.cid == (None), - tables.Messages.c.ttl > - sfunc.now() - tables.Messages.c.created, - tables.Messages.c.qid == qid) - sel = sa.sql.select([tables.Messages.c.id], and_stmt).limit(limit) - - records = [t[0] for t in trans.execute(sel)] - and_stmt = sa.and_(tables.Messages.c.id.in_(records)) - update = tables.Messages.update().values(cid=cid).where(and_stmt) - trans.execute(update) - - # NOTE(flaper87): I bet there's a better way - # to do this. - messages_ttl = metadata['ttl'] + metadata['grace'] - update = (tables.Messages.update().values(ttl=messages_ttl). - where(sa.and_( - tables.Messages.c.ttl < messages_ttl, - tables.Messages.c.cid == cid))) - trans.execute(update) - - return utils.cid_encode(int(cid)), list(self.__get(cid, trans)) - - def update(self, queue, claim_id, metadata, project=None): - if project is None: - project = '' - - cid = utils.cid_decode(claim_id) - if cid is None: - raise errors.ClaimDoesNotExist(claim_id, queue, project) - - age = utils.get_age(tables.Claims.c.created) - with self.driver.trans() as trans: - qid = utils.get_qid(self.driver.control_driver, queue, project) - - update = tables.Claims.update().where(sa.and_( - tables.Claims.c.ttl > age, - tables.Claims.c.id == cid, - tables.Claims.c.id == qid)) - - update = update.values(ttl=metadata['ttl']) - - res = trans.execute(update) - if res.rowcount != 1: - raise errors.ClaimDoesNotExist(claim_id, queue, project) - - update = (tables.Messages.update(). - values(ttl=metadata['ttl'] + metadata['grace']). - where(sa.and_( - tables.Messages.c.ttl < metadata['ttl'], - tables.Messages.c.cid == cid))) - trans.execute(update) - - def delete(self, queue, claim_id, project=None): - if project is None: - project = '' - - cid = utils.cid_decode(claim_id) - if cid is None: - return - - with self.driver.trans() as trans: - try: - # NOTE(flaper87): This could probably use some - # joins and be just 1 query. - qid = utils.get_qid(self.driver.control_driver, queue, project) - except errors.QueueDoesNotExist: - return - - and_stmt = sa.and_(tables.Claims.c.id == cid, - tables.Claims.c.qid == qid) - dlt = tables.Claims.delete().where(and_stmt) - trans.execute(dlt) - - update = (tables.Messages.update().values(cid=None). - where(tables.Messages.c.cid == cid)) - - trans.execute(update) diff --git a/zaqar/storage/sqlalchemy/controllers.py b/zaqar/storage/sqlalchemy/controllers.py index 8a35d6b83..5893ab9cc 100644 --- a/zaqar/storage/sqlalchemy/controllers.py +++ b/zaqar/storage/sqlalchemy/controllers.py @@ -14,14 +14,10 @@ # the License. from zaqar.storage.sqlalchemy import catalogue -from zaqar.storage.sqlalchemy import claims -from zaqar.storage.sqlalchemy import messages from zaqar.storage.sqlalchemy import pools from zaqar.storage.sqlalchemy import queues QueueController = queues.QueueController -ClaimController = claims.ClaimController -MessageController = messages.MessageController CatalogueController = catalogue.CatalogueController PoolsController = pools.PoolsController diff --git a/zaqar/storage/sqlalchemy/driver.py b/zaqar/storage/sqlalchemy/driver.py index ee7f268e2..d56a7cdb2 100644 --- a/zaqar/storage/sqlalchemy/driver.py +++ b/zaqar/storage/sqlalchemy/driver.py @@ -57,8 +57,8 @@ class ControlDriver(storage.ControlDriverBase): sa.event.listen(engine, 'connect', self._sqlite_on_connect) - if (uri.startswith('mysql+pymysql://') - or uri.startswith('mysql://')): + if (uri.startswith('mysql://') or + uri.startswith('mysql+pymysql://')): sa.event.listen(engine, 'connect', self._mysql_on_connect) diff --git a/zaqar/storage/sqlalchemy/messages.py b/zaqar/storage/sqlalchemy/messages.py deleted file mode 100644 index 21fe38f6a..000000000 --- a/zaqar/storage/sqlalchemy/messages.py +++ /dev/null @@ -1,420 +0,0 @@ -# Copyright (c) 2014 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import calendar - -from oslo_utils import timeutils -import sqlalchemy as sa -from sqlalchemy.sql import func as sfunc - -from zaqar import storage -from zaqar.storage import errors -from zaqar.storage.sqlalchemy import tables -from zaqar.storage.sqlalchemy import utils - - -class MessageController(storage.Message): - - def _and_stmt_with_ttl(self, queue_name, project): - return [tables.Queues.c.name == queue_name, - tables.Queues.c.project == project, - tables.Messages.c.ttl > - sfunc.now() - tables.Messages.c.created] - - def _get(self, queue, message_id, project, count=False): - - if project is None: - project = '' - - mid = utils.msgid_decode(message_id) - - if mid is None: - raise errors.MessageDoesNotExist(message_id, queue, project) - - try: - j = sa.join(tables.Messages, tables.Queues, - tables.Messages.c.qid == tables.Queues.c.id) - - sel = sa.sql.select([tables.Messages.c.body, - tables.Messages.c.ttl, - tables.Messages.c.created, - tables.Messages.c.cid]) - - if count: - sel = sa.sql.select([sfunc.count(tables.Messages.c.id)]) - - sel = sel.select_from(j) - and_stmt = [tables.Messages.c.id == mid] - and_stmt.extend(self._and_stmt_with_ttl(queue, project)) - - sel = sel.where(sa.and_(*and_stmt)) - - return self.driver.get(sel) - except utils.NoResult: - raise errors.MessageDoesNotExist(message_id, queue, project) - - def _exists(self, queue, message_id, project): - try: - # NOTE(flaper87): Use count to avoid returning - # unnecessary data from the database. - self._get(queue, message_id, project, count=True) - return True - except errors.MessageDoesNotExist: - return False - - def _get_cid(self, mid): - """Return the decoded claim ID for the given message. - - :param mid: Decoded message ID - """ - - and_stmt = sa.and_(tables.Messages.c.id == mid) - sel = sa.sql.select([tables.Messages.c.cid], and_stmt) - - return self.driver.get(sel)[0] - - def get(self, queue, message_id, project): - body, ttl, created, cid = self._get(queue, message_id, project) - now = timeutils.utcnow_ts() - return { - 'id': message_id, - 'ttl': ttl, - 'age': now - calendar.timegm(created.timetuple()), - 'body': utils.json_decode(body), - 'claim_id': utils.cid_encode(cid) if cid else None, - } - - def bulk_get(self, queue, message_ids, project): - if project is None: - project = '' - - message_ids = [id for id in - map(utils.msgid_decode, message_ids) - if id is not None] - - statement = sa.sql.select([tables.Messages.c.id, - tables.Messages.c.body, - tables.Messages.c.ttl, - tables.Messages.c.created, - tables.Messages.c.cid]) - - and_stmt = [tables.Messages.c.id.in_(message_ids)] - and_stmt.extend(self._and_stmt_with_ttl(queue, project)) - - j = sa.join(tables.Messages, tables.Queues, - tables.Messages.c.qid == tables.Queues.c.id) - - statement = statement.select_from(j).where(sa.and_(*and_stmt)) - - now = timeutils.utcnow_ts() - records = self.driver.run(statement) - for id, body, ttl, created, cid in records: - yield { - 'id': utils.msgid_encode(int(id)), - 'ttl': ttl, - 'age': now - calendar.timegm(created.timetuple()), - 'body': utils.json_decode(body), - 'claim_id': utils.cid_encode(cid) if cid else None, - } - - def first(self, queue, project=None, sort=1): - if project is None: - project = '' - - qid = utils.get_qid(self.driver.control_driver, - queue, project) - - sel = sa.sql.select([tables.Messages.c.id, - tables.Messages.c.body, - tables.Messages.c.ttl, - tables.Messages.c.created], - sa.and_( - tables.Messages.c.ttl > - sfunc.now() - tables.Messages.c.created, - tables.Messages.c.qid == qid)) - if sort not in (1, -1): - raise ValueError(u'sort must be either 1 (ascending) ' - u'or -1 (descending)') - - order = sa.asc - if sort == -1: - order = sa.desc - - sel = sel.order_by(order(tables.Messages.c.id)) - - try: - id, body, ttl, created = self.driver.get(sel) - except utils.NoResult: - raise errors.QueueIsEmpty(queue, project) - - created_iso = timeutils.isotime(created) - return { - 'id': utils.msgid_encode(int(id)), - 'ttl': ttl, - 'created': created_iso, - 'age': int((timeutils.utcnow() - created).seconds), - 'body': body, - } - - def list(self, queue, project, marker=None, - limit=storage.DEFAULT_MESSAGES_PER_PAGE, - echo=False, client_uuid=None, include_claimed=False): - - if project is None: - project = '' - with self.driver.trans() as trans: - sel = sa.sql.select([tables.Messages.c.id, - tables.Messages.c.body, - tables.Messages.c.ttl, - tables.Messages.c.created, - tables.Messages.c.cid]) - - j = sa.join(tables.Messages, tables.Queues, - tables.Messages.c.qid == tables.Queues.c.id) - - sel = sel.select_from(j) - and_clause = self._and_stmt_with_ttl(queue, project) - - if not echo: - and_clause.append(tables.Messages.c.client != str(client_uuid)) - - if marker: - mark = utils.marker_decode(marker) - if mark: - and_clause.append(tables.Messages.c.id > mark) - else: - # NOTE(flaper87): Awful hack. - # If the marker is invalid, we don't want to - # return *any* record. Since rows PKs start - # from 0, it won't match anything and the query - # will still be fast. - and_clause.append(tables.Messages.c.id < -1) - - if not include_claimed: - and_clause.append(tables.Messages.c.cid == (None)) - - sel = sel.where(sa.and_(*and_clause)) - sel = sel.limit(limit) - - records = trans.execute(sel) - marker_id = {} - - def it(): - now = timeutils.utcnow_ts() - for id, body, ttl, created, cid in records: - marker_id['next'] = id - yield { - 'id': utils.msgid_encode(id), - 'ttl': ttl, - 'age': now - calendar.timegm(created.timetuple()), - 'body': utils.json_decode(body), - 'claim_id': utils.cid_encode(cid) if cid else None, - } - - yield it() - yield utils.marker_encode(marker_id['next']) - - def post(self, queue, messages, client_uuid, project): - if project is None: - project = '' - - with self.driver.trans() as trans: - qid = utils.get_qid(self.driver.control_driver, - queue, project) - - # Delete the expired messages - and_stmt = sa.and_(tables.Messages.c.ttl <= - sfunc.now() - tables.Messages.c.created, - tables.Messages.c.qid == qid) - statement = tables.Messages.delete().where(and_stmt) - - trans.execute(statement) - - # executemany() sets lastrowid to None, so no matter we manually - # generate the IDs or not, we still need to query for it. - - def it(): - for m in messages: - yield dict(qid=qid, - ttl=m['ttl'], - body=utils.json_encode(m['body']), - client=str(client_uuid)) - - result = trans.execute(tables.Messages.insert(), list(it())) - - statement = sa.sql.select([tables.Messages.c.id]) - statement = statement.limit(result.rowcount) - statement = statement.order_by(tables.Messages.c.id.desc()) - result = trans.execute(statement).fetchall() - - return [utils.msgid_encode(i[0]) for i in reversed(result)] - - def delete(self, queue, message_id, project, claim=None): - if project is None: - project = '' - - mid = utils.msgid_decode(message_id) - if mid is None: - return - - with self.driver.trans() as trans: - if not self._exists(queue, message_id, project): - return - - statement = tables.Messages.delete() - and_stmt = [tables.Messages.c.id == mid] - - exists = sa.sql.select([tables.Messages.c.id], sa.and_(*and_stmt)) - - if not trans.execute(exists).first(): - return - - cid = claim and utils.cid_decode(claim) or None - - if claim and cid is None: - raise errors.ClaimDoesNotExist(queue, project, claim) - - and_stmt.append(tables.Messages.c.cid == cid) - - statement = statement.where(sa.and_(*and_stmt)) - res = trans.execute(statement) - - if res.rowcount == 0: - # NOTE(kgriffs): Either the message is not claimed, - # or if it is, the specified claim does not exist. - cid = self._get_cid(mid) - if cid is None: - raise errors.MessageNotClaimed(mid) - - # NOTE(kgriffs): The message exists, but the claim - # must have expired or something, since it - # was not associated with the message. - raise errors.MessageNotClaimedBy(mid, claim) - - def bulk_delete(self, queue, message_ids, project): - if project is None: - project = '' - - message_ids = [id for id in - map(utils.msgid_decode, message_ids) if id] - - with self.driver.trans() as trans: - try: - qid = utils.get_qid(self.driver.control_driver, - queue, project) - except errors.QueueDoesNotExist: - return - - statement = tables.Messages.delete() - - and_stmt = [tables.Messages.c.id.in_(message_ids), - tables.Messages.c.qid == qid] - - trans.execute(statement.where(sa.and_(*and_stmt))) - - def pop(self, queue_name, limit, project=None): - if project is None: - project = '' - - with self.driver.trans() as trans: - sel = sa.sql.select([tables.Messages.c.id, - tables.Messages.c.body, - tables.Messages.c.ttl, - tables.Messages.c.created, - tables.Messages.c.cid]) - - j = sa.join(tables.Messages, tables.Queues, - tables.Messages.c.qid == tables.Queues.c.id) - - sel = sel.select_from(j) - and_clause = self._and_stmt_with_ttl(queue_name, project) - - and_clause.append(tables.Messages.c.cid == (None)) - - sel = sel.where(sa.and_(*and_clause)) - sel = sel.limit(limit) - - records = trans.execute(sel) - now = timeutils.utcnow_ts() - messages = [] - message_ids = [] - for id, body, ttl, created, cid in records: - messages.append({ - 'id': utils.msgid_encode(id), - 'ttl': ttl, - 'age': now - calendar.timegm(created.timetuple()), - 'body': utils.json_decode(body), - 'claim_id': utils.cid_encode(cid) if cid else None, - }) - message_ids.append(id) - - statement = tables.Messages.delete() - - qid = utils.get_qid(self.driver.control_driver, - queue_name, project) - - and_stmt = [tables.Messages.c.id.in_(message_ids), - tables.Messages.c.qid == qid] - - trans.execute(statement.where(sa.and_(*and_stmt))) - - return messages - - -class MessageQueueHandler(object): - def __init__(self, driver, control_driver): - self.driver = driver - - def stats(self, name, project): - if project is None: - project = '' - - qid = utils.get_qid(self.driver.control_driver, name, project) - sel = sa.sql.select([ - sa.sql.select([sa.func.count(tables.Messages.c.id)], - sa.and_( - tables.Messages.c.qid == qid, - tables.Messages.c.cid != (None), - tables.Messages.c.ttl > - sfunc.now() - tables.Messages.c.created)), - sa.sql.select([sa.func.count(tables.Messages.c.id)], - sa.and_( - tables.Messages.c.qid == qid, - tables.Messages.c.cid == (None), - tables.Messages.c.ttl > - sfunc.now() - tables.Messages.c.created)) - ]) - - claimed, free = self.driver.get(sel) - - total = free + claimed - - message_stats = { - 'claimed': claimed, - 'free': free, - 'total': total, - } - - try: - message_controller = self.driver.message_controller - oldest = message_controller.first(name, project, sort=1) - newest = message_controller.first(name, project, sort=-1) - except errors.QueueIsEmpty: - pass - else: - message_stats['oldest'] = utils.stat_message(oldest) - message_stats['newest'] = utils.stat_message(newest) - - return {'messages': message_stats} diff --git a/zaqar/storage/sqlalchemy/options.py b/zaqar/storage/sqlalchemy/options.py index e86eed6d0..55603d922 100644 --- a/zaqar/storage/sqlalchemy/options.py +++ b/zaqar/storage/sqlalchemy/options.py @@ -26,12 +26,9 @@ _COMMON_SQLALCHEMY_OPTIONS = ( ) MANAGEMENT_SQLALCHEMY_OPTIONS = _COMMON_SQLALCHEMY_OPTIONS -MESSAGE_SQLALCHEMY_OPTIONS = _COMMON_SQLALCHEMY_OPTIONS MANAGEMENT_SQLALCHEMY_GROUP = 'drivers:management_store:sqlalchemy' -MESSAGE_SQLALCHEMY_GROUP = 'drivers:message_store:sqlalchemy' def _config_options(): - return [(MANAGEMENT_SQLALCHEMY_GROUP, MANAGEMENT_SQLALCHEMY_OPTIONS), - (MESSAGE_SQLALCHEMY_GROUP, MESSAGE_SQLALCHEMY_OPTIONS)] + return [(MANAGEMENT_SQLALCHEMY_GROUP, MANAGEMENT_SQLALCHEMY_OPTIONS)] diff --git a/zaqar/storage/sqlalchemy/tables.py b/zaqar/storage/sqlalchemy/tables.py index 2f000dfde..84702f458 100644 --- a/zaqar/storage/sqlalchemy/tables.py +++ b/zaqar/storage/sqlalchemy/tables.py @@ -19,34 +19,6 @@ metadata = sa.MetaData() now = timeutils.utcnow - -Messages = sa.Table('Messages', metadata, - sa.Column('id', sa.INTEGER, primary_key=True), - sa.Column('qid', sa.INTEGER, - sa.ForeignKey("Queues.id", ondelete="CASCADE"), - nullable=False), - sa.Column('ttl', sa.INTEGER), - sa.Column('body', sa.LargeBinary), - sa.Column('client', sa.TEXT), - sa.Column('created', sa.TIMESTAMP, - default=now, onupdate=now), - sa.Column('cid', sa.INTEGER, - sa.ForeignKey("Claims.id", ondelete='SET NULL')), - ) - - -Claims = sa.Table('Claims', metadata, - sa.Column('id', sa.INTEGER, primary_key=True, - autoincrement=True), - sa.Column('qid', sa.INTEGER, - sa.ForeignKey("Queues.id", ondelete="CASCADE"), - nullable=False), - sa.Column('ttl', sa.INTEGER), - sa.Column('created', sa.TIMESTAMP, - default=now, onupdate=now), - ) - - Queues = sa.Table('Queues', metadata, sa.Column('id', sa.INTEGER, primary_key=True), sa.Column('project', sa.String(64)),