container-reconciler: support multiple processes
This follows the same pattern of configuration used in the object-expirer. When the container-recociler has a configuration value for processes it expects that many instances of the reconciler to be configured with a process value from [0, processes). Change-Id: Ie46bda37ca3f6e692ec31a4ddcd46f343fb1aeca
This commit is contained in:
parent
f29ea9d04a
commit
eb969fdeea
@ -60,6 +60,16 @@
|
|||||||
# Number of objects to process concurrently per process
|
# Number of objects to process concurrently per process
|
||||||
# concurrency = 1
|
# concurrency = 1
|
||||||
|
|
||||||
|
# processes is how many parts to divide the work into, one part per process
|
||||||
|
# that will be doing the work
|
||||||
|
# processes set 0 means that a single process will be doing all the work
|
||||||
|
# processes = 0
|
||||||
|
#
|
||||||
|
# process is which of the parts a particular process will work on
|
||||||
|
# process is "zero based", if you want to use 3 processes, you should run
|
||||||
|
# processes with process set to 0, 1, and 2
|
||||||
|
# process = 0
|
||||||
|
|
||||||
[pipeline:main]
|
[pipeline:main]
|
||||||
# Note that the reconciler's pipeline is intentionally very sparse -- it is
|
# Note that the reconciler's pipeline is intentionally very sparse -- it is
|
||||||
# only responsible for moving data from one policy to another and should not
|
# only responsible for moving data from one policy to another and should not
|
||||||
|
@ -31,7 +31,7 @@ from swift.common.request_helpers import MISPLACED_OBJECTS_ACCOUNT, \
|
|||||||
USE_REPLICATION_NETWORK_HEADER
|
USE_REPLICATION_NETWORK_HEADER
|
||||||
from swift.common.utils import get_logger, split_path, majority_size, \
|
from swift.common.utils import get_logger, split_path, majority_size, \
|
||||||
FileLikeIter, Timestamp, last_modified_date_to_timestamp, \
|
FileLikeIter, Timestamp, last_modified_date_to_timestamp, \
|
||||||
LRUCache, decode_timestamps
|
LRUCache, decode_timestamps, hash_path
|
||||||
from swift.common.storage_policy import POLICIES
|
from swift.common.storage_policy import POLICIES
|
||||||
|
|
||||||
MISPLACED_OBJECTS_CONTAINER_DIVISOR = 3600 # 1 hour
|
MISPLACED_OBJECTS_CONTAINER_DIVISOR = 3600 # 1 hour
|
||||||
@ -381,6 +381,17 @@ class ContainerReconciler(Daemon):
|
|||||||
self.concurrency = int(conf.get('concurrency', 1))
|
self.concurrency = int(conf.get('concurrency', 1))
|
||||||
if self.concurrency < 1:
|
if self.concurrency < 1:
|
||||||
raise ValueError("concurrency must be set to at least 1")
|
raise ValueError("concurrency must be set to at least 1")
|
||||||
|
self.processes = int(self.conf.get('processes', 0))
|
||||||
|
if self.processes < 0:
|
||||||
|
raise ValueError(
|
||||||
|
'processes must be an integer greater than or equal to 0')
|
||||||
|
self.process = int(self.conf.get('process', 0))
|
||||||
|
if self.process < 0:
|
||||||
|
raise ValueError(
|
||||||
|
'process must be an integer greater than or equal to 0')
|
||||||
|
if self.processes and self.process >= self.processes:
|
||||||
|
raise ValueError(
|
||||||
|
'process must be less than processes')
|
||||||
|
|
||||||
def stats_log(self, metric, msg, *args, **kwargs):
|
def stats_log(self, metric, msg, *args, **kwargs):
|
||||||
"""
|
"""
|
||||||
@ -773,6 +784,19 @@ class ContainerReconciler(Daemon):
|
|||||||
MISPLACED_OBJECTS_ACCOUNT, container,
|
MISPLACED_OBJECTS_ACCOUNT, container,
|
||||||
acceptable_statuses=(2, 404, 409, 412))
|
acceptable_statuses=(2, 404, 409, 412))
|
||||||
|
|
||||||
|
def should_process(self, queue_item):
|
||||||
|
"""
|
||||||
|
Check if a given entry should be handled by this process.
|
||||||
|
|
||||||
|
:param container: the queue container
|
||||||
|
:param queue_item: an entry from the queue
|
||||||
|
"""
|
||||||
|
if not self.processes:
|
||||||
|
return True
|
||||||
|
hexdigest = hash_path(
|
||||||
|
queue_item['account'], queue_item['container'], queue_item['obj'])
|
||||||
|
return int(hexdigest, 16) % self.processes == self.process
|
||||||
|
|
||||||
def process_queue_item(self, q_container, q_entry, queue_item):
|
def process_queue_item(self, q_container, q_entry, queue_item):
|
||||||
"""
|
"""
|
||||||
Process an entry and remove from queue on success.
|
Process an entry and remove from queue on success.
|
||||||
@ -806,8 +830,9 @@ class ContainerReconciler(Daemon):
|
|||||||
'invalid queue record: %r', raw_obj,
|
'invalid queue record: %r', raw_obj,
|
||||||
level=logging.ERROR, exc_info=True)
|
level=logging.ERROR, exc_info=True)
|
||||||
continue
|
continue
|
||||||
pool.spawn_n(self.process_queue_item,
|
if self.should_process(queue_item):
|
||||||
container, raw_obj['name'], queue_item)
|
pool.spawn_n(self.process_queue_item,
|
||||||
|
container, raw_obj['name'], queue_item)
|
||||||
self.log_stats()
|
self.log_stats()
|
||||||
pool.waitall()
|
pool.waitall()
|
||||||
|
|
||||||
|
@ -765,6 +765,34 @@ class TestReconciler(unittest.TestCase):
|
|||||||
self.assertRaises(ValueError, reconciler.ContainerReconciler,
|
self.assertRaises(ValueError, reconciler.ContainerReconciler,
|
||||||
conf, self.logger, self.swift)
|
conf, self.logger, self.swift)
|
||||||
|
|
||||||
|
def test_processes_config(self):
|
||||||
|
conf = {}
|
||||||
|
r = reconciler.ContainerReconciler(conf, self.logger, self.swift)
|
||||||
|
self.assertEqual(r.process, 0)
|
||||||
|
self.assertEqual(r.processes, 0)
|
||||||
|
|
||||||
|
conf = {'processes': '1'}
|
||||||
|
r = reconciler.ContainerReconciler(conf, self.logger, self.swift)
|
||||||
|
self.assertEqual(r.process, 0)
|
||||||
|
self.assertEqual(r.processes, 1)
|
||||||
|
|
||||||
|
conf = {'processes': 10, 'process': '9'}
|
||||||
|
r = reconciler.ContainerReconciler(conf, self.logger, self.swift)
|
||||||
|
self.assertEqual(r.process, 9)
|
||||||
|
self.assertEqual(r.processes, 10)
|
||||||
|
|
||||||
|
conf = {'processes': -1}
|
||||||
|
self.assertRaises(ValueError, reconciler.ContainerReconciler,
|
||||||
|
conf, self.logger, self.swift)
|
||||||
|
|
||||||
|
conf = {'process': -1}
|
||||||
|
self.assertRaises(ValueError, reconciler.ContainerReconciler,
|
||||||
|
conf, self.logger, self.swift)
|
||||||
|
|
||||||
|
conf = {'processes': 9, 'process': 9}
|
||||||
|
self.assertRaises(ValueError, reconciler.ContainerReconciler,
|
||||||
|
conf, self.logger, self.swift)
|
||||||
|
|
||||||
def _mock_listing(self, objects):
|
def _mock_listing(self, objects):
|
||||||
self.swift.parse(objects)
|
self.swift.parse(objects)
|
||||||
self.fake_swift = self.reconciler.swift.app
|
self.fake_swift = self.reconciler.swift.app
|
||||||
@ -853,6 +881,57 @@ class TestReconciler(unittest.TestCase):
|
|||||||
('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1'),
|
('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1'),
|
||||||
])
|
])
|
||||||
|
|
||||||
|
def test_multi_process_should_process(self):
|
||||||
|
def mkqi(a, c, o):
|
||||||
|
"make queue item"
|
||||||
|
return {
|
||||||
|
'account': a,
|
||||||
|
'container': c,
|
||||||
|
'obj': o,
|
||||||
|
}
|
||||||
|
queue = [
|
||||||
|
mkqi('a', 'c', 'o1'),
|
||||||
|
mkqi('a', 'c', 'o2'),
|
||||||
|
mkqi('a', 'c', 'o3'),
|
||||||
|
mkqi('a', 'c', 'o4'),
|
||||||
|
]
|
||||||
|
|
||||||
|
def map_should_process(process, processes):
|
||||||
|
self.reconciler.process = process
|
||||||
|
self.reconciler.processes = processes
|
||||||
|
with mock.patch('swift.common.utils.HASH_PATH_SUFFIX',
|
||||||
|
b'endcap'), \
|
||||||
|
mock.patch('swift.common.utils.HASH_PATH_PREFIX', b''):
|
||||||
|
return [self.reconciler.should_process(q_item)
|
||||||
|
for q_item in queue]
|
||||||
|
|
||||||
|
def check_process(process, processes, expected):
|
||||||
|
should_process = map_should_process(process, processes)
|
||||||
|
try:
|
||||||
|
self.assertEqual(should_process, expected)
|
||||||
|
except AssertionError as e:
|
||||||
|
self.fail('unexpected items processed for %s/%s\n%s' % (
|
||||||
|
process, processes, e))
|
||||||
|
|
||||||
|
check_process(0, 0, [True] * 4)
|
||||||
|
check_process(0, 1, [True] * 4)
|
||||||
|
check_process(0, 2, [False, True, False, False])
|
||||||
|
check_process(1, 2, [True, False, True, True])
|
||||||
|
|
||||||
|
check_process(0, 4, [False, True, False, False])
|
||||||
|
check_process(1, 4, [True, False, False, False])
|
||||||
|
check_process(2, 4, [False] * 4) # lazy
|
||||||
|
check_process(3, 4, [False, False, True, True])
|
||||||
|
|
||||||
|
queue = [mkqi('a%s' % i, 'c%s' % i, 'o%s' % i) for i in range(1000)]
|
||||||
|
items_handled = [0] * 1000
|
||||||
|
for process in range(100):
|
||||||
|
should_process = map_should_process(process, 100)
|
||||||
|
for i, handled in enumerate(should_process):
|
||||||
|
if handled:
|
||||||
|
items_handled[i] += 1
|
||||||
|
self.assertEqual([1] * 1000, items_handled)
|
||||||
|
|
||||||
def test_invalid_queue_name(self):
|
def test_invalid_queue_name(self):
|
||||||
self._mock_listing({
|
self._mock_listing({
|
||||||
(None, "/.misplaced_objects/3600/bogus"): 3618.84187,
|
(None, "/.misplaced_objects/3600/bogus"): 3618.84187,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user