Calculate stats dynamically instead of using side counters

Side counters are error-prone, and there are cases where they can
get out of sync if we lose a connection to the DB at the wrong
moment. Also, they are difficult to update when performing GC
operations.

This patch mostly removes the use of counters to track stats and
does the calculations on the fly. These calculations should still be
quite fast, since we are still using a per-claim counter (which
is easier to maintain than the per-queue counters).

Change-Id: I5d63a3791afd8d749890c5996208568c974f71d1
Partially-Implements: blueprint redis-storage-driver
This commit is contained in:
kgriffs 2014-09-01 23:11:27 -05:00
parent 00e5157f0c
commit 0562485a09
5 changed files with 169 additions and 147 deletions

View File

@ -192,29 +192,6 @@ class RedisQueuesTest(base.QueueControllerTest):
super(RedisQueuesTest, self).tearDown()
self.connection.flushdb()
def test_inc_counter(self):
queue_name = 'inc-counter'
self.controller.create(queue_name)
self.controller._inc_counter(queue_name, None, 10)
scoped_q_name = utils.scope_queue_name(queue_name)
count = self.controller._get_queue_info(scoped_q_name, b'c', int)[0]
self.assertEqual(count, 10)
def test_inc_claimed(self):
self.addCleanup(self.controller.delete, 'test-queue',
project=self.project)
queue_name = 'inc-claimed'
self.controller.create(queue_name)
self.controller._inc_claimed(queue_name, None, 10)
scoped_q_name = utils.scope_queue_name(queue_name)
claimed = self.controller._get_queue_info(scoped_q_name,
b'cl', int)[0]
self.assertEqual(claimed, 10)
@testing.requires_redis
class RedisMessagesTest(base.MessageControllerTest):
@ -231,7 +208,7 @@ class RedisMessagesTest(base.MessageControllerTest):
super(RedisMessagesTest, self).tearDown()
self.connection.flushdb()
def test_get_count(self):
def test_count(self):
queue_name = 'get-count'
self.queue_ctrl.create(queue_name)
@ -244,10 +221,7 @@ class RedisMessagesTest(base.MessageControllerTest):
# Creating 10 messages
self.controller.post(queue_name, msgs, client_id)
messages_set_id = utils.scope_message_ids_set(queue_name, None,
'messages')
num_msg = self.controller._get_count(messages_set_id)
num_msg = self.controller._count(queue_name, None)
self.assertEqual(num_msg, 10)
def test_empty_queue_exception(self):

View File

