object-updater: defer ratelimited updates

Previously, objects updates that could not be sent immediately due to
per-container/bucket ratelimiting [1] would be skipped and re-tried
during the next updater cycle. There could potentially be a period of
time at the end of a cycle when the updater slept, having completed a
sweep of the on-disk async pending files, despite having skipped
updates during the cycle. Skipped updates would then be read from disk
again during the next cycle.

With this change the updater will defer skipped updates to an
in-memory queue (up to a configurable maximum number) until the sweep
of async pending files has completed, and then trickle out deferred
updates until the cycle's interval expires. This increases the useful
work done in the current cycle and reduces the amount of repeated disk
IO during the next cycle.

The deferrals queue is bounded in size and will evict least recently
read updates in order to accept more recently read updates. This
reduces the probablility that a deferred update has been made obsolete
by newer on-disk async pending files while waiting in the deferrals
queue.

The deferrals queue is implemented as a collection of per-bucket
queues so that updates can be drained from the queues in the order
that buckets cease to be ratelimited.

[1] Related-Change: Idef25cd6026b02c1b5c10a9816c8c6cbe505e7ed

Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com>
Co-Authored-By: Matthew Oliver <matt@oliver.net.au>
Change-Id: I95e58df9f15c5f9d552b8f4c4989a474f52262f4
This commit is contained in:
Alistair Coles 2022-01-10 11:52:49 +00:00
parent de88862981
commit 51da2543ca
4 changed files with 881 additions and 61 deletions

View File

@ -483,6 +483,12 @@ use = egg:swift#recon
# be an integer greater than 0. # be an integer greater than 0.
# per_container_ratelimit_buckets = 1000 # per_container_ratelimit_buckets = 1000
# #
# Updates that cannot be sent due to per-container rate-limiting may be
# deferred and re-tried at the end of the updater cycle. This option constrains
# the size of the in-memory data structure used to store deferred updates.
# Must be an integer value greater than or equal to 0.
# max_deferred_updates = 10000
#
# slowdown will sleep that amount between objects. Deprecated; use # slowdown will sleep that amount between objects. Deprecated; use
# objects_per_second instead. # objects_per_second instead.
# slowdown = 0.01 # slowdown = 0.01

View File

