Merge "Move _(get|inc)_counter out of QueueController"
This commit is contained in:
commit
43e805bfb8
@ -399,6 +399,117 @@ class MessageController(storage.Message):
|
||||
{'$set': {'c': {'id': None, 'e': now}}},
|
||||
upsert=False, multi=True)
|
||||
|
||||
def _inc_counter(self, queue_name, project=None, amount=1, window=None):
|
||||
"""Increments the message counter and returns the new value.
|
||||
|
||||
:param queue_name: Name of the queue to which the counter is scoped
|
||||
:param project: Queue's project name
|
||||
:param amount: (Default 1) Amount by which to increment the counter
|
||||
:param window: (Default None) A time window, in seconds, that
|
||||
must have elapsed since the counter was last updated, in
|
||||
order to increment the counter.
|
||||
|
||||
:returns: Updated message counter value, or None if window
|
||||
was specified, and the counter has already been updated
|
||||
within the specified time period.
|
||||
|
||||
:raises: storage.errors.QueueDoesNotExist
|
||||
"""
|
||||
|
||||
# NOTE(flaper87): If this `if` is True, it means we're
|
||||
# using a mongodb in the control plane. To avoid breaking
|
||||
# environments doing so already, we'll keep using the counter
|
||||
# in the mongodb queue_controller rather than the one in the
|
||||
# message_controller. This should go away, eventually
|
||||
if hasattr(self._queue_ctrl, '_inc_counter'):
|
||||
return self._queue_ctrl._inc_counter(queue_name, project,
|
||||
amount, window)
|
||||
|
||||
now = timeutils.utcnow_ts()
|
||||
|
||||
update = {'$inc': {'c.v': amount}, '$set': {'c.t': now}}
|
||||
query = _get_scoped_query(queue_name, project)
|
||||
if window is not None:
|
||||
threshold = now - window
|
||||
query['c.t'] = {'$lt': threshold}
|
||||
|
||||
while True:
|
||||
try:
|
||||
collection = self._collection(queue_name, project).stats
|
||||
doc = collection.find_one_and_update(
|
||||
query, update,
|
||||
return_document=pymongo.ReturnDocument.AFTER,
|
||||
projection={'c.v': 1, '_id': 0})
|
||||
|
||||
break
|
||||
except pymongo.errors.AutoReconnect as ex:
|
||||
LOG.exception(ex)
|
||||
|
||||
if doc is None:
|
||||
if window is None:
|
||||
# NOTE(kgriffs): Since we did not filter by a time window,
|
||||
# the queue should have been found and updated. Perhaps
|
||||
# the queue has been deleted?
|
||||
message = _(u'Failed to increment the message '
|
||||
u'counter for queue %(name)s and '
|
||||
u'project %(project)s')
|
||||
message %= dict(name=queue_name, project=project)
|
||||
|
||||
LOG.warning(message)
|
||||
|
||||
raise errors.QueueDoesNotExist(queue_name, project)
|
||||
|
||||
# NOTE(kgriffs): Assume the queue existed, but the counter
|
||||
# was recently updated, causing the range query on 'c.t' to
|
||||
# exclude the record.
|
||||
return None
|
||||
|
||||
return doc['c']['v']
|
||||
|
||||
def _get_counter(self, queue_name, project=None):
|
||||
"""Retrieves the current message counter value for a given queue.
|
||||
|
||||
This helper is used to generate monotonic pagination
|
||||
markers that are saved as part of the message
|
||||
document.
|
||||
|
||||
Note 1: Markers are scoped per-queue and so are *not*
|
||||
globally unique or globally ordered.
|
||||
|
||||
Note 2: If two or more requests to this method are made
|
||||
in parallel, this method will return the same counter
|
||||
value. This is done intentionally so that the caller
|
||||
can detect a parallel message post, allowing it to
|
||||
mitigate race conditions between producer and
|
||||
observer clients.
|
||||
|
||||
:param queue_name: Name of the queue to which the counter is scoped
|
||||
:param project: Queue's project
|
||||
:returns: current message counter as an integer
|
||||
"""
|
||||
|
||||
# NOTE(flaper87): If this `if` is True, it means we're
|
||||
# using a mongodb in the control plane. To avoid breaking
|
||||
# environments doing so already, we'll keep using the counter
|
||||
# in the mongodb queue_controller rather than the one in the
|
||||
# message_controller. This should go away, eventually
|
||||
if hasattr(self._queue_ctrl, '_get_counter'):
|
||||
return self._queue_ctrl._get_counter(queue_name, project)
|
||||
|
||||
update = {'$inc': {'c.v': 0, 'c.t': 0}}
|
||||
query = _get_scoped_query(queue_name, project)
|
||||
|
||||
try:
|
||||
collection = self._collection(queue_name, project).stats
|
||||
doc = collection.find_one_and_update(
|
||||
query, update, upsert=True,
|
||||
return_document=pymongo.ReturnDocument.AFTER,
|
||||
projection={'c.v': 1, '_id': 0})
|
||||
|
||||
return doc['c']['v']
|
||||
except pymongo.errors.AutoReconnect as ex:
|
||||
LOG.exception(ex)
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# Public interface
|
||||
# ----------------------------------------------------------------------
|
||||
@ -506,15 +617,18 @@ class MessageController(storage.Message):
|
||||
if not self._queue_ctrl.exists(queue_name, project):
|
||||
raise errors.QueueDoesNotExist(queue_name, project)
|
||||
|
||||
# NOTE(flaper87): Make sure the counter exists. This method
|
||||
# is an upsert.
|
||||
self._get_counter(queue_name, project)
|
||||
now = timeutils.utcnow_ts()
|
||||
now_dt = datetime.datetime.utcfromtimestamp(now)
|
||||
collection = self._collection(queue_name, project)
|
||||
|
||||
messages = list(messages)
|
||||
msgs_n = len(messages)
|
||||
next_marker = self._queue_ctrl._inc_counter(queue_name,
|
||||
project,
|
||||
amount=msgs_n) - msgs_n
|
||||
next_marker = self._inc_counter(queue_name,
|
||||
project,
|
||||
amount=msgs_n) - msgs_n
|
||||
|
||||
prepared_messages = [
|
||||
{
|
||||
@ -674,6 +788,9 @@ class FIFOMessageController(MessageController):
|
||||
if not self._queue_ctrl.exists(queue_name, project):
|
||||
raise errors.QueueDoesNotExist(queue_name, project)
|
||||
|
||||
# NOTE(flaper87): Make sure the counter exists. This method
|
||||
# is an upsert.
|
||||
self._get_counter(queue_name, project)
|
||||
now = timeutils.utcnow_ts()
|
||||
now_dt = datetime.datetime.utcfromtimestamp(now)
|
||||
collection = self._collection(queue_name, project)
|
||||
@ -686,7 +803,7 @@ class FIFOMessageController(MessageController):
|
||||
# where a client paging through the queue may get the messages
|
||||
# with the higher counter and skip the previous ones. This would
|
||||
# make our FIFO guarantee unsound.
|
||||
next_marker = self._queue_ctrl._get_counter(queue_name, project)
|
||||
next_marker = self._get_counter(queue_name, project)
|
||||
|
||||
# Unique transaction ID to facilitate atomic batch inserts
|
||||
transaction = objectid.ObjectId()
|
||||
@ -743,8 +860,7 @@ class FIFOMessageController(MessageController):
|
||||
# such that the competing marker's will start at a
|
||||
# unique number, 1 past the max of the messages just
|
||||
# inserted above.
|
||||
self._queue_ctrl._inc_counter(queue_name, project,
|
||||
amount=len(ids))
|
||||
self._inc_counter(queue_name, project, amount=len(ids))
|
||||
|
||||
# NOTE(kgriffs): Finalize the insert once we can say that
|
||||
# all the messages made it. This makes bulk inserts
|
||||
@ -811,7 +927,7 @@ class FIFOMessageController(MessageController):
|
||||
# Note that we increment one at a time until the logjam is
|
||||
# broken, since we don't know how many messages were posted
|
||||
# by the worker before it crashed.
|
||||
next_marker = self._queue_ctrl._inc_counter(
|
||||
next_marker = self._inc_counter(
|
||||
queue_name, project, window=COUNTER_STALL_WINDOW)
|
||||
|
||||
# Retry the entire batch with a new sequence of markers.
|
||||
@ -824,7 +940,7 @@ class FIFOMessageController(MessageController):
|
||||
if next_marker is None:
|
||||
# NOTE(kgriffs): Usually we will end up here, since
|
||||
# it should be rare that a counter becomes stalled.
|
||||
next_marker = self._queue_ctrl._get_counter(
|
||||
next_marker = self._get_counter(
|
||||
queue_name, project)
|
||||
else:
|
||||
msgtmpl = (u'Detected a stalled message counter for '
|
||||
|
@ -306,60 +306,65 @@ class MongodbMessageTests(MongodbSetupMixin, base.MessageControllerTest):
|
||||
queue_name = self.queue_name
|
||||
iterations = 10
|
||||
|
||||
seed_marker1 = self.queue_controller._get_counter(queue_name,
|
||||
self.project)
|
||||
self.assertEqual(seed_marker1, 1, 'First marker is 1')
|
||||
m = mock.MagicMock(controllers.QueueController)
|
||||
self.controller._queue_ctrl = m
|
||||
del self.controller._queue_ctrl._get_counter
|
||||
del self.controller._queue_ctrl._inc_counter
|
||||
|
||||
seed_marker1 = self.controller._get_counter(queue_name,
|
||||
self.project)
|
||||
self.assertEqual(seed_marker1, 0, 'First marker is 0')
|
||||
|
||||
for i in range(iterations):
|
||||
self.controller.post(queue_name, [{'ttl': 60}],
|
||||
'uuid', project=self.project)
|
||||
|
||||
marker1 = self.queue_controller._get_counter(queue_name,
|
||||
self.project)
|
||||
marker2 = self.queue_controller._get_counter(queue_name,
|
||||
self.project)
|
||||
marker3 = self.queue_controller._get_counter(queue_name,
|
||||
self.project)
|
||||
marker1 = self.controller._get_counter(queue_name,
|
||||
self.project)
|
||||
marker2 = self.controller._get_counter(queue_name,
|
||||
self.project)
|
||||
marker3 = self.controller._get_counter(queue_name,
|
||||
self.project)
|
||||
|
||||
self.assertEqual(marker1, marker2)
|
||||
self.assertEqual(marker2, marker3)
|
||||
self.assertEqual(marker1, i + 2)
|
||||
self.assertEqual(marker1, i + 1)
|
||||
|
||||
new_value = self.queue_controller._inc_counter(queue_name,
|
||||
self.project)
|
||||
new_value = self.controller._inc_counter(queue_name,
|
||||
self.project)
|
||||
self.assertIsNotNone(new_value)
|
||||
|
||||
value_before = self.queue_controller._get_counter(queue_name,
|
||||
project=self.project)
|
||||
new_value = self.queue_controller._inc_counter(queue_name,
|
||||
project=self.project)
|
||||
value_before = self.controller._get_counter(queue_name,
|
||||
project=self.project)
|
||||
new_value = self.controller._inc_counter(queue_name,
|
||||
project=self.project)
|
||||
self.assertIsNotNone(new_value)
|
||||
value_after = self.queue_controller._get_counter(queue_name,
|
||||
project=self.project)
|
||||
value_after = self.controller._get_counter(queue_name,
|
||||
project=self.project)
|
||||
self.assertEqual(value_after, value_before + 1)
|
||||
|
||||
value_before = value_after
|
||||
new_value = self.queue_controller._inc_counter(queue_name,
|
||||
project=self.project,
|
||||
amount=7)
|
||||
value_after = self.queue_controller._get_counter(queue_name,
|
||||
project=self.project)
|
||||
new_value = self.controller._inc_counter(queue_name,
|
||||
project=self.project,
|
||||
amount=7)
|
||||
value_after = self.controller._get_counter(queue_name,
|
||||
project=self.project)
|
||||
self.assertEqual(value_after, value_before + 7)
|
||||
self.assertEqual(value_after, new_value)
|
||||
|
||||
reference_value = value_after
|
||||
|
||||
unchanged = self.queue_controller._inc_counter(queue_name,
|
||||
project=self.project,
|
||||
window=10)
|
||||
unchanged = self.controller._inc_counter(queue_name,
|
||||
project=self.project,
|
||||
window=10)
|
||||
self.assertIsNone(unchanged)
|
||||
|
||||
timeutils.set_time_override()
|
||||
timeutils.advance_time_delta(datetime.timedelta(seconds=10))
|
||||
|
||||
changed = self.queue_controller._inc_counter(queue_name,
|
||||
project=self.project,
|
||||
window=5)
|
||||
changed = self.controller._inc_counter(queue_name,
|
||||
project=self.project,
|
||||
window=5)
|
||||
self.assertEqual(changed, reference_value + 1)
|
||||
|
||||
timeutils.clear_time_override()
|
||||
@ -410,17 +415,18 @@ class MongodbFIFOMessageTests(MongodbSetupMixin, base.MessageControllerTest):
|
||||
# what happens when we have parallel requests and the "winning"
|
||||
# requests hasn't gotten around to calling _inc_counter before the
|
||||
# "losing" request attempts to insert it's batch of messages.
|
||||
with mock.patch.object(mongodb.queues.QueueController,
|
||||
'_inc_counter', autospec=True) as method:
|
||||
with mock.patch.object(mongodb.messages.MessageController,
|
||||
'_inc_counter', autospec=True) as ic:
|
||||
|
||||
method.return_value = 2
|
||||
ic.return_value = 2
|
||||
messages = expected_messages[:1]
|
||||
created = list(self.controller.post(queue_name, messages,
|
||||
uuid, project=self.project))
|
||||
created = list(self.controller.post(queue_name,
|
||||
messages, uuid,
|
||||
project=self.project))
|
||||
self.assertEqual(len(created), 1)
|
||||
|
||||
# Force infinite retries
|
||||
method.return_value = None
|
||||
ic.return_value = None
|
||||
|
||||
with testing.expect(errors.MessageConflict):
|
||||
self.controller.post(queue_name, messages,
|
||||
|
Loading…
x
Reference in New Issue
Block a user