Rename tenant into project

Keystone will replace tenant with project during Havana. Some of this
work already started and is moving forward. We can go ahead and replace
it now before the tree gets bigger.

Change-Id: I1d0494112f6a65cc4ee5390eee782e24790ca5b7
This commit is contained in:
Flaper Fesp 2013-05-07 01:08:33 +02:00
parent ee59a0f396
commit ddb24ff5b1
13 changed files with 249 additions and 246 deletions

View File

@ -69,12 +69,12 @@ class QueueBase(ControllerBase):
__metaclass__ = abc.ABCMeta __metaclass__ = abc.ABCMeta
@abc.abstractmethod @abc.abstractmethod
def list(self, tenant=None, marker=None, def list(self, project=None, marker=None,
limit=10, detailed=False): limit=10, detailed=False):
""" """
Base method for listing queues. Base method for listing queues.
:param tenant: Tenant id :param project: Project id
:param marker: The last queue name :param marker: The last queue name
:param limit: (Default 10) Max number :param limit: (Default 10) Max number
:param detailed: Whether metadata is included :param detailed: Whether metadata is included
@ -85,12 +85,12 @@ class QueueBase(ControllerBase):
raise NotImplementedError raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def get(self, name, tenant=None): def get(self, name, project=None):
""" """
Base method for queue retrieval. Base method for queue retrieval.
:param name: The queue name :param name: The queue name
:param tenant: Tenant id :param project: Project id
:returns: Dictionary containing queue metadata :returns: Dictionary containing queue metadata
:raises: DoesNotExist :raises: DoesNotExist
@ -98,14 +98,14 @@ class QueueBase(ControllerBase):
raise NotImplementedError raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def upsert(self, name, metadata, tenant=None): def upsert(self, name, metadata, project=None):
""" """
This methods handles both creates and updates This methods handles both creates and updates
operations for queues. operations for queues.
:param name: The queue name :param name: The queue name
:param metadata: Arbitrary metadata :param metadata: Arbitrary metadata
:param tenant: Tenant id :param project: Project id
:returns: True if a queue was created and False :returns: True if a queue was created and False
if it was updated. if it was updated.
""" """
@ -113,34 +113,34 @@ class QueueBase(ControllerBase):
assert isinstance(metadata, dict), msg assert isinstance(metadata, dict), msg
@abc.abstractmethod @abc.abstractmethod
def delete(self, name, tenant=None): def delete(self, name, project=None):
""" """
Base method for queue deletion. Base method for queue deletion.
:param name: The queue name :param name: The queue name
:param tenant: Tenant id :param project: Project id
""" """
raise NotImplementedError raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def stats(self, name, tenant=None): def stats(self, name, project=None):
""" """
Base method for queue stats. Base method for queue stats.
:param name: The queue name :param name: The queue name
:param tenant: Tenant id :param project: Project id
:returns: Dictionary with the :returns: Dictionary with the
queue stats queue stats
""" """
raise NotImplementedError raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def actions(self, name, tenant=None, marker=None, limit=10): def actions(self, name, project=None, marker=None, limit=10):
""" """
Base method for queue actions. Base method for queue actions.
:param name: Queue name :param name: Queue name
:param tenant: Tenant id :param project: Project id
:param marker: Tail identifier :param marker: Tail identifier
:param limit: (Default 10) Max number :param limit: (Default 10) Max number
of messages to retrieve. of messages to retrieve.
@ -157,14 +157,14 @@ class MessageBase(ControllerBase):
__metaclass__ = abc.ABCMeta __metaclass__ = abc.ABCMeta
@abc.abstractmethod @abc.abstractmethod
def list(self, queue, tenant=None, marker=None, def list(self, queue, project=None, marker=None,
limit=10, echo=False, client_uuid=None): limit=10, echo=False, client_uuid=None):
""" """
Base message list method Base message list method
:param queue: Name of the queue to get the :param queue: Name of the queue to get the
message from. message from.
:param tenant: Tenant id :param project: Project id
:param marker: Tail identifier :param marker: Tail identifier
:param limit: (Default 10) specifies up to 100 :param limit: (Default 10) specifies up to 100
messages to return. messages to return.
@ -178,13 +178,13 @@ class MessageBase(ControllerBase):
""" """
raise NotImplementedError raise NotImplementedError
def get(self, queue, message_id, tenant=None): def get(self, queue, message_id, project=None):
""" """
Base message get method Base message get method
:param queue: Name of the queue to get the :param queue: Name of the queue to get the
message from. message from.
:param tenant: Tenant id :param project: Project id
:param message_id: Message ID :param message_id: Message ID
:returns: Dictionary containing message data :returns: Dictionary containing message data
@ -193,7 +193,7 @@ class MessageBase(ControllerBase):
raise NotImplementedError raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def post(self, queue, messages, client_uuid, tenant=None): def post(self, queue, messages, client_uuid, project=None):
""" """
Base message post method Base message post method
@ -205,21 +205,21 @@ class MessageBase(ControllerBase):
:param messages: Messages to post to queue, :param messages: Messages to post to queue,
it can be a list of 1 or more elements. it can be a list of 1 or more elements.
:param client_uuid: Client's unique identifier. :param client_uuid: Client's unique identifier.
:param tenant: Tenant id :param project: Project id
:returns: List of message ids :returns: List of message ids
""" """
raise NotImplementedError raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def delete(self, queue, message_id, tenant=None, claim=None): def delete(self, queue, message_id, project=None, claim=None):
""" """
Base message delete method Base message delete method
:param queue: Name of the queue to post :param queue: Name of the queue to post
message to. message to.
:param message_id: Message to be deleted :param message_id: Message to be deleted
:param tenant: Tenant id :param project: Project id
:param claim: Claim this message :param claim: Claim this message
belongs to. When specified, claim must belongs to. When specified, claim must
be valid and message_id must belong to be valid and message_id must belong to
@ -233,14 +233,14 @@ class ClaimBase(ControllerBase):
__metaclass__ = abc.ABCMeta __metaclass__ = abc.ABCMeta
@abc.abstractmethod @abc.abstractmethod
def get(self, queue, claim_id, tenant=None): def get(self, queue, claim_id, project=None):
""" """
Base claim get method Base claim get method
:param queue: Name of the queue this :param queue: Name of the queue this
claim belongs to. claim belongs to.
:param claim_id: The claim id :param claim_id: The claim id
:param tenant: Tenant id :param project: Project id
:returns: (Claim's metadata, claimed messages) :returns: (Claim's metadata, claimed messages)
:raises: DoesNotExist :raises: DoesNotExist
@ -248,7 +248,7 @@ class ClaimBase(ControllerBase):
raise NotImplementedError raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def create(self, queue, metadata, tenant=None, limit=10): def create(self, queue, metadata, project=None, limit=10):
""" """
Base claim create method Base claim create method
@ -256,7 +256,7 @@ class ClaimBase(ControllerBase):
claim belongs to. claim belongs to.
:param metadata: Claim's parameters :param metadata: Claim's parameters
to be stored. to be stored.
:param tenant: Tenant id :param project: Project id
:param limit: (Default 10) Max number :param limit: (Default 10) Max number
of messages to claim. of messages to claim.
@ -265,7 +265,7 @@ class ClaimBase(ControllerBase):
raise NotImplementedError raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def update(self, queue, claim_id, metadata, tenant=None): def update(self, queue, claim_id, metadata, project=None):
""" """
Base claim update method Base claim update method
@ -274,18 +274,18 @@ class ClaimBase(ControllerBase):
:param claim_id: Claim to be updated :param claim_id: Claim to be updated
:param metadata: Claim's parameters :param metadata: Claim's parameters
to be updated. to be updated.
:param tenant: Tenant id :param project: Project id
""" """
raise NotImplementedError raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def delete(self, queue, claim_id, tenant=None): def delete(self, queue, claim_id, project=None):
""" """
Base claim delete method Base claim delete method
:param queue: Name of the queue this :param queue: Name of the queue this
claim belongs to. claim belongs to.
:param claim_id: Claim to be deleted :param claim_id: Claim to be deleted
:param tenant: Tenant id :param project: Project id
""" """
raise NotImplementedError raise NotImplementedError

View File

