Merge "Implement claim and message expiration logic for Redis"
This commit is contained in:
commit
00e5157f0c
@ -16,6 +16,7 @@ import collections
|
||||
import time
|
||||
import uuid
|
||||
|
||||
import mock
|
||||
import redis
|
||||
|
||||
from zaqar.openstack.common.cache import cache as oslo_cache
|
||||
@ -89,7 +90,21 @@ class RedisUtilsTest(testing.TestBase):
|
||||
self.assertEqual(utils.scope_message_ids_set(None, '123'), '123..')
|
||||
self.assertEqual(utils.scope_message_ids_set(None, None, 's'), '..s')
|
||||
|
||||
def test_descope_messages_set(self):
|
||||
key = utils.scope_message_ids_set('my-q')
|
||||
self.assertEqual(utils.descope_message_ids_set(key), ('my-q', None))
|
||||
|
||||
key = utils.scope_message_ids_set('my-q', '123')
|
||||
self.assertEqual(utils.descope_message_ids_set(key), ('my-q', '123'))
|
||||
|
||||
key = utils.scope_message_ids_set(None, '123')
|
||||
self.assertEqual(utils.descope_message_ids_set(key), (None, '123'))
|
||||
|
||||
key = utils.scope_message_ids_set()
|
||||
self.assertEqual(utils.descope_message_ids_set(key), (None, None))
|
||||
|
||||
def test_normalize_none_str(self):
|
||||
|
||||
self.assertEqual(utils.normalize_none_str('my-q'), 'my-q')
|
||||
self.assertEqual(utils.normalize_none_str(None), '')
|
||||
|
||||
@ -210,7 +225,7 @@ class RedisMessagesTest(base.MessageControllerTest):
|
||||
def setUp(self):
|
||||
super(RedisMessagesTest, self).setUp()
|
||||
self.connection = self.driver.connection
|
||||
self.q_controller = self.driver.queue_controller
|
||||
self.queue_ctrl = self.driver.queue_controller
|
||||
|
||||
def tearDown(self):
|
||||
super(RedisMessagesTest, self).tearDown()
|
||||
@ -218,7 +233,7 @@ class RedisMessagesTest(base.MessageControllerTest):
|
||||
|
||||
def test_get_count(self):
|
||||
queue_name = 'get-count'
|
||||
self.q_controller.create(queue_name)
|
||||
self.queue_ctrl.create(queue_name)
|
||||
|
||||
msgs = [{
|
||||
'ttl': 300,
|
||||
@ -237,11 +252,28 @@ class RedisMessagesTest(base.MessageControllerTest):
|
||||
|
||||
def test_empty_queue_exception(self):
|
||||
queue_name = 'empty-queue-test'
|
||||
self.q_controller.create(queue_name)
|
||||
self.queue_ctrl.create(queue_name)
|
||||
|
||||
self.assertRaises(storage.errors.QueueIsEmpty,
|
||||
self.controller.first, queue_name)
|
||||
|
||||
def test_gc(self):
|
||||
self.queue_ctrl.create(self.queue_name)
|
||||
self.controller.post(self.queue_name,
|
||||
[{'ttl': 0, 'body': {}}],
|
||||
client_uuid=str(uuid.uuid4()))
|
||||
|
||||
num_removed = self.controller.gc()
|
||||
self.assertEqual(num_removed, 1)
|
||||
|
||||
for _ in range(100):
|
||||
self.controller.post(self.queue_name,
|
||||
[{'ttl': 0, 'body': {}}],
|
||||
client_uuid=str(uuid.uuid4()))
|
||||
|
||||
num_removed = self.controller.gc()
|
||||
self.assertEqual(num_removed, 100)
|
||||
|
||||
|
||||
@testing.requires_redis
|
||||
class RedisClaimsTest(base.ClaimControllerTest):
|
||||
@ -252,7 +284,8 @@ class RedisClaimsTest(base.ClaimControllerTest):
|
||||
def setUp(self):
|
||||
super(RedisClaimsTest, self).setUp()
|
||||
self.connection = self.driver.connection
|
||||
self.q_controller = self.driver.queue_controller
|
||||
self.queue_ctrl = self.driver.queue_controller
|
||||
self.message_ctrl = self.driver.message_controller
|
||||
|
||||
def tearDown(self):
|
||||
super(RedisClaimsTest, self).tearDown()
|
||||
@ -261,7 +294,7 @@ class RedisClaimsTest(base.ClaimControllerTest):
|
||||
def test_claim_doesnt_exist(self):
|
||||
queue_name = 'no-such-claim'
|
||||
epoch = '000000000000000000000000'
|
||||
self.q_controller.create(queue_name)
|
||||
self.queue_ctrl.create(queue_name)
|
||||
self.assertRaises(storage.errors.ClaimDoesNotExist,
|
||||
self.controller.get, queue_name,
|
||||
epoch, project=None)
|
||||
@ -274,4 +307,38 @@ class RedisClaimsTest(base.ClaimControllerTest):
|
||||
time.sleep(2)
|
||||
self.assertRaises(storage.errors.ClaimDoesNotExist,
|
||||
self.controller.update, queue_name,
|
||||
claim_id, {}, project=None)
|
||||
claim_id, {}, project=None)
|
||||
|
||||
def test_gc(self):
|
||||
self.queue_ctrl.create(self.queue_name)
|
||||
|
||||
for _ in range(100):
|
||||
self.message_ctrl.post(self.queue_name,
|
||||
[{'ttl': 300, 'body': 'yo gabba'}],
|
||||
client_uuid=str(uuid.uuid4()))
|
||||
|
||||
now = timeutils.utcnow_ts()
|
||||
timeutils_utcnow = 'zaqar.openstack.common.timeutils.utcnow_ts'
|
||||
|
||||
# Test a single claim
|
||||
with mock.patch(timeutils_utcnow) as mock_utcnow:
|
||||
mock_utcnow.return_value = now - 1
|
||||
self.controller.create(self.queue_name, {'ttl': 1, 'grace': 60})
|
||||
|
||||
num_removed = self.controller._gc(self.queue_name, None)
|
||||
self.assertEqual(num_removed, 1)
|
||||
|
||||
# Test multiple claims
|
||||
with mock.patch(timeutils_utcnow) as mock_utcnow:
|
||||
mock_utcnow.return_value = now - 1
|
||||
|
||||
for _ in range(5):
|
||||
self.controller.create(self.queue_name,
|
||||
{'ttl': 1, 'grace': 60})
|
||||
|
||||
# NOTE(kgriffs): These ones should not be cleaned up
|
||||
self.controller.create(self.queue_name, {'ttl': 60, 'grace': 60})
|
||||
self.controller.create(self.queue_name, {'ttl': 60, 'grace': 60})
|
||||
|
||||
num_removed = self.controller._gc(self.queue_name, None)
|
||||
self.assertEqual(num_removed, 5)
|
||||
|
@ -166,6 +166,17 @@ class DataDriverBase(DriverBase):
|
||||
_handle_status('delete_queue', func)
|
||||
return op_status
|
||||
|
||||
def gc(self):
|
||||
"""Perform manual garbage collection of claims and messages.
|
||||
|
||||
This method can be overridden in order to provide a trigger
|
||||
that can be called by so-called "garbage collection" scripts
|
||||
that are required by some drivers.
|
||||
|
||||
By default, this method does nothing.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abc.abstractproperty
|
||||
def queue_controller(self):
|
||||
"""Returns the driver's queue controller."""
|
||||
|
@ -89,6 +89,11 @@ class DataDriver(storage.DataDriverBase):
|
||||
|
||||
return KPI
|
||||
|
||||
def gc(self):
|
||||
for pool in self._pool_catalog._pools_ctrl.list():
|
||||
driver = self._pool_catalog.get_driver(pool['name'])
|
||||
driver.gc()
|
||||
|
||||
@decorators.lazy_property(write=False)
|
||||
def queue_controller(self):
|
||||
return QueueController(self._pool_catalog)
|
||||
|
@ -38,20 +38,20 @@ class ClaimController(storage.Claim):
|
||||
|
||||
Redis Data Structures:
|
||||
----------------------
|
||||
Claims list (Redis set) contains claim ids
|
||||
1. Claims list (Redis set) contains claim IDs
|
||||
|
||||
Key: <project-id_q-name>
|
||||
Key: <project_id>.<queue_name>.claims
|
||||
|
||||
Name Field
|
||||
-------------------------
|
||||
claim_ids m
|
||||
|
||||
Claimed Messages (Redis set) contains the list of
|
||||
2. Claimed Messages (Redis set) contains the list of
|
||||
message ids stored per claim
|
||||
|
||||
Key: <claim_id>_messages
|
||||
Key: <claim_id>.messages
|
||||
|
||||
Claim info(Redis Hash):
|
||||
3. Claim info (Redis hash):
|
||||
|
||||
Key: <claim_id>
|
||||
|
||||
@ -82,13 +82,17 @@ class ClaimController(storage.Claim):
|
||||
|
||||
# Return False if no such claim exists
|
||||
# TODO(prashanthr_): Discuss the feasibility of a bloom filter.
|
||||
if not client.sismember(claims_set_key, claim_id):
|
||||
if client.zscore(claims_set_key, claim_id) is None:
|
||||
return False
|
||||
|
||||
expires = self._get_claim_info(claim_id, b'e')[0]
|
||||
now = timeutils.utcnow_ts()
|
||||
|
||||
if now > expires:
|
||||
if expires <= now:
|
||||
# NOTE(kgriffs): Redis should automatically remove the
|
||||
# other records in the very near future. This one
|
||||
# has to be manually deleted, however.
|
||||
client.zrem(claims_set_key, claim_id)
|
||||
return False
|
||||
|
||||
return True
|
||||
@ -104,6 +108,23 @@ class ClaimController(storage.Claim):
|
||||
def _queue_ctrl(self):
|
||||
return self.driver.queue_controller
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_connection_error
|
||||
def _gc(self, queue, project):
|
||||
"""Garbage-collect expired claim data.
|
||||
|
||||
Not all claim data can be automatically expired. This method
|
||||
cleans up the remainder.
|
||||
|
||||
:returns: Number of claims removed
|
||||
"""
|
||||
|
||||
claims_set_key = utils.scope_claims_set(queue, project,
|
||||
QUEUE_CLAIMS_SUFFIX)
|
||||
now = timeutils.utcnow_ts()
|
||||
num_removed = self._client.zremrangebyscore(claims_set_key, 0, now)
|
||||
return num_removed
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_connection_error
|
||||
def get(self, queue, claim_id, project=None):
|
||||
@ -145,23 +166,18 @@ class ClaimController(storage.Claim):
|
||||
def create(self, queue, metadata, project=None,
|
||||
limit=storage.DEFAULT_MESSAGES_PER_CLAIM):
|
||||
|
||||
ttl = int(metadata.get('ttl', 60))
|
||||
claim_ttl = int(metadata.get('ttl', 60))
|
||||
grace = int(metadata.get('grace', 60))
|
||||
msg_ttl = ttl + grace
|
||||
msg_ttl = claim_ttl + grace
|
||||
|
||||
claim_id = utils.generate_uuid()
|
||||
claim_key = utils.scope_claim_messages(claim_id,
|
||||
CLAIM_MESSAGES_SUFFIX)
|
||||
claim_msgs_key = utils.scope_claim_messages(claim_id,
|
||||
CLAIM_MESSAGES_SUFFIX)
|
||||
|
||||
claims_set_key = utils.scope_claims_set(queue, project,
|
||||
QUEUE_CLAIMS_SUFFIX)
|
||||
|
||||
counter_key = self._queue_ctrl._claim_counter_key(queue, project)
|
||||
|
||||
with self._client.pipeline() as pipe:
|
||||
|
||||
start_ts = timeutils.utcnow_ts()
|
||||
|
||||
# NOTE(kgriffs): Retry the operation if another transaction
|
||||
# completes before this one, in which case it will have
|
||||
# claimed the same messages the current thread is trying
|
||||
@ -173,6 +189,8 @@ class ClaimController(storage.Claim):
|
||||
|
||||
# TODO(kgriffs): Would it be beneficial (or harmful) to
|
||||
# introducce a backoff sleep in between retries?
|
||||
|
||||
start_ts = timeutils.utcnow_ts()
|
||||
while (timeutils.utcnow_ts() - start_ts) < RETRY_CLAIM_TIMEOUT:
|
||||
|
||||
# NOTE(kgriffs): The algorithm for claiming messages:
|
||||
@ -203,19 +221,32 @@ class ClaimController(storage.Claim):
|
||||
try:
|
||||
# TODO(kgriffs): Is it faster/better to do this all
|
||||
# in a Lua script instead of using an app-layer
|
||||
# transaction?
|
||||
# transaction? Lua requires Redis 2.6 or better.
|
||||
|
||||
# NOTE(kgriffs): Abort the entire transaction if
|
||||
# another request beats us to the punch. We detect
|
||||
# this by putting a watch on the key that will have
|
||||
# one of its fields updated as the final step of
|
||||
# the transaction.
|
||||
pipe.watch(counter_key)
|
||||
#
|
||||
# No other request to list active messages can
|
||||
# proceed while this current transaction is in
|
||||
# progress; therefore, it is not possible for
|
||||
# a different process to get some active messages
|
||||
# while the pipeline commands have partway
|
||||
# completed. Either the other process will query
|
||||
# for active messages at the same moment as
|
||||
# the current proc and get the exact same set,
|
||||
# or its request will have to wait while the
|
||||
# current process performs the transaction in
|
||||
# its entirety.
|
||||
mids = [msg.id for msg in msg_list]
|
||||
pipe.watch(*mids)
|
||||
pipe.multi()
|
||||
|
||||
now = timeutils.utcnow_ts()
|
||||
|
||||
claim_expires = now + ttl
|
||||
claim_expires = now + claim_ttl
|
||||
msg_expires = claim_expires + grace
|
||||
|
||||
# Associate the claim with each message
|
||||
@ -227,7 +258,7 @@ class ClaimController(storage.Claim):
|
||||
msg.ttl = msg_ttl
|
||||
msg.expires = msg_expires
|
||||
|
||||
pipe.rpush(claim_key, msg.id)
|
||||
pipe.rpush(claim_msgs_key, msg.id)
|
||||
|
||||
# TODO(kgriffs): Rather than writing back the
|
||||
# entire message, only set the fields that
|
||||
@ -236,18 +267,24 @@ class ClaimController(storage.Claim):
|
||||
|
||||
basic_messages.append(msg.to_basic(now))
|
||||
|
||||
pipe.expire(claim_msgs_key, claim_ttl)
|
||||
|
||||
# Create the claim
|
||||
claim_info = {
|
||||
'id': claim_id,
|
||||
't': ttl,
|
||||
'e': claim_expires
|
||||
't': claim_ttl,
|
||||
'e': claim_expires,
|
||||
}
|
||||
|
||||
pipe.hmset(claim_id, claim_info)
|
||||
pipe.expire(claim_id, claim_ttl)
|
||||
|
||||
# NOTE(kgriffs): Add the claim ID to a set so that
|
||||
# existence checks can be performed quickly.
|
||||
pipe.sadd(claims_set_key, claim_id)
|
||||
#
|
||||
# A sorted set is used to facilitate cleaning
|
||||
# up the IDs of expired claims.
|
||||
pipe.zadd(claims_set_key, claim_expires, claim_id)
|
||||
|
||||
# NOTE(kgriffs): Update a counter that facilitates
|
||||
# the queue stats calculation.
|
||||
@ -278,10 +315,10 @@ class ClaimController(storage.Claim):
|
||||
msg_ttl = claim_ttl + grace
|
||||
msg_expires = claim_expires + grace
|
||||
|
||||
claim_messages = utils.scope_claim_messages(claim_id,
|
||||
claim_msgs_key = utils.scope_claim_messages(claim_id,
|
||||
CLAIM_MESSAGES_SUFFIX)
|
||||
|
||||
msg_keys = self._get_claimed_message_keys(claim_messages)
|
||||
msg_keys = self._get_claimed_message_keys(claim_msgs_key)
|
||||
|
||||
with self._client.pipeline() as pipe:
|
||||
for key in msg_keys:
|
||||
@ -308,11 +345,22 @@ class ClaimController(storage.Claim):
|
||||
# TODO(kgriffs): Rather than writing back the
|
||||
# entire message, only set the fields that
|
||||
# have changed.
|
||||
#
|
||||
# When this change is made, don't forget to
|
||||
# also call pipe.expire with the new TTL value.
|
||||
msg.to_redis(pipe)
|
||||
|
||||
# Update the claim id and claim expiration info
|
||||
# for all the messages.
|
||||
pipe.hmset(claim_id, claim_info)
|
||||
pipe.expire(claim_id, claim_ttl)
|
||||
|
||||
pipe.expire(claim_msgs_key, claim_ttl)
|
||||
|
||||
claims_set_key = utils.scope_claims_set(queue, project,
|
||||
QUEUE_CLAIMS_SUFFIX)
|
||||
|
||||
pipe.zadd(claims_set_key, claim_expires, claim_id)
|
||||
|
||||
pipe.execute()
|
||||
|
||||
@ -325,10 +373,10 @@ class ClaimController(storage.Claim):
|
||||
return
|
||||
|
||||
now = timeutils.utcnow_ts()
|
||||
claim_messages_key = utils.scope_claim_messages(claim_id,
|
||||
CLAIM_MESSAGES_SUFFIX)
|
||||
claim_msgs_key = utils.scope_claim_messages(claim_id,
|
||||
CLAIM_MESSAGES_SUFFIX)
|
||||
|
||||
msg_keys = self._get_claimed_message_keys(claim_messages_key)
|
||||
msg_keys = self._get_claimed_message_keys(claim_msgs_key)
|
||||
|
||||
with self._client.pipeline() as pipe:
|
||||
for msg_key in msg_keys:
|
||||
@ -342,9 +390,9 @@ class ClaimController(storage.Claim):
|
||||
QUEUE_CLAIMS_SUFFIX)
|
||||
|
||||
with self._client.pipeline() as pipe:
|
||||
pipe.srem(claims_set_key, claim_id)
|
||||
pipe.zrem(claims_set_key, claim_id)
|
||||
pipe.delete(claim_id)
|
||||
pipe.delete(claim_messages_key)
|
||||
pipe.delete(claim_msgs_key)
|
||||
|
||||
for msg in claimed_msgs:
|
||||
if msg:
|
||||
@ -365,4 +413,4 @@ class ClaimController(storage.Claim):
|
||||
|
||||
|
||||
def _msg_would_expire(message, now):
|
||||
return message.expires < now
|
||||
return message.expires <= now
|
||||
|
@ -65,6 +65,14 @@ class DataDriver(storage.DataDriverBase):
|
||||
# TODO(kgriffs): Add metrics re message volume
|
||||
return KPI
|
||||
|
||||
def gc(self):
|
||||
# TODO(kgriffs): Check time since last run, and if
|
||||
# it hasn't been very long, skip. This allows for
|
||||
# running the GC script on multiple boxes for HA,
|
||||
# without having them all attempting to GC at the
|
||||
# same moment.
|
||||
self.message_controller.gc()
|
||||
|
||||
@decorators.lazy_property(write=False)
|
||||
def connection(self):
|
||||
"""Redis client connection instance."""
|
||||
|
@ -29,6 +29,8 @@ Message = models.Message
|
||||
|
||||
|
||||
MESSAGE_IDS_SUFFIX = 'messages'
|
||||
MSGSET_INDEX_KEY = 'msgset_index'
|
||||
|
||||
# The rank counter is an atomic index to rank messages
|
||||
# in a FIFO manner.
|
||||
MESSAGE_RANK_COUNTER_SUFFIX = 'rank_counter'
|
||||
@ -37,6 +39,11 @@ MESSAGE_RANK_COUNTER_SUFFIX = 'rank_counter'
|
||||
# minimum allowed TTL for messages (60 seconds).
|
||||
RETRY_POST_TIMEOUT = 10
|
||||
|
||||
# TODO(kgriffs): Tune this and/or make it configurable. Don't want
|
||||
# it to be so large that it blocks other operations for more than
|
||||
# 1-2 milliseconds.
|
||||
GC_BATCH_SIZE = 100
|
||||
|
||||
|
||||
class MessageController(storage.Message):
|
||||
"""Implements message resource operations using Redis.
|
||||
@ -52,9 +59,17 @@ class MessageController(storage.Message):
|
||||
incremented atomically using the counter(MESSAGE_RANK_COUNTER_SUFFIX)
|
||||
also stored in the database for every queue.
|
||||
|
||||
Key: <project-id.q-name>
|
||||
Key: <project_id>.<queue_name>.messages
|
||||
|
||||
2. Messages(Redis Hash):
|
||||
2. Index of message ID lists (Redis sorted set)
|
||||
|
||||
This is a sorted set that facilitates discovery of all the
|
||||
message ID lists. This is necessary when performing
|
||||
garbage collection on the IDs contained within these lists.
|
||||
|
||||
Key: msgset_index
|
||||
|
||||
3. Messages(Redis Hash):
|
||||
|
||||
Scoped by the UUID of the message, the redis datastructure
|
||||
has the following information.
|
||||
@ -70,6 +85,10 @@ class MessageController(storage.Message):
|
||||
claim expiry time -> c.e
|
||||
client uuid -> u
|
||||
created time -> cr
|
||||
|
||||
4. Messages rank counter (Redis Hash):
|
||||
|
||||
Key: <project_id>.<queue_name>.rank_counter
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
@ -100,6 +119,18 @@ class MessageController(storage.Message):
|
||||
"""
|
||||
return self._client.zcard(msgset_key)
|
||||
|
||||
def _create_msgset(self, queue, project, pipe):
|
||||
msgset_key = utils.scope_message_ids_set(queue, project,
|
||||
MESSAGE_IDS_SUFFIX)
|
||||
|
||||
pipe.zadd(MSGSET_INDEX_KEY, 1, msgset_key)
|
||||
|
||||
def _delete_msgset(self, queue, project, pipe):
|
||||
msgset_key = utils.scope_message_ids_set(queue, project,
|
||||
MESSAGE_IDS_SUFFIX)
|
||||
|
||||
pipe.zrem(MSGSET_INDEX_KEY, msgset_key)
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_connection_error
|
||||
def _delete_queue_messages(self, queue, project, pipe):
|
||||
@ -161,8 +192,8 @@ class MessageController(storage.Message):
|
||||
msgset_key = utils.scope_message_ids_set(queue, project,
|
||||
MESSAGE_IDS_SUFFIX)
|
||||
|
||||
sorter = self._client.zrange if sort == 1 else self._client.zrevrange
|
||||
message_ids = sorter(msgset_key, 0, 0)
|
||||
zrange = self._client.zrange if sort == 1 else self._client.zrevrange
|
||||
message_ids = zrange(msgset_key, 0, 0)
|
||||
return message_ids[0] if message_ids else None
|
||||
|
||||
def _get(self, message_id):
|
||||
@ -238,6 +269,73 @@ class MessageController(storage.Message):
|
||||
yield _filter_messages(messages, pipe, filters, to_basic, marker)
|
||||
yield marker['next']
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_connection_error
|
||||
def gc(self):
|
||||
"""Garbage-collect expired message data.
|
||||
|
||||
Not all message data can be automatically expired. This method
|
||||
cleans up the remainder.
|
||||
|
||||
:returns: Number of messages removed
|
||||
"""
|
||||
client = self._client
|
||||
|
||||
num_removed = 0
|
||||
offset_msgsets = 0
|
||||
|
||||
while True:
|
||||
# NOTE(kgriffs): Iterate across all message sets; there will
|
||||
# be one set of message IDs per queue.
|
||||
msgset_keys = client.zrange(MSGSET_INDEX_KEY,
|
||||
offset_msgsets,
|
||||
offset_msgsets + GC_BATCH_SIZE - 1)
|
||||
if not msgset_keys:
|
||||
break
|
||||
|
||||
offset_msgsets += len(msgset_keys)
|
||||
|
||||
for msgset_key in msgset_keys:
|
||||
msgset_key = strutils.safe_decode(msgset_key)
|
||||
|
||||
# NOTE(kgriffs): Drive the claim controller GC from
|
||||
# here, because we already know the queue and project
|
||||
# scope.
|
||||
queue, project = utils.descope_message_ids_set(msgset_key)
|
||||
self._claim_ctrl._gc(queue, project)
|
||||
|
||||
offset_mids = 0
|
||||
|
||||
while True:
|
||||
# NOTE(kgriffs): Look up each message in the message set,
|
||||
# see if it has expired, and if so, remove it from msgset.
|
||||
mids = client.zrange(msgset_key, offset_mids,
|
||||
offset_mids + GC_BATCH_SIZE - 1)
|
||||
|
||||
if not mids:
|
||||
break
|
||||
|
||||
offset_mids += len(mids)
|
||||
|
||||
# NOTE(kgriffs): If redis expired the message, it will
|
||||
# not exist, so all we have to do is remove mid from
|
||||
# the msgset collection.
|
||||
with client.pipeline() as pipe:
|
||||
for mid in mids:
|
||||
pipe.exists(mid)
|
||||
|
||||
mid_exists_flags = pipe.execute()
|
||||
|
||||
with client.pipeline() as pipe:
|
||||
for mid, exists in zip(mids, mid_exists_flags):
|
||||
if not exists:
|
||||
pipe.zrem(msgset_key, mid)
|
||||
num_removed += 1
|
||||
|
||||
pipe.execute()
|
||||
|
||||
return num_removed
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_connection_error
|
||||
def list(self, queue, project=None, marker=None,
|
||||
@ -259,8 +357,12 @@ class MessageController(storage.Message):
|
||||
if not message_id:
|
||||
raise errors.QueueIsEmpty(queue, project)
|
||||
|
||||
raw_message = self._get(message_id)
|
||||
if raw_message is None:
|
||||
raise errors.QueueIsEmpty(queue, project)
|
||||
|
||||
now = timeutils.utcnow_ts()
|
||||
return self._get(message_id).to_basic(now, include_created=True)
|
||||
return raw_message.to_basic(now, include_created=True)
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_connection_error
|
||||
@ -407,8 +509,10 @@ class MessageController(storage.Message):
|
||||
msgset_key = utils.scope_message_ids_set(queue, project,
|
||||
MESSAGE_IDS_SUFFIX)
|
||||
with self._client.pipeline() as pipe:
|
||||
results = pipe.delete(message_id).zrem(msgset_key,
|
||||
message_id).execute()
|
||||
pipe.delete(message_id)
|
||||
pipe.zrem(msgset_key, message_id)
|
||||
|
||||
results = pipe.execute()
|
||||
|
||||
# NOTE(prashanthr_): results[0] is 1 when the delete is
|
||||
# successful. Hence we use that case to identify successful
|
||||
@ -427,8 +531,9 @@ class MessageController(storage.Message):
|
||||
MESSAGE_IDS_SUFFIX)
|
||||
|
||||
with self._client.pipeline() as pipe:
|
||||
for message_id in message_ids:
|
||||
pipe.delete(message_id).zrem(msgset_key, message_id)
|
||||
for mid in message_ids:
|
||||
pipe.delete(mid)
|
||||
pipe.zrem(msgset_key, mid)
|
||||
|
||||
results = pipe.execute()
|
||||
|
||||
|
@ -109,6 +109,7 @@ class Message(object):
|
||||
}
|
||||
|
||||
pipe.hmset(self.id, doc)
|
||||
pipe.expire(self.id, self.ttl)
|
||||
|
||||
def to_basic(self, now, include_created=False):
|
||||
basic_msg = {
|
||||
|
@ -37,21 +37,21 @@ class QueueController(storage.Queue):
|
||||
Queues are scoped by project, which is prefixed to the
|
||||
queue name.
|
||||
|
||||
Queues (Redis sorted set):
|
||||
Redis Data Structures:
|
||||
----------------------
|
||||
1. Queue Index (Redis sorted set):
|
||||
|
||||
Key: queues_set
|
||||
Set of all queues for the given project, ordered by name.
|
||||
|
||||
Id Value
|
||||
---------------------------------
|
||||
name -> <project-id_q-name>
|
||||
Key: <project_id>.queues_set
|
||||
|
||||
Id Value
|
||||
----------------------------------------
|
||||
name -> <project_id>.<queue_name>
|
||||
|
||||
The set helps faster existence checks, while the list helps
|
||||
paginated retrieval of queues.
|
||||
2. Queue Information (Redis hash):
|
||||
|
||||
Queue Information (Redis hash):
|
||||
|
||||
Key: <project-id_q-name>
|
||||
Key: <project_id>.<queue_name>
|
||||
|
||||
Name Field
|
||||
-------------------------------
|
||||
@ -87,7 +87,7 @@ class QueueController(storage.Queue):
|
||||
client = pipe if pipe is not None else self._client
|
||||
client.hincrby(queue_key, 'cl', amount)
|
||||
|
||||
# TODO(kgriffs): Reimplement in Lua; this is way too expensive!
|
||||
# TODO(kgriffs): Remove or optimize
|
||||
def _get_expired_message_count(self, name, project):
|
||||
"""Calculate the number of expired messages in the queue.
|
||||
|
||||
@ -171,6 +171,7 @@ class QueueController(storage.Queue):
|
||||
# Pipeline ensures atomic inserts.
|
||||
with self._client.pipeline() as pipe:
|
||||
pipe.zadd(qset_key, 1, queue_key).hmset(queue_key, queue)
|
||||
self._message_ctrl._create_msgset(name, project, pipe)
|
||||
|
||||
try:
|
||||
pipe.execute()
|
||||
@ -220,6 +221,7 @@ class QueueController(storage.Queue):
|
||||
with self._client.pipeline() as pipe:
|
||||
pipe.zrem(qset_key, queue_key)
|
||||
pipe.delete(queue_key)
|
||||
self._message_ctrl._delete_msgset(name, project, pipe)
|
||||
self._message_ctrl._delete_queue_messages(name, project, pipe)
|
||||
|
||||
pipe.execute()
|
||||
|
@ -86,6 +86,17 @@ def scope_message_ids_set(queue=None, project=None, message_suffix=''):
|
||||
normalize_none_str(queue) + '.' +
|
||||
message_suffix)
|
||||
|
||||
|
||||
def descope_message_ids_set(msgset_key):
|
||||
"""Descope messages set with '.'
|
||||
|
||||
:returns: (queue, project)
|
||||
"""
|
||||
|
||||
tokens = msgset_key.split('.')
|
||||
|
||||
return (tokens[1] or None, tokens[0] or None)
|
||||
|
||||
# NOTE(prashanthr_): Aliasing the scope_message_ids_set function
|
||||
# to be used in the pools and claims controller as similar
|
||||
# functionality is required to scope redis id's.
|
||||
|
@ -526,7 +526,7 @@ class MessageControllerTest(ControllerBaseTest):
|
||||
project=self.project,
|
||||
claim=cid)
|
||||
|
||||
@testing.is_slow(condition=lambda self: self.gc_interval != 0)
|
||||
@testing.is_slow(condition=lambda self: self.gc_interval > 1)
|
||||
def test_expired_messages(self):
|
||||
messages = [{'body': 3.14, 'ttl': 0}, {'body': 0.618, 'ttl': 600}]
|
||||
client_uuid = uuid.uuid4()
|
||||
@ -536,8 +536,13 @@ class MessageControllerTest(ControllerBaseTest):
|
||||
project=self.project,
|
||||
client_uuid=client_uuid)
|
||||
|
||||
# NOTE(kgriffs): Allow for automatic GC of claims, messages
|
||||
time.sleep(self.gc_interval)
|
||||
|
||||
# NOTE(kgriffs): Some drivers require a manual GC to be
|
||||
# triggered to clean up claims and messages.
|
||||
self.driver.gc()
|
||||
|
||||
with testing.expect(storage.errors.DoesNotExist):
|
||||
self.controller.get(self.queue_name, msgid_expired,
|
||||
project=self.project)
|
||||
|
Loading…
x
Reference in New Issue
Block a user