diff --git a/tests/unit/queues/storage/test_impl_redis.py b/tests/unit/queues/storage/test_impl_redis.py index 5d74c4605..b9dfaceb5 100644 --- a/tests/unit/queues/storage/test_impl_redis.py +++ b/tests/unit/queues/storage/test_impl_redis.py @@ -192,29 +192,6 @@ class RedisQueuesTest(base.QueueControllerTest): super(RedisQueuesTest, self).tearDown() self.connection.flushdb() - def test_inc_counter(self): - queue_name = 'inc-counter' - self.controller.create(queue_name) - self.controller._inc_counter(queue_name, None, 10) - - scoped_q_name = utils.scope_queue_name(queue_name) - count = self.controller._get_queue_info(scoped_q_name, b'c', int)[0] - self.assertEqual(count, 10) - - def test_inc_claimed(self): - self.addCleanup(self.controller.delete, 'test-queue', - project=self.project) - - queue_name = 'inc-claimed' - - self.controller.create(queue_name) - self.controller._inc_claimed(queue_name, None, 10) - - scoped_q_name = utils.scope_queue_name(queue_name) - claimed = self.controller._get_queue_info(scoped_q_name, - b'cl', int)[0] - self.assertEqual(claimed, 10) - @testing.requires_redis class RedisMessagesTest(base.MessageControllerTest): @@ -231,7 +208,7 @@ class RedisMessagesTest(base.MessageControllerTest): super(RedisMessagesTest, self).tearDown() self.connection.flushdb() - def test_get_count(self): + def test_count(self): queue_name = 'get-count' self.queue_ctrl.create(queue_name) @@ -244,10 +221,7 @@ class RedisMessagesTest(base.MessageControllerTest): # Creating 10 messages self.controller.post(queue_name, msgs, client_id) - messages_set_id = utils.scope_message_ids_set(queue_name, None, - 'messages') - - num_msg = self.controller._get_count(messages_set_id) + num_msg = self.controller._count(queue_name, None) self.assertEqual(num_msg, 10) def test_empty_queue_exception(self): diff --git a/zaqar/queues/storage/redis/claims.py b/zaqar/queues/storage/redis/claims.py index 33d206fd7..d590bf0e1 100644 --- a/zaqar/queues/storage/redis/claims.py +++ b/zaqar/queues/storage/redis/claims.py @@ -32,6 +32,14 @@ CLAIM_MESSAGES_SUFFIX = 'messages' RETRY_CLAIM_TIMEOUT = 10 +# NOTE(kgriffs): Number of claims to read at a time when counting +# the total number of claimed messages for a queue. +# +# TODO(kgriffs): Tune this parameter and/or make it configurable. It +# takes ~0.8 ms to retrieve 100 items from a sorted set on a 2.7 GHz +# Intel Core i7 (not including network latency). +COUNTING_BATCH_SIZE = 100 + class ClaimController(storage.Claim): """Implements claim resource operations using Redis. @@ -60,6 +68,7 @@ class ClaimController(storage.Claim): ttl -> t id -> id expires -> e + num_messages -> n """ def __init__(self, *args, **kwargs): super(ClaimController, self).__init__(*args, **kwargs) @@ -69,6 +78,14 @@ class ClaimController(storage.Claim): use_bin_type=True).pack self._unpacker = functools.partial(msgpack.unpackb, encoding='utf-8') + @decorators.lazy_property(write=False) + def _message_ctrl(self): + return self.driver.message_controller + + @decorators.lazy_property(write=False) + def _queue_ctrl(self): + return self.driver.queue_controller + def _get_claim_info(self, claim_id, fields, transform=int): """Get one or more fields from the claim Info.""" @@ -97,16 +114,66 @@ class ClaimController(storage.Claim): return True - def _get_claimed_message_keys(self, claim_id): - return self._client.lrange(claim_id, 0, -1) + def _get_claimed_message_keys(self, claim_msgs_key): + return self._client.lrange(claim_msgs_key, 0, -1) - @decorators.lazy_property(write=False) - def _message_ctrl(self): - return self.driver.message_controller + def _count_messages(self, queue, project): + """Count and return the total number of claimed messages.""" - @decorators.lazy_property(write=False) - def _queue_ctrl(self): - return self.driver.queue_controller + # NOTE(kgriffs): Iterate through all claims, adding up the + # number of messages per claim. This is obviously slower + # than keeping a side counter, but is also less error-prone. + # Plus, it avoids having to do a lot of extra work during + # garbage collection passes. Also, considering that most + # workloads won't require a large number of claims, most of + # the time we can do this in a single pass, so it is still + # pretty fast. + + claims_set_key = utils.scope_claims_set(queue, project, + QUEUE_CLAIMS_SUFFIX) + num_claimed = 0 + offset = 0 + + while True: + claim_ids = self._client.zrange(claims_set_key, offset, + offset + COUNTING_BATCH_SIZE - 1) + if not claim_ids: + break + + offset += len(claim_ids) + + with self._client.pipeline() as pipe: + for cid in claim_ids: + pipe.hmget(cid, 'n') + + claim_infos = pipe.execute() + + for info in claim_infos: + # NOTE(kgriffs): In case the claim was deleted out + # from under us, sanity-check that we got a non-None + # info list. + if info: + num_claimed += int(info[0]) + + return num_claimed + + def _del_message(self, queue, project, claim_id, message_id, pipe): + """Called by MessageController when messages are being deleted. + + This method removes the message from claim data structures. + """ + + claim_msgs_key = utils.scope_claim_messages(claim_id, + CLAIM_MESSAGES_SUFFIX) + + # NOTE(kgriffs): In practice, scanning will be quite fast, + # since the usual pattern is to delete messages from oldest + # to newest, and the list is sorted in that order. Also, + # the length of the list will usually be ~10 messages. + pipe.lrem(claim_msgs_key, 1, message_id) + + # NOTE(kgriffs): Decrement the message counter used for stats + pipe.hincrby(claim_id, 'n', -1) @utils.raises_conn_error @utils.retries_on_connection_error @@ -210,6 +277,7 @@ class ClaimController(storage.Claim): cursor = next(results) msg_list = list(cursor) + num_messages = len(msg_list) # NOTE(kgriffs): If there are no active messages to # claim, simply return an empty list. @@ -274,6 +342,7 @@ class ClaimController(storage.Claim): 'id': claim_id, 't': claim_ttl, 'e': claim_expires, + 'n': num_messages, } pipe.hmset(claim_id, claim_info) @@ -285,14 +354,8 @@ class ClaimController(storage.Claim): # A sorted set is used to facilitate cleaning # up the IDs of expired claims. pipe.zadd(claims_set_key, claim_expires, claim_id) - - # NOTE(kgriffs): Update a counter that facilitates - # the queue stats calculation. - self._queue_ctrl._inc_claimed(queue, project, - len(msg_list), - pipe=pipe) - pipe.execute() + return claim_id, basic_messages except redis.exceptions.WatchError: @@ -405,10 +468,6 @@ class ClaimController(storage.Claim): # have changed. msg.to_redis(pipe) - self._queue_ctrl._inc_claimed(queue, project, - -1 * len(claimed_msgs), - pipe=pipe) - pipe.execute() diff --git a/zaqar/queues/storage/redis/messages.py b/zaqar/queues/storage/redis/messages.py index 7ac104455..8ef98cca4 100644 --- a/zaqar/queues/storage/redis/messages.py +++ b/zaqar/queues/storage/redis/messages.py @@ -111,12 +111,16 @@ class MessageController(storage.Message): include_claimed=False, limit=limit, to_basic=False) - def _get_count(self, msgset_key): - """Get num messages in a Queue. + def _count(self, queue, project): + """Return total number of messages in a queue. - Return the number of messages in a queue scoped by - queue and project. + Note: Some expired messages may be included in the count if + they haven't been GC'd yet. This is done for performance. """ + + msgset_key = utils.scope_message_ids_set(queue, project, + MESSAGE_IDS_SUFFIX) + return self._client.zcard(msgset_key) def _create_msgset(self, queue, project, pipe): @@ -175,13 +179,9 @@ class MessageController(storage.Message): if not utils.msg_claimed_filter(msg, now): return msg.id - def _exists(self, key): - """Check if message exists in the Queue. - - Helper function which checks if a particular message_id - exists in the sorted set of the queues message ids. - """ - return self._client.exists(key) + def _exists(self, message_id): + """Check if message exists in the Queue.""" + return self._client.exists(message_id) def _get_first_message_id(self, queue, project, sort): """Fetch head/tail of the Queue. @@ -201,18 +201,32 @@ class MessageController(storage.Message): return Message.from_redis(msg) if msg else None def _get_claim(self, message_id): + """Gets minimal claim doc for a message. + + :returns: {'id': cid, 'expires': ts} IFF the message is claimed, + and that claim has not expired. + """ + claim = self._client.hmget(message_id, 'c', 'c.e') if claim == [None, None]: # NOTE(kgriffs): message_id was not found return None - return { + info = { # NOTE(kgriffs): A "None" claim is serialized as an empty str 'id': strutils.safe_decode(claim[0]) or None, 'expires': int(claim[1]), } + # Is the message claimed? + now = timeutils.utcnow_ts() + if info['id'] and (now < info['expires']): + return info + + # Not claimed + return None + def _list(self, queue, project=None, marker=None, limit=storage.DEFAULT_MESSAGES_PER_PAGE, echo=False, client_uuid=None, @@ -456,11 +470,8 @@ class MessageController(storage.Message): keys.append(msg.id) pipe.incrby(counter_key, len(keys)) - self._queue_ctrl._inc_counter(queue, project, - len(prepared_messages), - pipe=pipe) - pipe.execute() + return keys except redis.exceptions.WatchError: @@ -474,6 +485,11 @@ class MessageController(storage.Message): if not self._queue_ctrl.exists(queue, project): raise errors.QueueDoesNotExist(queue, project) + # NOTE(kgriffs): The message does not exist, so + # it is essentially "already" deleted. + if not self._exists(message_id): + return + # TODO(kgriffs): Create decorator for validating claim and message # IDs, since those are not checked at the transport layer. This # decorator should be applied to all relevant methods. @@ -484,15 +500,9 @@ class MessageController(storage.Message): raise errors.ClaimDoesNotExist(queue, project, claim) msg_claim = self._get_claim(message_id) + is_claimed = (msg_claim is not None) - # NOTE(kgriffs): The message does not exist, so - # it is essentially "already deleted". - if msg_claim is None: - return - - now = timeutils.utcnow_ts() - is_claimed = msg_claim['id'] and (now < msg_claim['expires']) - + # Authorize the request based on having the correct claim ID if claim is None: if is_claimed: raise errors.MessageIsClaimed(message_id) @@ -512,13 +522,12 @@ class MessageController(storage.Message): pipe.delete(message_id) pipe.zrem(msgset_key, message_id) - results = pipe.execute() + if is_claimed: + self._claim_ctrl._del_message(queue, project, + msg_claim['id'], message_id, + pipe) - # NOTE(prashanthr_): results[0] is 1 when the delete is - # successful. Hence we use that case to identify successful - # deletes. - if results[0] == 1: - self._queue_ctrl._inc_counter(queue, project, -1) + pipe.execute() @utils.raises_conn_error @utils.retries_on_connection_error @@ -532,17 +541,18 @@ class MessageController(storage.Message): with self._client.pipeline() as pipe: for mid in message_ids: + if not self._exists(mid): + continue + pipe.delete(mid) pipe.zrem(msgset_key, mid) - results = pipe.execute() - - # NOTE(prashanthr_): None is returned for the cases where - # the message might not exist or has been deleted/expired. - # Hence we calculate the number of deletes as the - # total number of message ids - number of failed deletes. - amount = -1 * (len(results) - results.count(0)) / 2 - self._queue_ctrl._inc_counter(queue, project, int(amount)) + msg_claim = self._get_claim(mid) + if msg_claim is not None: + self._claim_ctrl._del_message(queue, project, + msg_claim['id'], mid, + pipe) + pipe.execute() @utils.raises_conn_error @utils.retries_on_connection_error diff --git a/zaqar/queues/storage/redis/queues.py b/zaqar/queues/storage/redis/queues.py index 3366910b3..e316e4635 100644 --- a/zaqar/queues/storage/redis/queues.py +++ b/zaqar/queues/storage/redis/queues.py @@ -22,7 +22,6 @@ from zaqar.openstack.common import log as logging from zaqar.openstack.common import timeutils from zaqar.queues import storage from zaqar.queues.storage import errors -from zaqar.queues.storage.redis import messages from zaqar.queues.storage.redis import utils LOG = logging.getLogger(__name__) @@ -55,8 +54,6 @@ class QueueController(storage.Queue): Name Field ------------------------------- - count -> c - num_msgs_claimed -> cl metadata -> m creation timestamp -> t """ @@ -72,49 +69,9 @@ class QueueController(storage.Queue): def _message_ctrl(self): return self.driver.message_controller - def _claim_counter_key(self, name, project): - return utils.scope_queue_name(name, project) - - def _inc_counter(self, name, project, amount=1, pipe=None): - queue_key = utils.scope_queue_name(name, project) - - client = pipe if pipe is not None else self._client - client.hincrby(queue_key, 'c', amount) - - def _inc_claimed(self, name, project, amount=1, pipe=None): - queue_key = utils.scope_queue_name(name, project) - - client = pipe if pipe is not None else self._client - client.hincrby(queue_key, 'cl', amount) - - # TODO(kgriffs): Remove or optimize - def _get_expired_message_count(self, name, project): - """Calculate the number of expired messages in the queue. - - Used to compute the stats on the queue. - Method has O(n) complexity as we iterate the entire list of - messages. - """ - - messages_set_key = utils.scope_message_ids_set(name, project, - MESSAGE_IDS_SUFFIX) - - with self._client.pipeline() as pipe: - for msg_key in self._client.zrange(messages_set_key, 0, -1): - pipe.hgetall(msg_key) - - raw_messages = pipe.execute() - - expired = 0 - now = timeutils.utcnow_ts() - - for msg in raw_messages: - if msg: - msg = messages.Message.from_redis(msg) - if utils.msg_expired_filter(msg, now): - expired += 1 - - return expired + @decorators.lazy_property(write=False) + def _claim_ctrl(self): + return self.driver.claim_controller def _get_queue_info(self, queue_key, fields, transform=str): """Get one or more fields from Queue Info.""" @@ -232,24 +189,27 @@ class QueueController(storage.Queue): if not self.exists(name, project=project): raise errors.QueueDoesNotExist(name, project) - queue_key = utils.scope_queue_name(name, project) + total = self._message_ctrl._count(name, project) - claimed, total = self._get_queue_info(queue_key, [b'cl', b'c'], int) - expired = self._get_expired_message_count(name, project) + if total: + claimed = self._claim_ctrl._count_messages(name, project) + else: + claimed = 0 message_stats = { 'claimed': claimed, - 'free': total - claimed - expired, - 'total': total + 'free': total - claimed, + 'total': total, } - try: - newest = self._message_ctrl.first(name, project, -1) - oldest = self._message_ctrl.first(name, project, 1) - except errors.QueueIsEmpty: - pass - else: - message_stats['newest'] = newest - message_stats['oldest'] = oldest + if total: + try: + newest = self._message_ctrl.first(name, project, -1) + oldest = self._message_ctrl.first(name, project, 1) + except errors.QueueIsEmpty: + pass + else: + message_stats['newest'] = newest + message_stats['oldest'] = oldest return {'messages': message_stats} diff --git a/zaqar/tests/queues/storage/base.py b/zaqar/tests/queues/storage/base.py index 6c516f58e..883cdc448 100644 --- a/zaqar/tests/queues/storage/base.py +++ b/zaqar/tests/queues/storage/base.py @@ -314,9 +314,9 @@ class QueueControllerTest(ControllerBaseTest): self.assertTrue(created) # Create 15 messages. - _insert_fixtures(self.message_controller, queue_name, - project=self.project, - client_uuid=client_uuid, num=15) + msg_keys = _insert_fixtures(self.message_controller, queue_name, + project=self.project, + client_uuid=client_uuid, num=15) stats = self.controller.stats(queue_name, self.project)['messages'] @@ -331,6 +331,25 @@ class QueueControllerTest(ControllerBaseTest): self.project)['messages'] self.assertEqual(stats['claimed'], 10) + # Delete one message and ensure stats are updated even + # thought the claim itself has not been deleted. + self.message_controller.delete(queue_name, msg_keys[0], + self.project, claim_id) + stats = self.controller.stats(queue_name, + self.project)['messages'] + self.assertEqual(stats['total'], 14) + self.assertEqual(stats['claimed'], 9) + self.assertEqual(stats['free'], 5) + + # Same thing but use bulk_delete interface + self.message_controller.bulk_delete(queue_name, msg_keys[1:3], + self.project) + stats = self.controller.stats(queue_name, + self.project)['messages'] + self.assertEqual(stats['total'], 12) + self.assertEqual(stats['claimed'], 7) + self.assertEqual(stats['free'], 5) + # Delete the claim self.claim_controller.delete(queue_name, claim_id, self.project)