@ -24,27 +24,27 @@ class NotPermitted(Exception):
class QueueDoesNotExist(DoesNotExist): class QueueDoesNotExist(DoesNotExist):
def __init__(self, name, tenant): def __init__(self, name, project):
msg = (_("Queue %(name)s does not exist for tenant %(tenant)s") % msg = (_("Queue %(name)s does not exist for project %(project)s") %
dict(name=name, tenant=tenant)) dict(name=name, project=project))
super(QueueDoesNotExist, self).__init__(msg) super(QueueDoesNotExist, self).__init__(msg)
class MessageDoesNotExist(DoesNotExist): class MessageDoesNotExist(DoesNotExist):
def __init__(self, mid, queue, tenant): def __init__(self, mid, queue, project):
msg = (_("Message %(mid)s does not exist in " msg = (_("Message %(mid)s does not exist in "
"queue %(queue)s of tenant %(tenant)s") % "queue %(queue)s of project %(project)s") %
dict(mid=mid, queue=queue, tenant=tenant)) dict(mid=mid, queue=queue, project=project))
super(MessageDoesNotExist, self).__init__(msg) super(MessageDoesNotExist, self).__init__(msg)
class ClaimDoesNotExist(DoesNotExist): class ClaimDoesNotExist(DoesNotExist):
def __init__(self, cid, queue, tenant): def __init__(self, cid, queue, project):
msg = (_("Claim %(cid)s does not exist in " msg = (_("Claim %(cid)s does not exist in "
"queue %(queue)s of tenant %(tenant)s") % "queue %(queue)s of project %(project)s") %
dict(cid=cid, queue=queue, tenant=tenant)) dict(cid=cid, queue=queue, project=project))
super(ClaimDoesNotExist, self).__init__(msg) super(ClaimDoesNotExist, self).__init__(msg)

View File

@ -38,7 +38,7 @@ class QueueController(storage.QueueBase):
Queues: Queues:
Name Field Name Field
---------------- ----------------
tenant -> t project -> p
metadata -> m metadata -> m
name -> n name -> n
@ -49,15 +49,15 @@ class QueueController(storage.QueueBase):
self._col = self.driver.db["queues"] self._col = self.driver.db["queues"]
# NOTE(flaper87): This creates a unique compound index for # NOTE(flaper87): This creates a unique compound index for
# tenant and name. Using tenant as the first field of the # project and name. Using project as the first field of the
# index allows for querying by tenant and tenant+name. # index allows for querying by project and project+name.
# This is also useful for retrieving the queues list for # This is also useful for retrieving the queues list for
# as specific tenant, for example. Order Matters! # as specific project, for example. Order Matters!
self._col.ensure_index([("t", 1), ("n", 1)], unique=True) self._col.ensure_index([("p", 1), ("n", 1)], unique=True)
def list(self, tenant=None, marker=None, def list(self, project=None, marker=None,
limit=10, detailed=False): limit=10, detailed=False):
query = {"t": tenant} query = {"p": project}
if marker: if marker:
query["n"] = {"$gt": marker} query["n"] = {"$gt": marker}
@ -80,29 +80,29 @@ class QueueController(storage.QueueBase):
yield normalizer(cursor) yield normalizer(cursor)
yield marker_name["next"] yield marker_name["next"]
def _get(self, name, tenant=None, fields={"m": 1, "_id": 0}): def _get(self, name, project=None, fields={"m": 1, "_id": 0}):
queue = self._col.find_one({"t": tenant, "n": name}, fields=fields) queue = self._col.find_one({"p": project, "n": name}, fields=fields)
if queue is None: if queue is None:
raise exceptions.QueueDoesNotExist(name, tenant) raise exceptions.QueueDoesNotExist(name, project)
return queue return queue
def get_id(self, name, tenant=None): def get_id(self, name, project=None):
""" """
Just like `get` method but returns the queue's id Just like `get` method but returns the queue's id
:returns: Queue's `ObjectId` :returns: Queue's `ObjectId`
""" """
queue = self._get(name, tenant, fields=["_id"]) queue = self._get(name, project, fields=["_id"])
return queue.get("_id") return queue.get("_id")
def get(self, name, tenant=None): def get(self, name, project=None):
queue = self._get(name, tenant) queue = self._get(name, project)
return queue.get("m", {}) return queue.get("m", {})
def upsert(self, name, metadata, tenant=None): def upsert(self, name, metadata, project=None):
super(QueueController, self).upsert(name, metadata, tenant) super(QueueController, self).upsert(name, metadata, project)
rst = self._col.update({"t": tenant, "n": name}, rst = self._col.update({"p": project, "n": name},
{"$set": {"m": metadata}}, {"$set": {"m": metadata}},
multi=False, multi=False,
upsert=True, upsert=True,
@ -110,12 +110,12 @@ class QueueController(storage.QueueBase):
return not rst["updatedExisting"] return not rst["updatedExisting"]
def delete(self, name, tenant=None): def delete(self, name, project=None):
self.driver.message_controller.purge_queue(name, tenant) self.driver.message_controller.purge_queue(name, project)
self._col.remove({"t": tenant, "n": name}) self._col.remove({"p": project, "n": name})
def stats(self, name, tenant=None): def stats(self, name, project=None):
qid = self.get_id(name, tenant) qid = self.get_id(name, project)
msg_ctrl = self.driver.message_controller msg_ctrl = self.driver.message_controller
active = msg_ctrl.active(qid) active = msg_ctrl.active(qid)
claimed = msg_ctrl.claimed(qid) claimed = msg_ctrl.claimed(qid)
@ -128,7 +128,7 @@ class QueueController(storage.QueueBase):
} }
} }
def actions(self, name, tenant=None, marker=None, limit=10): def actions(self, name, project=None, marker=None, limit=10):
raise NotImplementedError raise NotImplementedError
@ -176,9 +176,9 @@ class MessageController(storage.MessageBase):
("c.e", 1), ("c.e", 1),
("_id", -1)], background=True) ("_id", -1)], background=True)
def _get_queue_id(self, queue, tenant): def _get_queue_id(self, queue, project):
queue_controller = self.driver.queue_controller queue_controller = self.driver.queue_controller
return queue_controller.get_id(queue, tenant) return queue_controller.get_id(queue, project)
def all(self): def all(self):
return self._col.find() return self._col.find()
@ -247,11 +247,11 @@ class MessageController(storage.MessageBase):
{"$set": {"c": {"id": None, "e": 0}}}, {"$set": {"c": {"id": None, "e": 0}}},
upsert=False, multi=True) upsert=False, multi=True)
def list(self, queue, tenant=None, marker=None, def list(self, queue, project=None, marker=None,
limit=10, echo=False, client_uuid=None): limit=10, echo=False, client_uuid=None):
try: try:
qid = self._get_queue_id(queue, tenant) qid = self._get_queue_id(queue, project)
messages = self.active(qid, marker, echo, client_uuid) messages = self.active(qid, marker, echo, client_uuid)
except ValueError: except ValueError:
return return
@ -276,18 +276,18 @@ class MessageController(storage.MessageBase):
yield utils.HookedCursor(messages, denormalizer) yield utils.HookedCursor(messages, denormalizer)
yield str(marker_id['next']) yield str(marker_id['next'])
def get(self, queue, message_id, tenant=None): def get(self, queue, message_id, project=None):
# Base query, always check expire time # Base query, always check expire time
try: try:
mid = utils.to_oid(message_id) mid = utils.to_oid(message_id)
except ValueError: except ValueError:
raise exceptions.MessageDoesNotExist(message_id, queue, tenant) raise exceptions.MessageDoesNotExist(message_id, queue, project)
now = timeutils.utcnow() now = timeutils.utcnow()
query = { query = {
"q": self._get_queue_id(queue, tenant), "q": self._get_queue_id(queue, project),
"e": {"$gt": now}, "e": {"$gt": now},
"_id": mid "_id": mid
} }
@ -295,7 +295,7 @@ class MessageController(storage.MessageBase):
message = self._col.find_one(query) message = self._col.find_one(query)
if message is None: if message is None:
raise exceptions.MessageDoesNotExist(message_id, queue, tenant) raise exceptions.MessageDoesNotExist(message_id, queue, project)
oid = message["_id"] oid = message["_id"]
age = now - utils.oid_utc(oid) age = now - utils.oid_utc(oid)
@ -307,8 +307,8 @@ class MessageController(storage.MessageBase):
"body": message["b"], "body": message["b"],
} }
def post(self, queue, messages, client_uuid, tenant=None): def post(self, queue, messages, client_uuid, project=None):
qid = self._get_queue_id(queue, tenant) qid = self._get_queue_id(queue, project)
now = timeutils.utcnow() now = timeutils.utcnow()
@ -329,7 +329,7 @@ class MessageController(storage.MessageBase):
ids = self._col.insert(denormalizer(messages)) ids = self._col.insert(denormalizer(messages))
return map(str, ids) return map(str, ids)
def delete(self, queue, message_id, tenant=None, claim=None): def delete(self, queue, message_id, project=None, claim=None):
try: try:
try: try:
mid = utils.to_oid(message_id) mid = utils.to_oid(message_id)
@ -337,7 +337,7 @@ class MessageController(storage.MessageBase):
return return
query = { query = {
"q": self._get_queue_id(queue, tenant), "q": self._get_queue_id(queue, project),
"_id": mid "_id": mid
} }
@ -365,9 +365,9 @@ class MessageController(storage.MessageBase):
except exceptions.QueueDoesNotExist: except exceptions.QueueDoesNotExist:
pass pass
def purge_queue(self, queue, tenant=None): def purge_queue(self, queue, project=None):
try: try:
qid = self._get_queue_id(queue, tenant) qid = self._get_queue_id(queue, project)
self._col.remove({"q": qid}, w=0) self._col.remove({"q": qid}, w=0)
except exceptions.QueueDoesNotExist: except exceptions.QueueDoesNotExist:
pass pass
@ -393,15 +393,15 @@ class ClaimController(storage.ClaimBase):
the claim id and it's expiration timestamp. the claim id and it's expiration timestamp.
""" """
def _get_queue_id(self, queue, tenant): def _get_queue_id(self, queue, project):
queue_controller = self.driver.queue_controller queue_controller = self.driver.queue_controller
return queue_controller.get_id(queue, tenant) return queue_controller.get_id(queue, project)
def get(self, queue, claim_id, tenant=None): def get(self, queue, claim_id, project=None):
msg_ctrl = self.driver.message_controller msg_ctrl = self.driver.message_controller
# Check whether the queue exists or not # Check whether the queue exists or not
qid = self._get_queue_id(queue, tenant) qid = self._get_queue_id(queue, project)
# Base query, always check expire time # Base query, always check expire time
now = timeutils.utcnow() now = timeutils.utcnow()
@ -435,11 +435,11 @@ class ClaimController(storage.ClaimBase):
"id": str(claim["id"]), "id": str(claim["id"]),
} }
except StopIteration: except StopIteration:
raise exceptions.ClaimDoesNotExist(cid, queue, tenant) raise exceptions.ClaimDoesNotExist(cid, queue, project)
return (claim, messages) return (claim, messages)
def create(self, queue, metadata, tenant=None, limit=10): def create(self, queue, metadata, project=None, limit=10):
""" """
This implementation was done in a best-effort fashion. This implementation was done in a best-effort fashion.
In order to create a claim we need to get a list In order to create a claim we need to get a list
@ -460,7 +460,7 @@ class ClaimController(storage.ClaimBase):
# We don't need the qid here but # We don't need the qid here but
# we need to verify it exists. # we need to verify it exists.
qid = self._get_queue_id(queue, tenant) qid = self._get_queue_id(queue, project)
ttl = int(metadata.get("ttl", 60)) ttl = int(metadata.get("ttl", 60))
oid = objectid.ObjectId() oid = objectid.ObjectId()
@ -513,14 +513,14 @@ class ClaimController(storage.ClaimBase):
upsert=False, multi=True) upsert=False, multi=True)
if updated != 0: if updated != 0:
claim, messages = self.get(queue, oid, tenant=tenant) claim, messages = self.get(queue, oid, project=project)
return (str(oid), messages) return (str(oid), messages)
def update(self, queue, claim_id, metadata, tenant=None): def update(self, queue, claim_id, metadata, project=None):
try: try:
cid = utils.to_oid(claim_id) cid = utils.to_oid(claim_id)
except ValueError: except ValueError:
raise exceptions.ClaimDoesNotExist(claim_id, queue, tenant) raise exceptions.ClaimDoesNotExist(claim_id, queue, project)
now = timeutils.utcnow() now = timeutils.utcnow()
ttl = int(metadata.get("ttl", 60)) ttl = int(metadata.get("ttl", 60))
@ -532,14 +532,14 @@ class ClaimController(storage.ClaimBase):
msg = _("New ttl will make the claim expires") msg = _("New ttl will make the claim expires")
raise ValueError(msg) raise ValueError(msg)
qid = self._get_queue_id(queue, tenant) qid = self._get_queue_id(queue, project)
msg_ctrl = self.driver.message_controller msg_ctrl = self.driver.message_controller
claimed = msg_ctrl.claimed(qid, cid, expires=now, limit=1) claimed = msg_ctrl.claimed(qid, cid, expires=now, limit=1)
try: try:
claimed.next() claimed.next()
except StopIteration: except StopIteration:
raise exceptions.ClaimDoesNotExist(claim_id, queue, tenant) raise exceptions.ClaimDoesNotExist(claim_id, queue, project)
meta = { meta = {
"id": cid, "id": cid,
@ -561,6 +561,6 @@ class ClaimController(storage.ClaimBase):
{"$set": {"e": expires, "t": ttl}}, {"$set": {"e": expires, "t": ttl}},
upsert=False, multi=True) upsert=False, multi=True)
def delete(self, queue, claim_id, tenant=None): def delete(self, queue, claim_id, project=None):
msg_ctrl = self.driver.message_controller msg_ctrl = self.driver.message_controller
msg_ctrl.unclaim(claim_id) msg_ctrl.unclaim(claim_id)

