Remove messages and claims from sql code

Remove data modules from the sqlalchemy package since we don't support
them anymore.

Closes-bug: #1461962
Change-Id: I28abb510ea918ece65e5b28271b4d921221e6877
This commit is contained in:
Flavio Percoco 2015-06-04 15:50:13 +02:00
parent 27ca6e9118
commit a3a80c8e5c
9 changed files with 5 additions and 749 deletions

View File

@ -1,97 +1,5 @@
.. toctree:: .. toctree::
:glob:
:maxdepth: 1 :maxdepth: 1
zaqar.api.v1_1.request.rst *
zaqar.api.v1_1.response.rst
zaqar.api.v1.request.rst
zaqar.api.v1.response.rst
zaqar.bootstrap.rst
zaqar.cmd.gc.rst
zaqar.cmd.server.rst
zaqar.common.access.rst
zaqar.common.api.api.rst
zaqar.common.api.request.rst
zaqar.common.api.response.rst
zaqar.common.api.schemas.flavors.rst
zaqar.common.api.schemas.pools.rst
zaqar.common.cli.rst
zaqar.common.decorators.rst
zaqar.common.errors.rst
zaqar.common.pipeline.rst
zaqar.common.storage.select.rst
zaqar.common.transport.wsgi.helpers.rst
zaqar.common.utils.rst
zaqar.context.rst
zaqar.i18n.rst
zaqar.openstack.common.cache._backends.memory.rst
zaqar.openstack.common.cache.backends.rst
zaqar.openstack.common.cache.cache.rst
zaqar.openstack.common.config.generator.rst
zaqar.openstack.common.excutils.rst
zaqar.openstack.common.fileutils.rst
zaqar.openstack.common.gettextutils.rst
zaqar.openstack.common.importutils.rst
zaqar.openstack.common.local.rst
zaqar.openstack.common.lockutils.rst
zaqar.openstack.common.strutils.rst
zaqar.openstack.common.timeutils.rst
zaqar.storage.base.rst
zaqar.storage.errors.rst
zaqar.storage.mongodb.catalogue.rst
zaqar.storage.mongodb.claims.rst
zaqar.storage.mongodb.controllers.rst
zaqar.storage.mongodb.driver.rst
zaqar.storage.mongodb.flavors.rst
zaqar.storage.mongodb.messages.rst
zaqar.storage.mongodb.options.rst
zaqar.storage.mongodb.pools.rst
zaqar.storage.mongodb.queues.rst
zaqar.storage.mongodb.utils.rst
zaqar.storage.pipeline.rst
zaqar.storage.pooling.rst
zaqar.storage.redis.claims.rst
zaqar.storage.redis.controllers.rst
zaqar.storage.redis.driver.rst
zaqar.storage.redis.messages.rst
zaqar.storage.redis.models.rst
zaqar.storage.redis.options.rst
zaqar.storage.redis.queues.rst
zaqar.storage.redis.scripting.rst
zaqar.storage.redis.utils.rst
zaqar.storage.sqlalchemy.catalogue.rst
zaqar.storage.sqlalchemy.claims.rst
zaqar.storage.sqlalchemy.controllers.rst
zaqar.storage.sqlalchemy.driver.rst
zaqar.storage.sqlalchemy.messages.rst
zaqar.storage.sqlalchemy.options.rst
zaqar.storage.sqlalchemy.pools.rst
zaqar.storage.sqlalchemy.queues.rst
zaqar.storage.sqlalchemy.tables.rst
zaqar.storage.sqlalchemy.utils.rst
zaqar.storage.utils.rst
zaqar.transport.auth.rst
zaqar.transport.base.rst
zaqar.transport.utils.rst
zaqar.transport.validation.rst
zaqar.transport.wsgi.app.rst
zaqar.transport.wsgi.driver.rst
zaqar.transport.wsgi.errors.rst
zaqar.transport.wsgi.utils.rst
zaqar.transport.wsgi.v1_0.claims.rst
zaqar.transport.wsgi.v1_0.health.rst
zaqar.transport.wsgi.v1_0.homedoc.rst
zaqar.transport.wsgi.v1_0.messages.rst
zaqar.transport.wsgi.v1_0.metadata.rst
zaqar.transport.wsgi.v1_0.pools.rst
zaqar.transport.wsgi.v1_0.queues.rst
zaqar.transport.wsgi.v1_0.stats.rst
zaqar.transport.wsgi.v1_1.claims.rst
zaqar.transport.wsgi.v1_1.flavors.rst
zaqar.transport.wsgi.v1_1.health.rst
zaqar.transport.wsgi.v1_1.homedoc.rst
zaqar.transport.wsgi.v1_1.messages.rst
zaqar.transport.wsgi.v1_1.ping.rst
zaqar.transport.wsgi.v1_1.pools.rst
zaqar.transport.wsgi.v1_1.queues.rst
zaqar.transport.wsgi.v1_1.stats.rst
zaqar.version.rst

