Merge "Add concurrency to reconciler"
This commit is contained in:
commit
f739cadf97
@ -57,6 +57,8 @@
|
||||
# Work only with ionice_class.
|
||||
# ionice_class =
|
||||
# ionice_priority =
|
||||
# Number of objects to process concurrently per process
|
||||
# concurrency = 1
|
||||
|
||||
[pipeline:main]
|
||||
# Note that the reconciler's pipeline is intentionally very sparse -- it is
|
||||
|
@ -357,7 +357,7 @@ class ContainerReconciler(Daemon):
|
||||
Move objects that are in the wrong storage policy.
|
||||
"""
|
||||
|
||||
def __init__(self, conf):
|
||||
def __init__(self, conf, logger=None, swift=None):
|
||||
self.conf = conf
|
||||
# This option defines how long an un-processable misplaced object
|
||||
# marker will be retried before it is abandoned. It is not coupled
|
||||
@ -366,9 +366,10 @@ class ContainerReconciler(Daemon):
|
||||
self.interval = float(conf.get('interval', 30))
|
||||
conf_path = conf.get('__file__') or \
|
||||
'/etc/swift/container-reconciler.conf'
|
||||
self.logger = get_logger(conf, log_route='container-reconciler')
|
||||
self.logger = logger or get_logger(
|
||||
conf, log_route='container-reconciler')
|
||||
request_tries = int(conf.get('request_tries') or 3)
|
||||
self.swift = InternalClient(
|
||||
self.swift = swift or InternalClient(
|
||||
conf_path,
|
||||
'Swift Container Reconciler',
|
||||
request_tries,
|
||||
@ -377,6 +378,9 @@ class ContainerReconciler(Daemon):
|
||||
self.stats = defaultdict(int)
|
||||
self.last_stat_time = time.time()
|
||||
self.ring_check_interval = float(conf.get('ring_check_interval', 15))
|
||||
self.concurrency = int(conf.get('concurrency', 1))
|
||||
if self.concurrency < 1:
|
||||
raise ValueError("concurrency must be set to at least 1")
|
||||
|
||||
def stats_log(self, metric, msg, *args, **kwargs):
|
||||
"""
|
||||
@ -716,9 +720,9 @@ class ContainerReconciler(Daemon):
|
||||
# hit most recent container first instead of waiting on the updaters
|
||||
current_container = get_reconciler_container_name(time.time())
|
||||
yield current_container
|
||||
container_gen = self.swift.iter_containers(MISPLACED_OBJECTS_ACCOUNT)
|
||||
self.logger.debug('looking for containers in %s',
|
||||
MISPLACED_OBJECTS_ACCOUNT)
|
||||
container_gen = self.swift.iter_containers(MISPLACED_OBJECTS_ACCOUNT)
|
||||
while True:
|
||||
one_page = None
|
||||
try:
|
||||
@ -769,29 +773,41 @@ class ContainerReconciler(Daemon):
|
||||
MISPLACED_OBJECTS_ACCOUNT, container,
|
||||
acceptable_statuses=(2, 404, 409, 412))
|
||||
|
||||
def process_queue_entry(self, container, raw_obj):
|
||||
"""
|
||||
Process an entry and remove from queue on success.
|
||||
|
||||
:param container: the queue container
|
||||
:param raw_obj: the raw_obj listing from the container
|
||||
"""
|
||||
try:
|
||||
obj_info = parse_raw_obj(raw_obj)
|
||||
except Exception:
|
||||
self.stats_log('invalid_record',
|
||||
'invalid queue record: %r', raw_obj,
|
||||
level=logging.ERROR, exc_info=True)
|
||||
return
|
||||
finished = self.reconcile_object(obj_info)
|
||||
if finished:
|
||||
self.pop_queue(container, raw_obj['name'],
|
||||
obj_info['q_ts'],
|
||||
obj_info['q_record'])
|
||||
|
||||
def reconcile(self):
|
||||
"""
|
||||
Main entry point for processing misplaced objects.
|
||||
Main entry point for concurrent processing of misplaced objects.
|
||||
|
||||
Iterate over all queue entries and delegate to reconcile_object.
|
||||
Iterate over all queue entries and delegate processing to spawned
|
||||
workers in the pool.
|
||||
"""
|
||||
self.logger.debug('pulling items from the queue')
|
||||
pool = GreenPool(self.concurrency)
|
||||
for container in self._iter_containers():
|
||||
self.logger.debug('checking container %s', container)
|
||||
for raw_obj in self._iter_objects(container):
|
||||
try:
|
||||
obj_info = parse_raw_obj(raw_obj)
|
||||
except Exception:
|
||||
self.stats_log('invalid_record',
|
||||
'invalid queue record: %r', raw_obj,
|
||||
level=logging.ERROR, exc_info=True)
|
||||
continue
|
||||
finished = self.reconcile_object(obj_info)
|
||||
if finished:
|
||||
self.pop_queue(container, raw_obj['name'],
|
||||
obj_info['q_ts'],
|
||||
obj_info['q_record'])
|
||||
pool.spawn_n(self.process_queue_entry, container, raw_obj)
|
||||
self.log_stats()
|
||||
self.logger.debug('finished container %s', container)
|
||||
pool.waitall()
|
||||
|
||||
def run_once(self, *args, **kwargs):
|
||||
"""
|
||||
|
@ -23,6 +23,7 @@ import os
|
||||
import errno
|
||||
import itertools
|
||||
import random
|
||||
import eventlet
|
||||
|
||||
from collections import defaultdict
|
||||
from datetime import datetime
|
||||
@ -95,7 +96,7 @@ class FakeStoragePolicySwift(object):
|
||||
|
||||
|
||||
class FakeInternalClient(reconciler.InternalClient):
|
||||
def __init__(self, listings):
|
||||
def __init__(self, listings=None):
|
||||
self.app = FakeStoragePolicySwift()
|
||||
self.user_agent = 'fake-internal-client'
|
||||
self.request_tries = 1
|
||||
@ -103,6 +104,7 @@ class FakeInternalClient(reconciler.InternalClient):
|
||||
self.parse(listings)
|
||||
|
||||
def parse(self, listings):
|
||||
listings = listings or {}
|
||||
self.accounts = defaultdict(lambda: defaultdict(list))
|
||||
for item, timestamp in listings.items():
|
||||
# XXX this interface is stupid
|
||||
@ -735,15 +737,36 @@ class TestReconciler(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.logger = debug_logger()
|
||||
conf = {}
|
||||
with mock.patch('swift.container.reconciler.InternalClient'):
|
||||
self.reconciler = reconciler.ContainerReconciler(conf)
|
||||
self.reconciler.logger = self.logger
|
||||
self.swift = FakeInternalClient()
|
||||
self.reconciler = reconciler.ContainerReconciler(
|
||||
conf, logger=self.logger, swift=self.swift)
|
||||
self.start_interval = int(time.time() // 3600 * 3600)
|
||||
self.current_container_path = '/v1/.misplaced_objects/%d' % (
|
||||
self.start_interval) + listing_qs('')
|
||||
|
||||
def test_concurrency_config(self):
|
||||
conf = {}
|
||||
r = reconciler.ContainerReconciler(conf, self.logger, self.swift)
|
||||
self.assertEqual(r.concurrency, 1)
|
||||
|
||||
conf = {'concurrency': '10'}
|
||||
r = reconciler.ContainerReconciler(conf, self.logger, self.swift)
|
||||
self.assertEqual(r.concurrency, 10)
|
||||
|
||||
conf = {'concurrency': 48}
|
||||
r = reconciler.ContainerReconciler(conf, self.logger, self.swift)
|
||||
self.assertEqual(r.concurrency, 48)
|
||||
|
||||
conf = {'concurrency': 0}
|
||||
self.assertRaises(ValueError, reconciler.ContainerReconciler,
|
||||
conf, self.logger, self.swift)
|
||||
|
||||
conf = {'concurrency': '-1'}
|
||||
self.assertRaises(ValueError, reconciler.ContainerReconciler,
|
||||
conf, self.logger, self.swift)
|
||||
|
||||
def _mock_listing(self, objects):
|
||||
self.reconciler.swift = FakeInternalClient(objects)
|
||||
self.swift.parse(objects)
|
||||
self.fake_swift = self.reconciler.swift.app
|
||||
|
||||
def _mock_oldest_spi(self, container_oldest_spi_map):
|
||||
@ -776,6 +799,59 @@ class TestReconciler(unittest.TestCase):
|
||||
return [c[1][1:4] for c in
|
||||
mocks['direct_delete_container_entry'].mock_calls]
|
||||
|
||||
def test_no_concurrency(self):
|
||||
self._mock_listing({
|
||||
(None, "/.misplaced_objects/3600/1:/AUTH_bob/c/o1"): 3618.84187,
|
||||
(None, "/.misplaced_objects/3600/1:/AUTH_bob/c/o2"): 3724.23456,
|
||||
(1, "/AUTH_bob/c/o1"): 3618.84187,
|
||||
(1, "/AUTH_bob/c/o2"): 3724.23456,
|
||||
})
|
||||
|
||||
order_recieved = []
|
||||
|
||||
def fake_reconcile_object(account, container, obj, q_policy_index,
|
||||
q_ts, q_op, path, **kwargs):
|
||||
order_recieved.append(obj)
|
||||
return True
|
||||
|
||||
self.reconciler._reconcile_object = fake_reconcile_object
|
||||
self.assertEqual(self.reconciler.concurrency, 1) # sanity
|
||||
deleted_container_entries = self._run_once()
|
||||
self.assertEqual(order_recieved, ['o1', 'o2'])
|
||||
# process in order recieved
|
||||
self.assertEqual(deleted_container_entries, [
|
||||
('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1'),
|
||||
('.misplaced_objects', '3600', '1:/AUTH_bob/c/o2'),
|
||||
])
|
||||
|
||||
def test_concurrency(self):
|
||||
self._mock_listing({
|
||||
(None, "/.misplaced_objects/3600/1:/AUTH_bob/c/o1"): 3618.84187,
|
||||
(None, "/.misplaced_objects/3600/1:/AUTH_bob/c/o2"): 3724.23456,
|
||||
(1, "/AUTH_bob/c/o1"): 3618.84187,
|
||||
(1, "/AUTH_bob/c/o2"): 3724.23456,
|
||||
})
|
||||
|
||||
order_recieved = []
|
||||
|
||||
def fake_reconcile_object(account, container, obj, q_policy_index,
|
||||
q_ts, q_op, path, **kwargs):
|
||||
order_recieved.append(obj)
|
||||
# o1 takes longer than o2 for some reason
|
||||
while 'o2' not in order_recieved:
|
||||
eventlet.sleep(0.001)
|
||||
return True
|
||||
|
||||
self.reconciler._reconcile_object = fake_reconcile_object
|
||||
self.reconciler.concurrency = 2
|
||||
deleted_container_entries = self._run_once()
|
||||
self.assertEqual(order_recieved, ['o1', 'o2'])
|
||||
# ... and so we finish o2 first
|
||||
self.assertEqual(deleted_container_entries, [
|
||||
('.misplaced_objects', '3600', '1:/AUTH_bob/c/o2'),
|
||||
('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1'),
|
||||
])
|
||||
|
||||
def test_invalid_queue_name(self):
|
||||
self._mock_listing({
|
||||
(None, "/.misplaced_objects/3600/bogus"): 3618.84187,
|
||||
|
Loading…
Reference in New Issue
Block a user