View File

@ -26,23 +26,23 @@ class Queue(base.QueueBase):
if not exists if not exists
Queues ( Queues (
id INTEGER, id INTEGER,
tenant TEXT, project TEXT,
name TEXT, name TEXT,
metadata DOCUMENT, metadata DOCUMENT,
PRIMARY KEY(id), PRIMARY KEY(id),
UNIQUE(tenant, name) UNIQUE(project, name)
) )
''') ''')
def list(self, tenant, marker=None, def list(self, project, marker=None,
limit=10, detailed=False): limit=10, detailed=False):
sql = ((''' sql = (('''
select name from Queues''' if not detailed select name from Queues''' if not detailed
else ''' else '''
select name, metadata from Queues''') + select name, metadata from Queues''') +
''' '''
where tenant = ?''') where project = ?''')
args = [tenant] args = [project]
if marker: if marker:
sql += ''' sql += '''
@ -67,37 +67,37 @@ class Queue(base.QueueBase):
yield it() yield it()
yield marker_name['next'] yield marker_name['next']
def get(self, name, tenant): def get(self, name, project):
try: try:
return self.driver.get(''' return self.driver.get('''
select metadata from Queues select metadata from Queues
where tenant = ? and name = ?''', tenant, name)[0] where project = ? and name = ?''', project, name)[0]
except _NoResult: except _NoResult:
raise exceptions.QueueDoesNotExist(name, tenant) raise exceptions.QueueDoesNotExist(name, project)
def upsert(self, name, metadata, tenant): def upsert(self, name, metadata, project):
with self.driver('immediate'): with self.driver('immediate'):
previous_record = self.driver.run(''' previous_record = self.driver.run('''
select id from Queues select id from Queues
where tenant = ? and name = ? where project = ? and name = ?
''', tenant, name).fetchone() ''', project, name).fetchone()
self.driver.run(''' self.driver.run('''
replace into Queues replace into Queues
values (null, ?, ?, ?) values (null, ?, ?, ?)
''', tenant, name, self.driver.pack(metadata)) ''', project, name, self.driver.pack(metadata))
return previous_record is None return previous_record is None
def delete(self, name, tenant): def delete(self, name, project):
self.driver.run(''' self.driver.run('''
delete from Queues delete from Queues
where tenant = ? and name = ?''', tenant, name) where project = ? and name = ?''', project, name)
def stats(self, name, tenant): def stats(self, name, project):
with self.driver('deferred'): with self.driver('deferred'):
qid = _get_qid(self.driver, name, tenant) qid = _get_qid(self.driver, name, project)
claimed, free = self.driver.get(''' claimed, free = self.driver.get('''
select * from select * from
(select count(msgid) (select count(msgid)
@ -121,7 +121,7 @@ class Queue(base.QueueBase):
'actions': 0, 'actions': 0,
} }
def actions(self, name, tenant, marker=None, limit=10): def actions(self, name, project, marker=None, limit=10):
raise NotImplementedError raise NotImplementedError
@ -143,15 +143,15 @@ class Message(base.MessageBase):
) )
''') ''')
def get(self, queue, message_id, tenant): def get(self, queue, message_id, project):
try: try:
content, ttl, age = self.driver.get(''' content, ttl, age = self.driver.get('''
select content, ttl, julianday() * 86400.0 - created select content, ttl, julianday() * 86400.0 - created
from Queues as Q join Messages as M from Queues as Q join Messages as M
on qid = Q.id on qid = Q.id
where ttl > julianday() * 86400.0 - created where ttl > julianday() * 86400.0 - created
and M.id = ? and tenant = ? and name = ? and M.id = ? and project = ? and name = ?
''', _msgid_decode(message_id), tenant, queue) ''', _msgid_decode(message_id), project, queue)
return { return {
'id': message_id, 'id': message_id,
@ -161,9 +161,9 @@ class Message(base.MessageBase):
} }
except (_NoResult, _BadID): except (_NoResult, _BadID):
raise exceptions.MessageDoesNotExist(message_id, queue, tenant) raise exceptions.MessageDoesNotExist(message_id, queue, project)
def list(self, queue, tenant, marker=None, def list(self, queue, project, marker=None,
limit=10, echo=False, client_uuid=None): limit=10, echo=False, client_uuid=None):
with self.driver('deferred'): with self.driver('deferred'):
try: try:
@ -172,7 +172,7 @@ class Message(base.MessageBase):
from Messages from Messages
where ttl > julianday() * 86400.0 - created where ttl > julianday() * 86400.0 - created
and qid = ?''' and qid = ?'''
args = [_get_qid(self.driver, queue, tenant)] args = [_get_qid(self.driver, queue, project)]
if not echo: if not echo:
sql += ''' sql += '''
@ -207,9 +207,9 @@ class Message(base.MessageBase):
except _BadID: except _BadID:
return return
def post(self, queue, messages, client_uuid, tenant): def post(self, queue, messages, client_uuid, project):
with self.driver('immediate'): with self.driver('immediate'):
qid = _get_qid(self.driver, queue, tenant) qid = _get_qid(self.driver, queue, project)
# cleanup all expired messages in this queue # cleanup all expired messages in this queue
@ -237,7 +237,7 @@ class Message(base.MessageBase):
return map(_msgid_encode, range(unused, my['newid'])) return map(_msgid_encode, range(unused, my['newid']))
def delete(self, queue, message_id, tenant, claim=None): def delete(self, queue, message_id, project, claim=None):
try: try:
id = _msgid_decode(message_id) id = _msgid_decode(message_id)
@ -246,8 +246,8 @@ class Message(base.MessageBase):
delete from Messages delete from Messages
where id = ? where id = ?
and qid = (select id from Queues and qid = (select id from Queues
where tenant = ? and name = ?) where project = ? and name = ?)
''', id, tenant, queue) ''', id, project, queue)
return return
with self.driver('immediate'): with self.driver('immediate'):
@ -256,8 +256,8 @@ class Message(base.MessageBase):
from Queues as Q join Messages as M from Queues as Q join Messages as M
on qid = Q.id on qid = Q.id
where ttl > julianday() * 86400.0 - created where ttl > julianday() * 86400.0 - created
and M.id = ? and tenant = ? and name = ? and M.id = ? and project = ? and name = ?
''', id, tenant, queue) ''', id, project, queue)
if not message_exists: if not message_exists:
return return
@ -312,7 +312,7 @@ class Claim(base.ClaimBase):
) )
''') ''')
def get(self, queue, claim_id, tenant): def get(self, queue, claim_id, project):
with self.driver('deferred'): with self.driver('deferred'):
try: try:
id, ttl, age = self.driver.get(''' id, ttl, age = self.driver.get('''
@ -320,8 +320,8 @@ class Claim(base.ClaimBase):
from Queues as Q join Claims as C from Queues as Q join Claims as C
on Q.id = C.qid on Q.id = C.qid
where C.ttl > julianday() * 86400.0 - C.created where C.ttl > julianday() * 86400.0 - C.created
and C.id = ? and tenant = ? and name = ? and C.id = ? and project = ? and name = ?
''', _cid_decode(claim_id), tenant, queue) ''', _cid_decode(claim_id), project, queue)
return ( return (
{ {
@ -333,11 +333,11 @@ class Claim(base.ClaimBase):
) )
except (_NoResult, _BadID): except (_NoResult, _BadID):
raise exceptions.ClaimDoesNotExist(claim_id, queue, tenant) raise exceptions.ClaimDoesNotExist(claim_id, queue, project)
def create(self, queue, metadata, tenant, limit=10): def create(self, queue, metadata, project, limit=10):
with self.driver('immediate'): with self.driver('immediate'):
qid = _get_qid(self.driver, queue, tenant) qid = _get_qid(self.driver, queue, project)
# cleanup all expired claims in this queue # cleanup all expired claims in this queue
@ -383,7 +383,7 @@ class Claim(base.ClaimBase):
'body': content, 'body': content,
} }
def update(self, queue, claim_id, metadata, tenant): def update(self, queue, claim_id, metadata, project):
try: try:
id = _cid_decode(claim_id) id = _cid_decode(claim_id)
@ -397,16 +397,18 @@ class Claim(base.ClaimBase):
where ttl > julianday() * 86400.0 - created where ttl > julianday() * 86400.0 - created
and id = ? and id = ?
and qid = (select id from Queues and qid = (select id from Queues
where tenant = ? and name = ?) where project = ? and name = ?)
''', metadata['ttl'], id, tenant, queue) ''', metadata['ttl'], id, project, queue)
if not self.driver.affected: if not self.driver.affected:
raise exceptions.ClaimDoesNotExist(claim_id, queue, tenant) raise exceptions.ClaimDoesNotExist(claim_id,
queue,
project)
self.__update_claimed(id, metadata['ttl']) self.__update_claimed(id, metadata['ttl'])
except _BadID: except _BadID:
raise exceptions.ClaimDoesNotExist(claim_id, queue, tenant) raise exceptions.ClaimDoesNotExist(claim_id, queue, project)
def __update_claimed(self, cid, ttl): def __update_claimed(self, cid, ttl):
# Precondition: cid is not expired # Precondition: cid is not expired
@ -419,14 +421,14 @@ class Claim(base.ClaimBase):
where cid = ?) where cid = ?)
''', ttl, ttl, cid) ''', ttl, ttl, cid)
def delete(self, queue, claim_id, tenant): def delete(self, queue, claim_id, project):
try: try:
self.driver.run(''' self.driver.run('''
delete from Claims delete from Claims
where id = ? where id = ?
and qid = (select id from Queues and qid = (select id from Queues
where tenant = ? and name = ?) where project = ? and name = ?)
''', _cid_decode(claim_id), tenant, queue) ''', _cid_decode(claim_id), project, queue)
except _BadID: except _BadID:
pass pass
@ -440,14 +442,14 @@ class _BadID(Exception):
pass pass
def _get_qid(driver, queue, tenant): def _get_qid(driver, queue, project):
try: try:
return driver.get(''' return driver.get('''
select id from Queues select id from Queues
where tenant = ? and name = ?''', tenant, queue)[0] where project = ? and name = ?''', project, queue)[0]
except _NoResult: except _NoResult:
raise exceptions.QueueDoesNotExist(queue, tenant) raise exceptions.QueueDoesNotExist(queue, project)
# The utilities below make the database IDs opaque to the users # The utilities below make the database IDs opaque to the users

