diff --git a/marconi/queues/storage/sqlalchemy/__init__.py b/marconi/queues/storage/sqlalchemy/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/marconi/queues/storage/sqlalchemy/tables.py b/marconi/queues/storage/sqlalchemy/tables.py new file mode 100644 index 000000000..784ee7634 --- /dev/null +++ b/marconi/queues/storage/sqlalchemy/tables.py @@ -0,0 +1,108 @@ +# Copyright (c) 2013 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# +# See the License for the specific language governing permissions and +# limitations under the License. + +import sqlalchemy as sa + +metadata = sa.MetaData() + + +''' +create table +if not exists +Messages ( + id INTEGER, + qid INTEGER, + ttl INTEGER, + body DOCUMENT, + client TEXT, + created DATETIME, + PRIMARY KEY(id), + FOREIGN KEY(qid) references Queues(id) on delete cascade +) +''' +Messages = sa.Table('Messages', metadata, + sa.Column('id', sa.INTEGER, primary_key=True), + sa.Column('qid', sa.INTEGER, + sa.ForeignKey("Queues.id", ondelete="CASCADE"), + nullable=False), + sa.Column('ttl', sa.INTEGER), + sa.Column('body', sa.LargeBinary), + sa.Column('client', sa.TEXT), + sa.Column('created', sa.DATETIME), + ) + + +''' +create table +if not exists +Claims ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + qid INTEGER, + ttl INTEGER, + created DATETIME, + FOREIGN KEY(qid) references Queues(id) on delete cascade +) +''' +Claims = sa.Table('Claims', metadata, + sa.Column('id', sa.INTEGER, primary_key=True, + autoincrement=True), + sa.Column('qid', sa.INTEGER, + sa.ForeignKey("Queues.id", ondelete="CASCADE"), + nullable=False), + sa.Column('ttl', sa.INTEGER), + sa.Column('created', sa.DATETIME), + ) + + +''' +create table +if not exists +Queues ( + id INTEGER, + project TEXT, + name TEXT, + metadata DOCUMENT, + PRIMARY KEY(id), + UNIQUE(project, name) +) +''' +Queues = sa.Table('Queues', metadata, + sa.Column('id', sa.INTEGER, primary_key=True), + sa.Column('project', sa.String), + sa.Column('name', sa.String), + sa.Column('metadata', sa.LargeBinary), + sa.UniqueConstraint('project', 'name'), + ) + + +''' +create table +if not exists +Locked ( + cid INTEGER, + msgid INTEGER, + FOREIGN KEY(cid) references Claims(id) on delete cascade, + FOREIGN KEY(msgid) references Messages(id) on delete cascade +) +''' +Locked = sa.Table('Locked', metadata, + sa.Column('cid', sa.INTEGER, + sa.ForeignKey("Claims.id", ondelete="CASCADE"), + nullable=False), + sa.Column('msgid', sa.INTEGER, + sa.ForeignKey("Messages.id", ondelete="CASCADE"), + nullable=False), + ) diff --git a/requirements.txt b/requirements.txt index 47111f6d3..2be458da9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,3 +13,4 @@ WebOb>=1.2.3,<1.3 stevedore>=0.10 six>=1.4.1 oslo.config>=1.2.0 +sqlalchemy>=0.7.8,<=0.7.99 diff --git a/tests/unit/queues/storage/test_impl_sqlalchemy.py b/tests/unit/queues/storage/test_impl_sqlalchemy.py new file mode 100644 index 000000000..b835b58b8 --- /dev/null +++ b/tests/unit/queues/storage/test_impl_sqlalchemy.py @@ -0,0 +1,52 @@ +# Copyright (c) 2013 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime + +import sqlalchemy as sa + +from marconi.queues.storage.sqlalchemy import tables +from marconi import tests as testing + + +class SqlalchemyTableTests(testing.TestBase): + + def setUp(self): + super(SqlalchemyTableTests, self).setUp() + self.engine = sa.create_engine('sqlite:///:memory:') + tables.metadata.create_all(self.engine, checkfirst=True) + + def test_table_queries(self): + self.engine.execute(tables.Queues.insert(), id=1, project='test', + name='marconi', metadata='aaaa') + self.engine.execute(tables.Messages.insert(), id=1, qid=1, ttl=10, + body='bbbb', client='a', + created=datetime.datetime.now()) + self.engine.execute(tables.Claims.insert(), id=1, qid=1, ttl=10, + created=datetime.datetime.now()) + + rs = self.engine.execute(tables.Claims.select()) + row = rs.fetchone() + + self.assertEqual(row.id, 1) + self.assertEqual(row.qid, 1) + self.assertEqual(row.ttl, 10) + + self.engine.execute(tables.Claims.delete(tables.Claims.c.id == 1)) + rs = self.engine.execute(tables.Claims.select()) + row = rs.fetchone() + + self.assertIsNone(row)