Add an sqlalchemy storage to Marconi

Implements blueprint sql-storage-driver

Change-Id: I66452e1936646ca11366c574b6185b5e0badfdef
This commit is contained in:
Yeela Kaplan 2014-01-14 12:31:51 +02:00
parent 40ef90551a
commit 6f35b343fe
9 changed files with 777 additions and 68 deletions

View File

@ -0,0 +1,177 @@
# Copyright (c) 2014 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sqlalchemy as sa
from sqlalchemy.sql import func as sfunc
from marconi.openstack.common import timeutils
from marconi.queues import storage
from marconi.queues.storage import errors
from marconi.queues.storage.sqlalchemy import tables
from marconi.queues.storage.sqlalchemy import utils
class ClaimController(storage.Claim):
def __get(self, cid):
# NOTE(flaper87): This probably needs to
# join on `Claim` to check the claim ttl.
sel = sa.sql.select([tables.Messages.c.id,
tables.Messages.c.body,
tables.Messages.c.ttl,
tables.Messages.c.created],
sa.and_(
tables.Messages.c.ttl >
utils.get_age(tables.Messages.c.created),
#tables.Messages.c.ttl >
#utils.get_age(tables.Claims.c.created),
tables.Messages.c.cid == cid
))
records = self.driver.run(sel)
for id, body, ttl, created in records:
yield {
'id': utils.msgid_encode(id),
'ttl': ttl,
'age': (timeutils.utcnow() - created).seconds,
'body': body,
}
def get(self, queue, claim_id, project=None):
if project is None:
project = ''
cid = utils.cid_decode(claim_id)
if cid is None:
raise errors.ClaimDoesNotExist(claim_id, queue, project)
with self.driver.trans() as trans:
sel = sa.sql.select([tables.Claims.c.id,
tables.Claims.c.ttl,
tables.Claims.c.created],
sa.and_(tables.Claims.c.ttl >
utils.get_age(tables.Claims.c.created),
tables.Claims.c.id == cid,
tables.Queues.c.project == project,
tables.Queues.c.name == queue),
from_obj=[tables.Queues.join(tables.Claims)])
res = trans.execute(sel).fetchone()
if res is None:
raise errors.ClaimDoesNotExist(claim_id, queue, project)
cid, ttl, created = res
return (
{'id': claim_id,
'ttl': ttl,
'age': (timeutils.utcnow() - created).seconds},
self.__get(cid)
)
def create(self, queue, metadata, project=None,
limit=storage.DEFAULT_MESSAGES_PER_CLAIM):
if project is None:
project = ''
with self.driver.trans() as trans:
try:
qid = utils.get_qid(self.driver, queue, project)
except errors.QueueDoesNotExist:
return None, iter([])
# Clean up all expired claims in this queue
dlt = tables.Claims.delete().where(sa.and_(
tables.Claims.c.ttl <=
utils.get_age(tables.Claims.c.created),
tables.Claims.c.qid == qid))
trans.execute(dlt)
ins = tables.Claims.insert().values(qid=qid, ttl=metadata['ttl'])
res = trans.execute(ins)
cid = res.lastrowid
and_stmt = sa.and_(tables.Messages.c.cid == (None),
tables.Messages.c.ttl >
sfunc.now() - tables.Messages.c.created,
tables.Messages.c.qid == qid)
sel = sa.sql.select([tables.Messages.c.id], and_stmt).limit(limit)
records = [t[0] for t in trans.execute(sel)]
and_stmt = sa.and_(tables.Messages.c.id.in_(records))
update = tables.Messages.update().values(cid=cid).where(and_stmt)
trans.execute(update)
# NOTE(flaper87): I bet there's a better way
# to do this.
messages_ttl = metadata['ttl'] + metadata['grace']
update = (tables.Messages.update().values(ttl=messages_ttl).
where(sa.and_(
tables.Messages.c.ttl < messages_ttl,
tables.Messages.c.cid == cid)))
trans.execute(update)
return (utils.cid_encode(cid), self.__get(cid))
def update(self, queue, claim_id, metadata, project=None):
if project is None:
project = ''
cid = utils.cid_decode(claim_id)
if cid is None:
raise errors.ClaimDoesNotExist(claim_id, queue, project)
age = utils.get_age(tables.Claims.c.created)
with self.driver.trans() as trans:
qid = utils.get_qid(self.driver, queue, project)
update = tables.Claims.update().where(sa.and_(
tables.Claims.c.ttl > age,
tables.Claims.c.id == cid,
tables.Claims.c.id == qid)).\
values(ttl=metadata['ttl'])
res = trans.execute(update)
if res.rowcount != 1:
raise errors.ClaimDoesNotExist(claim_id, queue, project)
update = (tables.Messages.update().
values(ttl=metadata['ttl']).
where(sa.and_(
tables.Messages.c.ttl < metadata['ttl'],
tables.Messages.c.cid == cid)))
trans.execute(update)
def delete(self, queue, claim_id, project=None):
if project is None:
project = ''
cid = utils.cid_decode(claim_id)
if cid is None:
return
with self.driver.trans() as trans:
qid = utils.get_qid(self.driver, queue, project)
and_stmt = sa.and_(tables.Claims.c.id == cid,
tables.Claims.c.qid == qid)
dlt = tables.Claims.delete().where(and_stmt)
trans.execute(dlt)
update = (tables.Messages.update().values(cid=None).
where(tables.Messages.c.cid == cid))
trans.execute(update)