View File

@ -1,7 +0,0 @@
The :mod:`zaqar.storage.sqlalchemy.claims` module
==================================================
.. automodule:: zaqar.storage.sqlalchemy.claims
:members:
:undoc-members:
:show-inheritance:

View File

@ -1,7 +0,0 @@
The :mod:`zaqar.storage.sqlalchemy.messages` module
====================================================
.. automodule:: zaqar.storage.sqlalchemy.messages
:members:
:undoc-members:
:show-inheritance:

View File

@ -1,183 +0,0 @@
# Copyright (c) 2014 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_utils import timeutils
import sqlalchemy as sa
from sqlalchemy.sql import func as sfunc
from zaqar import storage
from zaqar.storage import errors
from zaqar.storage.sqlalchemy import tables
from zaqar.storage.sqlalchemy import utils
class ClaimController(storage.Claim):
def __get(self, cid, trans):
# NOTE(flaper87): This probably needs to
# join on `Claim` to check the claim ttl.
sel = sa.sql.select([tables.Messages.c.id,
tables.Messages.c.body,
tables.Messages.c.ttl,
tables.Messages.c.created],
sa.and_(
tables.Messages.c.ttl >
utils.get_age(tables.Messages.c.created),
# tables.Messages.c.ttl >
# utils.get_age(tables.Claims.c.created),
tables.Messages.c.cid == cid))
records = trans.execute(sel)
for id, body, ttl, created in records:
yield {
'id': utils.msgid_encode(int(id)),
'ttl': ttl,
'age': (timeutils.utcnow() - created).seconds,
'body': utils.json_decode(body),
}
def get(self, queue, claim_id, project=None):
if project is None:
project = ''
cid = utils.cid_decode(claim_id)
if cid is None:
raise errors.ClaimDoesNotExist(claim_id, queue, project)
with self.driver.trans() as trans:
sel = sa.sql.select([tables.Claims.c.id,
tables.Claims.c.ttl,
tables.Claims.c.created],
sa.and_(tables.Claims.c.ttl >
utils.get_age(tables.Claims.c.created),
tables.Claims.c.id == cid,
tables.Queues.c.project == project,
tables.Queues.c.name == queue),
from_obj=[tables.Queues.join(tables.Claims)])
res = trans.execute(sel).fetchone()
if res is None:
raise errors.ClaimDoesNotExist(claim_id, queue, project)
cid, ttl, created = res
return (
{'id': claim_id,
'ttl': ttl,
'age': (timeutils.utcnow() - created).seconds},
list(self.__get(cid, trans))
)
def create(self, queue, metadata, project=None,
limit=storage.DEFAULT_MESSAGES_PER_CLAIM):
if project is None:
project = ''
with self.driver.trans() as trans:
try:
qid = utils.get_qid(self.driver.control_driver, queue, project)
except errors.QueueDoesNotExist:
return None, iter([])
# Clean up all expired claims in this queue
dlt = tables.Claims.delete().where(sa.and_(
tables.Claims.c.ttl <=
utils.get_age(tables.Claims.c.created),
tables.Claims.c.qid == qid))
trans.execute(dlt)
ins = tables.Claims.insert().values(qid=qid, ttl=metadata['ttl'])
res = trans.execute(ins)
cid = res.lastrowid
and_stmt = sa.and_(tables.Messages.c.cid == (None),
tables.Messages.c.ttl >
sfunc.now() - tables.Messages.c.created,
tables.Messages.c.qid == qid)
sel = sa.sql.select([tables.Messages.c.id], and_stmt).limit(limit)
records = [t[0] for t in trans.execute(sel)]
and_stmt = sa.and_(tables.Messages.c.id.in_(records))
update = tables.Messages.update().values(cid=cid).where(and_stmt)
trans.execute(update)
# NOTE(flaper87): I bet there's a better way
# to do this.
messages_ttl = metadata['ttl'] + metadata['grace']
update = (tables.Messages.update().values(ttl=messages_ttl).
where(sa.and_(
tables.Messages.c.ttl < messages_ttl,
tables.Messages.c.cid == cid)))
trans.execute(update)
return utils.cid_encode(int(cid)), list(self.__get(cid, trans))
def update(self, queue, claim_id, metadata, project=None):
if project is None:
project = ''
cid = utils.cid_decode(claim_id)
if cid is None:
raise errors.ClaimDoesNotExist(claim_id, queue, project)
age = utils.get_age(tables.Claims.c.created)
with self.driver.trans() as trans:
qid = utils.get_qid(self.driver.control_driver, queue, project)
update = tables.Claims.update().where(sa.and_(
tables.Claims.c.ttl > age,
tables.Claims.c.id == cid,
tables.Claims.c.id == qid))
update = update.values(ttl=metadata['ttl'])
res = trans.execute(update)
if res.rowcount != 1:
raise errors.ClaimDoesNotExist(claim_id, queue, project)
update = (tables.Messages.update().
values(ttl=metadata['ttl'] + metadata['grace']).
where(sa.and_(
tables.Messages.c.ttl < metadata['ttl'],
tables.Messages.c.cid == cid)))
trans.execute(update)
def delete(self, queue, claim_id, project=None):
if project is None:
project = ''
cid = utils.cid_decode(claim_id)
if cid is None:
return
with self.driver.trans() as trans:
try:
# NOTE(flaper87): This could probably use some
# joins and be just 1 query.
qid = utils.get_qid(self.driver.control_driver, queue, project)
except errors.QueueDoesNotExist:
return
and_stmt = sa.and_(tables.Claims.c.id == cid,
tables.Claims.c.qid == qid)
dlt = tables.Claims.delete().where(and_stmt)
trans.execute(dlt)
update = (tables.Messages.update().values(cid=None).
where(tables.Messages.c.cid == cid))
trans.execute(update)

