Add "total", "oldest" and "newest" to queue stats

This patch introduces three new queue stats in order to expose
a better user experience via control panels. Also included in
the changes is a bug fix for calculating timestamp differences,
as well as a renaming of 'queue' to 'queue_name' for consistency
within the MongoDB driver.

Note that there are several remaining stats identified in the
associated blueprint. This may be addressed in a future
patch.

Implements: blueprint extra-queue-stats
Change-Id: I31e48a581ccdd014ec97674af0c0dd43c808c42c
This commit is contained in:
kgriffs 2013-07-15 18:44:46 -06:00 committed by Gerrit Code Review
parent 5459848e95
commit 901f3e0ca0
13 changed files with 319 additions and 57 deletions

View File

@ -75,6 +75,14 @@ class QueueDoesNotExist(DoesNotExist):
super(QueueDoesNotExist, self).__init__(msg)
class QueueIsEmpty(Exception):
def __init__(self, name, project):
msg = ('Queue %(name)s in project %(project)s is empty' %
dict(name=name, project=project))
super(QueueIsEmpty, self).__init__(msg)
class MessageDoesNotExist(DoesNotExist):
def __init__(self, mid, queue, project):

View File

@ -72,7 +72,7 @@ class ClaimController(storage.ClaimBase):
except ValueError:
raise exceptions.ClaimDoesNotExist()
age = now - utils.oid_utc(cid)
age = timeutils.delta_seconds(utils.oid_utc(cid), now)
def messages(msg_iter):
msg = next(msg_iter)
@ -92,7 +92,7 @@ class ClaimController(storage.ClaimBase):
project=project))
claim = next(msgs)
claim = {
'age': age.seconds,
'age': int(age),
'ttl': claim.pop('t'),
'id': str(claim['id']),
}

View File

