From fb5a45f0c4cea01d9ca88bbcd30e94ca19151a5f Mon Sep 17 00:00:00 2001 From: Zhihao Yuan Date: Thu, 21 Mar 2013 09:26:10 -0400 Subject: [PATCH] Message support in SQlite. * The JSON serialization is switched to msgpack; * list() on messages and stats() on queues fail loudly. Implements: blueprint message-pagination Change-Id: Iadb1ef348a53e0de28c6b45782ef686a3f1bf1e2 --- marconi/storage/sqlite/controllers.py | 266 +++++++++++++++++++------- marconi/storage/sqlite/driver.py | 67 ++++++- marconi/tests/test_sqlite.py | 110 +++++++++-- tools/pip-requires | 1 + 4 files changed, 344 insertions(+), 100 deletions(-) diff --git a/marconi/storage/sqlite/controllers.py b/marconi/storage/sqlite/controllers.py index 11a2baf97..d2b159de8 100644 --- a/marconi/storage/sqlite/controllers.py +++ b/marconi/storage/sqlite/controllers.py @@ -14,8 +14,6 @@ # limitations under the License. -import json - from marconi.storage import base from marconi.storage import exceptions @@ -23,63 +21,67 @@ from marconi.storage import exceptions class Queue(base.QueueBase): def __init__(self, driver): self.driver = driver - self.driver._run('''create table if not exists Queues ( - id INTEGER, - tenant TEXT, - name TEXT, - metadata TEXT, - PRIMARY KEY(id), - UNIQUE(tenant, name) - )''') - self.driver._run('''create unique index if not exists Paths on Queues ( - tenant, name - )''') + self.driver.run(''' + create table + if not exists + Queues ( + id INTEGER, + tenant TEXT, + name TEXT, + metadata DOCUMENT, + PRIMARY KEY(id), + UNIQUE(tenant, name) + ) + ''') def list(self, tenant): - records = self.driver._run('''select name, metadata from Queues where - tenant = ?''', tenant) + records = self.driver.run(''' + select name, metadata from Queues + where tenant = ?''', tenant) for k, v in records: yield {'name': k, 'metadata': v} def get(self, name, tenant): - sql = '''select metadata from Queues where - tenant = ? and name = ?''' - try: - return json.loads(self.driver._get(sql, tenant, name)[0]) - except TypeError: - msg = (_("Queue %(name)s does not exist for tenant %(tenant)s") - % dict(name=name, tenant=tenant)) + return self.driver.get(''' + select metadata from Queues + where tenant = ? and name = ?''', tenant, name)[0] - raise exceptions.DoesNotExist(msg) + except _NoResult: + _queue_doesnotexist(name, tenant) def upsert(self, name, metadata, tenant): - with self.driver: - sql_select = '''select metadata from Queues where - tenant = ? and name = ?''' - previous_record = self.driver._get(sql_select, tenant, name) + with self.driver('immediate'): + previous_record = self.driver.run(''' + select id from Queues + where tenant = ? and name = ? + ''', tenant, name).fetchone() - sql_replace = '''replace into Queues - values (null, ?, ?, ?)''' - doc = json.dumps(metadata, ensure_ascii=False) - self.driver._run(sql_replace, tenant, name, doc) + self.driver.run(''' + replace into Queues + values (null, ?, ?, ?) + ''', tenant, name, self.driver.pack(metadata)) return previous_record is None def delete(self, name, tenant): - self.driver._run('''delete from Queues where - tenant = ? and name = ?''', - tenant, name) + self.driver.run(''' + delete from Queues + where tenant = ? and name = ?''', tenant, name) def stats(self, name, tenant): - sql = '''select count(id) - from Messages where - qid = (select id from Queues where - tenant = ? and name = ?)''' + qid, messages = self.driver.get(''' + select Q.id, count(M.id) + from Queues as Q join Messages as M + on qid = Q.id + where tenant = ? and name = ?''', tenant, name) + + if qid is None: + _queue_doesnotexist(name, tenant) return { - 'messages': self.driver._get(sql, tenant, name)[0], + 'messages': messages, 'actions': 0, } @@ -90,55 +92,181 @@ class Queue(base.QueueBase): class Message(base.MessageBase): def __init__(self, driver): self.driver = driver - self.driver._run(''' - create table if not exists Messages ( + self.driver.run(''' + create table + if not exists + Messages ( id INTEGER, qid INTEGER, ttl INTEGER, - content TEXT, - created DATETIME, + content DOCUMENT, + client TEXT, + created DATETIME, -- seconds since the Julian day PRIMARY KEY(id), FOREIGN KEY(qid) references Queues(id) on delete cascade ) ''') - def get(self, queue, tenant=None, message_id=None, - marker=None, echo=False, client_uuid=None): - pass + def get(self, queue, message_id, tenant): + try: + content, ttl, age = self.driver.get(''' + select content, ttl, julianday() * 86400.0 - created + from Queues as Q join Messages as M + on qid = Q.id + where ttl > julianday() * 86400.0 - created + and M.id = ? and tenant = ? and name = ? + ''', _msgid_decode(message_id), tenant, queue) - def post(self, queue, messages, tenant): - with self.driver: + return { + 'id': message_id, + 'ttl': ttl, + 'age': int(age), + 'body': content, + } + + except (_NoResult, _BadID): + _msg_doesnotexist(message_id) + + def list(self, queue, tenant, marker=None, + limit=10, echo=False, client_uuid=None): + with self.driver('deferred'): try: - qid, = self.driver._get('''select id from Queues where - tenant = ? and name = ?''', - tenant, queue) - except TypeError: - msg = (_("Queue %(name)s does not exist for tenant %(tenant)s") - % dict(name=queue, tenant=tenant)) + sql = ''' + select id, content, ttl, julianday() * 86400.0 - created + from Messages + where ttl > julianday() * 86400.0 - created + and qid = ?''' + args = [_get_qid(self.driver, queue, tenant)] - raise exceptions.DoesNotExist(msg) + if not echo: + sql += ''' + and client != ?''' + args += [client_uuid] + + if marker: + sql += ''' + and id > ?''' + args += [_marker_decode(marker)] + + sql += ''' + limit ?''' + args += [limit] + + iter = self.driver.run(sql, *args) + + for id, content, ttl, age in iter: + yield { + 'id': _msgid_encode(id), + 'ttl': ttl, + 'age': int(age), + 'marker': _marker_encode(id), + 'body': content, + } + + except _BadID: + return + + def post(self, queue, messages, tenant, client_uuid): + with self.driver('immediate'): + qid = _get_qid(self.driver, queue, tenant) + + # cleanup all expired messages in this queue + + self.driver.run(''' + delete from Messages + where ttl <= julianday() * 86400.0 - created + and qid = ?''', qid) # executemany() sets lastrowid to None, so no matter we manually # generate the IDs or not, we still need to query for it. - try: - unused, = self.driver._get('''select id + 1 from Messages - where id = (select max(id) - from Messages)''') - except TypeError: - unused, = 1001, + + unused = self.driver.get(''' + select max(id) + 1 from Messages''')[0] or 1001 def it(newid): for m in messages: yield (newid, qid, m['ttl'], - json.dumps(m, ensure_ascii=False)) - + self.driver.pack(m['body']), client_uuid) newid += 1 - self.driver._run_multiple('''insert into Messages values - (?, ?, ?, ?, datetime())''', - it(unused)) + self.driver.run_multiple(''' + insert into Messages + values (?, ?, ?, ?, ?, julianday() * 86400.0)''', it(unused)) - return [str(x) for x in range(unused, unused + len(messages))] + return map(_msgid_encode, range(unused, unused + len(messages))) - def delete(self, queue, message_id, tenant=None, claim=None): - pass + def delete(self, queue, message_id, tenant, claim=None): + try: + self.driver.run(''' + delete from Messages + where id = ? + and qid = (select id from Queues + where tenant = ? and name = ?) + ''', _msgid_decode(message_id), tenant, queue) + + except _BadID: + pass + + +class _NoResult(Exception): + pass + + +class _BadID(Exception): + pass + + +def _queue_doesnotexist(name, tenant): + msg = (_("Queue %(name)s does not exist for tenant %(tenant)s") + % dict(name=name, tenant=tenant)) + + raise exceptions.DoesNotExist(msg) + + +def _msg_doesnotexist(id): + msg = (_("Message %(id)s does not exist") + % dict(id=id)) + + raise exceptions.DoesNotExist(msg) + + +def _get_qid(driver, queue, tenant): + try: + return driver.get(''' + select id from Queues + where tenant = ? and name = ?''', tenant, queue)[0] + + except _NoResult: + _queue_doesnotexist(queue, tenant) + + +# The utilities below make the database IDs opaque to the users +# of Marconi API. The only purpose is to advise the users NOT to +# make assumptions on the implementation of and/or relationship +# between the message IDs, the markers, and claim IDs. +# +# The magic numbers are arbitrarily picked; the numbers themselves +# come with no special functionalities. + +def _msgid_encode(id): + return hex(id ^ 0x5c693a53)[2:] + + +def _msgid_decode(id): + try: + return int(id, 16) ^ 0x5c693a53 + + except ValueError: + raise _BadID + + +def _marker_encode(id): + return oct(id ^ 0x3c96a355)[1:] + + +def _marker_decode(id): + try: + return int(id, 8) ^ 0x3c96a355 + + except ValueError: + raise _BadID diff --git a/marconi/storage/sqlite/driver.py b/marconi/storage/sqlite/driver.py index 160f1bcdb..77898d599 100644 --- a/marconi/storage/sqlite/driver.py +++ b/marconi/storage/sqlite/driver.py @@ -14,8 +14,11 @@ # limitations under the License. +import contextlib import sqlite3 +import msgpack + from marconi.common import config from marconi import storage from marconi.storage.sqlite import controllers @@ -26,26 +29,70 @@ cfg = config.namespace('drivers:storage:sqlite').from_options( class Driver(storage.DriverBase): + def __init__(self): self.__path = cfg.database - self.__conn = sqlite3.connect(self.__path) + self.__conn = sqlite3.connect(self.__path, + detect_types=sqlite3.PARSE_DECLTYPES) self.__db = self.__conn.cursor() - self._run('''PRAGMA foreign_keys = ON''') + self.run('''PRAGMA foreign_keys = ON''') - def _run(self, sql, *args): + @staticmethod + def pack(o): + """ + Convert a Python variable to a SQlite variable + with the customized type `DOCUMENT`. + + :param o: a Python str, unicode, int, long, float, bool, None + or a dict or list of %o + """ + return buffer(msgpack.dumps(o)) + + sqlite3.register_converter('DOCUMENT', lambda s: + msgpack.loads(s, encoding='utf-8')) + + def run(self, sql, *args): + """ + Perform a SQL query. + + :param sql: a query string with the '?' placeholders + :param args: the arguments to substitute the placeholders + """ return self.__db.execute(sql, args) - def _run_multiple(self, sql, it): + def run_multiple(self, sql, it): + """ + Iteratively perform multiple SQL queries. + + :param sql: a query string with the '?' placeholders + :param it: an iterator which yields a sequence of arguments to + substitute the placeholders + """ self.__db.executemany(sql, it) - def _get(self, sql, *args): - return self._run(sql, *args).fetchone() + def get(self, sql, *args): + """ + Get one entry from the query result. - def __enter__(self): - self._run('begin immediate') + :param sql: a query string with the '?' placeholders + :param args: the arguments to substitute the placeholders + :raises: _NoResult if the result set is empty + """ + try: + return self.run(sql, *args).next() - def __exit__(self, exc_type, exc_value, traceback): - self.__conn.commit() + except StopIteration: + raise controllers._NoResult + + @contextlib.contextmanager + def __call__(self, isolation): + self.run('begin ' + isolation) + try: + yield + self.__conn.commit() + except Exception: + self.__conn.rollback() + raise @property def queue_controller(self): diff --git a/marconi/tests/test_sqlite.py b/marconi/tests/test_sqlite.py index f455eda30..d47424ec4 100644 --- a/marconi/tests/test_sqlite.py +++ b/marconi/tests/test_sqlite.py @@ -23,13 +23,16 @@ from marconi.tests import util as testing #TODO(zyuan): let tests/storage/base.py handle these class TestSqlite(testing.TestBase): - def test_some_messages(self): - storage = sqlite.Driver() - q = storage.queue_controller - q.upsert('fizbit', {'_message_ttl': 40}, '480924') - m = storage.message_controller + def setUp(self): + super(TestSqlite, self).setUp() - d = [ + storage = sqlite.Driver() + self.queue_ctrl = storage.queue_controller + self.queue_ctrl.upsert('fizbit', {'_message_ttl': 40}, '480924') + self.msg_ctrl = storage.message_controller + + def test_some_messages(self): + doc = [ { 'body': { 'event': 'BackupStarted', @@ -37,21 +40,86 @@ class TestSqlite(testing.TestBase): }, 'ttl': 30, }, - { - 'body': { - 'event': 'BackupProgress', - 'currentBytes': '0', - 'totalBytes': '99614720', - }, - 'ttl': 10 - }, ] - n = q.stats('fizbit', '480924')['messages'] - l1 = m.post('fizbit', d, '480924') - l2 = m.post('fizbit', d, '480924') - self.assertEquals([int(v) + 2 for v in l1], map(int, l2)) - self.assertEquals(q.stats('fizbit', '480924')['messages'] - n, 4) - q.delete('fizbit', '480924') + for _ in range(10): + self.msg_ctrl.post('fizbit', doc, '480924', + client_uuid='30387f00') + msgid = self.msg_ctrl.post('fizbit', doc, '480924', + client_uuid='79ed56f8')[0] + + self.assertEquals( + self.queue_ctrl.stats('fizbit', '480924')['messages'], 11) + + msgs = list(self.msg_ctrl.list('fizbit', '480924', + client_uuid='30387f00')) + + self.assertEquals(len(msgs), 1) + + #TODO(zyuan): move this to tests/storage/test_impl_sqlite.py + msgs = list(self.msg_ctrl.list('fizbit', '480924', + marker='illformed')) + + self.assertEquals(len(msgs), 0) + + cnt = 0 + marker = None + while True: + nomsg = True + for msg in self.msg_ctrl.list('fizbit', '480924', + limit=3, marker=marker, + client_uuid='79ed56f8'): + nomsg = False + if nomsg: + break + marker = msg['marker'] + cnt += 1 + + self.assertEquals(cnt, 4) + + self.assertIn( + 'body', self.msg_ctrl.get('fizbit', msgid, '480924')) + + self.msg_ctrl.delete('fizbit', msgid, '480924') + with testtools.ExpectedException(exceptions.DoesNotExist): - m.post('fizbit', d, '480924') + self.msg_ctrl.get('fizbit', msgid, '480924') + + def test_expired_messages(self): + doc = [ + {'body': {}, 'ttl': 0}, + ] + + msgid = self.msg_ctrl.post('fizbit', doc, '480924', + client_uuid='unused')[0] + + with testtools.ExpectedException(exceptions.DoesNotExist): + self.msg_ctrl.get('fizbit', msgid, '480924') + + def test_nonexsitent(self): + with testtools.ExpectedException(exceptions.DoesNotExist): + self.msg_ctrl.post('nonexistent', [], '480924', + client_uuid='30387f00') + + with testtools.ExpectedException(exceptions.DoesNotExist): + for _ in self.msg_ctrl.list('nonexistent', '480924'): + pass + + with testtools.ExpectedException(exceptions.DoesNotExist): + self.queue_ctrl.stats('nonexistent', '480924') + + #TODO(zyuan): move this to tests/storage/test_impl_sqlite.py + def test_illformed_id(self): + + # SQlite-specific tests. Since all IDs exposed in APIs are opaque, + # any ill-formed IDs should be regarded as non-existing ones. + + with testtools.ExpectedException(exceptions.DoesNotExist): + self.msg_ctrl.get('nonexistent', 'illformed', '480924') + + self.msg_ctrl.delete('nonexistent', 'illformed', '480924') + + def tearDown(self): + self.queue_ctrl.delete('fizbit', '480924') + + super(TestSqlite, self).tearDown() diff --git a/tools/pip-requires b/tools/pip-requires index 9d62af712..86be08f05 100644 --- a/tools/pip-requires +++ b/tools/pip-requires @@ -1,5 +1,6 @@ cliff falcon +msgpack-python oslo.config>=1.1.0 PasteDeploy pymongo