View File

@ -14,14 +14,10 @@
# the License. # the License.
from zaqar.storage.sqlalchemy import catalogue from zaqar.storage.sqlalchemy import catalogue
from zaqar.storage.sqlalchemy import claims
from zaqar.storage.sqlalchemy import messages
from zaqar.storage.sqlalchemy import pools from zaqar.storage.sqlalchemy import pools
from zaqar.storage.sqlalchemy import queues from zaqar.storage.sqlalchemy import queues
QueueController = queues.QueueController QueueController = queues.QueueController
ClaimController = claims.ClaimController
MessageController = messages.MessageController
CatalogueController = catalogue.CatalogueController CatalogueController = catalogue.CatalogueController
PoolsController = pools.PoolsController PoolsController = pools.PoolsController

View File

@ -57,8 +57,8 @@ class ControlDriver(storage.ControlDriverBase):
sa.event.listen(engine, 'connect', sa.event.listen(engine, 'connect',
self._sqlite_on_connect) self._sqlite_on_connect)
if (uri.startswith('mysql+pymysql://') if (uri.startswith('mysql://') or
or uri.startswith('mysql://')): uri.startswith('mysql+pymysql://')):
sa.event.listen(engine, 'connect', sa.event.listen(engine, 'connect',
self._mysql_on_connect) self._mysql_on_connect)

View File