View File

@ -1,3 +1,4 @@
# Copyright (c) 2014 Red Hat, Inc.
# Copyright (c) 2014 Rackspace Hosting Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@ -10,12 +11,19 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
from marconi.queues.storage.sqlalchemy import catalogue
from marconi.queues.storage.sqlalchemy import claims
from marconi.queues.storage.sqlalchemy import messages
from marconi.queues.storage.sqlalchemy import queues
from marconi.queues.storage.sqlalchemy import shards
QueueController = queues.QueueController
ClaimController = claims.ClaimController
MessageController = messages.MessageController
CatalogueController = catalogue.CatalogueController
ShardsController = shards.ShardsController

View File

@ -14,15 +14,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import sqlalchemy as sa
import contextlib
from oslo.config import cfg
import sqlalchemy as sa
from marconi.common import decorators
from marconi.queues import storage
from marconi.queues.storage.sqlalchemy import controllers
from marconi.queues.storage.sqlalchemy import tables
from marconi.queues.storage.sqlalchemy import utils
_SQLALCHEMY_OPTIONS = [
cfg.StrOpt('uri', default='sqlite:///:memory:',
@ -57,17 +58,46 @@ class DataDriver(storage.DataDriverBase):
def close_connection(self):
self.connection.close()
@contextlib.contextmanager
def trans(self):
with self.engine.begin() as connection:
yield connection
def run(self, statement):
"""Performs a SQL query.
:param sql: a query string with the '?' placeholders
:param args: the arguments to substitute the placeholders
"""
return self.connection.execute(statement)
def get(self, statement):
"""Runs sql and returns the first entry in the results.
:raises: utils.NoResult if the result set is empty
"""
res = self.run(statement)
r = res.fetchone()
if r is None:
raise utils.NoResult()
else:
res.close()
return r
@decorators.lazy_property(write=False)
def queue_controller(self):
raise NotImplementedError()
return controllers.QueueController(self)
@decorators.lazy_property(write=False)
def message_controller(self):
raise NotImplementedError()
return controllers.MessageController(self)
@decorators.lazy_property(write=False)
def claim_controller(self):
raise NotImplementedError()
return controllers.ClaimController(self)
def is_alive(self):
return True
class ControlDriver(storage.ControlDriverBase):

View File

@ -0,0 +1,270 @@
# Copyright (c) 2014 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import calendar
import json
import sqlalchemy as sa
from sqlalchemy.sql import func as sfunc
from marconi.openstack.common import timeutils
from marconi.queues import storage
from marconi.queues.storage import errors
from marconi.queues.storage.sqlalchemy import tables
from marconi.queues.storage.sqlalchemy import utils
class MessageController(storage.Message):
def get(self, queue, message_id, project, count=False):
if project is None:
project = ''
mid = utils.msgid_decode(message_id)
if mid is None:
raise errors.MessageDoesNotExist(message_id, queue, project)
try:
j = sa.join(tables.Messages, tables.Queues,
tables.Messages.c.qid == tables.Queues.c.id)
sel = sa.sql.select([tables.Messages.c.body,
tables.Messages.c.ttl,
tables.Messages.c.created])
if count:
sel = sa.sql.select([sfunc.count(tables.Messages.c.id)])
sel = sel.select_from(j)
sel = sel.where(sa.and_(tables.Messages.c.id == mid,
tables.Queues.c.project == project,
tables.Queues.c.name == queue,
tables.Messages.c.ttl >
sfunc.now() - tables.Messages.c.created))
return self.driver.get(sel)[0]
except utils.NoResult:
raise errors.MessageDoesNotExist(message_id, queue, project)
def bulk_get(self, queue, message_ids, project):
if project is None:
project = ''
message_ids = [id for id in
map(utils.msgid_decode, message_ids)
if id is not None]
statement = sa.sql.select([tables.Messages.c.id,
tables.Messages.c.body,
tables.Messages.c.ttl,
tables.Messages.c.created])
and_stmt = [tables.Messages.c.id.in_(message_ids),
tables.Queues.c.name == queue,
tables.Queues.c.project == project,
tables.Messages.c.ttl >
sfunc.now() - tables.Messages.c.created]
j = sa.join(tables.Messages, tables.Queues,
tables.Messages.c.qid == tables.Queues.c.id)
statement = statement.select_from(j).where(sa.and_(*and_stmt))
now = timeutils.utcnow_ts()
records = self.driver.run(statement)
for id, body, ttl, created in records:
yield {
'id': utils.msgid_encode(id),
'ttl': ttl,
'age': now - calendar.timegm(created.timetuple()),
'body': json.loads(body),
}
def first(self, queue, project=None, sort=1):
if project is None:
project = ''
qid = utils.get_qid(self.driver, queue, project)
sel = sa.sql.select([tables.Messages.c.id,
tables.Messages.c.body,
tables.Messages.c.ttl,
tables.Messages.c.created],
sa.and_(
tables.Messages.c.ttl >
sfunc.now() - tables.Messages.c.created,
tables.Messages.c.qid == qid))
if sort not in (1, -1):
raise ValueError(u'sort must be either 1 (ascending) '
u'or -1 (descending)')
order = sa.asc
if sort == -1:
order = sa.desc
sel = sel.order_by(order(tables.Messages.c.id))
try:
id, body, ttl, created = self.driver.get(sel)
except utils.NoResult:
raise errors.QueueIsEmpty(queue, project)
created_iso = timeutils.isotime(created)
return {
'id': utils.msgid_encode(id),
'ttl': ttl,
'created': created_iso,
'age': int((timeutils.utcnow() - created).seconds),
'body': body,
}
def list(self, queue, project, marker=None,
limit=storage.DEFAULT_MESSAGES_PER_PAGE,
echo=False, client_uuid=None, include_claimed=False):
if project is None:
project = ''
with self.driver.trans() as trans:
sel = sa.sql.select([tables.Messages.c.id,
tables.Messages.c.body,
tables.Messages.c.ttl,
tables.Messages.c.created])
j = sa.join(tables.Messages, tables.Queues,
tables.Messages.c.qid == tables.Queues.c.id)
sel = sel.select_from(j)
and_clause = [tables.Queues.c.name == queue,
tables.Queues.c.project == project]
if not echo:
and_clause.append(tables.Messages.c.client != str(client_uuid))
if marker:
mark = utils.marker_decode(marker)
if mark:
and_clause.append(tables.Messages.c.id > mark)
if not include_claimed:
and_clause.append(tables.Messages.c.cid == (None))
sel = sel.where(sa.and_(*and_clause))
sel = sel.limit(limit)
records = trans.execute(sel)
marker_id = {}
def it():
now = timeutils.utcnow_ts()
for id, body, ttl, created in records:
marker_id['next'] = id
yield {
'id': utils.msgid_encode(id),
'ttl': ttl,
'age': now - calendar.timegm(created.timetuple()),
'body': json.loads(body),
}
yield it()
yield utils.marker_encode(marker_id['next'])
def post(self, queue, messages, client_uuid, project):
if project is None:
project = ''
with self.driver.trans() as trans:
qid = utils.get_qid(self.driver, queue, project)
# cleanup all expired messages in this queue
#self.driver.run('''
# delete from Messages
# where ttl <= julianday() * 86400.0 - created
# and qid = ?''', qid)
# executemany() sets lastrowid to None, so no matter we manually
# generate the IDs or not, we still need to query for it.
def it():
for m in messages:
yield dict(qid=qid,
ttl=m['ttl'],
body=json.dumps(m['body']),
client=str(client_uuid))
result = trans.execute(tables.Messages.insert(), list(it()))
statement = sa.sql.select([tables.Messages.c.id])
statement = statement.limit(result.rowcount)
statement = statement.order_by(tables.Messages.c.id.desc())
result = trans.execute(statement).fetchall()
return map(utils.msgid_encode, [i[0] for i in reversed(result)])
def delete(self, queue, message_id, project, claim=None):
if project is None:
project = ''
mid = utils.msgid_decode(message_id)
if mid is None:
return
with self.driver.trans() as trans:
try:
self.get(queue, message_id, project, count=True)
except errors.MessageDoesNotExist:
return
statement = tables.Messages.delete()
and_stmt = [tables.Messages.c.id == mid]
exists = sa.sql.select([tables.Messages.c.id], sa.and_(*and_stmt))
if not trans.execute(exists).first():
return
cid = claim and utils.cid_decode(claim) or None
if claim and cid is None:
return
and_stmt.append(tables.Messages.c.cid == cid)
statement = statement.where(sa.and_(*and_stmt))
res = trans.execute(statement)
if res.rowcount == 0:
raise errors.MessageIsClaimed(mid)
def bulk_delete(self, queue, message_ids, project):
if project is None:
project = ''
message_ids = ','.join(
["'%s'" % id for id in
map(utils.msgid_decode, message_ids) if id]
)
with self.driver.trans() as trans:
qid = utils.get_qid(self.driver, queue, project)
statement = tables.Messages.delete()
and_stmt = [tables.Messages.c.id.in_(message_ids),
tables.Messages.c.qid == qid]
trans.execute(statement.where(sa.and_(*and_stmt)))

View File

@ -0,0 +1,169 @@
# Copyright (c) 2014 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import sqlalchemy as sa
from sqlalchemy.sql import func as sfunc
from marconi.queues import storage
from marconi.queues.storage import errors
from marconi.queues.storage.sqlalchemy import tables
from marconi.queues.storage.sqlalchemy import utils
class QueueController(storage.Queue):
def list(self, project, marker=None,
limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False):
if project is None:
project = ''
fields = [tables.Queues.c.name]
if detailed:
fields.append(tables.Queues.c.metadata)
if marker:
sel = sa.sql.select(fields, sa.and_(
tables.Queues.c.project == project,
tables.Queues.c.name > marker))
else:
sel = sa.sql.select(fields, tables.Queues.c.project == project)
sel = sel.order_by(sa.asc(tables.Queues.c.name)).limit(limit)
records = self.driver.run(sel)
marker_name = {}
def it():
for rec in records:
marker_name['next'] = rec[0]
yield ({'name': rec[0]} if not detailed
else
{'name': rec[0], 'metadata': json.loads(rec[1])})
yield it()
yield marker_name['next']
def get_metadata(self, name, project):
if project is None:
project = ''
try:
sel = sa.sql.select([tables.Queues.c.metadata], sa.and_(
tables.Queues.c.project == project,
tables.Queues.c.name == name
))
return json.loads(self.driver.get(sel)[0])
except utils.NoResult:
raise errors.QueueDoesNotExist(name, project)
def create(self, name, project):
if project is None:
project = ''
try:
ins = tables.Queues.insert().values(project=project, name=name,
metadata=json.dumps({}))
res = self.driver.run(ins)
except sa.exc.IntegrityError:
return False
return res.rowcount == 1
def exists(self, name, project):
if project is None:
project = ''
sel = sa.sql.select([tables.Queues.c.id], sa.and_(
tables.Queues.c.project == project,
tables.Queues.c.name == name
))
res = self.driver.run(sel)
r = res.fetchone()
res.close()
return r is not None
def set_metadata(self, name, metadata, project):
if project is None:
project = ''
update = tables.Queues.update().\
where(sa.and_(
tables.Queues.c.project == project,
tables.Queues.c.name == name)).\
values(metadata=json.dumps(metadata))
res = self.driver.run(update)
try:
if res.rowcount != 1:
raise errors.QueueDoesNotExist(name, project)
finally:
res.close()
def delete(self, name, project):
if project is None:
project = ''
dlt = tables.Queues.delete().where(sa.and_(
tables.Queues.c.project == project,
tables.Queues.c.name == name))
self.driver.run(dlt)
def stats(self, name, project):
if project is None:
project = ''
qid = utils.get_qid(self.driver, name, project)
sel = sa.sql.select([
sa.sql.select([sa.func.count(tables.Messages.c.id)],
sa.and_(
tables.Messages.c.qid == qid,
tables.Messages.c.cid != (None),
tables.Messages.c.ttl >
sfunc.now() - tables.Messages.c.created,
)),
sa.sql.select([sa.func.count(tables.Messages.c.id)],
sa.and_(
tables.Messages.c.qid == qid,
tables.Messages.c.cid == (None),
tables.Messages.c.ttl >
sfunc.now() - tables.Messages.c.created,
))
])
claimed, free = self.driver.get(sel)
total = free + claimed
message_stats = {
'claimed': claimed,
'free': free,
'total': total,
}
try:
message_controller = self.driver.message_controller
oldest = message_controller.first(name, project, sort=1)
newest = message_controller.first(name, project, sort=-1)
except errors.QueueIsEmpty:
pass
else:
message_stats['oldest'] = utils.stat_message(oldest)
message_stats['newest'] = utils.stat_message(newest)
return {'messages': message_stats}

View File

@ -16,23 +16,13 @@
import sqlalchemy as sa
from marconi.openstack.common import timeutils
metadata = sa.MetaData()
now = timeutils.utcnow
'''
create table
if not exists
Messages (
id INTEGER,
qid INTEGER,
ttl INTEGER,
body DOCUMENT,
client TEXT,
created DATETIME,
PRIMARY KEY(id),
FOREIGN KEY(qid) references Queues(id) on delete cascade
)
'''
Messages = sa.Table('Messages', metadata,
sa.Column('id', sa.INTEGER, primary_key=True),
sa.Column('qid', sa.INTEGER,
@ -41,21 +31,13 @@ Messages = sa.Table('Messages', metadata,
sa.Column('ttl', sa.INTEGER),
sa.Column('body', sa.LargeBinary),
sa.Column('client', sa.TEXT),
sa.Column('created', sa.DATETIME),
sa.Column('created', sa.TIMESTAMP,
default=now, onupdate=now),
sa.Column('cid', sa.INTEGER,
sa.ForeignKey("Claims.id", ondelete='SET NULL')),
)
'''
create table
if not exists
Claims (
id INTEGER PRIMARY KEY AUTOINCREMENT,
qid INTEGER,
ttl INTEGER,
created DATETIME,
FOREIGN KEY(qid) references Queues(id) on delete cascade
)
'''
Claims = sa.Table('Claims', metadata,
sa.Column('id', sa.INTEGER, primary_key=True,
autoincrement=True),
@ -63,22 +45,11 @@ Claims = sa.Table('Claims', metadata,
sa.ForeignKey("Queues.id", ondelete="CASCADE"),
nullable=False),
sa.Column('ttl', sa.INTEGER),
sa.Column('created', sa.DATETIME),
sa.Column('created', sa.TIMESTAMP,
default=now, onupdate=now),
)
'''
create table
if not exists
Queues (
id INTEGER,
project TEXT,
name TEXT,
metadata DOCUMENT,
PRIMARY KEY(id),
UNIQUE(project, name)
)
'''
Queues = sa.Table('Queues', metadata,
sa.Column('id', sa.INTEGER, primary_key=True),
sa.Column('project', sa.String),
@ -88,31 +59,13 @@ Queues = sa.Table('Queues', metadata,
)
'''
create table
if not exists
Locked (
cid INTEGER,
msgid INTEGER,
FOREIGN KEY(cid) references Claims(id) on delete cascade,
FOREIGN KEY(msgid) references Messages(id) on delete cascade
)
'''
Locked = sa.Table('Locked', metadata,
sa.Column('cid', sa.INTEGER,
sa.ForeignKey("Claims.id", ondelete="CASCADE"),
nullable=False),
sa.Column('msgid', sa.INTEGER,
sa.ForeignKey("Messages.id", ondelete="CASCADE"),
nullable=False),
)
Shards = sa.Table('Shards', metadata,
sa.Column('name', sa.String, primary_key=True),
sa.Column('uri', sa.String, nullable=False),
sa.Column('weight', sa.INTEGER, nullable=False),
sa.Column('options', sa.BINARY))
Catalogue = sa.Table('Catalogue', metadata,
sa.Column('shard', sa.String,
sa.ForeignKey('Shards.name',

View File

@ -1,3 +1,4 @@
# Copyright (c) 2014 Red Hat, Inc.
# Copyright (c) 2014 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@ -15,13 +16,17 @@
# limitations under the License.
import functools
import sqlalchemy as sa
from sqlalchemy import exc
from sqlalchemy.sql import func as sfunc
import marconi.openstack.common.log as logging
from marconi.openstack.common import log as logging
from marconi.queues.storage import errors
from marconi.queues.storage.sqlalchemy import tables
LOG = logging.getLogger(__name__)
UNIX_EPOCH_AS_JULIAN_SEC = 2440587.5 * 86400.0
def raises_conn_error(func):
@ -41,3 +46,79 @@ def raises_conn_error(func):
raise errors.ConnectionError()
return wrapper
class NoResult(Exception):
pass
def get_qid(driver, queue, project):
sel = sa.sql.select([tables.Queues.c.id], sa.and_(
tables.Queues.c.project == project,
tables.Queues.c.name == queue))
try:
return driver.get(sel)[0]
except NoResult:
raise errors.QueueDoesNotExist(queue, project)
def get_age(created):
return sfunc.now() - created
# The utilities below make the database IDs opaque to the users
# of Marconi API. The only purpose is to advise the users NOT to
# make assumptions on the implementation of and/or relationship
# between the message IDs, the markers, and claim IDs.
#
# The magic numbers are arbitrarily picked; the numbers themselves
# come with no special functionalities.
def msgid_encode(id):
return hex(id ^ 0x5c693a53)[2:]
def msgid_decode(id):
try:
return int(id, 16) ^ 0x5c693a53
except ValueError:
return None
def marker_encode(id):
return oct(id ^ 0x3c96a355)[1:]
def marker_decode(id):
try:
return int(id, 8) ^ 0x3c96a355
except ValueError:
return None
def cid_encode(id):
return hex(id ^ 0x63c9a59c)[2:]
def cid_decode(id):
try:
return int(id, 16) ^ 0x63c9a59c
except ValueError:
return None
def julian_to_unix(julian_sec):
"""Converts Julian timestamp, in seconds, to a UNIX timestamp."""
return int(round(julian_sec - UNIX_EPOCH_AS_JULIAN_SEC))
def stat_message(message):
"""Creates a stat document based on a message."""
return {
'id': message['id'],
'age': message['age'],
'created': message['created'],
}

View File

@ -81,6 +81,7 @@ class QueueControllerTest(ControllerBaseTest):
def setUp(self):
super(QueueControllerTest, self).setUp()
self.queue_controller = self.driver.queue_controller
self.message_controller = self.driver.message_controller
self.claim_controller = self.driver.claim_controller
@ -395,10 +396,6 @@ class MessageControllerTest(ControllerBaseTest):
messages = [{'body': 3.14, 'ttl': 0}]
client_uuid = uuid.uuid4()
[msgid] = self.controller.post(self.queue_name, messages,
project=self.project,
client_uuid=client_uuid)
[msgid] = self.controller.post(self.queue_name, messages,
project=self.project,
client_uuid=client_uuid)

View File

@ -18,6 +18,7 @@ import datetime
import sqlalchemy as sa
from marconi.queues.storage import errors
from marconi.queues.storage import sqlalchemy
from marconi.queues.storage.sqlalchemy import controllers
from marconi.queues.storage.sqlalchemy import tables
@ -55,6 +56,29 @@ class SqlalchemyTableTests(testing.TestBase):
self.assertIsNone(row)
class SqlalchemyQueueTests(base.QueueControllerTest):
driver_class = sqlalchemy.DataDriver
controller_class = controllers.QueueController
class SqlalchemyMessageTests(base.MessageControllerTest):
driver_class = sqlalchemy.DataDriver
controller_class = controllers.MessageController
def test_empty_queue_exception(self):
queue_name = 'empty-queue-test'
self.queue_controller.create(queue_name, None)
self.assertRaises(errors.QueueIsEmpty,
self.controller.first,
queue_name, None, sort=1)
class SqlalchemyClaimTests(base.ClaimControllerTest):
driver_class = sqlalchemy.DataDriver
controller_class = controllers.ClaimController
class SqlalchemyShardsTest(base.ShardsControllerTest):
driver_class = sqlalchemy.ControlDriver
controller_class = controllers.ShardsController