fix: Requests get slower when queues have a lot of messages

The primary culprit was an additional scan operation
in the MongoDB driver's _list(...) method, triggered by
a range query on the message's expiration date.

This patch removes that field from the index, so the filtering
is done using a single scan pass. It also removes 'e' from the
index used for counting messages, since in testing it was
slightly faster to leave it out (and should also slightly speed
up insertion time since it's once less index field that mongo
has to worry about.)

Also in this patch, the order of the project and
queue name fields were swapped in the index in order
to optimize for selectivity. In addition, some minor
edits were made for style and correctness.

Finally, this patch also removes thresholding from
the GC so that we don't have expired messages hanging
around for a long time that have to be filtered out.

Change-Id: Ie4bd125e966612f4a8022fd6af133314d05fe428
Closes-Bug: #1216950
This commit is contained in:
kgriffs 2013-08-28 20:30:08 -05:00
parent defd75e1a5
commit 72974b7be1
7 changed files with 160 additions and 171 deletions

View File

@ -144,8 +144,8 @@ class ClaimController(storage.ClaimBase):
# Get a list of active, not claimed nor expired
# messages that could be claimed.
msgs = msg_ctrl.active(queue, fields={'_id': 1}, project=project)
msgs = msgs.limit(limit)
msgs = msg_ctrl.active(queue, fields={'_id': 1}, project=project,
limit=limit)
messages = iter([])
ids = [msg['_id'] for msg in msgs]

View File

