diff --git a/marconi/storage/base.py b/marconi/storage/base.py index 81063ac84..258c189f4 100644 --- a/marconi/storage/base.py +++ b/marconi/storage/base.py @@ -69,12 +69,12 @@ class QueueBase(ControllerBase): __metaclass__ = abc.ABCMeta @abc.abstractmethod - def list(self, tenant=None, marker=None, + def list(self, project=None, marker=None, limit=10, detailed=False): """ Base method for listing queues. - :param tenant: Tenant id + :param project: Project id :param marker: The last queue name :param limit: (Default 10) Max number :param detailed: Whether metadata is included @@ -85,12 +85,12 @@ class QueueBase(ControllerBase): raise NotImplementedError @abc.abstractmethod - def get(self, name, tenant=None): + def get(self, name, project=None): """ Base method for queue retrieval. :param name: The queue name - :param tenant: Tenant id + :param project: Project id :returns: Dictionary containing queue metadata :raises: DoesNotExist @@ -98,14 +98,14 @@ class QueueBase(ControllerBase): raise NotImplementedError @abc.abstractmethod - def upsert(self, name, metadata, tenant=None): + def upsert(self, name, metadata, project=None): """ This methods handles both creates and updates operations for queues. :param name: The queue name :param metadata: Arbitrary metadata - :param tenant: Tenant id + :param project: Project id :returns: True if a queue was created and False if it was updated. """ @@ -113,34 +113,34 @@ class QueueBase(ControllerBase): assert isinstance(metadata, dict), msg @abc.abstractmethod - def delete(self, name, tenant=None): + def delete(self, name, project=None): """ Base method for queue deletion. :param name: The queue name - :param tenant: Tenant id + :param project: Project id """ raise NotImplementedError @abc.abstractmethod - def stats(self, name, tenant=None): + def stats(self, name, project=None): """ Base method for queue stats. :param name: The queue name - :param tenant: Tenant id + :param project: Project id :returns: Dictionary with the queue stats """ raise NotImplementedError @abc.abstractmethod - def actions(self, name, tenant=None, marker=None, limit=10): + def actions(self, name, project=None, marker=None, limit=10): """ Base method for queue actions. :param name: Queue name - :param tenant: Tenant id + :param project: Project id :param marker: Tail identifier :param limit: (Default 10) Max number of messages to retrieve. @@ -157,14 +157,14 @@ class MessageBase(ControllerBase): __metaclass__ = abc.ABCMeta @abc.abstractmethod - def list(self, queue, tenant=None, marker=None, + def list(self, queue, project=None, marker=None, limit=10, echo=False, client_uuid=None): """ Base message list method :param queue: Name of the queue to get the message from. - :param tenant: Tenant id + :param project: Project id :param marker: Tail identifier :param limit: (Default 10) specifies up to 100 messages to return. @@ -178,13 +178,13 @@ class MessageBase(ControllerBase): """ raise NotImplementedError - def get(self, queue, message_id, tenant=None): + def get(self, queue, message_id, project=None): """ Base message get method :param queue: Name of the queue to get the message from. - :param tenant: Tenant id + :param project: Project id :param message_id: Message ID :returns: Dictionary containing message data @@ -193,7 +193,7 @@ class MessageBase(ControllerBase): raise NotImplementedError @abc.abstractmethod - def post(self, queue, messages, client_uuid, tenant=None): + def post(self, queue, messages, client_uuid, project=None): """ Base message post method @@ -205,21 +205,21 @@ class MessageBase(ControllerBase): :param messages: Messages to post to queue, it can be a list of 1 or more elements. :param client_uuid: Client's unique identifier. - :param tenant: Tenant id + :param project: Project id :returns: List of message ids """ raise NotImplementedError @abc.abstractmethod - def delete(self, queue, message_id, tenant=None, claim=None): + def delete(self, queue, message_id, project=None, claim=None): """ Base message delete method :param queue: Name of the queue to post message to. :param message_id: Message to be deleted - :param tenant: Tenant id + :param project: Project id :param claim: Claim this message belongs to. When specified, claim must be valid and message_id must belong to @@ -233,14 +233,14 @@ class ClaimBase(ControllerBase): __metaclass__ = abc.ABCMeta @abc.abstractmethod - def get(self, queue, claim_id, tenant=None): + def get(self, queue, claim_id, project=None): """ Base claim get method :param queue: Name of the queue this claim belongs to. :param claim_id: The claim id - :param tenant: Tenant id + :param project: Project id :returns: (Claim's metadata, claimed messages) :raises: DoesNotExist @@ -248,7 +248,7 @@ class ClaimBase(ControllerBase): raise NotImplementedError @abc.abstractmethod - def create(self, queue, metadata, tenant=None, limit=10): + def create(self, queue, metadata, project=None, limit=10): """ Base claim create method @@ -256,7 +256,7 @@ class ClaimBase(ControllerBase): claim belongs to. :param metadata: Claim's parameters to be stored. - :param tenant: Tenant id + :param project: Project id :param limit: (Default 10) Max number of messages to claim. @@ -265,7 +265,7 @@ class ClaimBase(ControllerBase): raise NotImplementedError @abc.abstractmethod - def update(self, queue, claim_id, metadata, tenant=None): + def update(self, queue, claim_id, metadata, project=None): """ Base claim update method @@ -274,18 +274,18 @@ class ClaimBase(ControllerBase): :param claim_id: Claim to be updated :param metadata: Claim's parameters to be updated. - :param tenant: Tenant id + :param project: Project id """ raise NotImplementedError @abc.abstractmethod - def delete(self, queue, claim_id, tenant=None): + def delete(self, queue, claim_id, project=None): """ Base claim delete method :param queue: Name of the queue this claim belongs to. :param claim_id: Claim to be deleted - :param tenant: Tenant id + :param project: Project id """ raise NotImplementedError diff --git a/marconi/storage/exceptions.py b/marconi/storage/exceptions.py index 89ccaf6a8..2b247e284 100644 --- a/marconi/storage/exceptions.py +++ b/marconi/storage/exceptions.py @@ -24,27 +24,27 @@ class NotPermitted(Exception): class QueueDoesNotExist(DoesNotExist): - def __init__(self, name, tenant): - msg = (_("Queue %(name)s does not exist for tenant %(tenant)s") % - dict(name=name, tenant=tenant)) + def __init__(self, name, project): + msg = (_("Queue %(name)s does not exist for project %(project)s") % + dict(name=name, project=project)) super(QueueDoesNotExist, self).__init__(msg) class MessageDoesNotExist(DoesNotExist): - def __init__(self, mid, queue, tenant): + def __init__(self, mid, queue, project): msg = (_("Message %(mid)s does not exist in " - "queue %(queue)s of tenant %(tenant)s") % - dict(mid=mid, queue=queue, tenant=tenant)) + "queue %(queue)s of project %(project)s") % + dict(mid=mid, queue=queue, project=project)) super(MessageDoesNotExist, self).__init__(msg) class ClaimDoesNotExist(DoesNotExist): - def __init__(self, cid, queue, tenant): + def __init__(self, cid, queue, project): msg = (_("Claim %(cid)s does not exist in " - "queue %(queue)s of tenant %(tenant)s") % - dict(cid=cid, queue=queue, tenant=tenant)) + "queue %(queue)s of project %(project)s") % + dict(cid=cid, queue=queue, project=project)) super(ClaimDoesNotExist, self).__init__(msg) diff --git a/marconi/storage/mongodb/controllers.py b/marconi/storage/mongodb/controllers.py index b5aab1c33..d0d7d8ea2 100644 --- a/marconi/storage/mongodb/controllers.py +++ b/marconi/storage/mongodb/controllers.py @@ -38,7 +38,7 @@ class QueueController(storage.QueueBase): Queues: Name Field ---------------- - tenant -> t + project -> p metadata -> m name -> n @@ -49,15 +49,15 @@ class QueueController(storage.QueueBase): self._col = self.driver.db["queues"] # NOTE(flaper87): This creates a unique compound index for - # tenant and name. Using tenant as the first field of the - # index allows for querying by tenant and tenant+name. + # project and name. Using project as the first field of the + # index allows for querying by project and project+name. # This is also useful for retrieving the queues list for - # as specific tenant, for example. Order Matters! - self._col.ensure_index([("t", 1), ("n", 1)], unique=True) + # as specific project, for example. Order Matters! + self._col.ensure_index([("p", 1), ("n", 1)], unique=True) - def list(self, tenant=None, marker=None, + def list(self, project=None, marker=None, limit=10, detailed=False): - query = {"t": tenant} + query = {"p": project} if marker: query["n"] = {"$gt": marker} @@ -80,29 +80,29 @@ class QueueController(storage.QueueBase): yield normalizer(cursor) yield marker_name["next"] - def _get(self, name, tenant=None, fields={"m": 1, "_id": 0}): - queue = self._col.find_one({"t": tenant, "n": name}, fields=fields) + def _get(self, name, project=None, fields={"m": 1, "_id": 0}): + queue = self._col.find_one({"p": project, "n": name}, fields=fields) if queue is None: - raise exceptions.QueueDoesNotExist(name, tenant) + raise exceptions.QueueDoesNotExist(name, project) return queue - def get_id(self, name, tenant=None): + def get_id(self, name, project=None): """ Just like `get` method but returns the queue's id :returns: Queue's `ObjectId` """ - queue = self._get(name, tenant, fields=["_id"]) + queue = self._get(name, project, fields=["_id"]) return queue.get("_id") - def get(self, name, tenant=None): - queue = self._get(name, tenant) + def get(self, name, project=None): + queue = self._get(name, project) return queue.get("m", {}) - def upsert(self, name, metadata, tenant=None): - super(QueueController, self).upsert(name, metadata, tenant) + def upsert(self, name, metadata, project=None): + super(QueueController, self).upsert(name, metadata, project) - rst = self._col.update({"t": tenant, "n": name}, + rst = self._col.update({"p": project, "n": name}, {"$set": {"m": metadata}}, multi=False, upsert=True, @@ -110,12 +110,12 @@ class QueueController(storage.QueueBase): return not rst["updatedExisting"] - def delete(self, name, tenant=None): - self.driver.message_controller.purge_queue(name, tenant) - self._col.remove({"t": tenant, "n": name}) + def delete(self, name, project=None): + self.driver.message_controller.purge_queue(name, project) + self._col.remove({"p": project, "n": name}) - def stats(self, name, tenant=None): - qid = self.get_id(name, tenant) + def stats(self, name, project=None): + qid = self.get_id(name, project) msg_ctrl = self.driver.message_controller active = msg_ctrl.active(qid) claimed = msg_ctrl.claimed(qid) @@ -128,7 +128,7 @@ class QueueController(storage.QueueBase): } } - def actions(self, name, tenant=None, marker=None, limit=10): + def actions(self, name, project=None, marker=None, limit=10): raise NotImplementedError @@ -176,9 +176,9 @@ class MessageController(storage.MessageBase): ("c.e", 1), ("_id", -1)], background=True) - def _get_queue_id(self, queue, tenant): + def _get_queue_id(self, queue, project): queue_controller = self.driver.queue_controller - return queue_controller.get_id(queue, tenant) + return queue_controller.get_id(queue, project) def all(self): return self._col.find() @@ -247,11 +247,11 @@ class MessageController(storage.MessageBase): {"$set": {"c": {"id": None, "e": 0}}}, upsert=False, multi=True) - def list(self, queue, tenant=None, marker=None, + def list(self, queue, project=None, marker=None, limit=10, echo=False, client_uuid=None): try: - qid = self._get_queue_id(queue, tenant) + qid = self._get_queue_id(queue, project) messages = self.active(qid, marker, echo, client_uuid) except ValueError: return @@ -276,18 +276,18 @@ class MessageController(storage.MessageBase): yield utils.HookedCursor(messages, denormalizer) yield str(marker_id['next']) - def get(self, queue, message_id, tenant=None): + def get(self, queue, message_id, project=None): # Base query, always check expire time try: mid = utils.to_oid(message_id) except ValueError: - raise exceptions.MessageDoesNotExist(message_id, queue, tenant) + raise exceptions.MessageDoesNotExist(message_id, queue, project) now = timeutils.utcnow() query = { - "q": self._get_queue_id(queue, tenant), + "q": self._get_queue_id(queue, project), "e": {"$gt": now}, "_id": mid } @@ -295,7 +295,7 @@ class MessageController(storage.MessageBase): message = self._col.find_one(query) if message is None: - raise exceptions.MessageDoesNotExist(message_id, queue, tenant) + raise exceptions.MessageDoesNotExist(message_id, queue, project) oid = message["_id"] age = now - utils.oid_utc(oid) @@ -307,8 +307,8 @@ class MessageController(storage.MessageBase): "body": message["b"], } - def post(self, queue, messages, client_uuid, tenant=None): - qid = self._get_queue_id(queue, tenant) + def post(self, queue, messages, client_uuid, project=None): + qid = self._get_queue_id(queue, project) now = timeutils.utcnow() @@ -329,7 +329,7 @@ class MessageController(storage.MessageBase): ids = self._col.insert(denormalizer(messages)) return map(str, ids) - def delete(self, queue, message_id, tenant=None, claim=None): + def delete(self, queue, message_id, project=None, claim=None): try: try: mid = utils.to_oid(message_id) @@ -337,7 +337,7 @@ class MessageController(storage.MessageBase): return query = { - "q": self._get_queue_id(queue, tenant), + "q": self._get_queue_id(queue, project), "_id": mid } @@ -365,9 +365,9 @@ class MessageController(storage.MessageBase): except exceptions.QueueDoesNotExist: pass - def purge_queue(self, queue, tenant=None): + def purge_queue(self, queue, project=None): try: - qid = self._get_queue_id(queue, tenant) + qid = self._get_queue_id(queue, project) self._col.remove({"q": qid}, w=0) except exceptions.QueueDoesNotExist: pass @@ -393,15 +393,15 @@ class ClaimController(storage.ClaimBase): the claim id and it's expiration timestamp. """ - def _get_queue_id(self, queue, tenant): + def _get_queue_id(self, queue, project): queue_controller = self.driver.queue_controller - return queue_controller.get_id(queue, tenant) + return queue_controller.get_id(queue, project) - def get(self, queue, claim_id, tenant=None): + def get(self, queue, claim_id, project=None): msg_ctrl = self.driver.message_controller # Check whether the queue exists or not - qid = self._get_queue_id(queue, tenant) + qid = self._get_queue_id(queue, project) # Base query, always check expire time now = timeutils.utcnow() @@ -435,11 +435,11 @@ class ClaimController(storage.ClaimBase): "id": str(claim["id"]), } except StopIteration: - raise exceptions.ClaimDoesNotExist(cid, queue, tenant) + raise exceptions.ClaimDoesNotExist(cid, queue, project) return (claim, messages) - def create(self, queue, metadata, tenant=None, limit=10): + def create(self, queue, metadata, project=None, limit=10): """ This implementation was done in a best-effort fashion. In order to create a claim we need to get a list @@ -460,7 +460,7 @@ class ClaimController(storage.ClaimBase): # We don't need the qid here but # we need to verify it exists. - qid = self._get_queue_id(queue, tenant) + qid = self._get_queue_id(queue, project) ttl = int(metadata.get("ttl", 60)) oid = objectid.ObjectId() @@ -513,14 +513,14 @@ class ClaimController(storage.ClaimBase): upsert=False, multi=True) if updated != 0: - claim, messages = self.get(queue, oid, tenant=tenant) + claim, messages = self.get(queue, oid, project=project) return (str(oid), messages) - def update(self, queue, claim_id, metadata, tenant=None): + def update(self, queue, claim_id, metadata, project=None): try: cid = utils.to_oid(claim_id) except ValueError: - raise exceptions.ClaimDoesNotExist(claim_id, queue, tenant) + raise exceptions.ClaimDoesNotExist(claim_id, queue, project) now = timeutils.utcnow() ttl = int(metadata.get("ttl", 60)) @@ -532,14 +532,14 @@ class ClaimController(storage.ClaimBase): msg = _("New ttl will make the claim expires") raise ValueError(msg) - qid = self._get_queue_id(queue, tenant) + qid = self._get_queue_id(queue, project) msg_ctrl = self.driver.message_controller claimed = msg_ctrl.claimed(qid, cid, expires=now, limit=1) try: claimed.next() except StopIteration: - raise exceptions.ClaimDoesNotExist(claim_id, queue, tenant) + raise exceptions.ClaimDoesNotExist(claim_id, queue, project) meta = { "id": cid, @@ -561,6 +561,6 @@ class ClaimController(storage.ClaimBase): {"$set": {"e": expires, "t": ttl}}, upsert=False, multi=True) - def delete(self, queue, claim_id, tenant=None): + def delete(self, queue, claim_id, project=None): msg_ctrl = self.driver.message_controller msg_ctrl.unclaim(claim_id) diff --git a/marconi/storage/sqlite/controllers.py b/marconi/storage/sqlite/controllers.py index aa82c8099..465a114ef 100644 --- a/marconi/storage/sqlite/controllers.py +++ b/marconi/storage/sqlite/controllers.py @@ -26,23 +26,23 @@ class Queue(base.QueueBase): if not exists Queues ( id INTEGER, - tenant TEXT, + project TEXT, name TEXT, metadata DOCUMENT, PRIMARY KEY(id), - UNIQUE(tenant, name) + UNIQUE(project, name) ) ''') - def list(self, tenant, marker=None, + def list(self, project, marker=None, limit=10, detailed=False): sql = ((''' select name from Queues''' if not detailed else ''' select name, metadata from Queues''') + ''' - where tenant = ?''') - args = [tenant] + where project = ?''') + args = [project] if marker: sql += ''' @@ -67,37 +67,37 @@ class Queue(base.QueueBase): yield it() yield marker_name['next'] - def get(self, name, tenant): + def get(self, name, project): try: return self.driver.get(''' select metadata from Queues - where tenant = ? and name = ?''', tenant, name)[0] + where project = ? and name = ?''', project, name)[0] except _NoResult: - raise exceptions.QueueDoesNotExist(name, tenant) + raise exceptions.QueueDoesNotExist(name, project) - def upsert(self, name, metadata, tenant): + def upsert(self, name, metadata, project): with self.driver('immediate'): previous_record = self.driver.run(''' select id from Queues - where tenant = ? and name = ? - ''', tenant, name).fetchone() + where project = ? and name = ? + ''', project, name).fetchone() self.driver.run(''' replace into Queues values (null, ?, ?, ?) - ''', tenant, name, self.driver.pack(metadata)) + ''', project, name, self.driver.pack(metadata)) return previous_record is None - def delete(self, name, tenant): + def delete(self, name, project): self.driver.run(''' delete from Queues - where tenant = ? and name = ?''', tenant, name) + where project = ? and name = ?''', project, name) - def stats(self, name, tenant): + def stats(self, name, project): with self.driver('deferred'): - qid = _get_qid(self.driver, name, tenant) + qid = _get_qid(self.driver, name, project) claimed, free = self.driver.get(''' select * from (select count(msgid) @@ -121,7 +121,7 @@ class Queue(base.QueueBase): 'actions': 0, } - def actions(self, name, tenant, marker=None, limit=10): + def actions(self, name, project, marker=None, limit=10): raise NotImplementedError @@ -143,15 +143,15 @@ class Message(base.MessageBase): ) ''') - def get(self, queue, message_id, tenant): + def get(self, queue, message_id, project): try: content, ttl, age = self.driver.get(''' select content, ttl, julianday() * 86400.0 - created from Queues as Q join Messages as M on qid = Q.id where ttl > julianday() * 86400.0 - created - and M.id = ? and tenant = ? and name = ? - ''', _msgid_decode(message_id), tenant, queue) + and M.id = ? and project = ? and name = ? + ''', _msgid_decode(message_id), project, queue) return { 'id': message_id, @@ -161,9 +161,9 @@ class Message(base.MessageBase): } except (_NoResult, _BadID): - raise exceptions.MessageDoesNotExist(message_id, queue, tenant) + raise exceptions.MessageDoesNotExist(message_id, queue, project) - def list(self, queue, tenant, marker=None, + def list(self, queue, project, marker=None, limit=10, echo=False, client_uuid=None): with self.driver('deferred'): try: @@ -172,7 +172,7 @@ class Message(base.MessageBase): from Messages where ttl > julianday() * 86400.0 - created and qid = ?''' - args = [_get_qid(self.driver, queue, tenant)] + args = [_get_qid(self.driver, queue, project)] if not echo: sql += ''' @@ -207,9 +207,9 @@ class Message(base.MessageBase): except _BadID: return - def post(self, queue, messages, client_uuid, tenant): + def post(self, queue, messages, client_uuid, project): with self.driver('immediate'): - qid = _get_qid(self.driver, queue, tenant) + qid = _get_qid(self.driver, queue, project) # cleanup all expired messages in this queue @@ -237,7 +237,7 @@ class Message(base.MessageBase): return map(_msgid_encode, range(unused, my['newid'])) - def delete(self, queue, message_id, tenant, claim=None): + def delete(self, queue, message_id, project, claim=None): try: id = _msgid_decode(message_id) @@ -246,8 +246,8 @@ class Message(base.MessageBase): delete from Messages where id = ? and qid = (select id from Queues - where tenant = ? and name = ?) - ''', id, tenant, queue) + where project = ? and name = ?) + ''', id, project, queue) return with self.driver('immediate'): @@ -256,8 +256,8 @@ class Message(base.MessageBase): from Queues as Q join Messages as M on qid = Q.id where ttl > julianday() * 86400.0 - created - and M.id = ? and tenant = ? and name = ? - ''', id, tenant, queue) + and M.id = ? and project = ? and name = ? + ''', id, project, queue) if not message_exists: return @@ -312,7 +312,7 @@ class Claim(base.ClaimBase): ) ''') - def get(self, queue, claim_id, tenant): + def get(self, queue, claim_id, project): with self.driver('deferred'): try: id, ttl, age = self.driver.get(''' @@ -320,8 +320,8 @@ class Claim(base.ClaimBase): from Queues as Q join Claims as C on Q.id = C.qid where C.ttl > julianday() * 86400.0 - C.created - and C.id = ? and tenant = ? and name = ? - ''', _cid_decode(claim_id), tenant, queue) + and C.id = ? and project = ? and name = ? + ''', _cid_decode(claim_id), project, queue) return ( { @@ -333,11 +333,11 @@ class Claim(base.ClaimBase): ) except (_NoResult, _BadID): - raise exceptions.ClaimDoesNotExist(claim_id, queue, tenant) + raise exceptions.ClaimDoesNotExist(claim_id, queue, project) - def create(self, queue, metadata, tenant, limit=10): + def create(self, queue, metadata, project, limit=10): with self.driver('immediate'): - qid = _get_qid(self.driver, queue, tenant) + qid = _get_qid(self.driver, queue, project) # cleanup all expired claims in this queue @@ -383,7 +383,7 @@ class Claim(base.ClaimBase): 'body': content, } - def update(self, queue, claim_id, metadata, tenant): + def update(self, queue, claim_id, metadata, project): try: id = _cid_decode(claim_id) @@ -397,16 +397,18 @@ class Claim(base.ClaimBase): where ttl > julianday() * 86400.0 - created and id = ? and qid = (select id from Queues - where tenant = ? and name = ?) - ''', metadata['ttl'], id, tenant, queue) + where project = ? and name = ?) + ''', metadata['ttl'], id, project, queue) if not self.driver.affected: - raise exceptions.ClaimDoesNotExist(claim_id, queue, tenant) + raise exceptions.ClaimDoesNotExist(claim_id, + queue, + project) self.__update_claimed(id, metadata['ttl']) except _BadID: - raise exceptions.ClaimDoesNotExist(claim_id, queue, tenant) + raise exceptions.ClaimDoesNotExist(claim_id, queue, project) def __update_claimed(self, cid, ttl): # Precondition: cid is not expired @@ -419,14 +421,14 @@ class Claim(base.ClaimBase): where cid = ?) ''', ttl, ttl, cid) - def delete(self, queue, claim_id, tenant): + def delete(self, queue, claim_id, project): try: self.driver.run(''' delete from Claims where id = ? and qid = (select id from Queues - where tenant = ? and name = ?) - ''', _cid_decode(claim_id), tenant, queue) + where project = ? and name = ?) + ''', _cid_decode(claim_id), project, queue) except _BadID: pass @@ -440,14 +442,14 @@ class _BadID(Exception): pass -def _get_qid(driver, queue, tenant): +def _get_qid(driver, queue, project): try: return driver.get(''' select id from Queues - where tenant = ? and name = ?''', tenant, queue)[0] + where project = ? and name = ?''', project, queue)[0] except _NoResult: - raise exceptions.QueueDoesNotExist(queue, tenant) + raise exceptions.QueueDoesNotExist(queue, project) # The utilities below make the database IDs opaque to the users diff --git a/marconi/tests/storage/base.py b/marconi/tests/storage/base.py index 4962d08e5..4dd52909d 100644 --- a/marconi/tests/storage/base.py +++ b/marconi/tests/storage/base.py @@ -20,7 +20,7 @@ from marconi.tests import util as testing class ControllerBaseTest(testing.TestBase): - tenant = "tenant" + project = "project" driver_class = None controller_class = None controller_base_class = None @@ -53,9 +53,9 @@ class QueueControllerTest(ControllerBaseTest): def test_list(self): num = 15 for queue in xrange(num): - self.controller.upsert(queue, {}, tenant=self.tenant) + self.controller.upsert(queue, {}, project=self.project) - interaction = self.controller.list(tenant=self.tenant, + interaction = self.controller.list(project=self.project, detailed=True) queues = list(interaction.next()) @@ -64,7 +64,7 @@ class QueueControllerTest(ControllerBaseTest): 'metadata' in queue, queues)), True) self.assertEquals(len(queues), 10) - interaction = self.controller.list(tenant=self.tenant, + interaction = self.controller.list(project=self.project, marker=interaction.next()) queues = list(interaction.next()) @@ -75,37 +75,37 @@ class QueueControllerTest(ControllerBaseTest): def test_queue_lifecycle(self): # Test Queue Creation - created = self.controller.upsert("test", tenant=self.tenant, + created = self.controller.upsert("test", project=self.project, metadata=dict(topic="test_queue")) self.assertTrue(created) # Test Queue retrieval - queue = self.controller.get("test", tenant=self.tenant) + queue = self.controller.get("test", project=self.project) self.assertIsNotNone(queue) # Test Queue Update - created = self.controller.upsert("test", tenant=self.tenant, + created = self.controller.upsert("test", project=self.project, metadata=dict(meta="test_meta")) self.assertFalse(created) - queue = self.controller.get("test", tenant=self.tenant) + queue = self.controller.get("test", project=self.project) self.assertEqual(queue["meta"], "test_meta") # Test Queue Statistic _insert_fixtures(self.message_controller, "test", - tenant=self.tenant, client_uuid="my_uuid", num=12) + project=self.project, client_uuid="my_uuid", num=12) - countof = self.controller.stats("test", tenant=self.tenant) + countof = self.controller.stats("test", project=self.project) self.assertEqual(countof['messages']['free'], 12) # Test Queue Deletion - self.controller.delete("test", tenant=self.tenant) + self.controller.delete("test", project=self.project) # Test DoesNotExist Exception self.assertRaises(storage.exceptions.DoesNotExist, self.controller.get, "test", - tenant=self.tenant) + project=self.project) class MessageControllerTest(ControllerBaseTest): @@ -126,10 +126,10 @@ class MessageControllerTest(ControllerBaseTest): self.queue_controller = self.driver.queue_controller self.claim_controller = self.driver.claim_controller self.queue_controller.upsert(self.queue_name, {}, - tenant=self.tenant) + project=self.project) def tearDown(self): - self.queue_controller.delete(self.queue_name, tenant=self.tenant) + self.queue_controller.delete(self.queue_name, project=self.project) super(MessageControllerTest, self).tearDown() def test_message_lifecycle(self): @@ -147,25 +147,25 @@ class MessageControllerTest(ControllerBaseTest): # Test Message Creation created = list(self.controller.post(queue_name, messages, - tenant=self.tenant, + project=self.project, client_uuid="unused")) self.assertEqual(len(created), 1) # Test Message Get - self.controller.get(queue_name, created[0], tenant=self.tenant) + self.controller.get(queue_name, created[0], project=self.project) # Test Message Deletion - self.controller.delete(queue_name, created[0], tenant=self.tenant) + self.controller.delete(queue_name, created[0], project=self.project) # Test DoesNotExist self.assertRaises(storage.exceptions.DoesNotExist, self.controller.get, queue_name, message_id=created[0], - tenant=self.tenant) + project=self.project) def test_get_multi(self): _insert_fixtures(self.controller, self.queue_name, - tenant=self.tenant, client_uuid="my_uuid", num=15) + project=self.project, client_uuid="my_uuid", num=15) def load_messages(expected, *args, **kwargs): interaction = self.controller.list(*args, **kwargs) @@ -174,75 +174,76 @@ class MessageControllerTest(ControllerBaseTest): return interaction # Test all messages, echo False and uuid - load_messages(0, self.queue_name, tenant=self.tenant, + load_messages(0, self.queue_name, project=self.project, client_uuid="my_uuid") # Test all messages and limit - load_messages(15, self.queue_name, tenant=self.tenant, limit=20, + load_messages(15, self.queue_name, project=self.project, limit=20, echo=True) # Test all messages, echo True, and uuid interaction = load_messages(10, self.queue_name, echo=True, - tenant=self.tenant, client_uuid="my_uuid") + project=self.project, + client_uuid="my_uuid") # Test all messages, echo True, uuid and marker - load_messages(5, self.queue_name, echo=True, tenant=self.tenant, + load_messages(5, self.queue_name, echo=True, project=self.project, marker=interaction.next(), client_uuid="my_uuid") def test_claim_effects(self): _insert_fixtures(self.controller, self.queue_name, - tenant=self.tenant, client_uuid="my_uuid", num=12) + project=self.project, client_uuid="my_uuid", num=12) meta = {"ttl": 70} another_cid, _ = self.claim_controller.create(self.queue_name, meta, - tenant=self.tenant) + project=self.project) cid, msgs = self.claim_controller.create(self.queue_name, meta, - tenant=self.tenant) + project=self.project) [msg1, msg2] = msgs # A wrong claim does not ensure the message deletion with testing.expected(storage.exceptions.NotPermitted): self.controller.delete(self.queue_name, msg1["id"], - tenant=self.tenant, + project=self.project, claim=another_cid) # Make sure a message can be deleted with a claim self.controller.delete(self.queue_name, msg1["id"], - tenant=self.tenant, + project=self.project, claim=cid) with testing.expected(storage.exceptions.DoesNotExist): self.controller.get(self.queue_name, msg1["id"], - tenant=self.tenant) + project=self.project) # Make sure such a deletion is idempotent self.controller.delete(self.queue_name, msg1["id"], - tenant=self.tenant, + project=self.project, claim=cid) # A non-existing claim does not ensure the message deletion self.claim_controller.delete(self.queue_name, cid, - tenant=self.tenant) + project=self.project) with testing.expected(storage.exceptions.NotPermitted): self.controller.delete(self.queue_name, msg2["id"], - tenant=self.tenant, + project=self.project, claim=cid) def test_expired_message(self): messages = [{'body': 3.14, 'ttl': 0}] [msgid] = self.controller.post(self.queue_name, messages, - tenant=self.tenant, + project=self.project, client_uuid='my_uuid') with testing.expected(storage.exceptions.DoesNotExist): self.controller.get(self.queue_name, msgid, - tenant=self.tenant) + project=self.project) countof = self.queue_controller.stats(self.queue_name, - tenant=self.tenant) + project=self.project) self.assertEquals(countof['messages']['free'], 0) def test_illformed_id(self): @@ -263,12 +264,12 @@ class MessageControllerTest(ControllerBaseTest): self.queue_controller.upsert('unused', {}, '480924') [msgid] = self.controller.post('unused', [{'body': {}, 'ttl': 10}], - tenant='480924', + project='480924', client_uuid='unused') with testing.expected(exceptions.NotPermitted): self.controller.delete('unused', msgid, - tenant='480924', + project='480924', claim='illformed') @@ -290,21 +291,21 @@ class ClaimControllerTest(ControllerBaseTest): self.queue_controller = self.driver.queue_controller self.message_controller = self.driver.message_controller self.queue_controller.upsert(self.queue_name, {}, - tenant=self.tenant) + project=self.project) def tearDown(self): - self.queue_controller.delete(self.queue_name, tenant=self.tenant) + self.queue_controller.delete(self.queue_name, project=self.project) super(ClaimControllerTest, self).tearDown() def test_claim_lifecycle(self): _insert_fixtures(self.message_controller, self.queue_name, - tenant=self.tenant, client_uuid="my_uuid", num=20) + project=self.project, client_uuid="my_uuid", num=20) meta = {"ttl": 70} # Make sure create works claim_id, messages = self.controller.create(self.queue_name, meta, - tenant=self.tenant, + project=self.project, limit=15) messages = list(messages) @@ -312,13 +313,13 @@ class ClaimControllerTest(ControllerBaseTest): # Ensure Queue stats countof = self.queue_controller.stats(self.queue_name, - tenant=self.tenant) + project=self.project) self.assertEqual(countof['messages']['claimed'], 15) self.assertEqual(countof['messages']['free'], 5) # Make sure get works claim, messages2 = self.controller.get(self.queue_name, claim_id, - tenant=self.tenant) + project=self.project) messages2 = list(messages2) self.assertEquals(len(messages2), 15) @@ -328,11 +329,11 @@ class ClaimControllerTest(ControllerBaseTest): new_meta = {"ttl": 100} self.controller.update(self.queue_name, claim_id, - new_meta, tenant=self.tenant) + new_meta, project=self.project) # Make sure update works claim, messages2 = self.controller.get(self.queue_name, claim_id, - tenant=self.tenant) + project=self.project) messages2 = list(messages2) self.assertEquals(len(messages2), 15) @@ -344,25 +345,25 @@ class ClaimControllerTest(ControllerBaseTest): # Make sure delete works self.controller.delete(self.queue_name, claim_id, - tenant=self.tenant) + project=self.project) self.assertRaises(storage.exceptions.ClaimDoesNotExist, self.controller.get, self.queue_name, - claim_id, tenant=self.tenant) + claim_id, project=self.project) def test_expired_claim(self): meta = {"ttl": 0} claim_id, messages = self.controller.create(self.queue_name, meta, - tenant=self.tenant) + project=self.project) with testing.expected(storage.exceptions.DoesNotExist): self.controller.get(self.queue_name, claim_id, - tenant=self.tenant) + project=self.project) with testing.expected(storage.exceptions.DoesNotExist): self.controller.update(self.queue_name, claim_id, - meta, tenant=self.tenant) + meta, project=self.project) def test_illformed_id(self): # any ill-formed IDs should be regarded as non-existing ones. @@ -375,7 +376,7 @@ class ClaimControllerTest(ControllerBaseTest): {'ttl': 40}, '480924') -def _insert_fixtures(controller, queue_name, tenant=None, +def _insert_fixtures(controller, queue_name, project=None, client_uuid=None, num=4): def messages(): @@ -386,4 +387,4 @@ def _insert_fixtures(controller, queue_name, tenant=None, "event": "Event number %s" % n }} controller.post(queue_name, messages(), - tenant=tenant, client_uuid=client_uuid) + project=project, client_uuid=client_uuid) diff --git a/marconi/tests/storage/test_base.py b/marconi/tests/storage/test_base.py index 9429d7848..61f7af686 100644 --- a/marconi/tests/storage/test_base.py +++ b/marconi/tests/storage/test_base.py @@ -35,24 +35,24 @@ class Driver(storage.DriverBase): class QueueController(storage.QueueBase): - def list(self, tenant=None): - super(QueueController, self).list(tenant) + def list(self, project=None): + super(QueueController, self).list(project) - def get(self, name, tenant=None): - super(QueueController, self).get(name, tenant=tenant) + def get(self, name, project=None): + super(QueueController, self).get(name, project=project) - def upsert(self, name, metadata, tenant=None): - super(QueueController, self).upsert(name, tenant=tenant, + def upsert(self, name, metadata, project=None): + super(QueueController, self).upsert(name, project=project, metadata=metadata) - def delete(self, name, tenant=None): - super(QueueController, self).delete(name, tenant=tenant) + def delete(self, name, project=None): + super(QueueController, self).delete(name, project=project) - def stats(self, name, tenant=None): - super(QueueController, self).stats(name, tenant=tenant) + def stats(self, name, project=None): + super(QueueController, self).stats(name, project=project) - def actions(self, name, tenant=None, marker=None, limit=10): - super(QueueController, self).actions(name, tenant=tenant, + def actions(self, name, project=None, marker=None, limit=10): + super(QueueController, self).actions(name, project=project, marker=marker, limit=limit) diff --git a/marconi/tests/storage/test_impl_mongodb.py b/marconi/tests/storage/test_impl_mongodb.py index 007cd5590..b8dec942c 100644 --- a/marconi/tests/storage/test_impl_mongodb.py +++ b/marconi/tests/storage/test_impl_mongodb.py @@ -61,7 +61,7 @@ class MongodbQueueTests(base.QueueControllerTest): def test_indexes(self): col = self.controller._col indexes = col.index_information() - self.assertIn("t_1_n_1", indexes) + self.assertIn("p_1_n_1", indexes) def test_messages_purged(self): queue_name = "test" @@ -119,18 +119,18 @@ class MongodbClaimTests(base.ClaimControllerTest): epoch = '000000000000000000000000' self.assertRaises(storage.exceptions.ClaimDoesNotExist, self.controller.get, self.queue_name, - epoch, tenant=self.tenant) + epoch, project=self.project) claim_id, messages = self.controller.create(self.queue_name, {"ttl": 1}, - tenant=self.tenant) + project=self.project) # Lets let it expire time.sleep(1) self.assertRaises(storage.exceptions.ClaimDoesNotExist, self.controller.update, self.queue_name, - claim_id, {}, tenant=self.tenant) + claim_id, {}, project=self.project) self.assertRaises(storage.exceptions.ClaimDoesNotExist, self.controller.update, self.queue_name, - claim_id, {}, tenant=self.tenant) + claim_id, {}, project=self.project) diff --git a/marconi/tests/util/faulty_storage.py b/marconi/tests/util/faulty_storage.py index 14bd9b020..049db7bf5 100644 --- a/marconi/tests/util/faulty_storage.py +++ b/marconi/tests/util/faulty_storage.py @@ -35,22 +35,22 @@ class QueueController(storage.QueueBase): def __init__(self, driver): pass - def list(self, tenant=None): + def list(self, project=None): raise NotImplementedError() - def get(self, name, tenant=None): + def get(self, name, project=None): raise NotImplementedError() - def upsert(self, name, metadata, tenant=None): + def upsert(self, name, metadata, project=None): raise NotImplementedError() - def delete(self, name, tenant=None): + def delete(self, name, project=None): raise NotImplementedError() - def stats(self, name, tenant=None): + def stats(self, name, project=None): raise NotImplementedError() - def actions(self, name, tenant=None, marker=None, limit=10): + def actions(self, name, project=None, marker=None, limit=10): raise NotImplementedError() @@ -58,16 +58,16 @@ class MessageController(storage.MessageBase): def __init__(self, driver): pass - def get(self, queue, tenant=None, message_id=None, + def get(self, queue, project=None, message_id=None, marker=None, echo=False, client_uuid=None): raise NotImplementedError() - def list(self, queue, tenant=None, marker=None, + def list(self, queue, project=None, marker=None, limit=10, echo=False, client_uuid=None): raise NotImplementedError() - def post(self, queue, messages, tenant=None): + def post(self, queue, messages, project=None): raise NotImplementedError() - def delete(self, queue, message_id, tenant=None, claim=None): + def delete(self, queue, message_id, project=None, claim=None): raise NotImplementedError() diff --git a/marconi/transport/wsgi/claims.py b/marconi/transport/wsgi/claims.py index 3274ad4f8..30fc3ee2e 100644 --- a/marconi/transport/wsgi/claims.py +++ b/marconi/transport/wsgi/claims.py @@ -31,7 +31,7 @@ class CollectionResource(object): def __init__(self, claim_controller): self.claim_ctrl = claim_controller - def on_post(self, req, resp, tenant_id, queue_name): + def on_post(self, req, resp, project_id, queue_name): if req.content_length is None or req.content_length == 0: raise falcon.HTTPBadRequest(_('Bad request'), _('Missing claim metadata.')) @@ -46,7 +46,7 @@ class CollectionResource(object): cid, msgs = self.claim_ctrl.create( queue_name, metadata=metadata, - tenant=tenant_id, + project=project_id, **kwargs) resp_msgs = list(msgs) @@ -83,12 +83,12 @@ class ItemResource(object): def __init__(self, claim_controller): self.claim_ctrl = claim_controller - def on_get(self, req, resp, tenant_id, queue_name, claim_id): + def on_get(self, req, resp, project_id, queue_name, claim_id): try: meta, msgs = self.claim_ctrl.get( queue_name, claim_id=claim_id, - tenant=tenant_id) + project=project_id) meta['messages'] = list(msgs) for msg in meta['messages']: @@ -112,7 +112,7 @@ class ItemResource(object): msg = _('Please try again in a few seconds.') raise falcon.HTTPServiceUnavailable(title, msg, 30) - def on_patch(self, req, resp, tenant_id, queue_name, claim_id): + def on_patch(self, req, resp, project_id, queue_name, claim_id): if req.content_length is None or req.content_length == 0: raise falcon.HTTPBadRequest(_('Bad request'), _('Missing claim metadata.')) @@ -122,7 +122,7 @@ class ItemResource(object): self.claim_ctrl.update(queue_name, claim_id=claim_id, metadata=metadata, - tenant=tenant_id) + project=project_id) resp.status = falcon.HTTP_204 @@ -139,11 +139,11 @@ class ItemResource(object): msg = _('Please try again in a few seconds.') raise falcon.HTTPServiceUnavailable(title, msg, 30) - def on_delete(self, req, resp, tenant_id, queue_name, claim_id): + def on_delete(self, req, resp, project_id, queue_name, claim_id): try: self.claim_ctrl.delete(queue_name, claim_id=claim_id, - tenant=tenant_id) + project=project_id) resp.status = falcon.HTTP_204 diff --git a/marconi/transport/wsgi/driver.py b/marconi/transport/wsgi/driver.py index 46741d8b3..2b607bda6 100644 --- a/marconi/transport/wsgi/driver.py +++ b/marconi/transport/wsgi/driver.py @@ -39,17 +39,17 @@ class Driver(transport.DriverBase): claim_item = transport.wsgi.claims.ItemResource(claim_controller) self.app = api = falcon.API() - api.add_route('/v1/{tenant_id}/queues', queue_collection) - api.add_route('/v1/{tenant_id}/queues/{queue_name}', queue_item) - api.add_route('/v1/{tenant_id}/queues/{queue_name}' + api.add_route('/v1/{project_id}/queues', queue_collection) + api.add_route('/v1/{project_id}/queues/{queue_name}', queue_item) + api.add_route('/v1/{project_id}/queues/{queue_name}' '/stats', stats_endpoint) - api.add_route('/v1/{tenant_id}/queues/{queue_name}' + api.add_route('/v1/{project_id}/queues/{queue_name}' '/messages', msg_collection) - api.add_route('/v1/{tenant_id}/queues/{queue_name}' + api.add_route('/v1/{project_id}/queues/{queue_name}' '/messages/{message_id}', msg_item) - api.add_route('/v1/{tenant_id}/queues/{queue_name}' + api.add_route('/v1/{project_id}/queues/{queue_name}' '/claims', claim_collection) - api.add_route('/v1/{tenant_id}/queues/{queue_name}' + api.add_route('/v1/{project_id}/queues/{queue_name}' '/claims/{claim_id}', claim_item) def listen(self): diff --git a/marconi/transport/wsgi/messages.py b/marconi/transport/wsgi/messages.py index d2ecc10f6..d7f356c96 100644 --- a/marconi/transport/wsgi/messages.py +++ b/marconi/transport/wsgi/messages.py @@ -31,7 +31,7 @@ class CollectionResource(object): def __init__(self, message_controller): self.msg_ctrl = message_controller - def on_post(self, req, resp, tenant_id, queue_name): + def on_post(self, req, resp, project_id, queue_name): uuid = req.get_header('Client-ID', required=True) if req.content_length is None or req.content_length == 0: @@ -54,7 +54,7 @@ class CollectionResource(object): ls = filtered(helpers.read_json(req.stream)) ns = self.msg_ctrl.post(queue_name, messages=ls, - tenant=tenant_id, + project=project_id, client_uuid=uuid) resp.location = req.path + '/' + ','.join( @@ -74,7 +74,7 @@ class CollectionResource(object): msg = _('Please try again in a few seconds.') raise falcon.HTTPServiceUnavailable(title, msg, 30) - def on_get(self, req, resp, tenant_id, queue_name): + def on_get(self, req, resp, project_id, queue_name): uuid = req.get_header('Client-ID', required=True) #TODO(zyuan): where do we define the limits? @@ -86,7 +86,7 @@ class CollectionResource(object): try: interaction = self.msg_ctrl.list(queue_name, - tenant=tenant_id, + project=project_id, client_uuid=uuid, **kwargs) resp_dict = { @@ -129,11 +129,11 @@ class ItemResource(object): def __init__(self, message_controller): self.msg_ctrl = message_controller - def on_get(self, req, resp, tenant_id, queue_name, message_id): + def on_get(self, req, resp, project_id, queue_name, message_id): try: msg = self.msg_ctrl.get(queue_name, message_id=message_id, - tenant=tenant_id) + project=project_id) msg['href'] = req.path del msg['id'] @@ -151,11 +151,11 @@ class ItemResource(object): msg = _('Please try again in a few seconds.') raise falcon.HTTPServiceUnavailable(title, msg, 30) - def on_delete(self, req, resp, tenant_id, queue_name, message_id): + def on_delete(self, req, resp, project_id, queue_name, message_id): try: self.msg_ctrl.delete(queue_name, message_id=message_id, - tenant=tenant_id, + project=project_id, claim=req.get_param('claim_id')) resp.status = falcon.HTTP_204 diff --git a/marconi/transport/wsgi/queues.py b/marconi/transport/wsgi/queues.py index 8032653f1..9ba569144 100644 --- a/marconi/transport/wsgi/queues.py +++ b/marconi/transport/wsgi/queues.py @@ -32,7 +32,7 @@ class ItemResource(object): def __init__(self, queue_controller): self.queue_ctrl = queue_controller - def on_put(self, req, resp, tenant_id, queue_name): + def on_put(self, req, resp, project_id, queue_name): if req.content_length > transport.MAX_QUEUE_METADATA_SIZE: raise falcon.HTTPBadRequest(_('Bad request'), _('Queue metadata size is too large.')) @@ -45,7 +45,7 @@ class ItemResource(object): metadata = _filtered(helpers.read_json(req.stream)) created = self.queue_ctrl.upsert(queue_name, metadata=metadata, - tenant=tenant_id) + project=project_id) except helpers.MalformedJSON: raise falcon.HTTPBadRequest(_('Bad request'), _('Malformed queue metadata.')) @@ -59,10 +59,10 @@ class ItemResource(object): resp.status = falcon.HTTP_201 if created else falcon.HTTP_204 resp.location = req.path - def on_get(self, req, resp, tenant_id, queue_name): + def on_get(self, req, resp, project_id, queue_name): try: doc = self.queue_ctrl.get(queue_name, - tenant=tenant_id) + project=project_id) resp.content_location = req.relative_uri resp.body = helpers.to_json(doc) @@ -76,10 +76,10 @@ class ItemResource(object): msg = _('Please try again in a few seconds.') raise falcon.HTTPServiceUnavailable(title, msg, 30) - def on_delete(self, req, resp, tenant_id, queue_name): + def on_delete(self, req, resp, project_id, queue_name): try: self.queue_ctrl.delete(queue_name, - tenant=tenant_id) + project=project_id) except Exception as ex: LOG.exception(ex) @@ -97,7 +97,7 @@ class CollectionResource(object): def __init__(self, queue_controller): self.queue_ctrl = queue_controller - def on_get(self, req, resp, tenant_id): + def on_get(self, req, resp, project_id): #TODO(zyuan): where do we define the limits? kwargs = helpers.purge({ 'marker': req.get_param('marker'), @@ -106,7 +106,7 @@ class CollectionResource(object): }) try: - interaction = self.queue_ctrl.list(tenant=tenant_id, **kwargs) + interaction = self.queue_ctrl.list(project=project_id, **kwargs) resp_dict = { 'queues': list(interaction.next()) diff --git a/marconi/transport/wsgi/stats.py b/marconi/transport/wsgi/stats.py index 19b65e816..e480b2835 100644 --- a/marconi/transport/wsgi/stats.py +++ b/marconi/transport/wsgi/stats.py @@ -30,10 +30,10 @@ class Resource(object): def __init__(self, queue_controller): self.queue_ctrl = queue_controller - def on_get(self, req, resp, tenant_id, queue_name): + def on_get(self, req, resp, project_id, queue_name): try: resp_dict = self.queue_ctrl.stats(queue_name, - tenant=tenant_id) + project=project_id) resp.content_location = req.path resp.body = helpers.to_json(resp_dict)