View File

@ -20,7 +20,7 @@ from marconi.tests import util as testing
class ControllerBaseTest(testing.TestBase): class ControllerBaseTest(testing.TestBase):
tenant = "tenant" project = "project"
driver_class = None driver_class = None
controller_class = None controller_class = None
controller_base_class = None controller_base_class = None
@ -53,9 +53,9 @@ class QueueControllerTest(ControllerBaseTest):
def test_list(self): def test_list(self):
num = 15 num = 15
for queue in xrange(num): for queue in xrange(num):
self.controller.upsert(queue, {}, tenant=self.tenant) self.controller.upsert(queue, {}, project=self.project)
interaction = self.controller.list(tenant=self.tenant, interaction = self.controller.list(project=self.project,
detailed=True) detailed=True)
queues = list(interaction.next()) queues = list(interaction.next())
@ -64,7 +64,7 @@ class QueueControllerTest(ControllerBaseTest):
'metadata' in queue, queues)), True) 'metadata' in queue, queues)), True)
self.assertEquals(len(queues), 10) self.assertEquals(len(queues), 10)
interaction = self.controller.list(tenant=self.tenant, interaction = self.controller.list(project=self.project,
marker=interaction.next()) marker=interaction.next())
queues = list(interaction.next()) queues = list(interaction.next())
@ -75,37 +75,37 @@ class QueueControllerTest(ControllerBaseTest):
def test_queue_lifecycle(self): def test_queue_lifecycle(self):
# Test Queue Creation # Test Queue Creation
created = self.controller.upsert("test", tenant=self.tenant, created = self.controller.upsert("test", project=self.project,
metadata=dict(topic="test_queue")) metadata=dict(topic="test_queue"))
self.assertTrue(created) self.assertTrue(created)
# Test Queue retrieval # Test Queue retrieval
queue = self.controller.get("test", tenant=self.tenant) queue = self.controller.get("test", project=self.project)
self.assertIsNotNone(queue) self.assertIsNotNone(queue)
# Test Queue Update # Test Queue Update
created = self.controller.upsert("test", tenant=self.tenant, created = self.controller.upsert("test", project=self.project,
metadata=dict(meta="test_meta")) metadata=dict(meta="test_meta"))
self.assertFalse(created) self.assertFalse(created)
queue = self.controller.get("test", tenant=self.tenant) queue = self.controller.get("test", project=self.project)
self.assertEqual(queue["meta"], "test_meta") self.assertEqual(queue["meta"], "test_meta")
# Test Queue Statistic # Test Queue Statistic
_insert_fixtures(self.message_controller, "test", _insert_fixtures(self.message_controller, "test",
tenant=self.tenant, client_uuid="my_uuid", num=12) project=self.project, client_uuid="my_uuid", num=12)
countof = self.controller.stats("test", tenant=self.tenant) countof = self.controller.stats("test", project=self.project)
self.assertEqual(countof['messages']['free'], 12) self.assertEqual(countof['messages']['free'], 12)
# Test Queue Deletion # Test Queue Deletion
self.controller.delete("test", tenant=self.tenant) self.controller.delete("test", project=self.project)
# Test DoesNotExist Exception # Test DoesNotExist Exception
self.assertRaises(storage.exceptions.DoesNotExist, self.assertRaises(storage.exceptions.DoesNotExist,
self.controller.get, "test", self.controller.get, "test",
tenant=self.tenant) project=self.project)
class MessageControllerTest(ControllerBaseTest): class MessageControllerTest(ControllerBaseTest):
@ -126,10 +126,10 @@ class MessageControllerTest(ControllerBaseTest):
self.queue_controller = self.driver.queue_controller self.queue_controller = self.driver.queue_controller
self.claim_controller = self.driver.claim_controller self.claim_controller = self.driver.claim_controller
self.queue_controller.upsert(self.queue_name, {}, self.queue_controller.upsert(self.queue_name, {},
tenant=self.tenant) project=self.project)
def tearDown(self): def tearDown(self):
self.queue_controller.delete(self.queue_name, tenant=self.tenant) self.queue_controller.delete(self.queue_name, project=self.project)
super(MessageControllerTest, self).tearDown() super(MessageControllerTest, self).tearDown()
def test_message_lifecycle(self): def test_message_lifecycle(self):
@ -147,25 +147,25 @@ class MessageControllerTest(ControllerBaseTest):
# Test Message Creation # Test Message Creation
created = list(self.controller.post(queue_name, messages, created = list(self.controller.post(queue_name, messages,
tenant=self.tenant, project=self.project,
client_uuid="unused")) client_uuid="unused"))
self.assertEqual(len(created), 1) self.assertEqual(len(created), 1)
# Test Message Get # Test Message Get
self.controller.get(queue_name, created[0], tenant=self.tenant) self.controller.get(queue_name, created[0], project=self.project)
# Test Message Deletion # Test Message Deletion
self.controller.delete(queue_name, created[0], tenant=self.tenant) self.controller.delete(queue_name, created[0], project=self.project)
# Test DoesNotExist # Test DoesNotExist
self.assertRaises(storage.exceptions.DoesNotExist, self.assertRaises(storage.exceptions.DoesNotExist,
self.controller.get, self.controller.get,
queue_name, message_id=created[0], queue_name, message_id=created[0],
tenant=self.tenant) project=self.project)
def test_get_multi(self): def test_get_multi(self):
_insert_fixtures(self.controller, self.queue_name, _insert_fixtures(self.controller, self.queue_name,
tenant=self.tenant, client_uuid="my_uuid", num=15) project=self.project, client_uuid="my_uuid", num=15)
def load_messages(expected, *args, **kwargs): def load_messages(expected, *args, **kwargs):
interaction = self.controller.list(*args, **kwargs) interaction = self.controller.list(*args, **kwargs)
@ -174,75 +174,76 @@ class MessageControllerTest(ControllerBaseTest):
return interaction return interaction
# Test all messages, echo False and uuid # Test all messages, echo False and uuid
load_messages(0, self.queue_name, tenant=self.tenant, load_messages(0, self.queue_name, project=self.project,
client_uuid="my_uuid") client_uuid="my_uuid")
# Test all messages and limit # Test all messages and limit
load_messages(15, self.queue_name, tenant=self.tenant, limit=20, load_messages(15, self.queue_name, project=self.project, limit=20,
echo=True) echo=True)
# Test all messages, echo True, and uuid # Test all messages, echo True, and uuid
interaction = load_messages(10, self.queue_name, echo=True, interaction = load_messages(10, self.queue_name, echo=True,
tenant=self.tenant, client_uuid="my_uuid") project=self.project,
client_uuid="my_uuid")
# Test all messages, echo True, uuid and marker # Test all messages, echo True, uuid and marker
load_messages(5, self.queue_name, echo=True, tenant=self.tenant, load_messages(5, self.queue_name, echo=True, project=self.project,
marker=interaction.next(), client_uuid="my_uuid") marker=interaction.next(), client_uuid="my_uuid")
def test_claim_effects(self): def test_claim_effects(self):
_insert_fixtures(self.controller, self.queue_name, _insert_fixtures(self.controller, self.queue_name,
tenant=self.tenant, client_uuid="my_uuid", num=12) project=self.project, client_uuid="my_uuid", num=12)
meta = {"ttl": 70} meta = {"ttl": 70}
another_cid, _ = self.claim_controller.create(self.queue_name, meta, another_cid, _ = self.claim_controller.create(self.queue_name, meta,
tenant=self.tenant) project=self.project)
cid, msgs = self.claim_controller.create(self.queue_name, meta, cid, msgs = self.claim_controller.create(self.queue_name, meta,
tenant=self.tenant) project=self.project)
[msg1, msg2] = msgs [msg1, msg2] = msgs
# A wrong claim does not ensure the message deletion # A wrong claim does not ensure the message deletion
with testing.expected(storage.exceptions.NotPermitted): with testing.expected(storage.exceptions.NotPermitted):
self.controller.delete(self.queue_name, msg1["id"], self.controller.delete(self.queue_name, msg1["id"],
tenant=self.tenant, project=self.project,
claim=another_cid) claim=another_cid)
# Make sure a message can be deleted with a claim # Make sure a message can be deleted with a claim
self.controller.delete(self.queue_name, msg1["id"], self.controller.delete(self.queue_name, msg1["id"],
tenant=self.tenant, project=self.project,
claim=cid) claim=cid)
with testing.expected(storage.exceptions.DoesNotExist): with testing.expected(storage.exceptions.DoesNotExist):
self.controller.get(self.queue_name, msg1["id"], self.controller.get(self.queue_name, msg1["id"],
tenant=self.tenant) project=self.project)
# Make sure such a deletion is idempotent # Make sure such a deletion is idempotent
self.controller.delete(self.queue_name, msg1["id"], self.controller.delete(self.queue_name, msg1["id"],
tenant=self.tenant, project=self.project,
claim=cid) claim=cid)
# A non-existing claim does not ensure the message deletion # A non-existing claim does not ensure the message deletion
self.claim_controller.delete(self.queue_name, cid, self.claim_controller.delete(self.queue_name, cid,
tenant=self.tenant) project=self.project)
with testing.expected(storage.exceptions.NotPermitted): with testing.expected(storage.exceptions.NotPermitted):
self.controller.delete(self.queue_name, msg2["id"], self.controller.delete(self.queue_name, msg2["id"],
tenant=self.tenant, project=self.project,
claim=cid) claim=cid)
def test_expired_message(self): def test_expired_message(self):
messages = [{'body': 3.14, 'ttl': 0}] messages = [{'body': 3.14, 'ttl': 0}]
[msgid] = self.controller.post(self.queue_name, messages, [msgid] = self.controller.post(self.queue_name, messages,
tenant=self.tenant, project=self.project,
client_uuid='my_uuid') client_uuid='my_uuid')
with testing.expected(storage.exceptions.DoesNotExist): with testing.expected(storage.exceptions.DoesNotExist):
self.controller.get(self.queue_name, msgid, self.controller.get(self.queue_name, msgid,
tenant=self.tenant) project=self.project)
countof = self.queue_controller.stats(self.queue_name, countof = self.queue_controller.stats(self.queue_name,
tenant=self.tenant) project=self.project)
self.assertEquals(countof['messages']['free'], 0) self.assertEquals(countof['messages']['free'], 0)
def test_illformed_id(self): def test_illformed_id(self):
@ -263,12 +264,12 @@ class MessageControllerTest(ControllerBaseTest):
self.queue_controller.upsert('unused', {}, '480924') self.queue_controller.upsert('unused', {}, '480924')
[msgid] = self.controller.post('unused', [msgid] = self.controller.post('unused',
[{'body': {}, 'ttl': 10}], [{'body': {}, 'ttl': 10}],
tenant='480924', project='480924',
client_uuid='unused') client_uuid='unused')
with testing.expected(exceptions.NotPermitted): with testing.expected(exceptions.NotPermitted):
self.controller.delete('unused', msgid, self.controller.delete('unused', msgid,
tenant='480924', project='480924',
claim='illformed') claim='illformed')
@ -290,21 +291,21 @@ class ClaimControllerTest(ControllerBaseTest):
self.queue_controller = self.driver.queue_controller self.queue_controller = self.driver.queue_controller
self.message_controller = self.driver.message_controller self.message_controller = self.driver.message_controller
self.queue_controller.upsert(self.queue_name, {}, self.queue_controller.upsert(self.queue_name, {},
tenant=self.tenant) project=self.project)
def tearDown(self): def tearDown(self):
self.queue_controller.delete(self.queue_name, tenant=self.tenant) self.queue_controller.delete(self.queue_name, project=self.project)
super(ClaimControllerTest, self).tearDown() super(ClaimControllerTest, self).tearDown()
def test_claim_lifecycle(self): def test_claim_lifecycle(self):
_insert_fixtures(self.message_controller, self.queue_name, _insert_fixtures(self.message_controller, self.queue_name,
tenant=self.tenant, client_uuid="my_uuid", num=20) project=self.project, client_uuid="my_uuid", num=20)
meta = {"ttl": 70} meta = {"ttl": 70}
# Make sure create works # Make sure create works
claim_id, messages = self.controller.create(self.queue_name, meta, claim_id, messages = self.controller.create(self.queue_name, meta,
tenant=self.tenant, project=self.project,
limit=15) limit=15)
messages = list(messages) messages = list(messages)
@ -312,13 +313,13 @@ class ClaimControllerTest(ControllerBaseTest):
# Ensure Queue stats # Ensure Queue stats
countof = self.queue_controller.stats(self.queue_name, countof = self.queue_controller.stats(self.queue_name,
tenant=self.tenant) project=self.project)
self.assertEqual(countof['messages']['claimed'], 15) self.assertEqual(countof['messages']['claimed'], 15)
self.assertEqual(countof['messages']['free'], 5) self.assertEqual(countof['messages']['free'], 5)
# Make sure get works # Make sure get works
claim, messages2 = self.controller.get(self.queue_name, claim_id, claim, messages2 = self.controller.get(self.queue_name, claim_id,
tenant=self.tenant) project=self.project)
messages2 = list(messages2) messages2 = list(messages2)
self.assertEquals(len(messages2), 15) self.assertEquals(len(messages2), 15)
@ -328,11 +329,11 @@ class ClaimControllerTest(ControllerBaseTest):
new_meta = {"ttl": 100} new_meta = {"ttl": 100}
self.controller.update(self.queue_name, claim_id, self.controller.update(self.queue_name, claim_id,
new_meta, tenant=self.tenant) new_meta, project=self.project)
# Make sure update works # Make sure update works
claim, messages2 = self.controller.get(self.queue_name, claim_id, claim, messages2 = self.controller.get(self.queue_name, claim_id,
tenant=self.tenant) project=self.project)
messages2 = list(messages2) messages2 = list(messages2)
self.assertEquals(len(messages2), 15) self.assertEquals(len(messages2), 15)
@ -344,25 +345,25 @@ class ClaimControllerTest(ControllerBaseTest):
# Make sure delete works # Make sure delete works
self.controller.delete(self.queue_name, claim_id, self.controller.delete(self.queue_name, claim_id,
tenant=self.tenant) project=self.project)
self.assertRaises(storage.exceptions.ClaimDoesNotExist, self.assertRaises(storage.exceptions.ClaimDoesNotExist,
self.controller.get, self.queue_name, self.controller.get, self.queue_name,
claim_id, tenant=self.tenant) claim_id, project=self.project)
def test_expired_claim(self): def test_expired_claim(self):
meta = {"ttl": 0} meta = {"ttl": 0}
claim_id, messages = self.controller.create(self.queue_name, meta, claim_id, messages = self.controller.create(self.queue_name, meta,
tenant=self.tenant) project=self.project)
with testing.expected(storage.exceptions.DoesNotExist): with testing.expected(storage.exceptions.DoesNotExist):
self.controller.get(self.queue_name, claim_id, self.controller.get(self.queue_name, claim_id,
tenant=self.tenant) project=self.project)
with testing.expected(storage.exceptions.DoesNotExist): with testing.expected(storage.exceptions.DoesNotExist):
self.controller.update(self.queue_name, claim_id, self.controller.update(self.queue_name, claim_id,
meta, tenant=self.tenant) meta, project=self.project)
def test_illformed_id(self): def test_illformed_id(self):
# any ill-formed IDs should be regarded as non-existing ones. # any ill-formed IDs should be regarded as non-existing ones.
@ -375,7 +376,7 @@ class ClaimControllerTest(ControllerBaseTest):
{'ttl': 40}, '480924') {'ttl': 40}, '480924')
def _insert_fixtures(controller, queue_name, tenant=None, def _insert_fixtures(controller, queue_name, project=None,
client_uuid=None, num=4): client_uuid=None, num=4):
def messages(): def messages():
@ -386,4 +387,4 @@ def _insert_fixtures(controller, queue_name, tenant=None,
"event": "Event number %s" % n "event": "Event number %s" % n
}} }}
controller.post(queue_name, messages(), controller.post(queue_name, messages(),
tenant=tenant, client_uuid=client_uuid) project=project, client_uuid=client_uuid)

