From 901f3e0ca05dc659e7b34c5408536d4822c272bf Mon Sep 17 00:00:00 2001 From: kgriffs Date: Mon, 15 Jul 2013 18:44:46 -0600 Subject: [PATCH] Add "total", "oldest" and "newest" to queue stats This patch introduces three new queue stats in order to expose a better user experience via control panels. Also included in the changes is a bug fix for calculating timestamp differences, as well as a renaming of 'queue' to 'queue_name' for consistency within the MongoDB driver. Note that there are several remaining stats identified in the associated blueprint. This may be addressed in a future patch. Implements: blueprint extra-queue-stats Change-Id: I31e48a581ccdd014ec97674af0c0dd43c808c42c --- marconi/storage/exceptions.py | 8 ++ marconi/storage/mongodb/claims.py | 4 +- marconi/storage/mongodb/messages.py | 109 +++++++++++++----- marconi/storage/mongodb/queues.py | 35 ++++-- marconi/storage/mongodb/utils.py | 14 +++ marconi/storage/sqlite/messages.py | 42 ++++++- marconi/storage/sqlite/queues.py | 27 ++++- marconi/storage/sqlite/utils.py | 16 +++ marconi/tests/storage/base.py | 66 ++++++++++- marconi/tests/storage/test_impl_mongodb.py | 14 +++ marconi/tests/storage/test_impl_sqlite.py | 14 +++ marconi/tests/transport/wsgi/test_messages.py | 14 ++- marconi/transport/wsgi/stats.py | 13 +++ 13 files changed, 319 insertions(+), 57 deletions(-) diff --git a/marconi/storage/exceptions.py b/marconi/storage/exceptions.py index fba649ddd..79bed9841 100644 --- a/marconi/storage/exceptions.py +++ b/marconi/storage/exceptions.py @@ -75,6 +75,14 @@ class QueueDoesNotExist(DoesNotExist): super(QueueDoesNotExist, self).__init__(msg) +class QueueIsEmpty(Exception): + + def __init__(self, name, project): + msg = ('Queue %(name)s in project %(project)s is empty' % + dict(name=name, project=project)) + super(QueueIsEmpty, self).__init__(msg) + + class MessageDoesNotExist(DoesNotExist): def __init__(self, mid, queue, project): diff --git a/marconi/storage/mongodb/claims.py b/marconi/storage/mongodb/claims.py index de5b4b34a..d2788fc23 100644 --- a/marconi/storage/mongodb/claims.py +++ b/marconi/storage/mongodb/claims.py @@ -72,7 +72,7 @@ class ClaimController(storage.ClaimBase): except ValueError: raise exceptions.ClaimDoesNotExist() - age = now - utils.oid_utc(cid) + age = timeutils.delta_seconds(utils.oid_utc(cid), now) def messages(msg_iter): msg = next(msg_iter) @@ -92,7 +92,7 @@ class ClaimController(storage.ClaimBase): project=project)) claim = next(msgs) claim = { - 'age': age.seconds, + 'age': int(age), 'ttl': claim.pop('t'), 'id': str(claim['id']), } diff --git a/marconi/storage/mongodb/messages.py b/marconi/storage/mongodb/messages.py index f20fa286c..a24e344ec 100644 --- a/marconi/storage/mongodb/messages.py +++ b/marconi/storage/mongodb/messages.py @@ -122,7 +122,7 @@ class MessageController(storage.MessageBase): def _get_queue_np(self): return self._queue_controller._get_np() - def _next_marker(self, queue, project=None): + def _next_marker(self, queue_name, project=None): """Retrieves the next message marker for a given queue. This helper is used to generate monotonic pagination @@ -141,16 +141,15 @@ class MessageController(storage.MessageBase): mitigate race conditions between producer and observer clients. - :param queue: queue name + :param queue_name: Determines the scope for the marker :param project: Queue's project :returns: next message marker as an integer """ - document = self._col.find_one({'q': queue, 'p': project}, + document = self._col.find_one({'q': queue_name, 'p': project}, sort=[('k', -1)], fields={'k': 1, '_id': 0}) - # NOTE(kgriffs): this approach is faster than using 'or' return 1 if document is None else (document['k'] + 1) def _backoff_sleep(self, attempt): @@ -240,7 +239,7 @@ class MessageController(storage.MessageBase): self._col.remove({'q': queue, 'p': project}, w=0) def _list(self, queue_name, marker=None, echo=False, client_uuid=None, - fields=None, include_claimed=False, project=None): + fields=None, include_claimed=False, project=None, sort=1): """Message document listing helper. :param queue_name: Name of the queue to list @@ -248,13 +247,20 @@ class MessageController(storage.MessageBase): :param marker: Message marker from which to start iterating :param echo: Whether to return messages that match client_uuid :param client_uuid: UUID for the client that originated this request - :param fields: fields to include in emmitted documents + :param fields: Fields to include in emmitted documents :param include_claimed: Whether to include claimed messages, not just active ones + :param sort: (Default 1) Sort order for the listing. Pass 1 for + ascending (oldest message first), or -1 for descending (newest + message first). - :returns: MongoDB "find" generator + :returns: MongoDB cursor """ + if sort not in (1, -1): + raise ValueError('sort must be either 1 (ascending) ' + 'or -1 (descending)') + now = timeutils.utcnow() query = { @@ -270,7 +276,7 @@ class MessageController(storage.MessageBase): if fields and not isinstance(fields, (dict, list)): raise TypeError('Fields must be an instance of list / dict') - if not echo and client_uuid: + if not echo and client_uuid is not None: query['u'] = {'$ne': client_uuid} if marker: @@ -283,12 +289,51 @@ class MessageController(storage.MessageBase): # NOTE(flaper87): Suggest the index to use for this query return self._col.find(query, fields=fields, - sort=[('k', 1)]).hint(self.active_fields) + sort=[('k', sort)]).hint(self.active_fields) #----------------------------------------------------------------------- # Interface #----------------------------------------------------------------------- + def count(self, queue_name, project=None): + """Return total number of (non-expired) messages in a queue. + + This method is designed to very quickly count the number + of messages in a given queue. Expired messages are not + counted, of course. If the queue does not exist, the + count will always be 0. + """ + query = { + # Messages must belong to this queue + 'q': queue_name, + 'p': project, + # The messages can not be expired + 'e': {'$gt': timeutils.utcnow()}, + } + + return self._col.find(query).count() + + def first(self, queue_name, project=None, sort=1): + """Get first message in the queue (including claimed). + + :param queue_id: ObjectID of the queue to list + :param sort: (Default 1) Sort order for the listing. Pass 1 for + ascending (oldest message first), or -1 for descending (newest + message first). + + :returns: First message in the queue, or None if the queue is + empty + + """ + cursor = self._list(queue_name, project=project, + include_claimed=True, sort=sort).limit(1) + try: + message = next(cursor) + except StopIteration: + raise exceptions.QueueIsEmpty(queue_name, project) + + return message + def active(self, queue_name, marker=None, echo=False, client_uuid=None, fields=None, project=None): @@ -297,7 +342,6 @@ class MessageController(storage.MessageBase): def claimed(self, queue_name, claim_id=None, expires=None, limit=None, project=None): - query = { 'c.id': claim_id, 'c.e': {'$gt': expires or timeutils.utcnow()}, @@ -362,7 +406,7 @@ class MessageController(storage.MessageBase): for name, project in self._get_queue_np(): self._remove_expired(name, project) - def list(self, queue, project=None, marker=None, limit=10, + def list(self, queue_name, project=None, marker=None, limit=10, echo=False, client_uuid=None, include_claimed=False): if marker is not None: @@ -371,7 +415,7 @@ class MessageController(storage.MessageBase): except ValueError: raise exceptions.MalformedMarker() - messages = self._list(queue, marker, echo, client_uuid, + messages = self._list(queue_name, marker, echo, client_uuid, include_claimed=include_claimed, project=project) messages = messages.limit(limit) @@ -388,13 +432,13 @@ class MessageController(storage.MessageBase): yield str(marker_id['next']) @utils.raises_conn_error - def get(self, queue, message_id, project=None): + def get(self, queue_name, message_id, project=None): mid = utils.to_oid(message_id) now = timeutils.utcnow() query = { '_id': mid, - 'q': queue, + 'q': queue_name, 'p': project, 'e': {'$gt': now} } @@ -402,18 +446,19 @@ class MessageController(storage.MessageBase): message = list(self._col.find(query).limit(1).hint([('_id', 1)])) if not message: - raise exceptions.MessageDoesNotExist(message_id, queue, project) + raise exceptions.MessageDoesNotExist(message_id, queue_name, + project) return _basic_message(message[0], now) @utils.raises_conn_error - def bulk_get(self, queue, message_ids, project=None): + def bulk_get(self, queue_name, message_ids, project=None): message_ids = [utils.to_oid(id) for id in message_ids] now = timeutils.utcnow() # Base query, always check expire time query = { - 'q': queue, + 'q': queue_name, 'p': project, '_id': {'$in': message_ids}, 'e': {'$gt': now}, @@ -429,14 +474,14 @@ class MessageController(storage.MessageBase): return utils.HookedCursor(messages, denormalizer) @utils.raises_conn_error - def post(self, queue, messages, client_uuid, project=None): + def post(self, queue_name, messages, client_uuid, project=None): now = timeutils.utcnow() # NOTE(flaper87): We need to assert the queue exists - self._get_queue_id(queue, project) + self._get_queue_id(queue_name, project) # Set the next basis marker for the first attempt. - next_marker = self._next_marker(queue, project) + next_marker = self._next_marker(queue_name, project) # Results are aggregated across all attempts # NOTE(kgriffs): lazy instantiation @@ -451,7 +496,7 @@ class MessageController(storage.MessageBase): message_gen = ( { 't': message['ttl'], - 'q': queue, + 'q': queue_name, 'p': project, 'e': now + datetime.timedelta(seconds=message['ttl']), 'u': client_uuid, @@ -483,7 +528,7 @@ class MessageController(storage.MessageBase): message = _('%(attempts)d attempt(s) required to post ' '%(num_messages)d messages to queue ' '%(queue_name)s and project %(project)s') - message %= dict(queue_name=queue, attempts=attempt + 1, + message %= dict(queue_name=queue_name, attempts=attempt+1, num_messages=len(ids), project=project) LOG.debug(message) @@ -501,7 +546,7 @@ class MessageController(storage.MessageBase): # TODO(kgriffs): Add transaction ID to help match up loglines if attempt == 0: message = _('First attempt failed while adding messages ' - 'to queue %s for current request') % queue + 'to queue %s for current request') % queue_name LOG.debug(message) @@ -535,7 +580,7 @@ class MessageController(storage.MessageBase): # Retry the remaining messages with a new sequence # of markers. prepared_messages = cached_messages[failed_index:] - next_marker = self._next_marker(queue, project) + next_marker = self._next_marker(queue_name, project) for index, message in enumerate(prepared_messages): message['k'] = next_marker + index @@ -553,21 +598,21 @@ class MessageController(storage.MessageBase): message = _('Hit maximum number of attempts (%(max)s) for queue ' '%(id)s in project %(project)s') - message %= dict(max=options.CFG.max_attempts, id=queue, + message %= dict(max=options.CFG.max_attempts, id=queue_name, project=project) LOG.warning(message) succeeded_ids = map(str, aggregated_results) - raise exceptions.MessageConflict(queue, project, succeeded_ids) + raise exceptions.MessageConflict(queue_name, project, succeeded_ids) @utils.raises_conn_error - def delete(self, queue, message_id, project=None, claim=None): + def delete(self, queue_name, message_id, project=None, claim=None): try: mid = utils.to_oid(message_id) query = { - 'q': queue, + 'q': queue_name, 'p': project, '_id': mid } @@ -594,11 +639,11 @@ class MessageController(storage.MessageBase): pass @utils.raises_conn_error - def bulk_delete(self, queue, message_ids, project=None): + def bulk_delete(self, queue_name, message_ids, project=None): try: message_ids = [utils.to_oid(id) for id in message_ids] query = { - 'q': queue, + 'q': queue_name, 'p': project, '_id': {'$in': message_ids}, } @@ -611,11 +656,11 @@ class MessageController(storage.MessageBase): def _basic_message(msg, now): oid = msg['_id'] - age = now - utils.oid_utc(oid) + age = timeutils.delta_seconds(utils.oid_utc(oid), now) return { 'id': str(oid), - 'age': age.seconds, + 'age': int(age), 'ttl': msg['t'], 'body': msg['b'], } diff --git a/marconi/storage/mongodb/queues.py b/marconi/storage/mongodb/queues.py index c7da326e4..0dd699caa 100644 --- a/marconi/storage/mongodb/queues.py +++ b/marconi/storage/mongodb/queues.py @@ -24,6 +24,7 @@ Field Mappings: import pymongo.errors import marconi.openstack.common.log as logging +from marconi.openstack.common import timeutils from marconi import storage from marconi.storage import exceptions from marconi.storage.mongodb import utils @@ -143,19 +144,33 @@ class QueueController(storage.QueueBase): @utils.raises_conn_error def stats(self, name, project=None): - self._get_id(name, project) - controller = self.driver.message_controller - active = controller.active(name, project=project) - claimed = controller.claimed(name, project=project) + if not self.exists(name, project=project): + raise exceptions.QueueDoesNotExist(name, project) - return { - 'actions': 0, - 'messages': { - 'claimed': claimed.count(), - 'free': active.count(), - } + controller = self.driver.message_controller + + active = controller.active(name, project=project).count() + total = controller.count(name, project=project) + + message_stats = { + 'claimed': total - active, + 'free': active, + 'total': total, } + if total != 0: + try: + oldest = controller.first(name, project=project, sort=1) + newest = controller.first(name, project=project, sort=-1) + except exceptions.QueueIsEmpty: + pass + else: + now = timeutils.utcnow() + message_stats['oldest'] = utils.stat_message(oldest, now) + message_stats['newest'] = utils.stat_message(newest, now) + + return {'messages': message_stats} + @utils.raises_conn_error def actions(self, name, project=None, marker=None, limit=10): raise NotImplementedError diff --git a/marconi/storage/mongodb/utils.py b/marconi/storage/mongodb/utils.py index c4f858057..3137aef7d 100644 --- a/marconi/storage/mongodb/utils.py +++ b/marconi/storage/mongodb/utils.py @@ -140,6 +140,19 @@ def oid_utc(oid): raise TypeError('Expected ObjectId and got %s' % type(oid)) +def stat_message(message, now): + """Creates a stat document from the given message, relative to now.""" + oid = message['_id'] + created = oid_utc(oid) + age = timeutils.delta_seconds(created, now) + + return { + 'id': str(oid), + 'age': int(age), + 'created': timeutils.isotime(created), + } + + def raises_conn_error(func): """Handles mongodb ConnectionFailure error @@ -156,6 +169,7 @@ def raises_conn_error(func): msg = "ConnectionFailure caught" LOG.error(msg) raise storage_exceptions.ConnectionError(msg) + return wrapper diff --git a/marconi/storage/sqlite/messages.py b/marconi/storage/sqlite/messages.py index 7facf2e01..9eca0b08c 100644 --- a/marconi/storage/sqlite/messages.py +++ b/marconi/storage/sqlite/messages.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. - +from marconi.openstack.common import timeutils from marconi.storage import base from marconi.storage import exceptions from marconi.storage.sqlite import utils @@ -84,6 +84,46 @@ class MessageController(base.MessageBase): 'body': content, } + def first(self, queue, project, sort=1): + if project is None: + project = '' + + with self.driver('deferred'): + sql = ''' + select id, content, ttl, created, + julianday() * 86400.0 - created + from Messages + where ttl > julianday() * 86400.0 - created + and qid = ? + order by id %s + limit 1''' + + if sort not in (1, -1): + raise ValueError('sort must be either 1 (ascending) ' + 'or -1 (descending)') + + sql = sql % ('DESC' if sort == -1 else 'ASC') + + args = [utils.get_qid(self.driver, queue, project)] + + records = self.driver.run(sql, *args) + + try: + id, content, ttl, created, age = next(records) + except StopIteration: + raise exceptions.QueueIsEmpty(queue, project) + + created_unix = utils.julian_to_unix(created) + created_iso8601 = timeutils.iso8601_from_timestamp(created_unix) + + return { + 'id': utils.msgid_encode(id), + 'ttl': ttl, + 'created': created_iso8601, + 'age': age, + 'body': content, + } + def list(self, queue, project, marker=None, limit=10, echo=False, client_uuid=None, include_claimed=False): diff --git a/marconi/storage/sqlite/queues.py b/marconi/storage/sqlite/queues.py index e9fad69e9..812f6e274 100644 --- a/marconi/storage/sqlite/queues.py +++ b/marconi/storage/sqlite/queues.py @@ -10,6 +10,7 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. +# # See the License for the specific language governing permissions and # limitations under the License. @@ -146,13 +147,27 @@ class QueueController(base.QueueBase): and qid = ?) ''', qid, qid) - return { - 'messages': { - 'claimed': claimed, - 'free': free, - }, - 'actions': 0, + total = free + claimed + + message_stats = { + 'claimed': claimed, + 'free': free, + 'total': total, } + if total != 0: + message_controller = self.driver.message_controller + + try: + oldest = message_controller.first(name, project, sort=1) + newest = message_controller.first(name, project, sort=-1) + except exceptions.QueueIsEmpty: + pass + else: + message_stats['oldest'] = utils.stat_message(oldest) + message_stats['newest'] = utils.stat_message(newest) + + return {'messages': message_stats} + def actions(self, name, project, marker=None, limit=10): raise NotImplementedError diff --git a/marconi/storage/sqlite/utils.py b/marconi/storage/sqlite/utils.py index 789a7e256..9957f8328 100644 --- a/marconi/storage/sqlite/utils.py +++ b/marconi/storage/sqlite/utils.py @@ -16,6 +16,8 @@ from marconi.storage import exceptions +UNIX_EPOCH_AS_JULIAN_SEC = 2440587.5 * 86400.0 + class NoResult(Exception): pass @@ -77,3 +79,17 @@ def cid_decode(id): except ValueError: raise exceptions.MalformedID() + + +def julian_to_unix(julian_sec): + """Converts Julian timestamp, in seconds, to a UNIX timestamp.""" + return int(round(julian_sec - UNIX_EPOCH_AS_JULIAN_SEC)) + + +def stat_message(message): + """Creates a stat document based on a message.""" + return { + 'id': message['id'], + 'age': message['age'], + 'created': message['created'], + } diff --git a/marconi/tests/storage/base.py b/marconi/tests/storage/base.py index ae6b3db34..34bb8593a 100644 --- a/marconi/tests/storage/base.py +++ b/marconi/tests/storage/base.py @@ -13,6 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time + +from testtools import matchers + +from marconi.openstack.common import timeutils from marconi import storage from marconi.storage import exceptions from marconi.tests import util as testing @@ -47,6 +52,10 @@ class QueueControllerTest(ControllerBaseTest): self.message_controller = self.driver.message_controller self.claim_controller = self.driver.claim_controller + def tearDown(self): + timeutils.clear_time_override() + super(QueueControllerTest, self).tearDown() + def test_list(self): num = 15 for queue in xrange(num): @@ -98,10 +107,45 @@ class QueueControllerTest(ControllerBaseTest): # Test Queue Statistic _insert_fixtures(self.message_controller, 'test', - project=self.project, client_uuid='my_uuid', num=12) + project=self.project, client_uuid='my_uuid', + num=6) - countof = self.controller.stats('test', project=self.project) - self.assertEqual(countof['messages']['free'], 12) + # NOTE(kgriffs): We can't get around doing this, because + # we don't know how the storage drive may be calculating + # message timestamps (and may not be monkey-patchable). + time.sleep(1) + + _insert_fixtures(self.message_controller, 'test', + project=self.project, client_uuid='my_uuid', + num=6) + + stats = self.controller.stats('test', project=self.project) + message_stats = stats['messages'] + + self.assertEqual(message_stats['free'], 12) + self.assertEqual(message_stats['claimed'], 0) + self.assertEqual(message_stats['total'], 12) + + oldest = message_stats['oldest'] + newest = message_stats['newest'] + + self.assertNotEqual(oldest, newest) + + # NOTE(kgriffs): Ensure "now" is different enough + # for the next comparison to work. + timeutils.set_time_override() + timeutils.advance_time_seconds(10) + + for message_stat in (oldest, newest): + created_iso = message_stat['created'] + created = timeutils.parse_isotime(created_iso) + self.assertThat(timeutils.normalize_time(created), + matchers.LessThan(timeutils.utcnow())) + + self.assertIn('id', message_stat) + + self.assertThat(oldest['created'], + matchers.LessThan(newest['created'])) # Test Queue Deletion self.controller.delete('test', project=self.project) @@ -116,6 +160,20 @@ class QueueControllerTest(ControllerBaseTest): with testing.expect(storage.exceptions.DoesNotExist): self.controller.set_metadata('test', '{}', project=self.project) + def test_stats_for_empty_queue(self): + created = self.controller.create('test', project=self.project) + self.assertTrue(created) + + stats = self.controller.stats('test', project=self.project) + message_stats = stats['messages'] + + self.assertEqual(message_stats['free'], 0) + self.assertEqual(message_stats['claimed'], 0) + self.assertEqual(message_stats['total'], 0) + + self.assertNotIn('newest', message_stats) + self.assertNotIn('oldest', message_stats) + class MessageControllerTest(ControllerBaseTest): """Message Controller base tests. @@ -381,6 +439,7 @@ class ClaimControllerTest(ControllerBaseTest): project=self.project) self.assertEqual(countof['messages']['claimed'], 15) self.assertEqual(countof['messages']['free'], 5) + self.assertEqual(countof['messages']['total'], 20) # Make sure get works claim, messages2 = self.controller.get(self.queue_name, claim_id, @@ -509,5 +568,6 @@ def _insert_fixtures(controller, queue_name, project=None, 'body': { 'event': 'Event number %s' % n }} + controller.post(queue_name, messages(), project=project, client_uuid=client_uuid) diff --git a/marconi/tests/storage/test_impl_mongodb.py b/marconi/tests/storage/test_impl_mongodb.py index 5d3b1c420..639b2dab8 100644 --- a/marconi/tests/storage/test_impl_mongodb.py +++ b/marconi/tests/storage/test_impl_mongodb.py @@ -240,6 +240,20 @@ class MongodbMessageTests(base.MessageControllerTest): message = self.driver.db.messages.find_one({'q': queue, 'p': project}) self.assertEquals(message['k'], messages_per_queue) + def test_empty_queue_exception(self): + queue_name = 'empty-queue-test' + self.queue_controller.create(queue_name) + + self.assertRaises(storage.exceptions.QueueIsEmpty, + self.controller.first, queue_name) + + def test_invalid_sort_option(self): + queue_name = 'empty-queue-test' + self.queue_controller.create(queue_name) + + self.assertRaises(ValueError, + self.controller.first, queue_name, sort=0) + class MongodbClaimTests(base.ClaimControllerTest): driver_class = mongodb.Driver diff --git a/marconi/tests/storage/test_impl_sqlite.py b/marconi/tests/storage/test_impl_sqlite.py index 108b8fc71..ec6115933 100644 --- a/marconi/tests/storage/test_impl_sqlite.py +++ b/marconi/tests/storage/test_impl_sqlite.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from marconi import storage from marconi.storage import sqlite from marconi.storage.sqlite import controllers from marconi.tests.storage import base @@ -27,6 +28,19 @@ class SQliteMessageTests(base.MessageControllerTest): driver_class = sqlite.Driver controller_class = controllers.MessageController + def test_empty_queue_exception(self): + queue_name = 'empty-queue-test' + self.queue_controller.create(queue_name, None) + + self.assertRaises(storage.exceptions.QueueIsEmpty, + self.controller.first, + queue_name, None, sort=1) + + def test_invalid_sort_option(self): + self.assertRaises(ValueError, + self.controller.first, + 'foo', None, sort='dosomething()') + class SQliteClaimTests(base.ClaimControllerTest): driver_class = sqlite.Driver diff --git a/marconi/tests/transport/wsgi/test_messages.py b/marconi/tests/transport/wsgi/test_messages.py index 9ae4e45f0..480f90e66 100644 --- a/marconi/tests/transport/wsgi/test_messages.py +++ b/marconi/tests/transport/wsgi/test_messages.py @@ -17,6 +17,7 @@ import json import os import falcon +from testtools import matchers from marconi.common import config from marconi.tests.transport.wsgi import base @@ -111,7 +112,7 @@ class MessagesBaseTest(base.TestBase): self._test_post(sample_messages) - def test_post_to_mia_queue(self): + def test_post_to_missing_queue(self): self._post_messages('/v1/queues/nonexistent/messages') self.assertEquals(self.srmock.status, falcon.HTTP_404) @@ -223,11 +224,18 @@ class MessagesBaseTest(base.TestBase): body = self.simulate_get(self.queue_path + '/stats', self.project_id) self.assertEquals(self.srmock.status, falcon.HTTP_200) - countof = json.loads(body[0]) + message_stats = json.loads(body[0])['messages'] self.assertEquals(self.srmock.headers_dict['Content-Location'], self.queue_path + '/stats') - self.assertEquals(countof['messages']['free'], 10) + # NOTE(kgriffs): The other parts of the stats are tested + # in tests.storage.base and so are not repeated here. + expected_pattern = self.queue_path + '/messages/[^/]+$' + for message_stat_name in ('oldest', 'newest'): + self.assertThat(message_stats[message_stat_name]['href'], + matchers.MatchesRegex(expected_pattern)) + + # NOTE(kgriffs): Try to get messages for a missing queue self.simulate_get('/v1/queues/nonexistent/messages', self.project_id, headers=self.headers) self.assertEquals(self.srmock.status, falcon.HTTP_204) diff --git a/marconi/transport/wsgi/stats.py b/marconi/transport/wsgi/stats.py index cb46c6694..63c18c4e3 100644 --- a/marconi/transport/wsgi/stats.py +++ b/marconi/transport/wsgi/stats.py @@ -36,6 +36,19 @@ class Resource(object): resp_dict = self.queue_ctrl.stats(queue_name, project=project_id) + message_stats = resp_dict['messages'] + + if message_stats['total'] != 0: + base_path = req.path[:req.path.rindex('/')] + '/messages/' + + newest = message_stats['newest'] + newest['href'] = base_path + newest['id'] + del newest['id'] + + oldest = message_stats['oldest'] + oldest['href'] = base_path + oldest['id'] + del oldest['id'] + resp.content_location = req.path resp.body = helpers.to_json(resp_dict) resp.status = falcon.HTTP_200