@ -122,7 +122,7 @@ class MessageController(storage.MessageBase):
def _get_queue_np(self):
return self._queue_controller._get_np()
def _next_marker(self, queue, project=None):
def _next_marker(self, queue_name, project=None):
"""Retrieves the next message marker for a given queue.
This helper is used to generate monotonic pagination
@ -141,16 +141,15 @@ class MessageController(storage.MessageBase):
mitigate race conditions between producer and
observer clients.
:param queue: queue name
:param queue_name: Determines the scope for the marker
:param project: Queue's project
:returns: next message marker as an integer
"""
document = self._col.find_one({'q': queue, 'p': project},
document = self._col.find_one({'q': queue_name, 'p': project},
sort=[('k', -1)],
fields={'k': 1, '_id': 0})
# NOTE(kgriffs): this approach is faster than using 'or'
return 1 if document is None else (document['k'] + 1)
def _backoff_sleep(self, attempt):
@ -240,7 +239,7 @@ class MessageController(storage.MessageBase):
self._col.remove({'q': queue, 'p': project}, w=0)
def _list(self, queue_name, marker=None, echo=False, client_uuid=None,
fields=None, include_claimed=False, project=None):
fields=None, include_claimed=False, project=None, sort=1):
"""Message document listing helper.
:param queue_name: Name of the queue to list
@ -248,13 +247,20 @@ class MessageController(storage.MessageBase):
:param marker: Message marker from which to start iterating
:param echo: Whether to return messages that match client_uuid
:param client_uuid: UUID for the client that originated this request
:param fields: fields to include in emmitted documents
:param fields: Fields to include in emmitted documents
:param include_claimed: Whether to include claimed messages,
not just active ones
:param sort: (Default 1) Sort order for the listing. Pass 1 for
ascending (oldest message first), or -1 for descending (newest
message first).
:returns: MongoDB "find" generator
:returns: MongoDB cursor
"""
if sort not in (1, -1):
raise ValueError('sort must be either 1 (ascending) '
'or -1 (descending)')
now = timeutils.utcnow()
query = {
@ -270,7 +276,7 @@ class MessageController(storage.MessageBase):
if fields and not isinstance(fields, (dict, list)):
raise TypeError('Fields must be an instance of list / dict')
if not echo and client_uuid:
if not echo and client_uuid is not None:
query['u'] = {'$ne': client_uuid}
if marker:
@ -283,12 +289,51 @@ class MessageController(storage.MessageBase):
# NOTE(flaper87): Suggest the index to use for this query
return self._col.find(query, fields=fields,
sort=[('k', 1)]).hint(self.active_fields)
sort=[('k', sort)]).hint(self.active_fields)
#-----------------------------------------------------------------------
# Interface
#-----------------------------------------------------------------------
def count(self, queue_name, project=None):
"""Return total number of (non-expired) messages in a queue.
This method is designed to very quickly count the number
of messages in a given queue. Expired messages are not
counted, of course. If the queue does not exist, the
count will always be 0.
"""
query = {
# Messages must belong to this queue
'q': queue_name,
'p': project,
# The messages can not be expired
'e': {'$gt': timeutils.utcnow()},
}
return self._col.find(query).count()
def first(self, queue_name, project=None, sort=1):
"""Get first message in the queue (including claimed).
:param queue_id: ObjectID of the queue to list
:param sort: (Default 1) Sort order for the listing. Pass 1 for
ascending (oldest message first), or -1 for descending (newest
message first).
:returns: First message in the queue, or None if the queue is
empty
"""
cursor = self._list(queue_name, project=project,
include_claimed=True, sort=sort).limit(1)
try:
message = next(cursor)
except StopIteration:
raise exceptions.QueueIsEmpty(queue_name, project)
return message
def active(self, queue_name, marker=None, echo=False,
client_uuid=None, fields=None, project=None):
@ -297,7 +342,6 @@ class MessageController(storage.MessageBase):
def claimed(self, queue_name, claim_id=None,
expires=None, limit=None, project=None):
query = {
'c.id': claim_id,
'c.e': {'$gt': expires or timeutils.utcnow()},
@ -362,7 +406,7 @@ class MessageController(storage.MessageBase):
for name, project in self._get_queue_np():
self._remove_expired(name, project)
def list(self, queue, project=None, marker=None, limit=10,
def list(self, queue_name, project=None, marker=None, limit=10,
echo=False, client_uuid=None, include_claimed=False):
if marker is not None:
@ -371,7 +415,7 @@ class MessageController(storage.MessageBase):
except ValueError:
raise exceptions.MalformedMarker()
messages = self._list(queue, marker, echo, client_uuid,
messages = self._list(queue_name, marker, echo, client_uuid,
include_claimed=include_claimed, project=project)
messages = messages.limit(limit)
@ -388,13 +432,13 @@ class MessageController(storage.MessageBase):
yield str(marker_id['next'])
@utils.raises_conn_error
def get(self, queue, message_id, project=None):
def get(self, queue_name, message_id, project=None):
mid = utils.to_oid(message_id)
now = timeutils.utcnow()
query = {
'_id': mid,
'q': queue,
'q': queue_name,
'p': project,
'e': {'$gt': now}
}
@ -402,18 +446,19 @@ class MessageController(storage.MessageBase):
message = list(self._col.find(query).limit(1).hint([('_id', 1)]))
if not message:
raise exceptions.MessageDoesNotExist(message_id, queue, project)
raise exceptions.MessageDoesNotExist(message_id, queue_name,
project)
return _basic_message(message[0], now)
@utils.raises_conn_error
def bulk_get(self, queue, message_ids, project=None):
def bulk_get(self, queue_name, message_ids, project=None):
message_ids = [utils.to_oid(id) for id in message_ids]
now = timeutils.utcnow()
# Base query, always check expire time
query = {
'q': queue,
'q': queue_name,
'p': project,
'_id': {'$in': message_ids},
'e': {'$gt': now},
@ -429,14 +474,14 @@ class MessageController(storage.MessageBase):
return utils.HookedCursor(messages, denormalizer)
@utils.raises_conn_error
def post(self, queue, messages, client_uuid, project=None):
def post(self, queue_name, messages, client_uuid, project=None):
now = timeutils.utcnow()
# NOTE(flaper87): We need to assert the queue exists
self._get_queue_id(queue, project)
self._get_queue_id(queue_name, project)
# Set the next basis marker for the first attempt.
next_marker = self._next_marker(queue, project)
next_marker = self._next_marker(queue_name, project)
# Results are aggregated across all attempts
# NOTE(kgriffs): lazy instantiation
@ -451,7 +496,7 @@ class MessageController(storage.MessageBase):
message_gen = (
{
't': message['ttl'],
'q': queue,
'q': queue_name,
'p': project,
'e': now + datetime.timedelta(seconds=message['ttl']),
'u': client_uuid,
@ -483,7 +528,7 @@ class MessageController(storage.MessageBase):
message = _('%(attempts)d attempt(s) required to post '
'%(num_messages)d messages to queue '
'%(queue_name)s and project %(project)s')
message %= dict(queue_name=queue, attempts=attempt + 1,
message %= dict(queue_name=queue_name, attempts=attempt+1,
num_messages=len(ids), project=project)
LOG.debug(message)
@ -501,7 +546,7 @@ class MessageController(storage.MessageBase):
# TODO(kgriffs): Add transaction ID to help match up loglines
if attempt == 0:
message = _('First attempt failed while adding messages '
'to queue %s for current request') % queue
'to queue %s for current request') % queue_name
LOG.debug(message)
@ -535,7 +580,7 @@ class MessageController(storage.MessageBase):
# Retry the remaining messages with a new sequence
# of markers.
prepared_messages = cached_messages[failed_index:]
next_marker = self._next_marker(queue, project)
next_marker = self._next_marker(queue_name, project)
for index, message in enumerate(prepared_messages):
message['k'] = next_marker + index
@ -553,21 +598,21 @@ class MessageController(storage.MessageBase):
message = _('Hit maximum number of attempts (%(max)s) for queue '
'%(id)s in project %(project)s')
message %= dict(max=options.CFG.max_attempts, id=queue,
message %= dict(max=options.CFG.max_attempts, id=queue_name,
project=project)
LOG.warning(message)
succeeded_ids = map(str, aggregated_results)
raise exceptions.MessageConflict(queue, project, succeeded_ids)
raise exceptions.MessageConflict(queue_name, project, succeeded_ids)
@utils.raises_conn_error
def delete(self, queue, message_id, project=None, claim=None):
def delete(self, queue_name, message_id, project=None, claim=None):
try:
mid = utils.to_oid(message_id)
query = {
'q': queue,
'q': queue_name,
'p': project,
'_id': mid
}
@ -594,11 +639,11 @@ class MessageController(storage.MessageBase):
pass
@utils.raises_conn_error
def bulk_delete(self, queue, message_ids, project=None):
def bulk_delete(self, queue_name, message_ids, project=None):
try:
message_ids = [utils.to_oid(id) for id in message_ids]
query = {
'q': queue,
'q': queue_name,
'p': project,
'_id': {'$in': message_ids},
}
@ -611,11 +656,11 @@ class MessageController(storage.MessageBase):
def _basic_message(msg, now):
oid = msg['_id']
age = now - utils.oid_utc(oid)
age = timeutils.delta_seconds(utils.oid_utc(oid), now)
return {
'id': str(oid),
'age': age.seconds,
'age': int(age),
'ttl': msg['t'],
'body': msg['b'],
}

View File

@ -24,6 +24,7 @@ Field Mappings:
import pymongo.errors
import marconi.openstack.common.log as logging
from marconi.openstack.common import timeutils
from marconi import storage
from marconi.storage import exceptions
from marconi.storage.mongodb import utils
@ -143,19 +144,33 @@ class QueueController(storage.QueueBase):
@utils.raises_conn_error
def stats(self, name, project=None):
self._get_id(name, project)
controller = self.driver.message_controller
active = controller.active(name, project=project)
claimed = controller.claimed(name, project=project)
if not self.exists(name, project=project):
raise exceptions.QueueDoesNotExist(name, project)
return {
'actions': 0,
'messages': {
'claimed': claimed.count(),
'free': active.count(),
}
controller = self.driver.message_controller
active = controller.active(name, project=project).count()
total = controller.count(name, project=project)
message_stats = {
'claimed': total - active,
'free': active,
'total': total,
}
if total != 0:
try:
oldest = controller.first(name, project=project, sort=1)
newest = controller.first(name, project=project, sort=-1)
except exceptions.QueueIsEmpty:
pass
else:
now = timeutils.utcnow()
message_stats['oldest'] = utils.stat_message(oldest, now)
message_stats['newest'] = utils.stat_message(newest, now)
return {'messages': message_stats}
@utils.raises_conn_error
def actions(self, name, project=None, marker=None, limit=10):
raise NotImplementedError

View File

@ -140,6 +140,19 @@ def oid_utc(oid):
raise TypeError('Expected ObjectId and got %s' % type(oid))
def stat_message(message, now):
"""Creates a stat document from the given message, relative to now."""
oid = message['_id']
created = oid_utc(oid)
age = timeutils.delta_seconds(created, now)
return {
'id': str(oid),
'age': int(age),
'created': timeutils.isotime(created),
}
def raises_conn_error(func):
"""Handles mongodb ConnectionFailure error
@ -156,6 +169,7 @@ def raises_conn_error(func):
msg = "ConnectionFailure caught"
LOG.error(msg)
raise storage_exceptions.ConnectionError(msg)
return wrapper

View File

@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from marconi.openstack.common import timeutils
from marconi.storage import base
from marconi.storage import exceptions
from marconi.storage.sqlite import utils
@ -84,6 +84,46 @@ class MessageController(base.MessageBase):
'body': content,
}
def first(self, queue, project, sort=1):
if project is None:
project = ''
with self.driver('deferred'):
sql = '''
select id, content, ttl, created,
julianday() * 86400.0 - created
from Messages
where ttl > julianday() * 86400.0 - created
and qid = ?
order by id %s
limit 1'''
if sort not in (1, -1):
raise ValueError('sort must be either 1 (ascending) '
'or -1 (descending)')
sql = sql % ('DESC' if sort == -1 else 'ASC')
args = [utils.get_qid(self.driver, queue, project)]
records = self.driver.run(sql, *args)
try:
id, content, ttl, created, age = next(records)
except StopIteration:
raise exceptions.QueueIsEmpty(queue, project)
created_unix = utils.julian_to_unix(created)
created_iso8601 = timeutils.iso8601_from_timestamp(created_unix)
return {
'id': utils.msgid_encode(id),
'ttl': ttl,
'created': created_iso8601,
'age': age,
'body': content,
}
def list(self, queue, project, marker=None, limit=10,
echo=False, client_uuid=None, include_claimed=False):

View File

@ -10,6 +10,7 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
@ -146,13 +147,27 @@ class QueueController(base.QueueBase):
and qid = ?)
''', qid, qid)
return {
'messages': {
'claimed': claimed,
'free': free,
},
'actions': 0,
total = free + claimed
message_stats = {
'claimed': claimed,
'free': free,
'total': total,
}
if total != 0:
message_controller = self.driver.message_controller
try:
oldest = message_controller.first(name, project, sort=1)
newest = message_controller.first(name, project, sort=-1)
except exceptions.QueueIsEmpty:
pass
else:
message_stats['oldest'] = utils.stat_message(oldest)
message_stats['newest'] = utils.stat_message(newest)
return {'messages': message_stats}
def actions(self, name, project, marker=None, limit=10):
raise NotImplementedError

View File

@ -16,6 +16,8 @@
from marconi.storage import exceptions
UNIX_EPOCH_AS_JULIAN_SEC = 2440587.5 * 86400.0
class NoResult(Exception):
pass
@ -77,3 +79,17 @@ def cid_decode(id):
except ValueError:
raise exceptions.MalformedID()
def julian_to_unix(julian_sec):
"""Converts Julian timestamp, in seconds, to a UNIX timestamp."""
return int(round(julian_sec - UNIX_EPOCH_AS_JULIAN_SEC))
def stat_message(message):
"""Creates a stat document based on a message."""
return {
'id': message['id'],
'age': message['age'],
'created': message['created'],
}

View File

@ -13,6 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import time
from testtools import matchers
from marconi.openstack.common import timeutils
from marconi import storage
from marconi.storage import exceptions
from marconi.tests import util as testing
@ -47,6 +52,10 @@ class QueueControllerTest(ControllerBaseTest):
self.message_controller = self.driver.message_controller
self.claim_controller = self.driver.claim_controller
def tearDown(self):
timeutils.clear_time_override()
super(QueueControllerTest, self).tearDown()
def test_list(self):
num = 15
for queue in xrange(num):
@ -98,10 +107,45 @@ class QueueControllerTest(ControllerBaseTest):
# Test Queue Statistic
_insert_fixtures(self.message_controller, 'test',
project=self.project, client_uuid='my_uuid', num=12)
project=self.project, client_uuid='my_uuid',
num=6)
countof = self.controller.stats('test', project=self.project)
self.assertEqual(countof['messages']['free'], 12)
# NOTE(kgriffs): We can't get around doing this, because
# we don't know how the storage drive may be calculating
# message timestamps (and may not be monkey-patchable).
time.sleep(1)
_insert_fixtures(self.message_controller, 'test',
project=self.project, client_uuid='my_uuid',
num=6)
stats = self.controller.stats('test', project=self.project)
message_stats = stats['messages']
self.assertEqual(message_stats['free'], 12)
self.assertEqual(message_stats['claimed'], 0)
self.assertEqual(message_stats['total'], 12)
oldest = message_stats['oldest']
newest = message_stats['newest']
self.assertNotEqual(oldest, newest)
# NOTE(kgriffs): Ensure "now" is different enough
# for the next comparison to work.
timeutils.set_time_override()
timeutils.advance_time_seconds(10)
for message_stat in (oldest, newest):
created_iso = message_stat['created']
created = timeutils.parse_isotime(created_iso)
self.assertThat(timeutils.normalize_time(created),
matchers.LessThan(timeutils.utcnow()))
self.assertIn('id', message_stat)
self.assertThat(oldest['created'],
matchers.LessThan(newest['created']))
# Test Queue Deletion
self.controller.delete('test', project=self.project)
@ -116,6 +160,20 @@ class QueueControllerTest(ControllerBaseTest):
with testing.expect(storage.exceptions.DoesNotExist):
self.controller.set_metadata('test', '{}', project=self.project)
def test_stats_for_empty_queue(self):
created = self.controller.create('test', project=self.project)
self.assertTrue(created)
stats = self.controller.stats('test', project=self.project)
message_stats = stats['messages']
self.assertEqual(message_stats['free'], 0)
self.assertEqual(message_stats['claimed'], 0)
self.assertEqual(message_stats['total'], 0)
self.assertNotIn('newest', message_stats)
self.assertNotIn('oldest', message_stats)
class MessageControllerTest(ControllerBaseTest):
"""Message Controller base tests.
@ -381,6 +439,7 @@ class ClaimControllerTest(ControllerBaseTest):
project=self.project)
self.assertEqual(countof['messages']['claimed'], 15)
self.assertEqual(countof['messages']['free'], 5)
self.assertEqual(countof['messages']['total'], 20)
# Make sure get works
claim, messages2 = self.controller.get(self.queue_name, claim_id,
@ -509,5 +568,6 @@ def _insert_fixtures(controller, queue_name, project=None,
'body': {
'event': 'Event number %s' % n
}}
controller.post(queue_name, messages(),
project=project, client_uuid=client_uuid)

View File

@ -240,6 +240,20 @@ class MongodbMessageTests(base.MessageControllerTest):
message = self.driver.db.messages.find_one({'q': queue, 'p': project})
self.assertEquals(message['k'], messages_per_queue)
def test_empty_queue_exception(self):
queue_name = 'empty-queue-test'
self.queue_controller.create(queue_name)
self.assertRaises(storage.exceptions.QueueIsEmpty,
self.controller.first, queue_name)
def test_invalid_sort_option(self):
queue_name = 'empty-queue-test'
self.queue_controller.create(queue_name)
self.assertRaises(ValueError,
self.controller.first, queue_name, sort=0)
class MongodbClaimTests(base.ClaimControllerTest):
driver_class = mongodb.Driver

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from marconi import storage
from marconi.storage import sqlite
from marconi.storage.sqlite import controllers
from marconi.tests.storage import base
@ -27,6 +28,19 @@ class SQliteMessageTests(base.MessageControllerTest):
driver_class = sqlite.Driver
controller_class = controllers.MessageController
def test_empty_queue_exception(self):
queue_name = 'empty-queue-test'
self.queue_controller.create(queue_name, None)
self.assertRaises(storage.exceptions.QueueIsEmpty,
self.controller.first,
queue_name, None, sort=1)
def test_invalid_sort_option(self):
self.assertRaises(ValueError,
self.controller.first,
'foo', None, sort='dosomething()')
class SQliteClaimTests(base.ClaimControllerTest):
driver_class = sqlite.Driver

View File

@ -17,6 +17,7 @@ import json
import os
import falcon
from testtools import matchers
from marconi.common import config
from marconi.tests.transport.wsgi import base
@ -111,7 +112,7 @@ class MessagesBaseTest(base.TestBase):
self._test_post(sample_messages)
def test_post_to_mia_queue(self):
def test_post_to_missing_queue(self):
self._post_messages('/v1/queues/nonexistent/messages')
self.assertEquals(self.srmock.status, falcon.HTTP_404)
@ -223,11 +224,18 @@ class MessagesBaseTest(base.TestBase):
body = self.simulate_get(self.queue_path + '/stats', self.project_id)
self.assertEquals(self.srmock.status, falcon.HTTP_200)
countof = json.loads(body[0])
message_stats = json.loads(body[0])['messages']
self.assertEquals(self.srmock.headers_dict['Content-Location'],
self.queue_path + '/stats')
self.assertEquals(countof['messages']['free'], 10)
# NOTE(kgriffs): The other parts of the stats are tested
# in tests.storage.base and so are not repeated here.
expected_pattern = self.queue_path + '/messages/[^/]+$'
for message_stat_name in ('oldest', 'newest'):
self.assertThat(message_stats[message_stat_name]['href'],
matchers.MatchesRegex(expected_pattern))
# NOTE(kgriffs): Try to get messages for a missing queue
self.simulate_get('/v1/queues/nonexistent/messages', self.project_id,
headers=self.headers)
self.assertEquals(self.srmock.status, falcon.HTTP_204)

View File

@ -36,6 +36,19 @@ class Resource(object):
resp_dict = self.queue_ctrl.stats(queue_name,
project=project_id)
message_stats = resp_dict['messages']
if message_stats['total'] != 0:
base_path = req.path[:req.path.rindex('/')] + '/messages/'
newest = message_stats['newest']
newest['href'] = base_path + newest['id']
del newest['id']
oldest = message_stats['oldest']
oldest['href'] = base_path + oldest['id']
del oldest['id']
resp.content_location = req.path
resp.body = helpers.to_json(resp_dict)
resp.status = falcon.HTTP_200