View File

@ -35,24 +35,24 @@ class Driver(storage.DriverBase):
class QueueController(storage.QueueBase): class QueueController(storage.QueueBase):
def list(self, tenant=None): def list(self, project=None):
super(QueueController, self).list(tenant) super(QueueController, self).list(project)
def get(self, name, tenant=None): def get(self, name, project=None):
super(QueueController, self).get(name, tenant=tenant) super(QueueController, self).get(name, project=project)
def upsert(self, name, metadata, tenant=None): def upsert(self, name, metadata, project=None):
super(QueueController, self).upsert(name, tenant=tenant, super(QueueController, self).upsert(name, project=project,
metadata=metadata) metadata=metadata)
def delete(self, name, tenant=None): def delete(self, name, project=None):
super(QueueController, self).delete(name, tenant=tenant) super(QueueController, self).delete(name, project=project)
def stats(self, name, tenant=None): def stats(self, name, project=None):
super(QueueController, self).stats(name, tenant=tenant) super(QueueController, self).stats(name, project=project)
def actions(self, name, tenant=None, marker=None, limit=10): def actions(self, name, project=None, marker=None, limit=10):
super(QueueController, self).actions(name, tenant=tenant, super(QueueController, self).actions(name, project=project,
marker=marker, limit=limit) marker=marker, limit=limit)