@ -1,420 +0,0 @@
# Copyright (c) 2014 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import calendar
from oslo_utils import timeutils
import sqlalchemy as sa
from sqlalchemy.sql import func as sfunc
from zaqar import storage
from zaqar.storage import errors
from zaqar.storage.sqlalchemy import tables
from zaqar.storage.sqlalchemy import utils
class MessageController(storage.Message):
def _and_stmt_with_ttl(self, queue_name, project):
return [tables.Queues.c.name == queue_name,
tables.Queues.c.project == project,
tables.Messages.c.ttl >
sfunc.now() - tables.Messages.c.created]
def _get(self, queue, message_id, project, count=False):
if project is None:
project = ''
mid = utils.msgid_decode(message_id)
if mid is None:
raise errors.MessageDoesNotExist(message_id, queue, project)
try:
j = sa.join(tables.Messages, tables.Queues,
tables.Messages.c.qid == tables.Queues.c.id)
sel = sa.sql.select([tables.Messages.c.body,
tables.Messages.c.ttl,
tables.Messages.c.created,
tables.Messages.c.cid])
if count:
sel = sa.sql.select([sfunc.count(tables.Messages.c.id)])
sel = sel.select_from(j)
and_stmt = [tables.Messages.c.id == mid]
and_stmt.extend(self._and_stmt_with_ttl(queue, project))
sel = sel.where(sa.and_(*and_stmt))
return self.driver.get(sel)
except utils.NoResult:
raise errors.MessageDoesNotExist(message_id, queue, project)
def _exists(self, queue, message_id, project):
try:
# NOTE(flaper87): Use count to avoid returning
# unnecessary data from the database.
self._get(queue, message_id, project, count=True)
return True
except errors.MessageDoesNotExist:
return False
def _get_cid(self, mid):
"""Return the decoded claim ID for the given message.
:param mid: Decoded message ID
"""
and_stmt = sa.and_(tables.Messages.c.id == mid)
sel = sa.sql.select([tables.Messages.c.cid], and_stmt)
return self.driver.get(sel)[0]
def get(self, queue, message_id, project):
body, ttl, created, cid = self._get(queue, message_id, project)
now = timeutils.utcnow_ts()
return {
'id': message_id,
'ttl': ttl,
'age': now - calendar.timegm(created.timetuple()),
'body': utils.json_decode(body),
'claim_id': utils.cid_encode(cid) if cid else None,
}
def bulk_get(self, queue, message_ids, project):
if project is None:
project = ''
message_ids = [id for id in
map(utils.msgid_decode, message_ids)
if id is not None]
statement = sa.sql.select([tables.Messages.c.id,
tables.Messages.c.body,
tables.Messages.c.ttl,
tables.Messages.c.created,
tables.Messages.c.cid])
and_stmt = [tables.Messages.c.id.in_(message_ids)]
and_stmt.extend(self._and_stmt_with_ttl(queue, project))
j = sa.join(tables.Messages, tables.Queues,
tables.Messages.c.qid == tables.Queues.c.id)
statement = statement.select_from(j).where(sa.and_(*and_stmt))
now = timeutils.utcnow_ts()
records = self.driver.run(statement)
for id, body, ttl, created, cid in records:
yield {
'id': utils.msgid_encode(int(id)),
'ttl': ttl,
'age': now - calendar.timegm(created.timetuple()),
'body': utils.json_decode(body),
'claim_id': utils.cid_encode(cid) if cid else None,
}
def first(self, queue, project=None, sort=1):
if project is None:
project = ''
qid = utils.get_qid(self.driver.control_driver,
queue, project)
sel = sa.sql.select([tables.Messages.c.id,
tables.Messages.c.body,
tables.Messages.c.ttl,
tables.Messages.c.created],
sa.and_(
tables.Messages.c.ttl >
sfunc.now() - tables.Messages.c.created,
tables.Messages.c.qid == qid))
if sort not in (1, -1):
raise ValueError(u'sort must be either 1 (ascending) '
u'or -1 (descending)')
order = sa.asc
if sort == -1:
order = sa.desc
sel = sel.order_by(order(tables.Messages.c.id))
try:
id, body, ttl, created = self.driver.get(sel)
except utils.NoResult:
raise errors.QueueIsEmpty(queue, project)
created_iso = timeutils.isotime(created)
return {
'id': utils.msgid_encode(int(id)),
'ttl': ttl,
'created': created_iso,
'age': int((timeutils.utcnow() - created).seconds),
'body': body,
}
def list(self, queue, project, marker=None,
limit=storage.DEFAULT_MESSAGES_PER_PAGE,
echo=False, client_uuid=None, include_claimed=False):
if project is None:
project = ''
with self.driver.trans() as trans:
sel = sa.sql.select([tables.Messages.c.id,
tables.Messages.c.body,
tables.Messages.c.ttl,
tables.Messages.c.created,
tables.Messages.c.cid])
j = sa.join(tables.Messages, tables.Queues,
tables.Messages.c.qid == tables.Queues.c.id)
sel = sel.select_from(j)
and_clause = self._and_stmt_with_ttl(queue, project)
if not echo:
and_clause.append(tables.Messages.c.client != str(client_uuid))
if marker:
mark = utils.marker_decode(marker)
if mark:
and_clause.append(tables.Messages.c.id > mark)
else:
# NOTE(flaper87): Awful hack.
# If the marker is invalid, we don't want to
# return *any* record. Since rows PKs start
# from 0, it won't match anything and the query
# will still be fast.
and_clause.append(tables.Messages.c.id < -1)
if not include_claimed:
and_clause.append(tables.Messages.c.cid == (None))
sel = sel.where(sa.and_(*and_clause))
sel = sel.limit(limit)
records = trans.execute(sel)
marker_id = {}
def it():
now = timeutils.utcnow_ts()
for id, body, ttl, created, cid in records:
marker_id['next'] = id
yield {
'id': utils.msgid_encode(id),
'ttl': ttl,
'age': now - calendar.timegm(created.timetuple()),
'body': utils.json_decode(body),
'claim_id': utils.cid_encode(cid) if cid else None,
}
yield it()
yield utils.marker_encode(marker_id['next'])
def post(self, queue, messages, client_uuid, project):
if project is None:
project = ''
with self.driver.trans() as trans:
qid = utils.get_qid(self.driver.control_driver,
queue, project)
# Delete the expired messages
and_stmt = sa.and_(tables.Messages.c.ttl <=
sfunc.now() - tables.Messages.c.created,
tables.Messages.c.qid == qid)
statement = tables.Messages.delete().where(and_stmt)
trans.execute(statement)
# executemany() sets lastrowid to None, so no matter we manually
# generate the IDs or not, we still need to query for it.
def it():
for m in messages:
yield dict(qid=qid,
ttl=m['ttl'],
body=utils.json_encode(m['body']),
client=str(client_uuid))
result = trans.execute(tables.Messages.insert(), list(it()))
statement = sa.sql.select([tables.Messages.c.id])
statement = statement.limit(result.rowcount)
statement = statement.order_by(tables.Messages.c.id.desc())
result = trans.execute(statement).fetchall()
return [utils.msgid_encode(i[0]) for i in reversed(result)]
def delete(self, queue, message_id, project, claim=None):
if project is None:
project = ''
mid = utils.msgid_decode(message_id)
if mid is None:
return
with self.driver.trans() as trans:
if not self._exists(queue, message_id, project):
return
statement = tables.Messages.delete()
and_stmt = [tables.Messages.c.id == mid]
exists = sa.sql.select([tables.Messages.c.id], sa.and_(*and_stmt))
if not trans.execute(exists).first():
return
cid = claim and utils.cid_decode(claim) or None
if claim and cid is None:
raise errors.ClaimDoesNotExist(queue, project, claim)
and_stmt.append(tables.Messages.c.cid == cid)
statement = statement.where(sa.and_(*and_stmt))
res = trans.execute(statement)
if res.rowcount == 0:
# NOTE(kgriffs): Either the message is not claimed,
# or if it is, the specified claim does not exist.
cid = self._get_cid(mid)
if cid is None:
raise errors.MessageNotClaimed(mid)
# NOTE(kgriffs): The message exists, but the claim
# must have expired or something, since it
# was not associated with the message.
raise errors.MessageNotClaimedBy(mid, claim)
def bulk_delete(self, queue, message_ids, project):
if project is None:
project = ''
message_ids = [id for id in
map(utils.msgid_decode, message_ids) if id]
with self.driver.trans() as trans:
try:
qid = utils.get_qid(self.driver.control_driver,
queue, project)
except errors.QueueDoesNotExist:
return
statement = tables.Messages.delete()
and_stmt = [tables.Messages.c.id.in_(message_ids),
tables.Messages.c.qid == qid]
trans.execute(statement.where(sa.and_(*and_stmt)))
def pop(self, queue_name, limit, project=None):
if project is None:
project = ''
with self.driver.trans() as trans:
sel = sa.sql.select([tables.Messages.c.id,
tables.Messages.c.body,
tables.Messages.c.ttl,
tables.Messages.c.created,
tables.Messages.c.cid])
j = sa.join(tables.Messages, tables.Queues,
tables.Messages.c.qid == tables.Queues.c.id)
sel = sel.select_from(j)
and_clause = self._and_stmt_with_ttl(queue_name, project)
and_clause.append(tables.Messages.c.cid == (None))
sel = sel.where(sa.and_(*and_clause))
sel = sel.limit(limit)
records = trans.execute(sel)
now = timeutils.utcnow_ts()
messages = []
message_ids = []
for id, body, ttl, created, cid in records:
messages.append({
'id': utils.msgid_encode(id),
'ttl': ttl,
'age': now - calendar.timegm(created.timetuple()),
'body': utils.json_decode(body),
'claim_id': utils.cid_encode(cid) if cid else None,
})
message_ids.append(id)
statement = tables.Messages.delete()
qid = utils.get_qid(self.driver.control_driver,
queue_name, project)
and_stmt = [tables.Messages.c.id.in_(message_ids),
tables.Messages.c.qid == qid]
trans.execute(statement.where(sa.and_(*and_stmt)))
return messages
class MessageQueueHandler(object):
def __init__(self, driver, control_driver):
self.driver = driver
def stats(self, name, project):
if project is None:
project = ''
qid = utils.get_qid(self.driver.control_driver, name, project)
sel = sa.sql.select([
sa.sql.select([sa.func.count(tables.Messages.c.id)],
sa.and_(
tables.Messages.c.qid == qid,
tables.Messages.c.cid != (None),
tables.Messages.c.ttl >
sfunc.now() - tables.Messages.c.created)),
sa.sql.select([sa.func.count(tables.Messages.c.id)],
sa.and_(
tables.Messages.c.qid == qid,
tables.Messages.c.cid == (None),
tables.Messages.c.ttl >
sfunc.now() - tables.Messages.c.created))
])
claimed, free = self.driver.get(sel)
total = free + claimed
message_stats = {
'claimed': claimed,
'free': free,
'total': total,
}
try:
message_controller = self.driver.message_controller
oldest = message_controller.first(name, project, sort=1)
newest = message_controller.first(name, project, sort=-1)
except errors.QueueIsEmpty:
pass
else:
message_stats['oldest'] = utils.stat_message(oldest)
message_stats['newest'] = utils.stat_message(newest)
return {'messages': message_stats}