@ -12,6 +12,7 @@
# implied. # implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from six.moves import queue
import six.moves.cPickle as pickle import six.moves.cPickle as pickle
import errno import errno
@ -21,6 +22,7 @@ import sys
import time import time
import uuid import uuid
from random import random, shuffle from random import random, shuffle
from collections import deque
from eventlet import spawn, Timeout from eventlet import spawn, Timeout
@ -31,7 +33,7 @@ from swift.common.ring import Ring
from swift.common.utils import get_logger, renamer, write_pickle, \ from swift.common.utils import get_logger, renamer, write_pickle, \
dump_recon_cache, config_true_value, RateLimitedIterator, split_path, \ dump_recon_cache, config_true_value, RateLimitedIterator, split_path, \
eventlet_monkey_patch, get_redirect_data, ContextPool, hash_path, \ eventlet_monkey_patch, get_redirect_data, ContextPool, hash_path, \
non_negative_float, config_positive_int_value non_negative_float, config_positive_int_value, non_negative_int
from swift.common.daemon import Daemon from swift.common.daemon import Daemon
from swift.common.header_key_dict import HeaderKeyDict from swift.common.header_key_dict import HeaderKeyDict
from swift.common.storage_policy import split_policy_string, PolicyError from swift.common.storage_policy import split_policy_string, PolicyError
@ -41,33 +43,98 @@ from swift.common.http import is_success, HTTP_INTERNAL_SERVER_ERROR, \
HTTP_MOVED_PERMANENTLY HTTP_MOVED_PERMANENTLY
class RateLimiterBucket(object):
def __init__(self, update_delta):
self.update_delta = update_delta
self.last_time = 0
self.deque = deque()
@property
def wait_until(self):
return self.last_time + self.update_delta
def __len__(self):
return len(self.deque)
def __bool__(self):
return bool(self.deque)
__nonzero__ = __bool__ # py2
def __lt__(self, other):
# used to sort buckets by readiness
if isinstance(other, RateLimiterBucket):
return self.wait_until < other.wait_until
return self.wait_until < other
class BucketizedUpdateSkippingLimiter(object): class BucketizedUpdateSkippingLimiter(object):
""" """
Wrap an iterator to filter elements that show up too often. Wrap an iterator to rate-limit updates on a per-bucket basis, where updates
are mapped to buckets by hashing their destination path. If an update is
rate-limited then it is placed on a deferral queue and may be sent later if
the wrapped iterator is exhausted before the ``drain_until`` time is
reached.
The deferral queue has constrained size and once the queue is full updates
are evicted using a first-in-first-out policy. This policy is used because
updates on the queue may have been made obsolete by newer updates written
to disk, and this is more likely for updates that have been on the queue
longest.
The iterator increments stats as follows:
* The `deferrals` stat is incremented for each update that is
rate-limited. Note that a individual update is rate-limited at most
once.
* The `skips` stat is incremented for each rate-limited update that is
not eventually yielded. This includes updates that are evicted from the
deferral queue and all updates that remain in the deferral queue when
``drain_until`` time is reached and the iterator terminates.
* The `drains` stat is incremented for each rate-limited update that is
eventually yielded.
Consequently, when this iterator terminates, the sum of `skips` and
`drains` is equal to the number of `deferrals`.
:param update_iterable: an async_pending update iterable :param update_iterable: an async_pending update iterable
:param logger: a logger instance
:param stats: a SweepStats instance
:param num_buckets: number of buckets to divide container hashes into, the :param num_buckets: number of buckets to divide container hashes into, the
more buckets total the less containers to a bucket more buckets total the less containers to a bucket
(once a busy container slows down a bucket the whole (once a busy container slows down a bucket the whole
bucket starts skipping) bucket starts deferring)
:param max_elements_per_group_per_second: tunable, when skipping kicks in :param max_elements_per_group_per_second: tunable, when deferring kicks in
:param skip_f: function to call with update_ctx when skipping it :param max_deferred_elements: maximum number of deferred elements before
skipping starts. Each bucket may defer updates, but once the total
number of deferred updates summed across all buckets reaches this
value then all buckets will skip subsequent updates.
:param drain_until: time at which any remaining deferred elements must be
skipped and the iterator stops. Once the wrapped iterator has been
exhausted, this iterator will drain deferred elements from its buckets
until either all buckets have drained or this time is reached.
""" """
def __init__(self, update_iterable, num_buckets, def __init__(self, update_iterable, logger, stats, num_buckets=1000,
max_elements_per_group_per_second, max_elements_per_group_per_second=50,
skip_f=lambda update_ctx: None): max_deferred_elements=0,
drain_until=0):
self.iterator = iter(update_iterable) self.iterator = iter(update_iterable)
self.logger = logger
self.stats = stats
# if we want a smaller "blast radius" we could make this number bigger # if we want a smaller "blast radius" we could make this number bigger
self.num_buckets = max(num_buckets, 1) self.num_buckets = max(num_buckets, 1)
# an array might be more efficient; but this is pretty cheap
self.next_update = [0.0 for _ in range(self.num_buckets)]
try: try:
self.bucket_update_delta = 1.0 / max_elements_per_group_per_second self.bucket_update_delta = 1.0 / max_elements_per_group_per_second
except ZeroDivisionError: except ZeroDivisionError:
self.bucket_update_delta = -1 self.bucket_update_delta = -1
self.skip_f = skip_f self.max_deferred_elements = max_deferred_elements
self.deferred_buckets = deque()
self.drain_until = drain_until
self.salt = str(uuid.uuid4()) self.salt = str(uuid.uuid4())
self.buckets = [RateLimiterBucket(self.bucket_update_delta)
for _ in range(self.num_buckets)]
self.buckets_ordered_by_readiness = None
def __iter__(self): def __iter__(self):
return self return self
@ -76,15 +143,77 @@ class BucketizedUpdateSkippingLimiter(object):
acct, cont = split_update_path(update) acct, cont = split_update_path(update)
return int(hash_path(acct, cont, self.salt), 16) % self.num_buckets return int(hash_path(acct, cont, self.salt), 16) % self.num_buckets
def _get_time(self):
return time.time()
def next(self): def next(self):
# first iterate over the wrapped iterator...
for update_ctx in self.iterator: for update_ctx in self.iterator:
bucket_key = self._bucket_key(update_ctx['update']) bucket = self.buckets[self._bucket_key(update_ctx['update'])]
now = time.time() now = self._get_time()
if self.next_update[bucket_key] > now: if now >= bucket.wait_until:
self.skip_f(update_ctx) # no need to ratelimit, just return next update
continue bucket.last_time = now
self.next_update[bucket_key] = now + self.bucket_update_delta
return update_ctx return update_ctx
self.stats.deferrals += 1
self.logger.increment("deferrals")
if self.max_deferred_elements > 0:
if len(self.deferred_buckets) >= self.max_deferred_elements:
# create space to defer this update by popping the least
# recent deferral from the least recently deferred bucket;
# updates read from disk recently are preferred over those
# read from disk less recently.
oldest_deferred_bucket = self.deferred_buckets.popleft()
oldest_deferred_bucket.deque.popleft()
self.stats.skips += 1
self.logger.increment("skips")
# append the update to the bucket's queue and append the bucket
# to the queue of deferred buckets
# note: buckets may have multiple entries in deferred_buckets,
# one for each deferred update in that particular bucket
bucket.deque.append(update_ctx)
self.deferred_buckets.append(bucket)
else:
self.stats.skips += 1
self.logger.increment("skips")
if self.buckets_ordered_by_readiness is None:
# initialise a queue of those buckets with deferred elements;
# buckets are queued in the chronological order in which they are
# ready to serve an element
self.buckets_ordered_by_readiness = queue.PriorityQueue()
for bucket in self.buckets:
if bucket:
self.buckets_ordered_by_readiness.put(bucket)
# now drain the buckets...
undrained_elements = []
while not self.buckets_ordered_by_readiness.empty():
now = self._get_time()
bucket = self.buckets_ordered_by_readiness.get_nowait()
if now < self.drain_until:
# wait for next element to be ready
time.sleep(max(0, bucket.wait_until - now))
# drain the most recently deferred element
item = bucket.deque.pop()
if bucket:
# bucket has more deferred elements, re-insert in queue in
# correct chronological position
bucket.last_time = self._get_time()
self.buckets_ordered_by_readiness.put(bucket)
self.stats.drains += 1
self.logger.increment("drains")
return item
else:
# time to stop iterating: gather all un-drained elements
undrained_elements.extend(bucket.deque)
if undrained_elements:
# report final batch of skipped elements
self.stats.skips += len(undrained_elements)
self.logger.update_stats("skips", len(undrained_elements))
raise StopIteration() raise StopIteration()
__next__ = next __next__ = next
@ -93,9 +222,18 @@ class BucketizedUpdateSkippingLimiter(object):
class SweepStats(object): class SweepStats(object):
""" """
Stats bucket for an update sweep Stats bucket for an update sweep
A measure of the rate at which updates are being rate-limited is:
deferrals / (deferrals + successes + failures - drains)
A measure of the rate at which updates are not being sent during a sweep
is:
skips / (skips + successes + failures)
""" """
def __init__(self, errors=0, failures=0, quarantines=0, successes=0, def __init__(self, errors=0, failures=0, quarantines=0, successes=0,
unlinks=0, redirects=0, skips=0): unlinks=0, redirects=0, skips=0, deferrals=0, drains=0):
self.errors = errors self.errors = errors
self.failures = failures self.failures = failures
self.quarantines = quarantines self.quarantines = quarantines
@ -103,10 +241,13 @@ class SweepStats(object):
self.unlinks = unlinks self.unlinks = unlinks
self.redirects = redirects self.redirects = redirects
self.skips = skips self.skips = skips
self.deferrals = deferrals
self.drains = drains
def copy(self): def copy(self):
return type(self)(self.errors, self.failures, self.quarantines, return type(self)(self.errors, self.failures, self.quarantines,
self.successes, self.unlinks) self.successes, self.unlinks, self.redirects,
self.skips, self.deferrals, self.drains)
def since(self, other): def since(self, other):
return type(self)(self.errors - other.errors, return type(self)(self.errors - other.errors,
@ -115,7 +256,9 @@ class SweepStats(object):
self.successes - other.successes, self.successes - other.successes,
self.unlinks - other.unlinks, self.unlinks - other.unlinks,
self.redirects - other.redirects, self.redirects - other.redirects,
self.skips - other.skips) self.skips - other.skips,
self.deferrals - other.deferrals,
self.drains - other.drains)
def reset(self): def reset(self):
self.errors = 0 self.errors = 0
@ -125,6 +268,8 @@ class SweepStats(object):
self.unlinks = 0 self.unlinks = 0
self.redirects = 0 self.redirects = 0
self.skips = 0 self.skips = 0
self.deferrals = 0
self.drains = 0
def __str__(self): def __str__(self):
keys = ( keys = (
@ -135,6 +280,8 @@ class SweepStats(object):
(self.errors, 'errors'), (self.errors, 'errors'),
(self.redirects, 'redirects'), (self.redirects, 'redirects'),
(self.skips, 'skips'), (self.skips, 'skips'),
(self.deferrals, 'deferrals'),
(self.drains, 'drains'),
) )
return ', '.join('%d %s' % pair for pair in keys) return ', '.join('%d %s' % pair for pair in keys)
@ -191,6 +338,9 @@ class ObjectUpdater(Daemon):
DEFAULT_RECON_CACHE_PATH) DEFAULT_RECON_CACHE_PATH)
self.rcache = os.path.join(self.recon_cache_path, RECON_OBJECT_FILE) self.rcache = os.path.join(self.recon_cache_path, RECON_OBJECT_FILE)
self.stats = SweepStats() self.stats = SweepStats()
self.max_deferred_updates = non_negative_int(
conf.get('max_deferred_updates', 10000))
self.begin = time.time()
def _listdir(self, path): def _listdir(self, path):
try: try:
@ -214,7 +364,7 @@ class ObjectUpdater(Daemon):
time.sleep(random() * self.interval) time.sleep(random() * self.interval)
while True: while True:
self.logger.info('Begin object update sweep') self.logger.info('Begin object update sweep')
begin = time.time() self.begin = time.time()
pids = [] pids = []
# read from container ring to ensure it's fresh # read from container ring to ensure it's fresh
self.get_container_ring().get_nodes('') self.get_container_ring().get_nodes('')
@ -248,7 +398,7 @@ class ObjectUpdater(Daemon):
sys.exit() sys.exit()
while pids: while pids:
pids.remove(os.wait()[0]) pids.remove(os.wait()[0])
elapsed = time.time() - begin elapsed = time.time() - self.begin
self.logger.info('Object update sweep completed: %.02fs', self.logger.info('Object update sweep completed: %.02fs',
elapsed) elapsed)
dump_recon_cache({'object_updater_sweep': elapsed}, dump_recon_cache({'object_updater_sweep': elapsed},
@ -259,7 +409,7 @@ class ObjectUpdater(Daemon):
def run_once(self, *args, **kwargs): def run_once(self, *args, **kwargs):
"""Run the updater once.""" """Run the updater once."""
self.logger.info('Begin object update single threaded sweep') self.logger.info('Begin object update single threaded sweep')
begin = time.time() self.begin = time.time()
self.stats.reset() self.stats.reset()
for device in self._listdir(self.devices): for device in self._listdir(self.devices):
try: try:
@ -271,7 +421,7 @@ class ObjectUpdater(Daemon):
self.logger.warning('Skipping: %s', err) self.logger.warning('Skipping: %s', err)
continue continue
self.object_sweep(dev_path) self.object_sweep(dev_path)
elapsed = time.time() - begin elapsed = time.time() - self.begin
self.logger.info( self.logger.info(
('Object update single-threaded sweep completed: ' ('Object update single-threaded sweep completed: '
'%(elapsed).02fs, %(stats)s'), '%(elapsed).02fs, %(stats)s'),
@ -404,18 +554,15 @@ class ObjectUpdater(Daemon):
self.logger.info("Object update sweep starting on %s (pid: %d)", self.logger.info("Object update sweep starting on %s (pid: %d)",
device, my_pid) device, my_pid)
def skip_counting_f(update_ctx):
# in the future we could defer update_ctx
self.stats.skips += 1
self.logger.increment("skips")
ap_iter = RateLimitedIterator( ap_iter = RateLimitedIterator(
self._iter_async_pendings(device), self._iter_async_pendings(device),
elements_per_second=self.max_objects_per_second) elements_per_second=self.max_objects_per_second)
ap_iter = BucketizedUpdateSkippingLimiter( ap_iter = BucketizedUpdateSkippingLimiter(
ap_iter, self.per_container_ratelimit_buckets, ap_iter, self.logger, self.stats,
self.per_container_ratelimit_buckets,
self.max_objects_per_container_per_second, self.max_objects_per_container_per_second,
skip_f=skip_counting_f) max_deferred_elements=self.max_deferred_updates,
drain_until=self.begin + self.interval)
with ContextPool(self.concurrency) as pool: with ContextPool(self.concurrency) as pool:
for update_ctx in ap_iter: for update_ctx in ap_iter:
pool.spawn(self.process_object_update, **update_ctx) pool.spawn(self.process_object_update, **update_ctx)
@ -440,8 +587,10 @@ class ObjectUpdater(Daemon):
'%(successes)d successes, %(failures)d failures, ' '%(successes)d successes, %(failures)d failures, '
'%(quarantines)d quarantines, ' '%(quarantines)d quarantines, '
'%(unlinks)d unlinks, %(errors)d errors, ' '%(unlinks)d unlinks, %(errors)d errors, '
'%(redirects)d redirects ' '%(redirects)d redirects, '
'%(skips)d skips ' '%(skips)d skips, '
'%(deferrals)d deferrals, '
'%(drains)d drains '
'(pid: %(pid)d)'), '(pid: %(pid)d)'),
{'device': device, {'device': device,
'elapsed': time.time() - start_time, 'elapsed': time.time() - start_time,
@ -452,7 +601,10 @@ class ObjectUpdater(Daemon):
'unlinks': sweep_totals.unlinks, 'unlinks': sweep_totals.unlinks,
'errors': sweep_totals.errors, 'errors': sweep_totals.errors,
'redirects': sweep_totals.redirects, 'redirects': sweep_totals.redirects,
'skips': sweep_totals.skips}) 'skips': sweep_totals.skips,
'deferrals': sweep_totals.deferrals,
'drains': sweep_totals.drains
})
def process_object_update(self, update_path, device, policy, update, def process_object_update(self, update_path, device, policy, update,
**kwargs): **kwargs):