@ -40,6 +40,41 @@ CFG = config.namespace('limits:storage').from_options(
default_message_paging=10,
)
# For hinting
ID_INDEX_FIELDS = [('_id', 1)]
# NOTE(kgriffs): This index is for listing messages, usually
# filtering out claimed ones.
ACTIVE_INDEX_FIELDS = [
('p', 1), # Project will to be unique, so put first
('q', 1), # May not be unique, since user names it
('k', 1), # Used for sorting and paging, must come before range queries
('c.e', 1), # Used for filtering out claimed messages
]
# For counting
COUNTING_INDEX_FIELDS = [
('p', 1), # Project will to be unique, so put first
('q', 1), # May not be unique, since user names it
('c.e', 1), # Used for filtering out claimed messages
]
# Index used for claims
CLAIMED_INDEX_FIELDS = [
('p', 1),
('q', 1),
('c.id', 1),
('k', 1),
('c.e', 1),
]
# Index used for _next_marker() and also to ensure uniqueness.
MARKER_INDEX_FIELDS = [
('p', 1),
('q', 1),
('k', -1)
]
class MessageController(storage.MessageBase):
"""Implements message resource operations using MongoDB.
@ -68,42 +103,27 @@ class MessageController(storage.MessageBase):
# doing anything.
self._col = self._db['messages']
# NOTE(flaper87): This index is used mostly in the
# active method but some parts of it are used in
# other places.
# * q: Mostly everywhere. It must stay at the
# beginning of the index.
# * k: Marker and FIFO key (Used mainly for sorting)
# * e: Together with q is used for getting a
# specific message. (see `get`)
self.active_fields = [
('q', 1),
('p', 1),
('k', 1),
('e', 1),
('c.e', 1),
]
self._ensure_indexes()
self._col.ensure_index(self.active_fields,
#-----------------------------------------------------------------------
# Helpers
#-----------------------------------------------------------------------
def _ensure_indexes(self):
"""Ensures that all indexes are created."""
self._col.ensure_index(ACTIVE_INDEX_FIELDS,
name='active',
background=True)
# Index used for claims
self.claimed_fields = [
('q', 1),
('p', 1),
('c.id', 1),
('k', 1),
('c.e', 1),
]
self._col.ensure_index(self.claimed_fields,
self._col.ensure_index(CLAIMED_INDEX_FIELDS,
name='claimed',
background=True)
# Index used for _next_marker() and also to ensure
# uniqueness.
#
self._col.ensure_index(COUNTING_INDEX_FIELDS,
name='counting',
background=True)
# NOTE(kgriffs): This index must be unique so that
# inserting a message with the same marker to the
# same queue will fail; this is used to detect a
@ -111,15 +131,11 @@ class MessageController(storage.MessageBase):
# to miss a message when there is more than one
# producer posting messages to the same queue, in
# parallel.
self._col.ensure_index([('q', 1), ('p', 1), ('k', -1)],
self._col.ensure_index(MARKER_INDEX_FIELDS,
name='queue_marker',
unique=True,
background=True)
#-----------------------------------------------------------------------
# Helpers
#-----------------------------------------------------------------------
def _next_marker(self, queue_name, project=None):
"""Retrieves the next message marker for a given queue.
@ -144,7 +160,7 @@ class MessageController(storage.MessageBase):
:returns: next message marker as an integer
"""
document = self._col.find_one({'q': queue_name, 'p': project},
document = self._col.find_one({'p': project, 'q': queue_name},
sort=[('k', -1)],
fields={'k': 1, '_id': 0})
@ -165,20 +181,6 @@ class MessageController(storage.MessageBase):
time.sleep(seconds)
def _count_expired(self, queue_name, project=None):
"""Counts the number of expired messages in a queue.
:param queue_name: Name of the queue to stat
"""
query = {
'p': project,
'q': queue_name,
'e': {'$lte': timeutils.utcnow()},
}
return self._col.find(query).count()
def _remove_expired(self, queue_name, project):
"""Removes all expired messages except for the most recent
in each queue.
@ -187,42 +189,31 @@ class MessageController(storage.MessageBase):
must always leave at least one message in the queue for
calculating the next marker.
Note that expired messages are only removed if their count
exceeds options.CFG.gc_threshold.
:param queue_name: name for the queue from which to remove
expired messages
:param project: Project queue_name belong's too
"""
expired_msgs = self._count_expired(queue_name, project)
if options.CFG.gc_threshold <= expired_msgs:
# Get the message with the highest marker, and leave
# it in the queue
# Get the message with the highest marker, and leave
# it in the queue
head = self._col.find_one({'p': project, 'q': queue_name},
sort=[('k', -1)], fields={'k': 1})
# NOTE(flaper87): Keep the counter in a separate record and
# lets remove all messages.
head = self._col.find_one({'q': queue_name, 'p': project},
sort=[('k', -1)], fields={'_id': 1})
if head is None:
# Assume queue was just deleted via a parallel request
LOG.debug(_(u'Queue %s is empty or missing.') % queue_name)
return
if head is None:
# Assume queue was just deleted via a parallel request
LOG.warning(_(u'Queue %s is empty or missing.') % queue_name)
return
query = {
'p': project,
'q': queue_name,
'k': {'$ne': head['k']},
'e': {'$lte': timeutils.utcnow()},
}
# NOTE(flaper87): Can we use k instead of
# _id here? The active index will cover
# the previous query and the remove one.
query = {
'p': project,
'q': queue_name,
'e': {'$lte': timeutils.utcnow()},
'_id': {'$ne': head['_id']}
}
self._col.remove(query, w=0)
self._col.remove(query, w=0)
def _purge_queue(self, queue, project=None):
def _purge_queue(self, queue_name, project=None):
"""Removes all messages from the queue.
Warning: Only use this when deleting the queue; otherwise
@ -231,28 +222,39 @@ class MessageController(storage.MessageBase):
If the queue does not exist, this method fails silently.
:param queue: name of the queue to purge
:param project: name of the project to which the queue belongs
:param queue_name: name of the queue to purge
:param project: ID of the project to which the queue belongs
"""
self._col.remove({'q': queue, 'p': project}, w=0)
self._col.remove({'p': project, 'q': queue_name}, w=0)
def _list(self, queue_name, marker=None, echo=False, client_uuid=None,
fields=None, include_claimed=False, project=None, sort=1):
def _list(self, queue_name, project=None, marker=None,
echo=False, client_uuid=None, fields=None,
include_claimed=False, sort=1, limit=None):
"""Message document listing helper.
:param queue_name: Name of the queue to list
:param project: Project `queue_name` belongs to.
:param marker: Message marker from which to start iterating
:param echo: Whether to return messages that match client_uuid
:param client_uuid: UUID for the client that originated this request
:param fields: Fields to include in emmitted documents as a dict
:param include_claimed: Whether to include claimed messages,
not just active ones
:param project: (Default None) Project `queue_name` belongs to. If
not specified, queries the "global" namespace/project.
:param marker: (Default None) Message marker from which to start
iterating. If not specified, starts with the first message
available in the queue.
:param echo: (Default False) Whether to return messages that match
client_uuid
:param client_uuid: (Default None) UUID for the client that
originated this request
:param fields: (Default None) Fields to include in emmitted
documents
:param include_claimed: (Default False) Whether to include
claimed messages, not just active ones
:param sort: (Default 1) Sort order for the listing. Pass 1 for
ascending (oldest message first), or -1 for descending (newest
message first).
:param limit: (Default None) The maximum number of messages
to list. The results may include fewer messages than the
requested `limit` if not enough are available. If limit is
not specified
:returns: MongoDB cursor
:returns: Generator yielding up to `limit` messages.
"""
if sort not in (1, -1):
@ -274,7 +276,7 @@ class MessageController(storage.MessageBase):
if not echo:
query['u'] = {'$ne': client_uuid}
if marker:
if marker is not None:
query['k'] = {'$gt': marker}
if not include_claimed:
@ -282,31 +284,42 @@ class MessageController(storage.MessageBase):
# any claim, or are part of an expired claim.
query['c.e'] = {'$lte': now}
# Construct the request
cursor = self._col.find(query, fields=fields,
sort=[('k', sort)], limit=limit)
# NOTE(flaper87): Suggest the index to use for this query
return self._col.find(query, fields=fields,
sort=[('k', sort)]).hint(self.active_fields)
return cursor.hint(ACTIVE_INDEX_FIELDS)
#-----------------------------------------------------------------------
# Interface
#-----------------------------------------------------------------------
def count(self, queue_name, project=None):
"""Return total number of (non-expired) messages in a queue.
def count(self, queue_name, project=None, include_claimed=False):
"""Return total number of messages in a queue.
This method is designed to very quickly count the number
of messages in a given queue. Expired messages are not
counted, of course. If the queue does not exist, the
count will always be 0.
Note: Some expired messages may be included in the count if
they haven't been GC'd yet. This is done for performance.
"""
query = {
# Messages must belong to this queue
'q': queue_name,
'p': project,
'q': queue_name,
# The messages can not be expired
'e': {'$gt': timeutils.utcnow()},
}
return self._col.find(query).count()
if not include_claimed:
# Exclude messages that are claimed
query['c.e'] = {'$lte': timeutils.utcnow()}
return self._col.find(query).hint(COUNTING_INDEX_FIELDS).count()
def first(self, queue_name, project=None, sort=1):
"""Get first message in the queue (including claimed).
@ -321,7 +334,8 @@ class MessageController(storage.MessageBase):
"""
cursor = self._list(queue_name, project=project,
include_claimed=True, sort=sort).limit(1)
include_claimed=True, sort=sort,
limit=1)
try:
message = next(cursor)
except StopIteration:
@ -330,18 +344,25 @@ class MessageController(storage.MessageBase):
return message
def active(self, queue_name, marker=None, echo=False,
client_uuid=None, fields=None, project=None):
client_uuid=None, fields=None, project=None,
limit=None):
return self._list(queue_name, marker, echo, client_uuid,
fields, include_claimed=False, project=project)
return self._list(queue_name, project=project, marker=marker,
echo=echo, client_uuid=client_uuid,
fields=fields, include_claimed=False,
limit=limit)
def claimed(self, queue_name, claim_id,
expires=None, limit=None, project=None):
if claim_id is None:
claim_id = {'$ne': None}
query = {
'p': project,
'q': queue_name,
'c.id': claim_id,
'c.e': {'$gt': expires or timeutils.utcnow()},
'q': queue_name,
'p': project,
}
# NOTE(kgriffs): Claimed messages bust be queried from
@ -351,7 +372,7 @@ class MessageController(storage.MessageBase):
msgs = self._col.find(query, sort=[('k', 1)],
read_preference=preference)
if limit:
if limit is not None:
msgs = msgs.limit(limit)
now = timeutils.utcnow()
@ -375,7 +396,7 @@ class MessageController(storage.MessageBase):
# NOTE(cpp-cabrera): unclaim by setting the claim ID to None
# and the claim expiration time to now
now = timeutils.utcnow()
self._col.update({'q': queue_name, 'p': project, 'c.id': cid},
self._col.update({'p': project, 'q': queue_name, 'c.id': cid},
{'$set': {'c': {'id': None, 'e': now}}},
upsert=False, multi=True)
@ -392,11 +413,8 @@ class MessageController(storage.MessageBase):
it must leave at least one message in each queue, and it
is impractical to send a huge list of _id's to filter out
in a single call. That being said, this is somewhat mitigated
by the gc_threshold configuration option, which reduces the
frequency at which the DB is locked for non-busy queues. Also,
since .remove is run on each queue seperately, this reduces
the duration that any given lock is held, avoiding blocking
regular writes.
by the fact that remove() is run on each queue seperately,
thereby reducing the duration that any given lock is held.
"""
# TODO(kgriffs): Optimize first by batching the .removes, second
@ -419,10 +437,10 @@ class MessageController(storage.MessageBase):
except ValueError:
yield iter([])
messages = self._list(queue_name, marker, echo, client_uuid,
include_claimed=include_claimed, project=project)
messages = self._list(queue_name, project=project, marker=marker,
client_uuid=client_uuid, echo=echo,
include_claimed=include_claimed, limit=limit)
messages = messages.limit(limit)
marker_id = {}
now = timeutils.utcnow()
@ -450,12 +468,12 @@ class MessageController(storage.MessageBase):
query = {
'_id': mid,
'q': queue_name,
'p': project,
'q': queue_name,
'e': {'$gt': now}
}
message = list(self._col.find(query).limit(1).hint([('_id', 1)]))
message = list(self._col.find(query).limit(1).hint(ID_INDEX_FIELDS))
if not message:
raise exceptions.MessageDoesNotExist(message_id, queue_name,
@ -473,15 +491,15 @@ class MessageController(storage.MessageBase):
# Base query, always check expire time
query = {
'q': queue_name,
'p': project,
'_id': {'$in': message_ids},
'p': project,
'q': queue_name,
'e': {'$gt': now},
}
# NOTE(flaper87): Should this query
# be sorted?
messages = self._col.find(query).hint([('_id', 1)])
messages = self._col.find(query).hint(ID_INDEX_FIELDS)
def denormalizer(msg):
return _basic_message(msg, now)
@ -617,9 +635,9 @@ class MessageController(storage.MessageBase):
return
query = {
'q': queue_name,
'_id': mid,
'p': project,
'_id': mid
'q': queue_name,
}
# NOTE(cpp-cabrera): return early - the user gaves us an
@ -653,9 +671,9 @@ class MessageController(storage.MessageBase):
def bulk_delete(self, queue_name, message_ids, project=None):
message_ids = [mid for mid in map(utils.to_oid, message_ids) if mid]
query = {
'q': queue_name,
'p': project,
'_id': {'$in': message_ids},
'p': project,
'q': queue_name,
}
self._col.remove(query, w=0)

View File

@ -42,16 +42,6 @@ OPTIONS = {
# Frequency of message garbage collections, in seconds
'gc_interval': 5 * 60,
# Threshold of number of expired messages to reach in a given
# queue, before performing the GC. Useful for reducing frequent
# locks on the DB for non-busy queues, or for worker queues
# which process jobs quickly enough to keep the number of in-
# flight messages low.
#
# Note: The higher this number, the larger the memory-mapped DB
# files will be.
'gc_threshold': 1000,
}
CFG = config.namespace('drivers:storage:mongodb').from_options(**OPTIONS)

View File

@ -149,8 +149,8 @@ class QueueController(storage.QueueBase):
controller = self.driver.message_controller
active = controller.active(name, project=project).count()
total = controller.count(name, project=project)
active = controller.count(name, project=project, include_claimed=False)
total = controller.count(name, project=project, include_claimed=True)
message_stats = {
'claimed': total - active,

View File

@ -12,4 +12,3 @@ port = 8888
[drivers:storage:mongodb]
uri = mongodb://127.0.0.1:27017
database = marconi_test
gc_threshold = 100

View File

@ -345,6 +345,10 @@ class MessageControllerTest(ControllerBaseTest):
def test_expired_message(self):
messages = [{'body': 3.14, 'ttl': 0}]
[msgid] = self.controller.post(self.queue_name, messages,
project=self.project,
client_uuid='my_uuid')
[msgid] = self.controller.post(self.queue_name, messages,
project=self.project,
client_uuid='my_uuid')
@ -355,6 +359,7 @@ class MessageControllerTest(ControllerBaseTest):
countof = self.queue_controller.stats(self.queue_name,
project=self.project)
self.assertEquals(countof['messages']['free'], 0)
def test_bad_id(self):

View File

@ -14,7 +14,6 @@
# limitations under the License.
import os
import random
import time
import mock
@ -165,6 +164,7 @@ class MongodbMessageTests(base.MessageControllerTest):
self.assertIn('active', indexes)
self.assertIn('claimed', indexes)
self.assertIn('queue_marker', indexes)
self.assertIn('counting', indexes)
def test_next_marker(self):
queue_name = 'marker_test'
@ -188,10 +188,7 @@ class MongodbMessageTests(base.MessageControllerTest):
def test_remove_expired(self):
num_projects = 10
num_queues = 10
total_queues = num_projects * num_queues
gc_threshold = mongodb_options.CFG.gc_threshold
messages_per_queue = gc_threshold
nogc_messages_per_queue = gc_threshold - 1
messages_per_queue = 100
projects = ['gc-test-project-{0}'.format(i)
for i in range(num_projects)]
@ -206,41 +203,21 @@ class MongodbMessageTests(base.MessageControllerTest):
self.queue_controller.create(queue, project)
self.controller.post(queue, messages, client_uuid, project)
# Add one that should not be gc'd due to being under threshold
self.queue_controller.create('nogc-test', 'nogc-test-project')
nogc_messages = [{'ttl': 0, 'body': str(i)}
for i in range(nogc_messages_per_queue)]
self.controller.post('nogc-test', nogc_messages,
client_uuid, 'nogc-test-project')
total_expired = sum(
self._count_expired(queue, project)
for queue in queue_names
for project in projects)
self.assertEquals(total_expired, total_queues * messages_per_queue)
self.controller.remove_expired()
# Make sure the messages in this queue were not gc'd since
# the count was under the threshold.
self.assertEquals(
self._count_expired('nogc-test', 'nogc-test-project'),
len(nogc_messages))
for project in projects:
for queue in queue_names:
query = {'q': queue, 'p': project}
total_expired = sum(
self._count_expired(queue, project)
for queue in queue_names
for project in projects)
cursor = self.driver.db.messages.find(query)
count = cursor.count()
# Expect that the most recent message for each queue
# will not be removed.
self.assertEquals(total_expired, total_queues)
# Expect that the most recent message for each queue
# will not be removed.
self.assertEquals(count, 1)
# Sanity-check that the most recent message is the
# one remaining in the queue.
queue = random.choice(queue_names)
message = self.driver.db.messages.find_one({'q': queue, 'p': project})
self.assertEquals(message['k'], messages_per_queue)
message = next(cursor)
self.assertEquals(message['k'], messages_per_queue)
def test_empty_queue_exception(self):
queue_name = 'empty-queue-test'