View File

@ -26,12 +26,9 @@ _COMMON_SQLALCHEMY_OPTIONS = (
) )
MANAGEMENT_SQLALCHEMY_OPTIONS = _COMMON_SQLALCHEMY_OPTIONS MANAGEMENT_SQLALCHEMY_OPTIONS = _COMMON_SQLALCHEMY_OPTIONS
MESSAGE_SQLALCHEMY_OPTIONS = _COMMON_SQLALCHEMY_OPTIONS
MANAGEMENT_SQLALCHEMY_GROUP = 'drivers:management_store:sqlalchemy' MANAGEMENT_SQLALCHEMY_GROUP = 'drivers:management_store:sqlalchemy'
MESSAGE_SQLALCHEMY_GROUP = 'drivers:message_store:sqlalchemy'
def _config_options(): def _config_options():
return [(MANAGEMENT_SQLALCHEMY_GROUP, MANAGEMENT_SQLALCHEMY_OPTIONS), return [(MANAGEMENT_SQLALCHEMY_GROUP, MANAGEMENT_SQLALCHEMY_OPTIONS)]
(MESSAGE_SQLALCHEMY_GROUP, MESSAGE_SQLALCHEMY_OPTIONS)]

View File

@ -19,34 +19,6 @@ metadata = sa.MetaData()
now = timeutils.utcnow now = timeutils.utcnow
Messages = sa.Table('Messages', metadata,
sa.Column('id', sa.INTEGER, primary_key=True),
sa.Column('qid', sa.INTEGER,
sa.ForeignKey("Queues.id", ondelete="CASCADE"),
nullable=False),
sa.Column('ttl', sa.INTEGER),
sa.Column('body', sa.LargeBinary),
sa.Column('client', sa.TEXT),
sa.Column('created', sa.TIMESTAMP,
default=now, onupdate=now),
sa.Column('cid', sa.INTEGER,
sa.ForeignKey("Claims.id", ondelete='SET NULL')),
)
Claims = sa.Table('Claims', metadata,
sa.Column('id', sa.INTEGER, primary_key=True,
autoincrement=True),
sa.Column('qid', sa.INTEGER,
sa.ForeignKey("Queues.id", ondelete="CASCADE"),
nullable=False),
sa.Column('ttl', sa.INTEGER),
sa.Column('created', sa.TIMESTAMP,
default=now, onupdate=now),
)
Queues = sa.Table('Queues', metadata, Queues = sa.Table('Queues', metadata,
sa.Column('id', sa.INTEGER, primary_key=True), sa.Column('id', sa.INTEGER, primary_key=True),
sa.Column('project', sa.String(64)), sa.Column('project', sa.String(64)),