@ -32,6 +32,14 @@ CLAIM_MESSAGES_SUFFIX = 'messages'
RETRY_CLAIM_TIMEOUT = 10
# NOTE(kgriffs): Number of claims to read at a time when counting
# the total number of claimed messages for a queue.
#
# TODO(kgriffs): Tune this parameter and/or make it configurable. It
# takes ~0.8 ms to retrieve 100 items from a sorted set on a 2.7 GHz
# Intel Core i7 (not including network latency).
COUNTING_BATCH_SIZE = 100
class ClaimController(storage.Claim):
"""Implements claim resource operations using Redis.
@ -60,6 +68,7 @@ class ClaimController(storage.Claim):
ttl -> t
id -> id
expires -> e
num_messages -> n
"""
def __init__(self, *args, **kwargs):
super(ClaimController, self).__init__(*args, **kwargs)
@ -69,6 +78,14 @@ class ClaimController(storage.Claim):
use_bin_type=True).pack
self._unpacker = functools.partial(msgpack.unpackb, encoding='utf-8')
@decorators.lazy_property(write=False)
def _message_ctrl(self):
return self.driver.message_controller
@decorators.lazy_property(write=False)
def _queue_ctrl(self):
return self.driver.queue_controller
def _get_claim_info(self, claim_id, fields, transform=int):
"""Get one or more fields from the claim Info."""
@ -97,16 +114,66 @@ class ClaimController(storage.Claim):
return True
def _get_claimed_message_keys(self, claim_id):
return self._client.lrange(claim_id, 0, -1)
def _get_claimed_message_keys(self, claim_msgs_key):
return self._client.lrange(claim_msgs_key, 0, -1)
@decorators.lazy_property(write=False)
def _message_ctrl(self):
return self.driver.message_controller
def _count_messages(self, queue, project):
"""Count and return the total number of claimed messages."""
@decorators.lazy_property(write=False)
def _queue_ctrl(self):
return self.driver.queue_controller
# NOTE(kgriffs): Iterate through all claims, adding up the
# number of messages per claim. This is obviously slower
# than keeping a side counter, but is also less error-prone.
# Plus, it avoids having to do a lot of extra work during
# garbage collection passes. Also, considering that most
# workloads won't require a large number of claims, most of
# the time we can do this in a single pass, so it is still
# pretty fast.
claims_set_key = utils.scope_claims_set(queue, project,
QUEUE_CLAIMS_SUFFIX)
num_claimed = 0
offset = 0
while True:
claim_ids = self._client.zrange(claims_set_key, offset,
offset + COUNTING_BATCH_SIZE - 1)
if not claim_ids:
break
offset += len(claim_ids)
with self._client.pipeline() as pipe:
for cid in claim_ids:
pipe.hmget(cid, 'n')
claim_infos = pipe.execute()
for info in claim_infos:
# NOTE(kgriffs): In case the claim was deleted out
# from under us, sanity-check that we got a non-None
# info list.
if info:
num_claimed += int(info[0])
return num_claimed
def _del_message(self, queue, project, claim_id, message_id, pipe):
"""Called by MessageController when messages are being deleted.
This method removes the message from claim data structures.
"""
claim_msgs_key = utils.scope_claim_messages(claim_id,
CLAIM_MESSAGES_SUFFIX)
# NOTE(kgriffs): In practice, scanning will be quite fast,
# since the usual pattern is to delete messages from oldest
# to newest, and the list is sorted in that order. Also,
# the length of the list will usually be ~10 messages.
pipe.lrem(claim_msgs_key, 1, message_id)
# NOTE(kgriffs): Decrement the message counter used for stats
pipe.hincrby(claim_id, 'n', -1)
@utils.raises_conn_error
@utils.retries_on_connection_error
@ -210,6 +277,7 @@ class ClaimController(storage.Claim):
cursor = next(results)
msg_list = list(cursor)
num_messages = len(msg_list)
# NOTE(kgriffs): If there are no active messages to
# claim, simply return an empty list.
@ -274,6 +342,7 @@ class ClaimController(storage.Claim):
'id': claim_id,
't': claim_ttl,
'e': claim_expires,
'n': num_messages,
}
pipe.hmset(claim_id, claim_info)
@ -285,14 +354,8 @@ class ClaimController(storage.Claim):
# A sorted set is used to facilitate cleaning
# up the IDs of expired claims.
pipe.zadd(claims_set_key, claim_expires, claim_id)
# NOTE(kgriffs): Update a counter that facilitates
# the queue stats calculation.
self._queue_ctrl._inc_claimed(queue, project,
len(msg_list),
pipe=pipe)
pipe.execute()
return claim_id, basic_messages
except redis.exceptions.WatchError:
@ -405,10 +468,6 @@ class ClaimController(storage.Claim):
# have changed.
msg.to_redis(pipe)
self._queue_ctrl._inc_claimed(queue, project,
-1 * len(claimed_msgs),
pipe=pipe)
pipe.execute()

View File