View File

@ -61,7 +61,7 @@ class MongodbQueueTests(base.QueueControllerTest):
def test_indexes(self): def test_indexes(self):
col = self.controller._col col = self.controller._col
indexes = col.index_information() indexes = col.index_information()
self.assertIn("t_1_n_1", indexes) self.assertIn("p_1_n_1", indexes)
def test_messages_purged(self): def test_messages_purged(self):
queue_name = "test" queue_name = "test"
@ -119,18 +119,18 @@ class MongodbClaimTests(base.ClaimControllerTest):
epoch = '000000000000000000000000' epoch = '000000000000000000000000'
self.assertRaises(storage.exceptions.ClaimDoesNotExist, self.assertRaises(storage.exceptions.ClaimDoesNotExist,
self.controller.get, self.queue_name, self.controller.get, self.queue_name,
epoch, tenant=self.tenant) epoch, project=self.project)
claim_id, messages = self.controller.create(self.queue_name, claim_id, messages = self.controller.create(self.queue_name,
{"ttl": 1}, {"ttl": 1},
tenant=self.tenant) project=self.project)
# Lets let it expire # Lets let it expire
time.sleep(1) time.sleep(1)
self.assertRaises(storage.exceptions.ClaimDoesNotExist, self.assertRaises(storage.exceptions.ClaimDoesNotExist,
self.controller.update, self.queue_name, self.controller.update, self.queue_name,
claim_id, {}, tenant=self.tenant) claim_id, {}, project=self.project)
self.assertRaises(storage.exceptions.ClaimDoesNotExist, self.assertRaises(storage.exceptions.ClaimDoesNotExist,
self.controller.update, self.queue_name, self.controller.update, self.queue_name,
claim_id, {}, tenant=self.tenant) claim_id, {}, project=self.project)