View File

@ -141,6 +141,9 @@ class FakeLogger(logging.Logger, CaptureLog):
counts[metric] += 1 counts[metric] += 1
return counts return counts
def get_update_stats(self):
return [call[0] for call in self.log_dict['update_stats']]
def setFormatter(self, obj): def setFormatter(self, obj):
self.formatter = obj self.formatter = obj

View File

@ -12,6 +12,8 @@
# implied. # implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from queue import PriorityQueue
import eventlet import eventlet
import six.moves.cPickle as pickle import six.moves.cPickle as pickle
import mock import mock
@ -127,6 +129,7 @@ class TestObjectUpdater(unittest.TestCase):
self.assertEqual(daemon.max_objects_per_second, 50.0) self.assertEqual(daemon.max_objects_per_second, 50.0)
self.assertEqual(daemon.max_objects_per_container_per_second, 0.0) self.assertEqual(daemon.max_objects_per_container_per_second, 0.0)
self.assertEqual(daemon.per_container_ratelimit_buckets, 1000) self.assertEqual(daemon.per_container_ratelimit_buckets, 1000)
self.assertEqual(daemon.max_deferred_updates, 10000)
# non-defaults # non-defaults
conf = { conf = {
@ -139,6 +142,7 @@ class TestObjectUpdater(unittest.TestCase):
'objects_per_second': '10.5', 'objects_per_second': '10.5',
'max_objects_per_container_per_second': '1.2', 'max_objects_per_container_per_second': '1.2',
'per_container_ratelimit_buckets': '100', 'per_container_ratelimit_buckets': '100',
'max_deferred_updates': '0',
} }
daemon = object_updater.ObjectUpdater(conf, logger=self.logger) daemon = object_updater.ObjectUpdater(conf, logger=self.logger)
self.assertEqual(daemon.devices, '/some/where/else') self.assertEqual(daemon.devices, '/some/where/else')
@ -150,6 +154,7 @@ class TestObjectUpdater(unittest.TestCase):
self.assertEqual(daemon.max_objects_per_second, 10.5) self.assertEqual(daemon.max_objects_per_second, 10.5)
self.assertEqual(daemon.max_objects_per_container_per_second, 1.2) self.assertEqual(daemon.max_objects_per_container_per_second, 1.2)
self.assertEqual(daemon.per_container_ratelimit_buckets, 100) self.assertEqual(daemon.per_container_ratelimit_buckets, 100)
self.assertEqual(daemon.max_deferred_updates, 0)
# check deprecated option # check deprecated option
daemon = object_updater.ObjectUpdater({'slowdown': '0.04'}, daemon = object_updater.ObjectUpdater({'slowdown': '0.04'},
@ -171,6 +176,9 @@ class TestObjectUpdater(unittest.TestCase):
check_bad({'per_container_ratelimit_buckets': '0'}) check_bad({'per_container_ratelimit_buckets': '0'})
check_bad({'per_container_ratelimit_buckets': '-1'}) check_bad({'per_container_ratelimit_buckets': '-1'})
check_bad({'per_container_ratelimit_buckets': 'auto'}) check_bad({'per_container_ratelimit_buckets': 'auto'})
check_bad({'max_deferred_updates': '-1'})
check_bad({'max_deferred_updates': '1.1'})
check_bad({'max_deferred_updates': 'auto'})
@mock.patch('os.listdir') @mock.patch('os.listdir')
def test_listdir_with_exception(self, mock_listdir): def test_listdir_with_exception(self, mock_listdir):
@ -1267,6 +1275,8 @@ class TestObjectUpdater(unittest.TestCase):
'mount_check': 'false', 'mount_check': 'false',
'swift_dir': self.testdir, 'swift_dir': self.testdir,
'max_objects_per_container_per_second': 1, 'max_objects_per_container_per_second': 1,
'max_deferred_updates': 0, # do not re-iterate
'concurrency': 1
} }
daemon = object_updater.ObjectUpdater(conf, logger=self.logger) daemon = object_updater.ObjectUpdater(conf, logger=self.logger)
self.async_dir = os.path.join(self.sda1, get_async_dir(POLICIES[0])) self.async_dir = os.path.join(self.sda1, get_async_dir(POLICIES[0]))
@ -1299,6 +1309,14 @@ class TestObjectUpdater(unittest.TestCase):
for req in fake_conn.requests), for req in fake_conn.requests),
{'/sda1/%s/a/c1' % c1_part: 3, {'/sda1/%s/a/c1' % c1_part: 3,
'/sda1/%s/.shards_a/c2_shard' % c2_part: 3}) '/sda1/%s/.shards_a/c2_shard' % c2_part: 3})
info_lines = self.logger.get_lines_for_level('info')
self.assertTrue(info_lines)
self.assertIn('2 successes, 0 failures, 0 quarantines, 2 unlinks, '
'0 errors, 0 redirects, 9 skips, 9 deferrals, 0 drains',
info_lines[-1])
self.assertEqual({'skips': 9, 'successes': 2, 'unlinks': 2,
'deferrals': 9},
self.logger.get_increment_counts())
@mock.patch('swift.obj.updater.dump_recon_cache') @mock.patch('swift.obj.updater.dump_recon_cache')
def test_per_container_rate_limit_unlimited(self, mock_recon): def test_per_container_rate_limit_unlimited(self, mock_recon):
@ -1330,14 +1348,24 @@ class TestObjectUpdater(unittest.TestCase):
self.assertEqual(expected_total, daemon.stats.successes) self.assertEqual(expected_total, daemon.stats.successes)
self.assertEqual(0, daemon.stats.skips) self.assertEqual(0, daemon.stats.skips)
self.assertEqual([], self._find_async_pending_files()) self.assertEqual([], self._find_async_pending_files())
info_lines = self.logger.get_lines_for_level('info')
self.assertTrue(info_lines)
self.assertIn('11 successes, 0 failures, 0 quarantines, 11 unlinks, '
'0 errors, 0 redirects, 0 skips, 0 deferrals, 0 drains',
info_lines[-1])
self.assertEqual({'successes': 11, 'unlinks': 11},
self.logger.get_increment_counts())
@mock.patch('swift.obj.updater.dump_recon_cache') @mock.patch('swift.obj.updater.dump_recon_cache')
def test_per_container_rate_limit_slow_responses(self, mock_recon): def test_per_container_rate_limit_some_limited(self, mock_recon):
# simulate delays between buckets being fed so that only some updates
# are skipped
conf = { conf = {
'devices': self.devices_dir, 'devices': self.devices_dir,
'mount_check': 'false', 'mount_check': 'false',
'swift_dir': self.testdir, 'swift_dir': self.testdir,
'max_objects_per_container_per_second': 10, 'max_objects_per_container_per_second': 10,
'max_deferred_updates': 0, # do not re-iterate
} }
daemon = object_updater.ObjectUpdater(conf, logger=self.logger) daemon = object_updater.ObjectUpdater(conf, logger=self.logger)
self.async_dir = os.path.join(self.sda1, get_async_dir(POLICIES[0])) self.async_dir = os.path.join(self.sda1, get_async_dir(POLICIES[0]))
@ -1347,26 +1375,423 @@ class TestObjectUpdater(unittest.TestCase):
for i in range(num_c1_files): for i in range(num_c1_files):
obj_name = 'o%02d' % i obj_name = 'o%02d' % i
self._make_async_pending_pickle('a', 'c1', obj_name) self._make_async_pending_pickle('a', 'c1', obj_name)
c1_part, _ = daemon.get_container_ring().get_nodes('a', 'c1')
expected_total = num_c1_files expected_total = num_c1_files
self.assertEqual(expected_total, self.assertEqual(expected_total,
len(self._find_async_pending_files())) len(self._find_async_pending_files()))
latencies = [.11, 0, .11, 0] # first one always succeeds, second is skipped because it is only 0.05s
# behind the first, second succeeds because it is 0.11 behind the
# first, fourth is skipped
latencies = [0, 0.05, .051, 0]
expected_success = 2 expected_success = 2
fake_status_codes = [200] * 3 * expected_success fake_status_codes = [200] * 3 * expected_success
def fake_spawn(pool, *args, **kwargs): contexts_fed_in = []
# make each update delay the iter being called again
eventlet.sleep(latencies.pop(0))
return args[0](*args[1:], **kwargs)
with mocked_http_conn(*fake_status_codes): def ratelimit_if(value):
with mock.patch('swift.obj.updater.ContextPool.spawn', fake_spawn): contexts_fed_in.append(value)
# make each update delay before the iter being called again
eventlet.sleep(latencies.pop(0))
return False # returning False overrides normal ratelimiting
orig_rate_limited_iterator = utils.RateLimitedIterator
def fake_rate_limited_iterator(*args, **kwargs):
# insert our own rate limiting function
kwargs['ratelimit_if'] = ratelimit_if
return orig_rate_limited_iterator(*args, **kwargs)
with mocked_http_conn(*fake_status_codes) as fake_conn, \
mock.patch('swift.obj.updater.RateLimitedIterator',
fake_rate_limited_iterator):
daemon.run_once() daemon.run_once()
self.assertEqual(expected_success, daemon.stats.successes) self.assertEqual(expected_success, daemon.stats.successes)
expected_skipped = expected_total - expected_success expected_skipped = expected_total - expected_success
self.assertEqual(expected_skipped, daemon.stats.skips) self.assertEqual(expected_skipped, daemon.stats.skips)
self.assertEqual(expected_skipped, self.assertEqual(expected_skipped,
len(self._find_async_pending_files())) len(self._find_async_pending_files()))
paths_fed_in = ['/sda1/%(part)s/%(account)s/%(container)s/%(obj)s'
% dict(ctx['update'], part=c1_part)
for ctx in contexts_fed_in]
expected_update_paths = paths_fed_in[:1] * 3 + paths_fed_in[2:3] * 3
actual_update_paths = [req['path'] for req in fake_conn.requests]
self.assertEqual(expected_update_paths, actual_update_paths)
info_lines = self.logger.get_lines_for_level('info')
self.assertTrue(info_lines)
self.assertIn('2 successes, 0 failures, 0 quarantines, 2 unlinks, '
'0 errors, 0 redirects, 2 skips, 2 deferrals, 0 drains',
info_lines[-1])
self.assertEqual({'skips': 2, 'successes': 2, 'unlinks': 2,
'deferrals': 2},
self.logger.get_increment_counts())
@mock.patch('swift.obj.updater.dump_recon_cache')
def test_per_container_rate_limit_defer_2_skip_1(self, mock_recon):
# limit length of deferral queue so that some defer and some skip
conf = {
'devices': self.devices_dir,
'mount_check': 'false',
'swift_dir': self.testdir,
'max_objects_per_container_per_second': 10,
# only one bucket needed for test
'per_container_ratelimit_buckets': 1,
'max_deferred_updates': 1,
}
daemon = object_updater.ObjectUpdater(conf, logger=self.logger)
self.async_dir = os.path.join(self.sda1, get_async_dir(POLICIES[0]))
os.mkdir(self.async_dir)
# all updates for same container
num_c1_files = 4
for i in range(num_c1_files):
obj_name = 'o%02d' % i
self._make_async_pending_pickle('a', 'c1', obj_name)
c1_part, _ = daemon.get_container_ring().get_nodes('a', 'c1')
expected_total = num_c1_files
self.assertEqual(expected_total,
len(self._find_async_pending_files()))
# first succeeds, second is deferred, third succeeds, fourth is
# deferred and bumps second out of deferral queue, fourth is re-tried
latencies = [0, 0.05, .051, 0, 0, .11]
expected_success = 3
contexts_fed_in = []
captured_queues = []
captured_skips_stats = []
def ratelimit_if(value):
contexts_fed_in.append(value)
return False # returning False overrides normal ratelimiting
orig_rate_limited_iterator = utils.RateLimitedIterator
def fake_rate_limited_iterator(*args, **kwargs):
# insert our own rate limiting function
kwargs['ratelimit_if'] = ratelimit_if
return orig_rate_limited_iterator(*args, **kwargs)
now = [time()]
def fake_get_time(bucket_iter):
captured_skips_stats.append(
daemon.logger.get_increment_counts().get('skips', 0))
captured_queues.append(list(bucket_iter.buckets[0].deque))
# make each update delay before the iter being called again
now[0] += latencies.pop(0)
return now[0]
captured_updates = []
def fake_object_update(node, part, op, obj, *args, **kwargs):
captured_updates.append((node, part, op, obj))
return True, node['id'], False
with mock.patch(
'swift.obj.updater.BucketizedUpdateSkippingLimiter._get_time',
fake_get_time), \
mock.patch.object(daemon, 'object_update',
fake_object_update), \
mock.patch('swift.obj.updater.RateLimitedIterator',
fake_rate_limited_iterator):
daemon.run_once()
self.assertEqual(expected_success, daemon.stats.successes)
expected_skipped = expected_total - expected_success
self.assertEqual(expected_skipped, daemon.stats.skips)
self.assertEqual(expected_skipped,
len(self._find_async_pending_files()))
orig_iteration = contexts_fed_in[:num_c1_files]
# we first capture every async fed in one by one
objs_fed_in = [ctx['update']['obj'] for ctx in orig_iteration]
self.assertEqual(num_c1_files, len(set(objs_fed_in)))
# keep track of this order for context
aorder = {ctx['update']['obj']: 'a%02d' % i
for i, ctx in enumerate(orig_iteration)}
expected_drops = (1,)
expected_updates_sent = []
for i, obj in enumerate(objs_fed_in):
if i in expected_drops:
continue
# triple replica, request to 3 nodes each obj!
expected_updates_sent.extend([obj] * 3)
actual_updates_sent = [
utils.split_path(update[3], minsegs=3)[-1]
for update in captured_updates
]
self.assertEqual([aorder[o] for o in expected_updates_sent],
[aorder[o] for o in actual_updates_sent])
self.assertEqual([0, 0, 0, 0, 1], captured_skips_stats)
expected_deferrals = [
[],
[],
[objs_fed_in[1]],
[objs_fed_in[1]],
[objs_fed_in[3]],
]
self.assertEqual(
expected_deferrals,
[[ctx['update']['obj'] for ctx in q] for q in captured_queues])
info_lines = self.logger.get_lines_for_level('info')
self.assertTrue(info_lines)
self.assertIn('3 successes, 0 failures, 0 quarantines, 3 unlinks, '
'0 errors, 0 redirects, 1 skips, 2 deferrals, 1 drains',
info_lines[-1])
self.assertEqual(
{'skips': 1, 'successes': 3, 'unlinks': 3, 'deferrals': 2,
'drains': 1}, self.logger.get_increment_counts())
@mock.patch('swift.obj.updater.dump_recon_cache')
def test_per_container_rate_limit_defer_3_skip_1(self, mock_recon):
# limit length of deferral queue so that some defer and some skip
conf = {
'devices': self.devices_dir,
'mount_check': 'false',
'swift_dir': self.testdir,
'max_objects_per_container_per_second': 10,
# only one bucket needed for test
'per_container_ratelimit_buckets': 1,
'max_deferred_updates': 2,
}
daemon = object_updater.ObjectUpdater(conf, logger=self.logger)
self.async_dir = os.path.join(self.sda1, get_async_dir(POLICIES[0]))
os.mkdir(self.async_dir)
# all updates for same container
num_c1_files = 5
for i in range(num_c1_files):
obj_name = 'o%02d' % i
self._make_async_pending_pickle('a', 'c1', obj_name)
c1_part, _ = daemon.get_container_ring().get_nodes('a', 'c1')
expected_total = num_c1_files
self.assertEqual(expected_total,
len(self._find_async_pending_files()))
# indexes 0, 2 succeed; 1, 3, 4 deferred but 1 is bumped from deferral
# queue by 4; 4, 3 are then drained
latencies = [0, 0.05, .051, 0, 0, 0, .11, .01]
expected_success = 4
contexts_fed_in = []
captured_queues = []
captured_skips_stats = []
def ratelimit_if(value):
contexts_fed_in.append(value)
return False # returning False overrides normal ratelimiting
orig_rate_limited_iterator = utils.RateLimitedIterator
def fake_rate_limited_iterator(*args, **kwargs):
# insert our own rate limiting function
kwargs['ratelimit_if'] = ratelimit_if
return orig_rate_limited_iterator(*args, **kwargs)
now = [time()]
def fake_get_time(bucket_iter):
captured_skips_stats.append(
daemon.logger.get_increment_counts().get('skips', 0))
captured_queues.append(list(bucket_iter.buckets[0].deque))
# make each update delay before the iter being called again
now[0] += latencies.pop(0)
return now[0]
captured_updates = []
def fake_object_update(node, part, op, obj, *args, **kwargs):
captured_updates.append((node, part, op, obj))
return True, node['id'], False
with mock.patch(
'swift.obj.updater.BucketizedUpdateSkippingLimiter._get_time',
fake_get_time), \
mock.patch.object(daemon, 'object_update',
fake_object_update), \
mock.patch('swift.obj.updater.RateLimitedIterator',
fake_rate_limited_iterator), \
mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
daemon.run_once()
self.assertEqual(expected_success, daemon.stats.successes)
expected_skipped = expected_total - expected_success
self.assertEqual(expected_skipped, daemon.stats.skips)
self.assertEqual(expected_skipped,
len(self._find_async_pending_files()))
orig_iteration = contexts_fed_in[:num_c1_files]
# we first capture every async fed in one by one
objs_fed_in = [ctx['update']['obj'] for ctx in orig_iteration]
self.assertEqual(num_c1_files, len(set(objs_fed_in)))
# keep track of this order for context
aorder = {ctx['update']['obj']: 'a%02d' % i
for i, ctx in enumerate(orig_iteration)}
expected_updates_sent = []
for index_sent in (0, 2, 4, 3):
expected_updates_sent.extend(
[contexts_fed_in[index_sent]['update']['obj']] * 3)
actual_updates_sent = [
utils.split_path(update[3], minsegs=3)[-1]
for update in captured_updates
]
self.assertEqual([aorder[o] for o in expected_updates_sent],
[aorder[o] for o in actual_updates_sent])
self.assertEqual([0, 0, 0, 0, 0, 1, 1, 1], captured_skips_stats)
expected_deferrals = [
[],
[],
[objs_fed_in[1]],
[objs_fed_in[1]],
[objs_fed_in[1], objs_fed_in[3]],
[objs_fed_in[3], objs_fed_in[4]],
[objs_fed_in[3]], # note: rightmost element is drained
[objs_fed_in[3]],
]
self.assertEqual(
expected_deferrals,
[[ctx['update']['obj'] for ctx in q] for q in captured_queues])
actual_sleeps = [call[0][0] for call in mock_sleep.call_args_list]
self.assertEqual(2, len(actual_sleeps))
self.assertAlmostEqual(0.1, actual_sleeps[0], 3)
self.assertAlmostEqual(0.09, actual_sleeps[1], 3)
info_lines = self.logger.get_lines_for_level('info')
self.assertTrue(info_lines)
self.assertIn('4 successes, 0 failures, 0 quarantines, 4 unlinks, '
'0 errors, 0 redirects, 1 skips, 3 deferrals, 2 drains',
info_lines[-1])
self.assertEqual(
{'skips': 1, 'successes': 4, 'unlinks': 4, 'deferrals': 3,
'drains': 2}, self.logger.get_increment_counts())
@mock.patch('swift.obj.updater.dump_recon_cache')
def test_per_container_rate_limit_unsent_deferrals(self, mock_recon):
# make some updates defer until interval is reached and cycle
# terminates
conf = {
'devices': self.devices_dir,
'mount_check': 'false',
'swift_dir': self.testdir,
'max_objects_per_container_per_second': 10,
# only one bucket needed for test
'per_container_ratelimit_buckets': 1,
'max_deferred_updates': 5,
'interval': 0.4,
}
daemon = object_updater.ObjectUpdater(conf, logger=self.logger)
self.async_dir = os.path.join(self.sda1, get_async_dir(POLICIES[0]))
os.mkdir(self.async_dir)
# all updates for same container
num_c1_files = 7
for i in range(num_c1_files):
obj_name = 'o%02d' % i
self._make_async_pending_pickle('a', 'c1', obj_name)
c1_part, _ = daemon.get_container_ring().get_nodes('a', 'c1')
expected_total = num_c1_files
self.assertEqual(expected_total,
len(self._find_async_pending_files()))
# first pass: 0, 2 and 5 succeed, 1, 3, 4, 6 deferred
# last 2 deferred items sent before interval elapses
latencies = [0, .05, 0.051, 0, 0, .11, 0, 0,
0.1, 0, 0.1, 0] # total 0.42
expected_success = 5
contexts_fed_in = []
captured_queues = []
captured_skips_stats = []
def ratelimit_if(value):
contexts_fed_in.append(value)
return False # returning False overrides normal ratelimiting
orig_rate_limited_iterator = utils.RateLimitedIterator
def fake_rate_limited_iterator(*args, **kwargs):
# insert our own rate limiting function
kwargs['ratelimit_if'] = ratelimit_if
return orig_rate_limited_iterator(*args, **kwargs)
start = time()
now = [start]
def fake_get_time(bucket_iter):
if not captured_skips_stats:
daemon.begin = now[0]
captured_skips_stats.append(
daemon.logger.get_increment_counts().get('skips', 0))
captured_queues.append(list(bucket_iter.buckets[0].deque))
# insert delay each time iter is called
now[0] += latencies.pop(0)
return now[0]
captured_updates = []
def fake_object_update(node, part, op, obj, *args, **kwargs):
captured_updates.append((node, part, op, obj))
return True, node['id'], False
with mock.patch(
'swift.obj.updater.BucketizedUpdateSkippingLimiter._get_time',
fake_get_time), \
mock.patch.object(daemon, 'object_update',
fake_object_update), \
mock.patch('swift.obj.updater.RateLimitedIterator',
fake_rate_limited_iterator), \
mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
daemon.run_once()
self.assertEqual(expected_success, daemon.stats.successes)
expected_skipped = expected_total - expected_success
self.assertEqual(expected_skipped, daemon.stats.skips)
self.assertEqual(expected_skipped,
len(self._find_async_pending_files()))
expected_updates_sent = []
for index_sent in (0, 2, 5, 6, 4):
expected_updates_sent.extend(
[contexts_fed_in[index_sent]['update']['obj']] * 3)
actual_updates_sent = [
utils.split_path(update[3], minsegs=3)[-1]
for update in captured_updates
]
self.assertEqual(expected_updates_sent, actual_updates_sent)
# skips (un-drained deferrals) not reported until end of cycle
self.assertEqual([0] * 12, captured_skips_stats)
objs_fed_in = [ctx['update']['obj'] for ctx in contexts_fed_in]
expected_deferrals = [
# queue content before app_iter feeds next update_ctx
[],
[],
[objs_fed_in[1]],
[objs_fed_in[1]],
[objs_fed_in[1], objs_fed_in[3]],
[objs_fed_in[1], objs_fed_in[3], objs_fed_in[4]],
[objs_fed_in[1], objs_fed_in[3], objs_fed_in[4]],
# queue content before each update_ctx is drained from queue...
# note: rightmost element is drained
[objs_fed_in[1], objs_fed_in[3], objs_fed_in[4], objs_fed_in[6]],
[objs_fed_in[1], objs_fed_in[3], objs_fed_in[4]],
[objs_fed_in[1], objs_fed_in[3], objs_fed_in[4]],
[objs_fed_in[1], objs_fed_in[3]],
[objs_fed_in[1], objs_fed_in[3]],
]
self.assertEqual(
expected_deferrals,
[[ctx['update']['obj'] for ctx in q] for q in captured_queues])
actual_sleeps = [call[0][0] for call in mock_sleep.call_args_list]
self.assertEqual(2, len(actual_sleeps))
self.assertAlmostEqual(0.1, actual_sleeps[0], 3)
self.assertAlmostEqual(0.1, actual_sleeps[1], 3)
info_lines = self.logger.get_lines_for_level('info')
self.assertTrue(info_lines)
self.assertIn('5 successes, 0 failures, 0 quarantines, 5 unlinks, '
'0 errors, 0 redirects, 2 skips, 4 deferrals, 2 drains',
info_lines[-1])
self.assertEqual(
{'successes': 5, 'unlinks': 5, 'deferrals': 4, 'drains': 2},
self.logger.get_increment_counts())
self.assertEqual([('skips', 2)], self.logger.get_update_stats())
class TestObjectUpdaterFunctions(unittest.TestCase): class TestObjectUpdaterFunctions(unittest.TestCase):
@ -1393,20 +1818,28 @@ class TestObjectUpdaterFunctions(unittest.TestCase):
class TestBucketizedUpdateSkippingLimiter(unittest.TestCase): class TestBucketizedUpdateSkippingLimiter(unittest.TestCase):
def setUp(self):
self.logger = debug_logger()
self.stats = object_updater.SweepStats()
def test_init(self): def test_init(self):
it = object_updater.BucketizedUpdateSkippingLimiter([3, 1], 1000, 10) it = object_updater.BucketizedUpdateSkippingLimiter(
[3, 1], self.logger, self.stats, 1000, 10)
self.assertEqual(1000, it.num_buckets) self.assertEqual(1000, it.num_buckets)
self.assertEqual(0.1, it.bucket_update_delta) self.assertEqual(0.1, it.bucket_update_delta)
self.assertEqual([3, 1], [x for x in it.iterator]) self.assertEqual([3, 1], [x for x in it.iterator])
# rate of 0 implies unlimited # rate of 0 implies unlimited
it = object_updater.BucketizedUpdateSkippingLimiter(iter([3, 1]), 9, 0) it = object_updater.BucketizedUpdateSkippingLimiter(
iter([3, 1]), self.logger, self.stats, 9, 0)
self.assertEqual(9, it.num_buckets) self.assertEqual(9, it.num_buckets)
self.assertEqual(-1, it.bucket_update_delta) self.assertEqual(-1, it.bucket_update_delta)
self.assertEqual([3, 1], [x for x in it.iterator]) self.assertEqual([3, 1], [x for x in it.iterator])
# num_buckets is collared at 1 # num_buckets is collared at 1
it = object_updater.BucketizedUpdateSkippingLimiter(iter([3, 1]), 0, 1) it = object_updater.BucketizedUpdateSkippingLimiter(
iter([3, 1]), self.logger, self.stats, 0, 1)
self.assertEqual(1, it.num_buckets) self.assertEqual(1, it.num_buckets)
self.assertEqual(1, it.bucket_update_delta) self.assertEqual(1, it.bucket_update_delta)
self.assertEqual([3, 1], [x for x in it.iterator]) self.assertEqual([3, 1], [x for x in it.iterator])
@ -1417,8 +1850,11 @@ class TestBucketizedUpdateSkippingLimiter(unittest.TestCase):
{'update': {'account': '%d' % i, 'container': '%s' % i}} {'update': {'account': '%d' % i, 'container': '%s' % i}}
for i in range(20)] for i in range(20)]
it = object_updater.BucketizedUpdateSkippingLimiter( it = object_updater.BucketizedUpdateSkippingLimiter(
iter(update_ctxs), 9, 0) iter(update_ctxs), self.logger, self.stats, 9, 0)
self.assertEqual(update_ctxs, [x for x in it]) self.assertEqual(update_ctxs, [x for x in it])
self.assertEqual(0, self.stats.skips)
self.assertEqual(0, self.stats.drains)
self.assertEqual(0, self.stats.deferrals)
def test_iteration_ratelimited(self): def test_iteration_ratelimited(self):
# verify iteration at limited rate - single bucket # verify iteration at limited rate - single bucket
@ -1426,23 +1862,246 @@ class TestBucketizedUpdateSkippingLimiter(unittest.TestCase):
{'update': {'account': '%d' % i, 'container': '%s' % i}} {'update': {'account': '%d' % i, 'container': '%s' % i}}
for i in range(2)] for i in range(2)]
it = object_updater.BucketizedUpdateSkippingLimiter( it = object_updater.BucketizedUpdateSkippingLimiter(
iter(update_ctxs), 1, 0.1) iter(update_ctxs), self.logger, self.stats, 1, 0.1)
# second update is skipped
self.assertEqual(update_ctxs[:1], [x for x in it]) self.assertEqual(update_ctxs[:1], [x for x in it])
self.assertEqual(1, self.stats.skips)
self.assertEqual(0, self.stats.drains)
self.assertEqual(1, self.stats.deferrals)
def test_iteration_ratelimited_with_callback(self): def test_deferral_single_bucket(self):
# verify iteration at limited rate - single bucket # verify deferral - single bucket
skipped = [] now = time()
def on_skip(update_ctx):
skipped.append(update_ctx)
update_ctxs = [ update_ctxs = [
{'update': {'account': '%d' % i, 'container': '%s' % i}} {'update': {'account': '%d' % i, 'container': '%s' % i}}
for i in range(2)] for i in range(4)]
# enough capacity for all deferrals
with mock.patch('swift.obj.updater.time.time',
side_effect=[now, now, now, now, now, now]):
with mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
it = object_updater.BucketizedUpdateSkippingLimiter( it = object_updater.BucketizedUpdateSkippingLimiter(
iter(update_ctxs), 1, 0.1, skip_f=on_skip) iter(update_ctxs[:3]), self.logger, self.stats, 1, 10,
self.assertEqual(update_ctxs[:1], [x for x in it]) max_deferred_elements=2,
self.assertEqual(update_ctxs[1:], skipped) drain_until=now + 10)
actual = [x for x in it]
self.assertEqual([update_ctxs[0],
update_ctxs[2], # deferrals...
update_ctxs[1]],
actual)
self.assertEqual(2, mock_sleep.call_count)
self.assertEqual(0, self.stats.skips)
self.assertEqual(2, self.stats.drains)
self.assertEqual(2, self.stats.deferrals)
self.stats.reset()
# only space for one deferral
with mock.patch('swift.obj.updater.time.time',
side_effect=[now, now, now, now, now]):
with mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
it = object_updater.BucketizedUpdateSkippingLimiter(
iter(update_ctxs[:3]), self.logger, self.stats, 1, 10,
max_deferred_elements=1,
drain_until=now + 10)
actual = [x for x in it]
self.assertEqual([update_ctxs[0],
update_ctxs[2]], # deferrals...
actual)
self.assertEqual(1, mock_sleep.call_count)
self.assertEqual(1, self.stats.skips)
self.assertEqual(1, self.stats.drains)
self.assertEqual(2, self.stats.deferrals)
self.stats.reset()
# only time for one deferral
with mock.patch('swift.obj.updater.time.time',
side_effect=[now, now, now, now, now + 20, now + 20]):
with mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
it = object_updater.BucketizedUpdateSkippingLimiter(
iter(update_ctxs[:3]), self.logger, self.stats, 1, 10,
max_deferred_elements=2,
drain_until=now + 10)
actual = [x for x in it]
self.assertEqual([update_ctxs[0],
update_ctxs[2]], # deferrals...
actual)
self.assertEqual(1, mock_sleep.call_count)
self.assertEqual(1, self.stats.skips)
self.assertEqual(1, self.stats.drains)
self.assertEqual(2, self.stats.deferrals)
self.stats.reset()
# only space for two deferrals, only time for one deferral
with mock.patch('swift.obj.updater.time.time',
side_effect=[now, now, now, now, now,
now + 20, now + 20]):
with mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
it = object_updater.BucketizedUpdateSkippingLimiter(
iter(update_ctxs), self.logger, self.stats, 1, 10,
max_deferred_elements=2,
drain_until=now + 10)
actual = [x for x in it]
self.assertEqual([update_ctxs[0],
update_ctxs[3]], # deferrals...
actual)
self.assertEqual(1, mock_sleep.call_count)
self.assertEqual(2, self.stats.skips)
self.assertEqual(1, self.stats.drains)
self.assertEqual(3, self.stats.deferrals)
self.stats.reset()
def test_deferral_multiple_buckets(self):
# verify deferral - multiple buckets
update_ctxs_1 = [
{'update': {'account': 'a', 'container': 'c1', 'obj': '%3d' % i}}
for i in range(3)]
update_ctxs_2 = [
{'update': {'account': 'a', 'container': 'c2', 'obj': '%3d' % i}}
for i in range(3)]
time_iter = itertools.count(time(), 0.001)
# deferrals stick in both buckets
with mock.patch('swift.obj.updater.time.time',
side_effect=[next(time_iter) for _ in range(12)]):
with mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
it = object_updater.BucketizedUpdateSkippingLimiter(
iter(update_ctxs_1 + update_ctxs_2),
self.logger, self.stats, 4, 10,
max_deferred_elements=4,
drain_until=next(time_iter))
it.salt = '' # make container->bucket hashing predictable
actual = [x for x in it]
self.assertEqual([update_ctxs_1[0],
update_ctxs_2[0],
update_ctxs_1[2], # deferrals...
update_ctxs_2[2],
update_ctxs_1[1],
update_ctxs_2[1],
],
actual)
self.assertEqual(4, mock_sleep.call_count)
self.assertEqual(0, self.stats.skips)
self.assertEqual(4, self.stats.drains)
self.assertEqual(4, self.stats.deferrals)
self.stats.reset()
# oldest deferral bumped from one bucket due to max_deferrals == 3
with mock.patch('swift.obj.updater.time.time',
side_effect=[next(time_iter) for _ in range(10)]):
with mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
it = object_updater.BucketizedUpdateSkippingLimiter(
iter(update_ctxs_1 + update_ctxs_2),
self.logger, self.stats, 4, 10,
max_deferred_elements=3,
drain_until=next(time_iter))
it.salt = '' # make container->bucket hashing predictable
actual = [x for x in it]
self.assertEqual([update_ctxs_1[0],
update_ctxs_2[0],
update_ctxs_1[2], # deferrals...
update_ctxs_2[2],
update_ctxs_2[1],
],
actual)
self.assertEqual(3, mock_sleep.call_count)
self.assertEqual(1, self.stats.skips)
self.assertEqual(3, self.stats.drains)
self.assertEqual(4, self.stats.deferrals)
self.stats.reset()
# older deferrals bumped from one bucket due to max_deferrals == 2
with mock.patch('swift.obj.updater.time.time',
side_effect=[next(time_iter) for _ in range(10)]):
with mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
it = object_updater.BucketizedUpdateSkippingLimiter(
iter(update_ctxs_1 + update_ctxs_2),
self.logger, self.stats, 4, 10,
max_deferred_elements=2,
drain_until=next(time_iter))
it.salt = '' # make container->bucket hashing predictable
actual = [x for x in it]
self.assertEqual([update_ctxs_1[0],
update_ctxs_2[0],
update_ctxs_2[2], # deferrals...
update_ctxs_2[1],
],
actual)
self.assertEqual(2, mock_sleep.call_count)
self.assertEqual(2, self.stats.skips)
self.assertEqual(2, self.stats.drains)
self.assertEqual(4, self.stats.deferrals)
self.stats.reset()
class TestRateLimiterBucket(unittest.TestCase):
def test_wait_until(self):
b1 = object_updater.RateLimiterBucket(10)
self.assertEqual(10, b1.wait_until)
b1.last_time = b1.wait_until
self.assertEqual(20, b1.wait_until)
b1.last_time = 12345.678
self.assertEqual(12355.678, b1.wait_until)
def test_len(self):
b1 = object_updater.RateLimiterBucket(10)
b1.deque.append(1)
b1.deque.append(2)
self.assertEqual(2, len(b1))
b1.deque.pop()
self.assertEqual(1, len(b1))
def test_bool(self):
b1 = object_updater.RateLimiterBucket(10)
self.assertFalse(b1)
b1.deque.append(1)
self.assertTrue(b1)
b1.deque.pop()
self.assertFalse(b1)
def test_bucket_ordering(self):
time_iter = itertools.count(time(), step=0.001)
b1 = object_updater.RateLimiterBucket(10)
b2 = object_updater.RateLimiterBucket(10)
b2.last_time = next(time_iter)
buckets = PriorityQueue()
buckets.put(b1)
buckets.put(b2)
self.assertEqual([b1, b2], [buckets.get_nowait() for _ in range(2)])
b1.last_time = next(time_iter)
buckets.put(b1)
buckets.put(b2)
self.assertEqual([b2, b1], [buckets.get_nowait() for _ in range(2)])
class TestSweepStats(unittest.TestCase):
def test_copy(self):
num_props = len(vars(object_updater.SweepStats()))
stats = object_updater.SweepStats(*range(1, num_props + 1))
stats2 = stats.copy()
self.assertEqual(vars(stats), vars(stats2))
def test_since(self):
stats = object_updater.SweepStats(1, 2, 3, 4, 5, 6, 7, 8, 9)
stats2 = object_updater.SweepStats(4, 6, 8, 10, 12, 14, 16, 18, 20)
expected = object_updater.SweepStats(3, 4, 5, 6, 7, 8, 9, 10, 11)
self.assertEqual(vars(expected), vars(stats2.since(stats)))
def test_reset(self):
num_props = len(vars(object_updater.SweepStats()))
stats = object_updater.SweepStats(*range(1, num_props + 1))
stats.reset()
expected = object_updater.SweepStats()
self.assertEqual(vars(expected), vars(stats))
def test_str(self):
num_props = len(vars(object_updater.SweepStats()))
stats = object_updater.SweepStats(*range(1, num_props + 1))
self.assertEqual(
'4 successes, 2 failures, 3 quarantines, 5 unlinks, 1 errors, '
'6 redirects, 7 skips, 8 deferrals, 9 drains', str(stats))
if __name__ == '__main__': if __name__ == '__main__':