@ -111,12 +111,16 @@ class MessageController(storage.Message):
include_claimed=False,
limit=limit, to_basic=False)
def _get_count(self, msgset_key):
"""Get num messages in a Queue.
def _count(self, queue, project):
"""Return total number of messages in a queue.
Return the number of messages in a queue scoped by
queue and project.
Note: Some expired messages may be included in the count if
they haven't been GC'd yet. This is done for performance.
"""
msgset_key = utils.scope_message_ids_set(queue, project,
MESSAGE_IDS_SUFFIX)
return self._client.zcard(msgset_key)
def _create_msgset(self, queue, project, pipe):
@ -175,13 +179,9 @@ class MessageController(storage.Message):
if not utils.msg_claimed_filter(msg, now):
return msg.id
def _exists(self, key):
"""Check if message exists in the Queue.
Helper function which checks if a particular message_id
exists in the sorted set of the queues message ids.
"""
return self._client.exists(key)
def _exists(self, message_id):
"""Check if message exists in the Queue."""
return self._client.exists(message_id)
def _get_first_message_id(self, queue, project, sort):
"""Fetch head/tail of the Queue.
@ -201,18 +201,32 @@ class MessageController(storage.Message):
return Message.from_redis(msg) if msg else None
def _get_claim(self, message_id):
"""Gets minimal claim doc for a message.
:returns: {'id': cid, 'expires': ts} IFF the message is claimed,
and that claim has not expired.
"""
claim = self._client.hmget(message_id, 'c', 'c.e')
if claim == [None, None]:
# NOTE(kgriffs): message_id was not found
return None
return {
info = {
# NOTE(kgriffs): A "None" claim is serialized as an empty str
'id': strutils.safe_decode(claim[0]) or None,
'expires': int(claim[1]),
}
# Is the message claimed?
now = timeutils.utcnow_ts()
if info['id'] and (now < info['expires']):
return info
# Not claimed
return None
def _list(self, queue, project=None, marker=None,
limit=storage.DEFAULT_MESSAGES_PER_PAGE,
echo=False, client_uuid=None,
@ -456,11 +470,8 @@ class MessageController(storage.Message):
keys.append(msg.id)
pipe.incrby(counter_key, len(keys))
self._queue_ctrl._inc_counter(queue, project,
len(prepared_messages),
pipe=pipe)
pipe.execute()
return keys
except redis.exceptions.WatchError:
@ -474,6 +485,11 @@ class MessageController(storage.Message):
if not self._queue_ctrl.exists(queue, project):
raise errors.QueueDoesNotExist(queue, project)
# NOTE(kgriffs): The message does not exist, so
# it is essentially "already" deleted.
if not self._exists(message_id):
return
# TODO(kgriffs): Create decorator for validating claim and message
# IDs, since those are not checked at the transport layer. This
# decorator should be applied to all relevant methods.
@ -484,15 +500,9 @@ class MessageController(storage.Message):
raise errors.ClaimDoesNotExist(queue, project, claim)
msg_claim = self._get_claim(message_id)
is_claimed = (msg_claim is not None)
# NOTE(kgriffs): The message does not exist, so
# it is essentially "already deleted".
if msg_claim is None:
return
now = timeutils.utcnow_ts()
is_claimed = msg_claim['id'] and (now < msg_claim['expires'])
# Authorize the request based on having the correct claim ID
if claim is None:
if is_claimed:
raise errors.MessageIsClaimed(message_id)
@ -512,13 +522,12 @@ class MessageController(storage.Message):
pipe.delete(message_id)
pipe.zrem(msgset_key, message_id)
results = pipe.execute()
if is_claimed:
self._claim_ctrl._del_message(queue, project,
msg_claim['id'], message_id,
pipe)
# NOTE(prashanthr_): results[0] is 1 when the delete is
# successful. Hence we use that case to identify successful
# deletes.
if results[0] == 1:
self._queue_ctrl._inc_counter(queue, project, -1)
pipe.execute()
@utils.raises_conn_error
@utils.retries_on_connection_error
@ -532,17 +541,18 @@ class MessageController(storage.Message):
with self._client.pipeline() as pipe:
for mid in message_ids:
if not self._exists(mid):
continue
pipe.delete(mid)
pipe.zrem(msgset_key, mid)
results = pipe.execute()
# NOTE(prashanthr_): None is returned for the cases where
# the message might not exist or has been deleted/expired.
# Hence we calculate the number of deletes as the
# total number of message ids - number of failed deletes.
amount = -1 * (len(results) - results.count(0)) / 2
self._queue_ctrl._inc_counter(queue, project, int(amount))
msg_claim = self._get_claim(mid)
if msg_claim is not None:
self._claim_ctrl._del_message(queue, project,
msg_claim['id'], mid,
pipe)
pipe.execute()
@utils.raises_conn_error
@utils.retries_on_connection_error

View File

@ -22,7 +22,6 @@ from zaqar.openstack.common import log as logging
from zaqar.openstack.common import timeutils
from zaqar.queues import storage
from zaqar.queues.storage import errors
from zaqar.queues.storage.redis import messages
from zaqar.queues.storage.redis import utils
LOG = logging.getLogger(__name__)
@ -55,8 +54,6 @@ class QueueController(storage.Queue):
Name Field
-------------------------------
count -> c
num_msgs_claimed -> cl
metadata -> m
creation timestamp -> t
"""
@ -72,49 +69,9 @@ class QueueController(storage.Queue):
def _message_ctrl(self):
return self.driver.message_controller
def _claim_counter_key(self, name, project):
return utils.scope_queue_name(name, project)
def _inc_counter(self, name, project, amount=1, pipe=None):
queue_key = utils.scope_queue_name(name, project)
client = pipe if pipe is not None else self._client
client.hincrby(queue_key, 'c', amount)
def _inc_claimed(self, name, project, amount=1, pipe=None):
queue_key = utils.scope_queue_name(name, project)
client = pipe if pipe is not None else self._client
client.hincrby(queue_key, 'cl', amount)
# TODO(kgriffs): Remove or optimize
def _get_expired_message_count(self, name, project):
"""Calculate the number of expired messages in the queue.
Used to compute the stats on the queue.
Method has O(n) complexity as we iterate the entire list of
messages.
"""
messages_set_key = utils.scope_message_ids_set(name, project,
MESSAGE_IDS_SUFFIX)
with self._client.pipeline() as pipe:
for msg_key in self._client.zrange(messages_set_key, 0, -1):
pipe.hgetall(msg_key)
raw_messages = pipe.execute()
expired = 0
now = timeutils.utcnow_ts()
for msg in raw_messages:
if msg:
msg = messages.Message.from_redis(msg)
if utils.msg_expired_filter(msg, now):
expired += 1
return expired
@decorators.lazy_property(write=False)
def _claim_ctrl(self):
return self.driver.claim_controller
def _get_queue_info(self, queue_key, fields, transform=str):
"""Get one or more fields from Queue Info."""
@ -232,24 +189,27 @@ class QueueController(storage.Queue):
if not self.exists(name, project=project):
raise errors.QueueDoesNotExist(name, project)
queue_key = utils.scope_queue_name(name, project)
total = self._message_ctrl._count(name, project)
claimed, total = self._get_queue_info(queue_key, [b'cl', b'c'], int)
expired = self._get_expired_message_count(name, project)
if total:
claimed = self._claim_ctrl._count_messages(name, project)
else:
claimed = 0
message_stats = {
'claimed': claimed,
'free': total - claimed - expired,
'total': total
'free': total - claimed,
'total': total,
}
try:
newest = self._message_ctrl.first(name, project, -1)
oldest = self._message_ctrl.first(name, project, 1)
except errors.QueueIsEmpty:
pass
else:
message_stats['newest'] = newest
message_stats['oldest'] = oldest
if total:
try:
newest = self._message_ctrl.first(name, project, -1)
oldest = self._message_ctrl.first(name, project, 1)
except errors.QueueIsEmpty:
pass
else:
message_stats['newest'] = newest
message_stats['oldest'] = oldest
return {'messages': message_stats}

View File

@ -314,9 +314,9 @@ class QueueControllerTest(ControllerBaseTest):
self.assertTrue(created)
# Create 15 messages.
_insert_fixtures(self.message_controller, queue_name,
project=self.project,
client_uuid=client_uuid, num=15)
msg_keys = _insert_fixtures(self.message_controller, queue_name,
project=self.project,
client_uuid=client_uuid, num=15)
stats = self.controller.stats(queue_name,
self.project)['messages']
@ -331,6 +331,25 @@ class QueueControllerTest(ControllerBaseTest):
self.project)['messages']
self.assertEqual(stats['claimed'], 10)
# Delete one message and ensure stats are updated even
# thought the claim itself has not been deleted.
self.message_controller.delete(queue_name, msg_keys[0],
self.project, claim_id)
stats = self.controller.stats(queue_name,
self.project)['messages']
self.assertEqual(stats['total'], 14)
self.assertEqual(stats['claimed'], 9)
self.assertEqual(stats['free'], 5)
# Same thing but use bulk_delete interface
self.message_controller.bulk_delete(queue_name, msg_keys[1:3],
self.project)
stats = self.controller.stats(queue_name,
self.project)['messages']
self.assertEqual(stats['total'], 12)
self.assertEqual(stats['claimed'], 7)
self.assertEqual(stats['free'], 5)
# Delete the claim
self.claim_controller.delete(queue_name, claim_id,
self.project)