View File

@ -35,22 +35,22 @@ class QueueController(storage.QueueBase):
def __init__(self, driver): def __init__(self, driver):
pass pass
def list(self, tenant=None): def list(self, project=None):
raise NotImplementedError() raise NotImplementedError()
def get(self, name, tenant=None): def get(self, name, project=None):
raise NotImplementedError() raise NotImplementedError()
def upsert(self, name, metadata, tenant=None): def upsert(self, name, metadata, project=None):
raise NotImplementedError() raise NotImplementedError()
def delete(self, name, tenant=None): def delete(self, name, project=None):
raise NotImplementedError() raise NotImplementedError()
def stats(self, name, tenant=None): def stats(self, name, project=None):
raise NotImplementedError() raise NotImplementedError()
def actions(self, name, tenant=None, marker=None, limit=10): def actions(self, name, project=None, marker=None, limit=10):
raise NotImplementedError() raise NotImplementedError()
@ -58,16 +58,16 @@ class MessageController(storage.MessageBase):
def __init__(self, driver): def __init__(self, driver):
pass pass
def get(self, queue, tenant=None, message_id=None, def get(self, queue, project=None, message_id=None,
marker=None, echo=False, client_uuid=None): marker=None, echo=False, client_uuid=None):
raise NotImplementedError() raise NotImplementedError()
def list(self, queue, tenant=None, marker=None, def list(self, queue, project=None, marker=None,
limit=10, echo=False, client_uuid=None): limit=10, echo=False, client_uuid=None):
raise NotImplementedError() raise NotImplementedError()
def post(self, queue, messages, tenant=None): def post(self, queue, messages, project=None):
raise NotImplementedError() raise NotImplementedError()
def delete(self, queue, message_id, tenant=None, claim=None): def delete(self, queue, message_id, project=None, claim=None):
raise NotImplementedError() raise NotImplementedError()

View File

@ -31,7 +31,7 @@ class CollectionResource(object):
def __init__(self, claim_controller): def __init__(self, claim_controller):
self.claim_ctrl = claim_controller self.claim_ctrl = claim_controller
def on_post(self, req, resp, tenant_id, queue_name): def on_post(self, req, resp, project_id, queue_name):
if req.content_length is None or req.content_length == 0: if req.content_length is None or req.content_length == 0:
raise falcon.HTTPBadRequest(_('Bad request'), raise falcon.HTTPBadRequest(_('Bad request'),
_('Missing claim metadata.')) _('Missing claim metadata.'))
@ -46,7 +46,7 @@ class CollectionResource(object):
cid, msgs = self.claim_ctrl.create( cid, msgs = self.claim_ctrl.create(
queue_name, queue_name,
metadata=metadata, metadata=metadata,
tenant=tenant_id, project=project_id,
**kwargs) **kwargs)
resp_msgs = list(msgs) resp_msgs = list(msgs)
@ -83,12 +83,12 @@ class ItemResource(object):
def __init__(self, claim_controller): def __init__(self, claim_controller):
self.claim_ctrl = claim_controller self.claim_ctrl = claim_controller
def on_get(self, req, resp, tenant_id, queue_name, claim_id): def on_get(self, req, resp, project_id, queue_name, claim_id):
try: try:
meta, msgs = self.claim_ctrl.get( meta, msgs = self.claim_ctrl.get(
queue_name, queue_name,
claim_id=claim_id, claim_id=claim_id,
tenant=tenant_id) project=project_id)
meta['messages'] = list(msgs) meta['messages'] = list(msgs)
for msg in meta['messages']: for msg in meta['messages']:
@ -112,7 +112,7 @@ class ItemResource(object):
msg = _('Please try again in a few seconds.') msg = _('Please try again in a few seconds.')
raise falcon.HTTPServiceUnavailable(title, msg, 30) raise falcon.HTTPServiceUnavailable(title, msg, 30)
def on_patch(self, req, resp, tenant_id, queue_name, claim_id): def on_patch(self, req, resp, project_id, queue_name, claim_id):
if req.content_length is None or req.content_length == 0: if req.content_length is None or req.content_length == 0:
raise falcon.HTTPBadRequest(_('Bad request'), raise falcon.HTTPBadRequest(_('Bad request'),
_('Missing claim metadata.')) _('Missing claim metadata.'))
@ -122,7 +122,7 @@ class ItemResource(object):
self.claim_ctrl.update(queue_name, self.claim_ctrl.update(queue_name,
claim_id=claim_id, claim_id=claim_id,
metadata=metadata, metadata=metadata,
tenant=tenant_id) project=project_id)
resp.status = falcon.HTTP_204 resp.status = falcon.HTTP_204
@ -139,11 +139,11 @@ class ItemResource(object):
msg = _('Please try again in a few seconds.') msg = _('Please try again in a few seconds.')
raise falcon.HTTPServiceUnavailable(title, msg, 30) raise falcon.HTTPServiceUnavailable(title, msg, 30)
def on_delete(self, req, resp, tenant_id, queue_name, claim_id): def on_delete(self, req, resp, project_id, queue_name, claim_id):
try: try:
self.claim_ctrl.delete(queue_name, self.claim_ctrl.delete(queue_name,
claim_id=claim_id, claim_id=claim_id,
tenant=tenant_id) project=project_id)
resp.status = falcon.HTTP_204 resp.status = falcon.HTTP_204

View File

@ -39,17 +39,17 @@ class Driver(transport.DriverBase):
claim_item = transport.wsgi.claims.ItemResource(claim_controller) claim_item = transport.wsgi.claims.ItemResource(claim_controller)
self.app = api = falcon.API() self.app = api = falcon.API()
api.add_route('/v1/{tenant_id}/queues', queue_collection) api.add_route('/v1/{project_id}/queues', queue_collection)
api.add_route('/v1/{tenant_id}/queues/{queue_name}', queue_item) api.add_route('/v1/{project_id}/queues/{queue_name}', queue_item)
api.add_route('/v1/{tenant_id}/queues/{queue_name}' api.add_route('/v1/{project_id}/queues/{queue_name}'
'/stats', stats_endpoint) '/stats', stats_endpoint)
api.add_route('/v1/{tenant_id}/queues/{queue_name}' api.add_route('/v1/{project_id}/queues/{queue_name}'
'/messages', msg_collection) '/messages', msg_collection)
api.add_route('/v1/{tenant_id}/queues/{queue_name}' api.add_route('/v1/{project_id}/queues/{queue_name}'
'/messages/{message_id}', msg_item) '/messages/{message_id}', msg_item)
api.add_route('/v1/{tenant_id}/queues/{queue_name}' api.add_route('/v1/{project_id}/queues/{queue_name}'
'/claims', claim_collection) '/claims', claim_collection)
api.add_route('/v1/{tenant_id}/queues/{queue_name}' api.add_route('/v1/{project_id}/queues/{queue_name}'
'/claims/{claim_id}', claim_item) '/claims/{claim_id}', claim_item)
def listen(self): def listen(self):

View File

@ -31,7 +31,7 @@ class CollectionResource(object):
def __init__(self, message_controller): def __init__(self, message_controller):
self.msg_ctrl = message_controller self.msg_ctrl = message_controller
def on_post(self, req, resp, tenant_id, queue_name): def on_post(self, req, resp, project_id, queue_name):
uuid = req.get_header('Client-ID', required=True) uuid = req.get_header('Client-ID', required=True)
if req.content_length is None or req.content_length == 0: if req.content_length is None or req.content_length == 0:
@ -54,7 +54,7 @@ class CollectionResource(object):
ls = filtered(helpers.read_json(req.stream)) ls = filtered(helpers.read_json(req.stream))
ns = self.msg_ctrl.post(queue_name, ns = self.msg_ctrl.post(queue_name,
messages=ls, messages=ls,
tenant=tenant_id, project=project_id,
client_uuid=uuid) client_uuid=uuid)
resp.location = req.path + '/' + ','.join( resp.location = req.path + '/' + ','.join(
@ -74,7 +74,7 @@ class CollectionResource(object):
msg = _('Please try again in a few seconds.') msg = _('Please try again in a few seconds.')
raise falcon.HTTPServiceUnavailable(title, msg, 30) raise falcon.HTTPServiceUnavailable(title, msg, 30)
def on_get(self, req, resp, tenant_id, queue_name): def on_get(self, req, resp, project_id, queue_name):
uuid = req.get_header('Client-ID', required=True) uuid = req.get_header('Client-ID', required=True)
#TODO(zyuan): where do we define the limits? #TODO(zyuan): where do we define the limits?
@ -86,7 +86,7 @@ class CollectionResource(object):
try: try:
interaction = self.msg_ctrl.list(queue_name, interaction = self.msg_ctrl.list(queue_name,
tenant=tenant_id, project=project_id,
client_uuid=uuid, client_uuid=uuid,
**kwargs) **kwargs)
resp_dict = { resp_dict = {
@ -129,11 +129,11 @@ class ItemResource(object):
def __init__(self, message_controller): def __init__(self, message_controller):
self.msg_ctrl = message_controller self.msg_ctrl = message_controller
def on_get(self, req, resp, tenant_id, queue_name, message_id): def on_get(self, req, resp, project_id, queue_name, message_id):
try: try:
msg = self.msg_ctrl.get(queue_name, msg = self.msg_ctrl.get(queue_name,
message_id=message_id, message_id=message_id,
tenant=tenant_id) project=project_id)
msg['href'] = req.path msg['href'] = req.path
del msg['id'] del msg['id']
@ -151,11 +151,11 @@ class ItemResource(object):
msg = _('Please try again in a few seconds.') msg = _('Please try again in a few seconds.')
raise falcon.HTTPServiceUnavailable(title, msg, 30) raise falcon.HTTPServiceUnavailable(title, msg, 30)
def on_delete(self, req, resp, tenant_id, queue_name, message_id): def on_delete(self, req, resp, project_id, queue_name, message_id):
try: try:
self.msg_ctrl.delete(queue_name, self.msg_ctrl.delete(queue_name,
message_id=message_id, message_id=message_id,
tenant=tenant_id, project=project_id,
claim=req.get_param('claim_id')) claim=req.get_param('claim_id'))
resp.status = falcon.HTTP_204 resp.status = falcon.HTTP_204

View File

@ -32,7 +32,7 @@ class ItemResource(object):
def __init__(self, queue_controller): def __init__(self, queue_controller):
self.queue_ctrl = queue_controller self.queue_ctrl = queue_controller
def on_put(self, req, resp, tenant_id, queue_name): def on_put(self, req, resp, project_id, queue_name):
if req.content_length > transport.MAX_QUEUE_METADATA_SIZE: if req.content_length > transport.MAX_QUEUE_METADATA_SIZE:
raise falcon.HTTPBadRequest(_('Bad request'), raise falcon.HTTPBadRequest(_('Bad request'),
_('Queue metadata size is too large.')) _('Queue metadata size is too large.'))
@ -45,7 +45,7 @@ class ItemResource(object):
metadata = _filtered(helpers.read_json(req.stream)) metadata = _filtered(helpers.read_json(req.stream))
created = self.queue_ctrl.upsert(queue_name, created = self.queue_ctrl.upsert(queue_name,
metadata=metadata, metadata=metadata,
tenant=tenant_id) project=project_id)
except helpers.MalformedJSON: except helpers.MalformedJSON:
raise falcon.HTTPBadRequest(_('Bad request'), raise falcon.HTTPBadRequest(_('Bad request'),
_('Malformed queue metadata.')) _('Malformed queue metadata.'))
@ -59,10 +59,10 @@ class ItemResource(object):
resp.status = falcon.HTTP_201 if created else falcon.HTTP_204 resp.status = falcon.HTTP_201 if created else falcon.HTTP_204
resp.location = req.path resp.location = req.path
def on_get(self, req, resp, tenant_id, queue_name): def on_get(self, req, resp, project_id, queue_name):
try: try:
doc = self.queue_ctrl.get(queue_name, doc = self.queue_ctrl.get(queue_name,
tenant=tenant_id) project=project_id)
resp.content_location = req.relative_uri resp.content_location = req.relative_uri
resp.body = helpers.to_json(doc) resp.body = helpers.to_json(doc)
@ -76,10 +76,10 @@ class ItemResource(object):
msg = _('Please try again in a few seconds.') msg = _('Please try again in a few seconds.')
raise falcon.HTTPServiceUnavailable(title, msg, 30) raise falcon.HTTPServiceUnavailable(title, msg, 30)
def on_delete(self, req, resp, tenant_id, queue_name): def on_delete(self, req, resp, project_id, queue_name):
try: try:
self.queue_ctrl.delete(queue_name, self.queue_ctrl.delete(queue_name,
tenant=tenant_id) project=project_id)
except Exception as ex: except Exception as ex:
LOG.exception(ex) LOG.exception(ex)
@ -97,7 +97,7 @@ class CollectionResource(object):
def __init__(self, queue_controller): def __init__(self, queue_controller):
self.queue_ctrl = queue_controller self.queue_ctrl = queue_controller
def on_get(self, req, resp, tenant_id): def on_get(self, req, resp, project_id):
#TODO(zyuan): where do we define the limits? #TODO(zyuan): where do we define the limits?
kwargs = helpers.purge({ kwargs = helpers.purge({
'marker': req.get_param('marker'), 'marker': req.get_param('marker'),
@ -106,7 +106,7 @@ class CollectionResource(object):
}) })
try: try:
interaction = self.queue_ctrl.list(tenant=tenant_id, **kwargs) interaction = self.queue_ctrl.list(project=project_id, **kwargs)
resp_dict = { resp_dict = {
'queues': list(interaction.next()) 'queues': list(interaction.next())

View File

@ -30,10 +30,10 @@ class Resource(object):
def __init__(self, queue_controller): def __init__(self, queue_controller):
self.queue_ctrl = queue_controller self.queue_ctrl = queue_controller
def on_get(self, req, resp, tenant_id, queue_name): def on_get(self, req, resp, project_id, queue_name):
try: try:
resp_dict = self.queue_ctrl.stats(queue_name, resp_dict = self.queue_ctrl.stats(queue_name,
tenant=tenant_id) project=project_id)
resp.content_location = req.path resp.content_location = req.path
resp.body = helpers.to_json(resp_dict) resp.body = helpers.to_json(resp_dict)