diff --git a/etc/object-server.conf-sample b/etc/object-server.conf-sample index 6cbc79a098..2d060fbd9c 100644 --- a/etc/object-server.conf-sample +++ b/etc/object-server.conf-sample @@ -352,6 +352,15 @@ use = egg:swift#recon # honored as a synonym, but may be ignored in a future release. # handoffs_only = False # +# The default strategy for unmounted drives will stage rebuilt data on a +# handoff node until updated rings are deployed. Because fragments are rebuilt +# on offset handoffs based on fragment index and the proxy limits how deep it +# will search for EC frags we restrict how many nodes we'll try. Setting to 0 +# will disable rebuilds to handoffs and only rebuild fragments for unmounted +# devices to mounted primaries after a ring change. +# Setting to -1 means "no limit". +# rebuild_handoff_node_count = 2 +# # You can set scheduling priority of processes. Niceness values range from -20 # (most favorable to the process) to 19 (least favorable to the process). # nice_priority = diff --git a/swift/common/ring/ring.py b/swift/common/ring/ring.py index a28d97d2fa..ed888e28da 100644 --- a/swift/common/ring/ring.py +++ b/swift/common/ring/ring.py @@ -24,7 +24,7 @@ from time import time import os from io import BufferedReader from hashlib import md5 -from itertools import chain +from itertools import chain, count from tempfile import NamedTemporaryFile import sys @@ -237,35 +237,36 @@ class Ring(object): self._replica2part2dev_id = ring_data._replica2part2dev_id self._part_shift = ring_data._part_shift self._rebuild_tier_data() - - # Do this now, when we know the data has changed, rather than - # doing it on every call to get_more_nodes(). - # - # Since this is to speed up the finding of handoffs, we only - # consider devices with at least one partition assigned. This - # way, a region, zone, or server with no partitions assigned - # does not count toward our totals, thereby keeping the early - # bailouts in get_more_nodes() working. - dev_ids_with_parts = set() - for part2dev_id in self._replica2part2dev_id: - for dev_id in part2dev_id: - dev_ids_with_parts.add(dev_id) - - regions = set() - zones = set() - ips = set() - self._num_devs = 0 - for dev in self._devs: - if dev and dev['id'] in dev_ids_with_parts: - regions.add(dev['region']) - zones.add((dev['region'], dev['zone'])) - ips.add((dev['region'], dev['zone'], dev['ip'])) - self._num_devs += 1 - self._num_regions = len(regions) - self._num_zones = len(zones) - self._num_ips = len(ips) + self._update_bookkeeping() self._next_part_power = ring_data.next_part_power + def _update_bookkeeping(self): + # Do this now, when we know the data has changed, rather than + # doing it on every call to get_more_nodes(). + # + # Since this is to speed up the finding of handoffs, we only + # consider devices with at least one partition assigned. This + # way, a region, zone, or server with no partitions assigned + # does not count toward our totals, thereby keeping the early + # bailouts in get_more_nodes() working. + dev_ids_with_parts = set() + for part2dev_id in self._replica2part2dev_id: + for dev_id in part2dev_id: + dev_ids_with_parts.add(dev_id) + regions = set() + zones = set() + ips = set() + self._num_devs = 0 + for dev in self._devs: + if dev and dev['id'] in dev_ids_with_parts: + regions.add(dev['region']) + zones.add((dev['region'], dev['zone'])) + ips.add((dev['region'], dev['zone'], dev['ip'])) + self._num_devs += 1 + self._num_regions = len(regions) + self._num_zones = len(zones) + self._num_ips = len(ips) + @property def next_part_power(self): return self._next_part_power @@ -407,8 +408,8 @@ class Ring(object): if time() > self._rtime: self._reload() primary_nodes = self._get_part_nodes(part) - used = set(d['id'] for d in primary_nodes) + index = count() same_regions = set(d['region'] for d in primary_nodes) same_zones = set((d['region'], d['zone']) for d in primary_nodes) same_ips = set( @@ -434,7 +435,7 @@ class Ring(object): dev = self._devs[dev_id] region = dev['region'] if dev_id not in used and region not in same_regions: - yield dev + yield dict(dev, handoff_index=next(index)) used.add(dev_id) same_regions.add(region) zone = dev['zone'] @@ -459,7 +460,7 @@ class Ring(object): dev = self._devs[dev_id] zone = (dev['region'], dev['zone']) if dev_id not in used and zone not in same_zones: - yield dev + yield dict(dev, handoff_index=next(index)) used.add(dev_id) same_zones.add(zone) ip = zone + (dev['ip'],) @@ -482,7 +483,7 @@ class Ring(object): dev = self._devs[dev_id] ip = (dev['region'], dev['zone'], dev['ip']) if dev_id not in used and ip not in same_ips: - yield dev + yield dict(dev, handoff_index=next(index)) used.add(dev_id) same_ips.add(ip) if len(same_ips) == self._num_ips: @@ -501,7 +502,8 @@ class Ring(object): if handoff_part < len(part2dev_id): dev_id = part2dev_id[handoff_part] if dev_id not in used: - yield self._devs[dev_id] + dev = self._devs[dev_id] + yield dict(dev, handoff_index=next(index)) used.add(dev_id) if len(used) == self._num_devs: hit_all_devs = True diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py index 64497552e3..71ae06e5fc 100644 --- a/swift/obj/reconstructor.py +++ b/swift/obj/reconstructor.py @@ -19,7 +19,6 @@ import os from os.path import join import random import time -import itertools from collections import defaultdict import six import six.moves.cPickle as pickle @@ -51,18 +50,22 @@ from swift.common.exceptions import ConnectionTimeout, DiskFileError, \ SYNC, REVERT = ('sync_only', 'sync_revert') -def _get_partners(frag_index, part_nodes): +def _get_partners(node_index, part_nodes): """ - Returns the left and right partners of the node whose index is - equal to the given frag_index. + Returns the left, right and far partners of the node whose index is equal + to the given node_index. - :param frag_index: a fragment index + :param node_index: the primary index :param part_nodes: a list of primary nodes - :returns: [, ] + :returns: [, , ] """ + num_nodes = len(part_nodes) return [ - part_nodes[(frag_index - 1) % len(part_nodes)], - part_nodes[(frag_index + 1) % len(part_nodes)], + part_nodes[(node_index - 1) % num_nodes], + part_nodes[(node_index + 1) % num_nodes], + part_nodes[( + node_index + (num_nodes // 2) + ) % num_nodes], ] @@ -203,6 +206,8 @@ class ObjectReconstructor(Daemon): elif default_handoffs_only: self.logger.warning('Ignored handoffs_first option in favor ' 'of handoffs_only.') + self.rebuild_handoff_node_count = int(conf.get( + 'rebuild_handoff_node_count', 2)) self._df_router = DiskFileRouter(conf, self.logger) self.all_local_devices = self.get_local_devices() @@ -667,6 +672,33 @@ class ObjectReconstructor(Daemon): _("Trying to sync suffixes with %s") % _full_path( node, job['partition'], '', job['policy'])) + def _iter_nodes_for_frag(self, policy, partition, node): + """ + Generate a priority list of nodes that can sync to the given node. + + The primary node is always the highest priority, after that we'll use + handoffs. + + To avoid conflicts placing frags we'll skip through the handoffs and + only yield back those that are offset equal to to the given primary + node index. + + Nodes returned from this iterator will have 'backend_index' set. + """ + node['backend_index'] = policy.get_backend_index(node['index']) + yield node + count = 0 + for handoff_node in policy.object_ring.get_more_nodes(partition): + handoff_backend_index = policy.get_backend_index( + handoff_node['handoff_index']) + if handoff_backend_index == node['backend_index']: + if (self.rebuild_handoff_node_count >= 0 and + count >= self.rebuild_handoff_node_count): + break + handoff_node['backend_index'] = handoff_backend_index + yield handoff_node + count += 1 + def _get_suffixes_to_sync(self, job, node): """ For SYNC jobs we need to make a remote REPLICATE request to get @@ -677,48 +709,56 @@ class ObjectReconstructor(Daemon): :param: the job dict, with the keys defined in ``_get_part_jobs`` :param node: the remote node dict :returns: a (possibly empty) list of strings, the suffixes to be - synced with the remote node. + synced and the remote node. """ # get hashes from the remote node remote_suffixes = None + attempts_remaining = 1 headers = self.headers.copy() headers['X-Backend-Storage-Policy-Index'] = int(job['policy']) - try: - with Timeout(self.http_timeout): - resp = http_connect( - node['replication_ip'], node['replication_port'], - node['device'], job['partition'], 'REPLICATE', - '', headers=headers).getresponse() - if resp.status == HTTP_INSUFFICIENT_STORAGE: - self.logger.error( - _('%s responded as unmounted'), - _full_path(node, job['partition'], '', - job['policy'])) - elif resp.status != HTTP_OK: - full_path = _full_path(node, job['partition'], '', - job['policy']) - self.logger.error( - _("Invalid response %(resp)s from %(full_path)s"), - {'resp': resp.status, 'full_path': full_path}) - else: - remote_suffixes = pickle.loads(resp.read()) - except (Exception, Timeout): - # all exceptions are logged here so that our caller can - # safely catch our exception and continue to the next node - # without logging - self.logger.exception('Unable to get remote suffix hashes ' - 'from %r' % _full_path( - node, job['partition'], '', - job['policy'])) - + possible_nodes = self._iter_nodes_for_frag( + job['policy'], job['partition'], node) + while remote_suffixes is None and attempts_remaining: + try: + node = next(possible_nodes) + except StopIteration: + break + attempts_remaining -= 1 + try: + with Timeout(self.http_timeout): + resp = http_connect( + node['replication_ip'], node['replication_port'], + node['device'], job['partition'], 'REPLICATE', + '', headers=headers).getresponse() + if resp.status == HTTP_INSUFFICIENT_STORAGE: + self.logger.error( + _('%s responded as unmounted'), + _full_path(node, job['partition'], '', + job['policy'])) + attempts_remaining += 1 + elif resp.status != HTTP_OK: + full_path = _full_path(node, job['partition'], '', + job['policy']) + self.logger.error( + _("Invalid response %(resp)s from %(full_path)s"), + {'resp': resp.status, 'full_path': full_path}) + else: + remote_suffixes = pickle.loads(resp.read()) + except (Exception, Timeout): + # all exceptions are logged here so that our caller can + # safely catch our exception and continue to the next node + # without logging + self.logger.exception('Unable to get remote suffix hashes ' + 'from %r' % _full_path( + node, job['partition'], '', + job['policy'])) if remote_suffixes is None: raise SuffixSyncError('Unable to get remote suffix hashes') suffixes = self.get_suffix_delta(job['hashes'], job['frag_index'], remote_suffixes, - job['policy'].get_backend_index( - node['index'])) + node['backend_index']) # now recalculate local hashes for suffixes that don't # match so we're comparing the latest local_suff = self._get_hashes(job['local_dev']['device'], @@ -728,11 +768,10 @@ class ObjectReconstructor(Daemon): suffixes = self.get_suffix_delta(local_suff, job['frag_index'], remote_suffixes, - job['policy'].get_backend_index( - node['index'])) + node['backend_index']) self.suffix_count += len(suffixes) - return suffixes + return suffixes, node def delete_reverted_objs(self, job, objects, frag_index): """ @@ -798,38 +837,15 @@ class ObjectReconstructor(Daemon): """ self.logger.increment( 'partition.update.count.%s' % (job['local_dev']['device'],)) - # after our left and right partners, if there's some sort of - # failure we'll continue onto the remaining primary nodes and - # make sure they're in sync - or potentially rebuild missing - # fragments we find - dest_nodes = itertools.chain( - job['sync_to'], - # I think we could order these based on our index to better - # protect against a broken chain - [ - n for n in - job['policy'].object_ring.get_part_nodes(job['partition']) - if n['id'] != job['local_dev']['id'] and - n['id'] not in (m['id'] for m in job['sync_to']) - ], - ) - syncd_with = 0 - for node in dest_nodes: - if syncd_with >= len(job['sync_to']): - # success! - break - + for node in job['sync_to']: try: - suffixes = self._get_suffixes_to_sync(job, node) + suffixes, node = self._get_suffixes_to_sync(job, node) except SuffixSyncError: continue if not suffixes: - syncd_with += 1 continue - node['backend_index'] = job['policy'].get_backend_index( - node['index']) # ssync any out-of-sync suffixes with the remote node success, _ = ssync_sender( self, node, job, suffixes)() @@ -838,8 +854,6 @@ class ObjectReconstructor(Daemon): # update stats for this attempt self.suffix_sync += len(suffixes) self.logger.update_stats('suffix.syncs', len(suffixes)) - if success: - syncd_with += 1 self.logger.timing_since('partition.update.timing', begin) def _revert(self, job, begin): @@ -951,6 +965,8 @@ class ObjectReconstructor(Daemon): try: suffixes = data_fi_to_suffixes.pop(frag_index) except KeyError: + # N.B. If this function ever returns an empty list of jobs + # the entire partition will be deleted. suffixes = [] sync_job = build_job( job_type=SYNC, diff --git a/test/probe/test_reconstructor_rebuild.py b/test/probe/test_reconstructor_rebuild.py index d64d85fa5b..5ebfb7381f 100644 --- a/test/probe/test_reconstructor_rebuild.py +++ b/test/probe/test_reconstructor_rebuild.py @@ -22,7 +22,6 @@ import unittest import uuid import shutil import random -from collections import defaultdict import os import time @@ -32,7 +31,6 @@ from test.probe.common import ECProbeTest from swift.common import direct_client from swift.common.storage_policy import EC_POLICY from swift.common.manager import Manager -from swift.obj.reconstructor import _get_partners from swiftclient import client, ClientException @@ -300,46 +298,46 @@ class TestReconstructorRebuild(ECProbeTest): self._test_rebuild_scenario(failed, non_durable, 3) def test_rebuild_partner_down(self): - # find a primary server that only has one of it's devices in the - # primary node list - group_nodes_by_config = defaultdict(list) - for n in self.onodes: - group_nodes_by_config[self.config_number(n)].append(n) - for config_number, node_list in group_nodes_by_config.items(): - if len(node_list) == 1: - break - else: - self.fail('ring balancing did not use all available nodes') - primary_node = node_list[0] + # we have to pick a lower index because we have few handoffs + nodes = self.onodes[:2] + random.shuffle(nodes) # left or right is fine + primary_node, partner_node = nodes - # pick one it's partners to fail randomly - partner_node = random.choice(_get_partners( - primary_node['index'], self.onodes)) + # capture fragment etag from partner + failed_partner_meta, failed_partner_etag = self.direct_get( + partner_node, self.opart) - # 507 the partner device + # and 507 the failed partner device device_path = self.device_dir('object', partner_node) self.kill_drive(device_path) - # select another primary sync_to node to fail - failed_primary = [n for n in self.onodes if n['id'] not in - (primary_node['id'], partner_node['id'])][0] - # ... capture it's fragment etag - failed_primary_meta, failed_primary_etag = self.direct_get( - failed_primary, self.opart) - # ... and delete it - part_dir = self.storage_dir('object', failed_primary, part=self.opart) - shutil.rmtree(part_dir, True) - # reconstruct from the primary, while one of it's partners is 507'd self.reconstructor.once(number=self.config_number(primary_node)) - # the other failed primary will get it's fragment rebuilt instead - failed_primary_meta_new, failed_primary_etag_new = self.direct_get( - failed_primary, self.opart) - del failed_primary_meta['Date'] - del failed_primary_meta_new['Date'] - self.assertEqual(failed_primary_etag, failed_primary_etag_new) - self.assertEqual(failed_primary_meta, failed_primary_meta_new) + # a handoff will pickup the rebuild + hnodes = list(self.object_ring.get_more_nodes(self.opart)) + for node in hnodes: + try: + found_meta, found_etag = self.direct_get( + node, self.opart) + except DirectClientException as e: + if e.http_status != 404: + raise + else: + break + else: + self.fail('Unable to fetch rebuilt frag from handoffs %r ' + 'given primary nodes %r with %s unmounted ' + 'trying to rebuild from %s' % ( + [h['device'] for h in hnodes], + [n['device'] for n in self.onodes], + partner_node['device'], + primary_node['device'], + )) + self.assertEqual(failed_partner_etag, found_etag) + del failed_partner_meta['Date'] + del found_meta['Date'] + self.assertEqual(failed_partner_meta, found_meta) # just to be nice self.revive_drive(device_path) diff --git a/test/unit/__init__.py b/test/unit/__init__.py index de29af08be..7109e3c210 100644 --- a/test/unit/__init__.py +++ b/test/unit/__init__.py @@ -274,6 +274,7 @@ class FakeRing(Ring): return [dict(node, index=i) for i, node in enumerate(list(self._devs))] def get_more_nodes(self, part): + index_counter = itertools.count() for x in range(self.replicas, (self.replicas + self.max_more_nodes)): yield {'ip': '10.0.0.%s' % x, 'replication_ip': '10.0.0.%s' % x, @@ -282,7 +283,8 @@ class FakeRing(Ring): 'device': 'sda', 'zone': x % 3, 'region': x % 2, - 'id': x} + 'id': x, + 'handoff_index': next(index_counter)} def write_fake_ring(path, *devs): @@ -346,6 +348,9 @@ class FabricatedRing(Ring): self._part_shift = 32 - part_power self._reload() + def has_changed(self): + return False + def _reload(self, *args, **kwargs): self._rtime = time.time() * 2 if hasattr(self, '_replica2part2dev_id'): @@ -370,6 +375,7 @@ class FabricatedRing(Ring): for p in range(2 ** self.part_power): for r in range(self.replicas): self._replica2part2dev_id[r][p] = next(dev_ids) + self._update_bookkeeping() class FakeMemcache(object): diff --git a/test/unit/common/ring/test_ring.py b/test/unit/common/ring/test_ring.py index 17335f582b..222376c0c6 100644 --- a/test/unit/common/ring/test_ring.py +++ b/test/unit/common/ring/test_ring.py @@ -568,6 +568,10 @@ class TestRing(TestRingBase): self.assertEqual(len(devs), len(exp_handoffs)) dev_ids = [d['id'] for d in devs] self.assertEqual(dev_ids, exp_handoffs) + # We mark handoffs so code consuming extra nodes can reason about how + # far they've gone + for i, d in enumerate(devs): + self.assertEqual(d['handoff_index'], i) # The first 6 replicas plus the 3 primary nodes should cover all 9 # zones in this test diff --git a/test/unit/obj/test_reconstructor.py b/test/unit/obj/test_reconstructor.py index 6e48f92213..335983c15d 100644 --- a/test/unit/obj/test_reconstructor.py +++ b/test/unit/obj/test_reconstructor.py @@ -40,7 +40,7 @@ from swift.obj import diskfile, reconstructor as object_reconstructor from swift.common import ring from swift.common.storage_policy import (StoragePolicy, ECStoragePolicy, POLICIES, EC_POLICY) -from swift.obj.reconstructor import REVERT +from swift.obj.reconstructor import SYNC, REVERT from test.unit import (patch_policies, debug_logger, mocked_http_conn, FabricatedRing, make_timestamp_iter, @@ -143,7 +143,7 @@ def get_header_frag_index(self, body): @patch_policies([StoragePolicy(0, name='zero', is_default=True), ECStoragePolicy(1, name='one', ec_type=DEFAULT_TEST_EC_TYPE, - ec_ndata=2, ec_nparity=1)]) + ec_ndata=3, ec_nparity=2)]) class TestGlobalSetupObjectReconstructor(unittest.TestCase): # Tests for reconstructor using real objects in test partition directories. legacy_durable = False @@ -151,9 +151,8 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): def setUp(self): skip_if_no_xattrs() self.testdir = tempfile.mkdtemp() - _create_test_rings(self.testdir) - POLICIES[0].object_ring = ring.Ring(self.testdir, ring_name='object') - POLICIES[1].object_ring = ring.Ring(self.testdir, ring_name='object-1') + POLICIES[0].object_ring = FabricatedRing(3) + POLICIES[1].object_ring = FabricatedRing(5) utils.HASH_PATH_SUFFIX = b'endcap' utils.HASH_PATH_PREFIX = b'' self.devices = os.path.join(self.testdir, 'node') @@ -176,7 +175,8 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): self.conf = dict( swift_dir=self.testdir, devices=self.devices, mount_check='false', - timeout='300', stats_interval='1') + timeout='300', stats_interval='1', + bind_ip='10.0.0.1', bind_port=6200) self.logger = debug_logger('test-reconstructor') self.reconstructor = object_reconstructor.ObjectReconstructor( self.conf, logger=self.logger) @@ -189,13 +189,16 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): # part 0: 3C1/hash/xxx#1#d.data <-- job: sync_only - partners (FI 1) # 061/hash/xxx#1#d.data <-- included in earlier job (FI 1) # /xxx#2#d.data <-- job: sync_revert to index 2 + # part_nodes: ['sda0', 'sda1', 'sda2', 'sda3', 'sda4'] - # part 1: 3C1/hash/xxx#0#d.data <-- job: sync_only - partners (FI 0) + # part 1: 3C1/hash/xxx#0#d.data <-- job: sync_revert to index 0 # /xxx#1#d.data <-- job: sync_revert to index 1 # 061/hash/xxx#1#d.data <-- included in earlier job (FI 1) + # part_nodes: ['sda5', 'sda6', 'sda7', 'sda0', 'sda1'] # part 2: 3C1/hash/xxx#2#d.data <-- job: sync_revert to index 2 # 061/hash/xxx#0#d.data <-- job: sync_revert to index 0 + # part_nodes: ['sda2', 'sda3', 'sda4', 'sda5', 'sda6'] def _create_frag_archives(policy, obj_path, local_id, obj_set): # we'll create 2 sets of objects in different suffix dirs @@ -251,7 +254,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): for obj_num in range(0, 3): _create_df(obj_num, part_num) - ips = utils.whataremyips() + ips = utils.whataremyips(self.reconstructor.bind_ip) for policy in [p for p in POLICIES if p.policy_type == EC_POLICY]: self.ec_policy = policy self.ec_obj_ring = self.reconstructor.load_object_ring( @@ -312,13 +315,14 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): 'sync_to': [{ 'index': 2, 'replication_port': 6200, - 'zone': 2, - 'ip': '127.0.0.2', + 'zone': 1, + 'ip': '10.0.0.2', 'region': 1, 'port': 6200, - 'replication_ip': '127.0.0.2', - 'device': 'sda1', + 'replication_ip': '10.0.0.2', + 'device': 'sda2', 'id': 2, + 'weight': 1.0, }], 'job_type': object_reconstructor.REVERT, 'suffixes': ['061'], @@ -328,11 +332,13 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): 'local_dev': { 'replication_port': 6200, 'zone': 1, - 'ip': '127.0.0.1', + 'ip': '10.0.0.1', 'region': 1, 'id': 1, - 'replication_ip': '127.0.0.1', - 'device': 'sda1', 'port': 6200, + 'replication_ip': '10.0.0.1', + 'device': 'sda1', + 'port': 6200, + 'weight': 1.0, }, 'hashes': { '061': { @@ -349,22 +355,36 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): 'sync_to': [{ 'index': 0, 'replication_port': 6200, - 'zone': 0, - 'ip': '127.0.0.0', + 'zone': 1, + 'ip': '10.0.0.0', 'region': 1, 'port': 6200, - 'replication_ip': '127.0.0.0', - 'device': 'sda1', 'id': 0, + 'replication_ip': '10.0.0.0', + 'device': 'sda0', + 'id': 0, + 'weight': 1.0, }, { 'index': 2, 'replication_port': 6200, - 'zone': 2, - 'ip': '127.0.0.2', + 'zone': 1, + 'ip': '10.0.0.2', 'region': 1, 'port': 6200, - 'replication_ip': '127.0.0.2', - 'device': 'sda1', + 'replication_ip': '10.0.0.2', + 'device': 'sda2', 'id': 2, + 'weight': 1.0, + }, { + 'index': 3, + 'replication_port': 6200, + 'zone': 1, + 'ip': '10.0.0.3', + 'region': 1, + 'port': 6200, + 'replication_ip': '10.0.0.3', + 'device': 'sda3', + 'id': 3, + 'weight': 1.0, }], 'job_type': object_reconstructor.SYNC, 'sync_diskfile_builder': self.reconstructor.reconstruct_fa, @@ -375,12 +395,13 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): 'local_dev': { 'replication_port': 6200, 'zone': 1, - 'ip': '127.0.0.1', + 'ip': '10.0.0.1', 'region': 1, 'id': 1, - 'replication_ip': '127.0.0.1', + 'replication_ip': '10.0.0.1', 'device': 'sda1', 'port': 6200, + 'weight': 1.0, }, 'hashes': { @@ -402,13 +423,14 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): 'sync_to': [{ 'index': 1, 'replication_port': 6200, - 'zone': 2, - 'ip': '127.0.0.2', + 'zone': 1, + 'ip': '10.0.0.2', 'region': 1, 'port': 6200, - 'replication_ip': '127.0.0.2', - 'device': 'sda1', - 'id': 2, + 'replication_ip': '10.0.0.2', + 'device': 'sda6', + 'id': 6, + 'weight': 1.0, }], 'job_type': object_reconstructor.REVERT, 'suffixes': ['061', '3c1'], @@ -418,12 +440,13 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): 'local_dev': { 'replication_port': 6200, 'zone': 1, - 'ip': '127.0.0.1', + 'ip': '10.0.0.1', 'region': 1, 'id': 1, - 'replication_ip': '127.0.0.1', + 'replication_ip': '10.0.0.1', 'device': 'sda1', 'port': 6200, + 'weight': 1.0, }, 'hashes': { @@ -439,27 +462,18 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): }, }, { 'sync_to': [{ - 'index': 2, + 'index': 0, 'replication_port': 6200, - 'zone': 4, - 'ip': '127.0.0.3', + 'zone': 1, + 'ip': '10.0.0.1', 'region': 1, 'port': 6200, - 'replication_ip': '127.0.0.3', - 'device': 'sda1', 'id': 3, - }, { - 'index': 1, - 'replication_port': 6200, - 'zone': 2, - 'ip': '127.0.0.2', - 'region': 1, - 'port': 6200, - 'replication_ip': '127.0.0.2', - 'device': 'sda1', - 'id': 2, + 'replication_ip': '10.0.0.1', + 'device': 'sda5', + 'id': 5, + 'weight': 1.0, }], - 'job_type': object_reconstructor.SYNC, - 'sync_diskfile_builder': self.reconstructor.reconstruct_fa, + 'job_type': object_reconstructor.REVERT, 'suffixes': ['3c1'], 'partition': 1, 'frag_index': 0, @@ -467,12 +481,13 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): 'local_dev': { 'replication_port': 6200, 'zone': 1, - 'ip': '127.0.0.1', + 'ip': '10.0.0.1', 'region': 1, 'id': 1, - 'replication_ip': '127.0.0.1', + 'replication_ip': '10.0.0.1', 'device': 'sda1', 'port': 6200, + 'weight': 1.0, }, 'hashes': { '061': { @@ -485,6 +500,70 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): 1: '0e6e8d48d801dc89fd31904ae3b31229', }, }, + }, { + 'sync_to': [{ + 'index': 3, + 'replication_port': 6200, + 'zone': 1, + 'ip': '10.0.0.0', + 'region': 1, + 'port': 6200, + 'replication_ip': '10.0.0.0', + 'device': 'sda0', + 'id': 0, + 'weight': 1.0, + }, { + 'index': 0, + 'replication_port': 6200, + 'zone': 1, + 'ip': '10.0.0.1', + 'region': 1, + 'port': 6200, + 'replication_ip': '10.0.0.1', + 'device': 'sda5', + 'id': 5, + 'weight': 1.0, + }, { + 'index': 1, + 'replication_port': 6200, + 'zone': 1, + 'ip': '10.0.0.2', + 'region': 1, + 'port': 6200, + 'replication_ip': '10.0.0.2', + 'device': 'sda6', + 'id': 6, + 'weight': 1.0, + }], + 'job_type': object_reconstructor.SYNC, + 'sync_diskfile_builder': self.reconstructor.reconstruct_fa, + 'suffixes': [], + 'partition': 1, + 'frag_index': 4, + 'device': 'sda1', + 'local_dev': { + 'replication_port': 6200, + 'zone': 1, + 'ip': '10.0.0.1', + 'region': 1, + 'id': 1, + 'replication_ip': '10.0.0.1', + 'device': 'sda1', + 'port': 6200, + 'weight': 1.0, + }, + 'hashes': { + '061': { + None: '85b02a5283704292a511078a5c483da5', + 1: '0e6e8d48d801dc89fd31904ae3b31229', + }, + '3c1': { + 0: '0e6e8d48d801dc89fd31904ae3b31229', + None: '85b02a5283704292a511078a5c483da5', + 1: '0e6e8d48d801dc89fd31904ae3b31229', + }, + }, + }] ) # part num 2 @@ -493,12 +572,14 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): 'sync_to': [{ 'index': 0, 'replication_port': 6200, - 'zone': 2, - 'ip': '127.0.0.2', + 'zone': 1, + 'ip': '10.0.0.2', 'region': 1, 'port': 6200, - 'replication_ip': '127.0.0.2', - 'device': 'sda1', 'id': 2, + 'replication_ip': '10.0.0.2', + 'device': 'sda2', + 'id': 2, + 'weight': 1.0, }], 'job_type': object_reconstructor.REVERT, 'suffixes': ['061'], @@ -508,12 +589,13 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): 'local_dev': { 'replication_port': 6200, 'zone': 1, - 'ip': '127.0.0.1', + 'ip': '10.0.0.1', 'region': 1, 'id': 1, - 'replication_ip': '127.0.0.1', + 'replication_ip': '10.0.0.1', 'device': 'sda1', 'port': 6200, + 'weight': 1.0, }, 'hashes': { '061': { @@ -529,13 +611,14 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): 'sync_to': [{ 'index': 2, 'replication_port': 6200, - 'zone': 0, - 'ip': '127.0.0.0', + 'zone': 1, + 'ip': '10.0.0.0', 'region': 1, 'port': 6200, - 'replication_ip': '127.0.0.0', - 'device': 'sda1', - 'id': 0, + 'replication_ip': '10.0.0.0', + 'device': 'sda4', + 'id': 4, + 'weight': 1.0, }], 'job_type': object_reconstructor.REVERT, 'suffixes': ['3c1'], @@ -545,12 +628,13 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): 'local_dev': { 'replication_port': 6200, 'zone': 1, - 'ip': '127.0.0.1', + 'ip': '10.0.0.1', 'region': 1, 'id': 1, - 'replication_ip': '127.0.0.1', + 'replication_ip': '10.0.0.1', 'device': 'sda1', - 'port': 6200 + 'port': 6200, + 'weight': 1.0, }, 'hashes': { '061': { @@ -572,6 +656,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): self.fail('Unknown part number %r' % part_num) expected_by_part_frag_index = dict( ((j['partition'], j['frag_index']), j) for j in expected_jobs) + unexpected_jobs = [] for job in jobs: job_key = (job['partition'], job['frag_index']) if job_key in expected_by_part_frag_index: @@ -585,15 +670,17 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): self.assertEqual(value, expected_value) except AssertionError as e: extra_info = \ - '\n\n... for %r in part num %s job %r' % ( - k, part_num, job_key) + '\n\n... for %r in part num %s frag %s' % ( + k, part_num, job_key[1]) raise AssertionError(str(e) + extra_info) else: - self.fail( - 'Unexpected job %r for part num %s - ' - 'expected jobs where %r' % ( - job_key, part_num, - expected_by_part_frag_index.keys())) + unexpected_jobs.append(job) + if unexpected_jobs: + self.fail( + 'Unexpected jobs for frags %r in part num %s - ' + 'expected jobs for frags %r' % ( + [j['frag_index'] for j in unexpected_jobs], part_num, + [k[1] for k in expected_by_part_frag_index])) for expected_job in expected_jobs: if expected_job in jobs: jobs.remove(expected_job) @@ -601,22 +688,30 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): check_jobs(part_num) def _run_once(self, http_count, extra_devices, override_devices=None): - ring_devs = list(self.policy.object_ring.devs) + id_counter = itertools.count( + max(d['id'] for d in self.policy.object_ring.devs) + 1) for device, parts in extra_devices.items(): device_path = os.path.join(self.devices, device) os.mkdir(device_path) for part in range(parts): - os.makedirs(os.path.join(device_path, 'objects-1', str(part))) - # we update the ring to make is_local happy - devs = [dict(d) for d in ring_devs] - for d in devs: - d['device'] = device - self.policy.object_ring.devs.extend(devs) + hash_path = os.path.join( + device_path, 'objects-1', str(part), 'abc', 'hash') + os.makedirs(hash_path) + tombstone_file = utils.Timestamp(time.time()).internal + '.ts' + with open(os.path.join(hash_path, tombstone_file), 'w'): + pass + # use sda1 as a base to make is_local happy + new_device = dict(self.policy.object_ring.devs[1]) + new_device['device'] = device + new_device['id'] = next(id_counter) + self.policy.object_ring.devs.append(new_device) self.reconstructor.stats_interval = 0 self.process_job = lambda j: sleep(0) - with mocked_http_conn(*[200] * http_count, body=pickle.dumps({})): - with mock_ssync_sender(): + with mock_ssync_sender(), \ + mocked_http_conn(*[200] * http_count, + body=pickle.dumps({})) as request_log: self.reconstructor.run_once(devices=override_devices) + return request_log def test_run_once(self): # sda1: 3 is done in setup @@ -625,7 +720,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): 'sdc1': 1, 'sdd1': 0, } - self._run_once(18, extra_devices) + self._run_once(32, extra_devices) stats_lines = set() for line in self.logger.get_lines_for_level('info'): if 'reconstructed in' not in line: @@ -651,7 +746,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): 'sdc1': 1, 'sdd1': 0, } - self._run_once(2, extra_devices, 'sdc1') + self._run_once(3, extra_devices, 'sdc1') stats_lines = set() for line in self.logger.get_lines_for_level('info'): if 'reconstructed in' not in line: @@ -822,38 +917,87 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): self.assertIn('You should disable handoffs_only', msgs[-1]) def test_get_partners(self): - # we're going to perform an exhaustive test of every possible - # combination of partitions and nodes in our custom test ring + expected = ( + # node_index, part_nodes => partners + (0, [0, 1, 2, 3], [3, 1, 2]), + (0, [2, 3, 1, 0], [0, 3, 1]), + (0, [0, 1, 2, 3, 4], [4, 1, 2]), + (0, [0, 1, 2, 3, 4, 5], [5, 1, 3]), + (1, [0, 1, 2, 3, 4, 5], [0, 2, 4]), + (2, [0, 1, 2, 3, 4, 5], [1, 3, 5]), + (3, [0, 1, 2, 3, 4, 5], [2, 4, 0]), + (4, [0, 1, 2, 3, 4, 5], [3, 5, 1]), + (5, [0, 1, 2, 3, 4, 5], [4, 0, 2]), + (5, [1, 4, 0, 2, 3, 5], [3, 1, 0]), + ) + failures = [] + for frag_index, part_nodes, partners in expected: + sync_to = object_reconstructor._get_partners( + frag_index, part_nodes) + if partners != sync_to: + failures.append('Given nodes %r for index %s we expected ' + '%r but got %r' % ( + part_nodes, frag_index, partners, sync_to)) + if failures: + failures.insert(0, 'Some test scenarios failed:') + self.fail('\n'.join(failures)) - # format: [dev_id in question, 'part_num', - # [part_nodes for the given part], left id, right id...] - expected_partners = sorted([ - (0, '0', [0, 1, 2], 2, 1), (0, '2', [2, 3, 0], 3, 2), - (1, '0', [0, 1, 2], 0, 2), (1, '1', [1, 2, 3], 3, 2), - (2, '0', [0, 1, 2], 1, 0), (2, '1', [1, 2, 3], 1, 3), - (2, '2', [2, 3, 0], 0, 3), (3, '1', [1, 2, 3], 2, 1), - (3, '2', [2, 3, 0], 2, 0), (0, '0', [0, 1, 2], 2, 1), - (0, '2', [2, 3, 0], 3, 2), (1, '0', [0, 1, 2], 0, 2), - (1, '1', [1, 2, 3], 3, 2), (2, '0', [0, 1, 2], 1, 0), - (2, '1', [1, 2, 3], 1, 3), (2, '2', [2, 3, 0], 0, 3), - (3, '1', [1, 2, 3], 2, 1), (3, '2', [2, 3, 0], 2, 0), - ]) + def test_iter_nodes_for_frag(self): + # no limit + self.reconstructor.rebuild_handoff_node_count = -1 + policy = ECStoragePolicy(1, name='test', ec_type=DEFAULT_TEST_EC_TYPE, + ec_ndata=4, ec_nparity=3) + policy.object_ring = FabricatedRing(replicas=7, devices=28) + primaries = policy.object_ring.get_part_nodes(0) - got_partners = [] - for pol in POLICIES: - obj_ring = pol.object_ring - for part_num in self.part_nums: - part_nodes = obj_ring.get_part_nodes(int(part_num)) - primary_ids = [n['id'] for n in part_nodes] - for node in part_nodes: - partners = object_reconstructor._get_partners( - node['index'], part_nodes) - left = partners[0]['id'] - right = partners[1]['id'] - got_partners.append(( - node['id'], part_num, primary_ids, left, right)) + node = primaries[0] + nodes_for_frag = list(self.reconstructor._iter_nodes_for_frag( + policy, 0, node)) + expected = [0, 0, 7, 14] + self.assertEqual(expected, [n.get('index', n.get('handoff_index')) + for n in nodes_for_frag]) + for node in nodes_for_frag: + self.assertEqual(0, node['backend_index']) - self.assertEqual(expected_partners, sorted(got_partners)) + node = primaries[3] + nodes_for_frag = list(self.reconstructor._iter_nodes_for_frag( + policy, 0, node)) + expected = [3, 3, 10, 17] + self.assertEqual(expected, [n.get('index', n.get('handoff_index')) + for n in nodes_for_frag]) + for node in nodes_for_frag: + self.assertEqual(3, node['backend_index']) + + node = primaries[-1] + nodes_for_frag = list(self.reconstructor._iter_nodes_for_frag( + policy, 0, node)) + expected = [6, 6, 13, 20] + self.assertEqual(expected, [n.get('index', n.get('handoff_index')) + for n in nodes_for_frag]) + for node in nodes_for_frag: + self.assertEqual(6, node['backend_index']) + + # default limit is 2 + self.reconstructor.rebuild_handoff_node_count = 2 + node = primaries[0] + nodes_for_frag = list(self.reconstructor._iter_nodes_for_frag( + policy, 0, node)) + expected = [0, 0, 7] + self.assertEqual(expected, [n.get('index', n.get('handoff_index')) + for n in nodes_for_frag]) + for node in nodes_for_frag: + self.assertEqual(0, node['backend_index']) + + # zero means only primaries + self.reconstructor.rebuild_handoff_node_count = 0 + node = primaries[0] + nodes_for_frag = list(self.reconstructor._iter_nodes_for_frag( + policy, 0, node)) + expected = [0] + self.assertEqual(expected, [n.get('index', n.get('handoff_index')) + for n in nodes_for_frag]) + for node in nodes_for_frag: + self.assertEqual(0, node['backend_index']) def test_collect_parts(self): self.reconstructor._reset_stats() @@ -880,6 +1024,8 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): self.assertEqual(str(log_kwargs['exc_info'][1]), 'Ow!') def test_removes_zbf(self): + # suppress unmount warning + os.mkdir(os.path.join(self.devices, 'sda5')) # After running xfs_repair, a partition directory could become a # zero-byte file. If this happens, the reconstructor should clean it # up, log something, and move on to the next partition. @@ -927,6 +1073,9 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): if e.errno != 2: raise + # suppress unmount warning + os.mkdir(os.path.join(self.devices, 'sda5')) + # since our collect_parts job is a generator, that yields directly # into build_jobs and then spawns it's safe to do the remove_files # without making reconstructor startup slow @@ -996,40 +1145,46 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): # run reconstructor with delete function mocked out to check calls ssync_calls = [] - delete_func =\ - 'swift.obj.reconstructor.ObjectReconstructor.delete_reverted_objs' with mock.patch('swift.obj.reconstructor.ssync_sender', - self._make_fake_ssync(ssync_calls)): - with mocked_http_conn(*[200] * 12, body=pickle.dumps({})): - with mock.patch(delete_func) as mock_delete: - self.reconstructor.reconstruct() - expected_calls = [] - for context in ssync_calls: - if context['job']['job_type'] == REVERT: - for dirpath, files in visit_obj_dirs(context): - # sanity check - expect some files to be in dir, - # may not be for the reverted frag index - self.assertTrue(files) - n_files += len(files) - expected_calls.append(mock.call(context['job'], - context['available_map'], - context['node']['index'])) - mock_delete.assert_has_calls(expected_calls, any_order=True) + self._make_fake_ssync(ssync_calls)), \ + mocked_http_conn(*[200] * 17, body=pickle.dumps({})), \ + mock.patch.object( + self.reconstructor, 'delete_reverted_objs') as mock_delete: + self.reconstructor.reconstruct() + expected_calls = [] + for context in ssync_calls: + if context['job']['job_type'] == REVERT: + for dirpath, files in visit_obj_dirs(context): + # sanity check - expect some files to be in dir, + # may not be for the reverted frag index + self.assertTrue(files) + n_files += len(files) + expected_calls.append(mock.call(context['job'], + context['available_map'], + context['node']['index'])) + mock_delete.assert_has_calls(expected_calls, any_order=True) + # N.B. in this next test sequence we acctually delete files after + # revert, so the on-disk hashes can change. In partition 1, if the + # revert jobs (for frag_index 0 or 1) run before the sync job + # (frag_index 4) all suffixes will get removed and the sync job won't + # have anything to ship the remote (meaning there's no post-sync + # REPLICATE call). To keep the number of mocked_http_conn responses + # predictable we force a stable job order by mocking random's shuffle. ssync_calls = [] with mock.patch('swift.obj.reconstructor.ssync_sender', - self._make_fake_ssync(ssync_calls)): - with mocked_http_conn(*[200] * 12, body=pickle.dumps({})): - self.reconstructor.reconstruct() - for context in ssync_calls: - if context['job']['job_type'] == REVERT: - data_file_tail = ('#%s.data' - % context['node']['index']) - for dirpath, files in visit_obj_dirs(context): - n_files_after += len(files) - for filename in files: - self.assertFalse( - filename.endswith(data_file_tail)) + self._make_fake_ssync(ssync_calls)), \ + mocked_http_conn(*[200] * 17, body=pickle.dumps({})), \ + mock.patch('swift.obj.reconstructor.random.shuffle'): + self.reconstructor.reconstruct() + for context in ssync_calls: + if context['job']['job_type'] == REVERT: + data_file_tail = ('#%s.data' + % context['node']['index']) + for dirpath, files in visit_obj_dirs(context): + n_files_after += len(files) + for filename in files: + self.assertFalse(filename.endswith(data_file_tail)) # sanity check that some files should were deleted self.assertGreater(n_files, n_files_after) @@ -1037,6 +1192,8 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): def test_no_delete_failed_revert(self): # test will only process revert jobs self.reconstructor.handoffs_only = True + # suppress unmount warning + os.mkdir(os.path.join(self.devices, 'sda5')) captured_ssync = [] # fail all jobs on part 2 on sda1 @@ -1092,7 +1249,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): job = context['job'] expected_suffix_calls.append( (job['sync_to'][0]['replication_ip'], '/%s/%s/%s' % ( - job['device'], job['partition'], + job['sync_to'][0]['device'], job['partition'], '-'.join(sorted(job['suffixes'])))) ) self.assertEqual(set(expected_suffix_calls), @@ -1145,16 +1302,16 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): self._make_fake_ssync(ssync_calls)): self.reconstructor.reconstruct(override_partitions=[2]) expected_repliate_calls = set([ - ('127.0.0.0', '/sda1/2/3c1'), - ('127.0.0.2', '/sda1/2/061'), + (u'10.0.0.0', '/sda4/2/3c1'), + (u'10.0.0.2', '/sda2/2/061'), ]) found_calls = set((r['ip'], r['path']) for r in request_log.requests) self.assertEqual(expected_repliate_calls, found_calls) expected_ssync_calls = sorted([ - ('127.0.0.0', REVERT, 2, ['3c1']), - ('127.0.0.2', REVERT, 2, ['061']), + (u'10.0.0.0', REVERT, 2, [u'3c1']), + (u'10.0.0.2', REVERT, 2, [u'061']), ]) self.assertEqual(expected_ssync_calls, sorted(( c['node']['ip'], @@ -1179,48 +1336,50 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): self.assertFalse(os.access(part_path, os.F_OK)) def test_process_job_all_success(self): + rehash_per_job_type = {SYNC: 2, REVERT: 1} self.reconstructor._reset_stats() with mock_ssync_sender(): - with mocked_http_conn(*[200] * 12, body=pickle.dumps({})): - found_jobs = [] - for part_info in self.reconstructor.collect_parts(): - jobs = self.reconstructor.build_reconstruction_jobs( - part_info) - found_jobs.extend(jobs) - for job in jobs: - self.logger._clear() - node_count = len(job['sync_to']) + found_jobs = [] + for part_info in self.reconstructor.collect_parts(): + jobs = self.reconstructor.build_reconstruction_jobs( + part_info) + found_jobs.extend(jobs) + for job in jobs: + self.logger._clear() + node_count = len(job['sync_to']) + rehash_count = node_count * rehash_per_job_type[ + job['job_type']] + with mocked_http_conn(*[200] * rehash_count, + body=pickle.dumps({})): self.reconstructor.process_job(job) - if job['job_type'] == object_reconstructor.REVERT: - self.assertEqual(0, count_stats( - self.logger, 'update_stats', 'suffix.hashes')) - else: - self.assertStatCount('update_stats', - 'suffix.hashes', - node_count) - self.assertEqual(node_count, count_stats( - self.logger, 'update_stats', 'suffix.hashes')) - self.assertEqual(node_count, count_stats( - self.logger, 'update_stats', 'suffix.syncs')) - self.assertNotIn('error', self.logger.all_log_lines()) + if job['job_type'] == object_reconstructor.REVERT: + self.assertStatCount('update_stats', + 'suffix.hashes', 0) + else: + self.assertStatCount('update_stats', + 'suffix.hashes', node_count) + self.assertStatCount('update_stats', + 'suffix.syncs', node_count) + self.assertNotIn('error', self.logger.all_log_lines()) self.assertEqual( - dict(collections.Counter( - (job['device'], job['partition'], job['frag_index']) - for job in found_jobs)), - {('sda1', 0, 1): 1, - ('sda1', 0, 2): 1, - ('sda1', 1, 0): 1, - ('sda1', 1, 1): 1, - ('sda1', 2, 0): 1, - ('sda1', 2, 2): 1}) - self.assertEqual(self.reconstructor.suffix_sync, 8) - self.assertEqual(self.reconstructor.suffix_count, 8) - self.assertEqual(self.reconstructor.reconstruction_count, 6) + dict(collections.Counter((job['device'], job['partition'], + job['frag_index'], job['job_type']) + for job in found_jobs)), + {('sda1', 0, 1, SYNC): 1, + ('sda1', 0, 2, REVERT): 1, + ('sda1', 1, 0, REVERT): 1, + ('sda1', 1, 1, REVERT): 1, + ('sda1', 1, 4, SYNC): 1, + ('sda1', 2, 0, REVERT): 1, + ('sda1', 2, 2, REVERT): 1}) + self.assertEqual(self.reconstructor.suffix_sync, 12) + self.assertEqual(self.reconstructor.suffix_count, 12) + self.assertEqual(self.reconstructor.reconstruction_count, 7) def test_process_job_all_insufficient_storage(self): self.reconstructor._reset_stats() with mock_ssync_sender(): - with mocked_http_conn(*[507] * 8): + with mocked_http_conn(*[507] * 15): found_jobs = [] for part_info in self.reconstructor.collect_parts(): jobs = self.reconstructor.build_reconstruction_jobs( @@ -1236,23 +1395,24 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): self.assertEqual(0, count_stats( self.logger, 'update_stats', 'suffix.syncs')) self.assertEqual( - dict(collections.Counter( - (job['device'], job['partition'], job['frag_index']) - for job in found_jobs)), - {('sda1', 0, 1): 1, - ('sda1', 0, 2): 1, - ('sda1', 1, 0): 1, - ('sda1', 1, 1): 1, - ('sda1', 2, 0): 1, - ('sda1', 2, 2): 1}) + dict(collections.Counter((job['device'], job['partition'], + job['frag_index'], job['job_type']) + for job in found_jobs)), + {('sda1', 0, 1, SYNC): 1, + ('sda1', 0, 2, REVERT): 1, + ('sda1', 1, 0, REVERT): 1, + ('sda1', 1, 1, REVERT): 1, + ('sda1', 1, 4, SYNC): 1, + ('sda1', 2, 0, REVERT): 1, + ('sda1', 2, 2, REVERT): 1}) self.assertEqual(self.reconstructor.suffix_sync, 0) self.assertEqual(self.reconstructor.suffix_count, 0) - self.assertEqual(self.reconstructor.reconstruction_count, 6) + self.assertEqual(self.reconstructor.reconstruction_count, 7) def test_process_job_all_client_error(self): self.reconstructor._reset_stats() with mock_ssync_sender(): - with mocked_http_conn(*[400] * 8): + with mocked_http_conn(*[400] * 11): found_jobs = [] for part_info in self.reconstructor.collect_parts(): jobs = self.reconstructor.build_reconstruction_jobs( @@ -1275,15 +1435,16 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): ('sda1', 0, 2): 1, ('sda1', 1, 0): 1, ('sda1', 1, 1): 1, + ('sda1', 1, 4): 1, ('sda1', 2, 0): 1, ('sda1', 2, 2): 1}) self.assertEqual(self.reconstructor.suffix_sync, 0) self.assertEqual(self.reconstructor.suffix_count, 0) - self.assertEqual(self.reconstructor.reconstruction_count, 6) + self.assertEqual(self.reconstructor.reconstruction_count, 7) def test_process_job_all_timeout(self): self.reconstructor._reset_stats() - with mock_ssync_sender(), mocked_http_conn(*[Timeout()] * 8): + with mock_ssync_sender(), mocked_http_conn(*[Timeout()] * 11): found_jobs = [] for part_info in self.reconstructor.collect_parts(): jobs = self.reconstructor.build_reconstruction_jobs( @@ -1306,11 +1467,12 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): ('sda1', 0, 2): 1, ('sda1', 1, 0): 1, ('sda1', 1, 1): 1, + ('sda1', 1, 4): 1, ('sda1', 2, 0): 1, ('sda1', 2, 2): 1}) self.assertEqual(self.reconstructor.suffix_sync, 0) self.assertEqual(self.reconstructor.suffix_count, 0) - self.assertEqual(self.reconstructor.reconstruction_count, 6) + self.assertEqual(self.reconstructor.reconstruction_count, 7) def test_reconstructor_skipped_partpower_increase(self): self.reconstructor._reset_stats() @@ -3133,7 +3295,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): self.assertEqual(job['job_type'], object_reconstructor.SYNC) self.assertEqual(job['frag_index'], 0) self.assertEqual(job['suffixes'], []) - self.assertEqual(len(job['sync_to']), 2) + self.assertEqual(len(job['sync_to']), 3) self.assertEqual(job['partition'], 0) self.assertEqual(job['path'], part_path) self.assertEqual(job['hashes'], {}) @@ -3165,7 +3327,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): self.assertEqual(job['job_type'], object_reconstructor.SYNC) self.assertEqual(job['frag_index'], 0) self.assertEqual(job['suffixes'], []) - self.assertEqual(len(job['sync_to']), 2) + self.assertEqual(len(job['sync_to']), 3) self.assertEqual(job['partition'], 0) self.assertEqual(job['path'], part_path) self.assertEqual(job['hashes'], {}) @@ -3210,7 +3372,9 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): self.assertEqual(job['suffixes'], stub_hashes.keys()) self.assertEqual(set([n['index'] for n in job['sync_to']]), set([(frag_index + 1) % ring.replicas, - (frag_index - 1) % ring.replicas])) + (frag_index - 1) % ring.replicas, + (frag_index + int(0.5 * ring.replicas)), + ])) self.assertEqual(job['partition'], partition) self.assertEqual(job['path'], part_path) self.assertEqual(job['hashes'], stub_hashes) @@ -3320,10 +3484,12 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): job = sync_jobs[0] self.assertEqual(job['frag_index'], frag_index) self.assertEqual(sorted(job['suffixes']), sorted(['123', 'abc'])) - self.assertEqual(len(job['sync_to']), 2) + self.assertEqual(len(job['sync_to']), 3) self.assertEqual(set([n['index'] for n in job['sync_to']]), set([(frag_index + 1) % ring.replicas, - (frag_index - 1) % ring.replicas])) + (frag_index - 1) % ring.replicas, + (frag_index + int(0.5 * ring.replicas)), + ])) self.assertEqual(1, len(revert_jobs)) job = revert_jobs[0] self.assertEqual(job['frag_index'], other_frag_index) @@ -3431,10 +3597,13 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): self.reconstructor.headers['X-Backend-Storage-Policy-Index'], int(job['policy'])) with mocked_http_conn(200, body=pickle.dumps({})) as request_log: - self.reconstructor._get_suffixes_to_sync(job, node) + suffixes, new_node = self.reconstructor._get_suffixes_to_sync( + job, node) self.assertEqual([int(job['policy'])], [ r['headers']['X-Backend-Storage-Policy-Index'] for r in request_log.requests]) + self.assertEqual(suffixes, []) + self.assertEqual(new_node, node) def test_get_suffixes_in_sync(self): part_path = os.path.join(self.devices, self.local_dev['device'], @@ -3464,10 +3633,12 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): with mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes', return_value=(None, local_hashes)), \ mocked_http_conn(200, body=remote_response) as request_log: - suffixes = self.reconstructor._get_suffixes_to_sync(job, node) + suffixes, new_node = self.reconstructor._get_suffixes_to_sync( + job, node) self.assertEqual([node['replication_ip']], [r['ip'] for r in request_log.requests]) self.assertEqual(suffixes, []) + self.assertEqual(new_node, node) def test_get_suffix_delta(self): # different @@ -3513,7 +3684,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): frag_index = self.policy.get_backend_index(local_dev['index']) sync_to = object_reconstructor._get_partners( local_dev['index'], part_nodes) - # setup left and right hashes + # setup left, right and far hashes stub_hashes = { '123': {frag_index: 'hash', None: 'hash'}, 'abc': {frag_index: 'hash', None: 'hash'}, @@ -3528,6 +3699,11 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): '123': {right_frag_index: 'hash', None: 'hash'}, 'abc': {right_frag_index: 'hash', None: 'hash'}, } + far_index = self.policy.get_backend_index(sync_to[2]['index']) + far_hashes = { + '123': {far_index: 'hash', None: 'hash'}, + 'abc': {far_index: 'hash', None: 'hash'}, + } partition = 0 part_path = os.path.join(self.devices, self.local_dev['device'], diskfile.get_data_dir(self.policy), @@ -3545,7 +3721,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): } responses = [(200, pickle.dumps(hashes)) for hashes in ( - left_hashes, right_hashes)] + left_hashes, right_hashes, far_hashes)] codes, body_iter = zip(*responses) ssync_calls = [] @@ -3556,13 +3732,13 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): mocked_http_conn(*codes, body_iter=body_iter) as request_log: self.reconstructor.process_job(job) - expected_suffix_calls = set([ + expected_suffix_calls = [ (sync_to[0]['ip'], '/%s/0' % sync_to[0]['device']), (sync_to[1]['ip'], '/%s/0' % sync_to[1]['device']), - ]) + (sync_to[2]['ip'], '/%s/0' % sync_to[2]['device']), + ] self.assertEqual(expected_suffix_calls, - set((r['ip'], r['path']) - for r in request_log.requests)) + [(r['ip'], r['path']) for r in request_log.requests]) self.assertFalse(ssync_calls) @@ -3580,6 +3756,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): } left_hashes = {} right_hashes = {} + far_hashes = {} partition = 0 part_path = os.path.join(self.devices, self.local_dev['device'], @@ -3597,8 +3774,9 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): 'local_dev': self.local_dev, } - responses = [(200, pickle.dumps(hashes)) for hashes in ( - left_hashes, left_hashes, right_hashes, right_hashes)] + responses = [] + for hashes in (left_hashes, right_hashes, far_hashes): + responses.extend([(200, pickle.dumps(hashes))] * 2) codes, body_iter = zip(*responses) ssync_calls = [] @@ -3608,19 +3786,21 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): mocked_http_conn(*codes, body_iter=body_iter) as request_log: self.reconstructor.process_job(job) - expected_suffix_calls = set([ + expected_suffix_calls = [ (sync_to[0]['ip'], '/%s/0' % sync_to[0]['device']), (sync_to[0]['ip'], '/%s/0/123-abc' % sync_to[0]['device']), (sync_to[1]['ip'], '/%s/0' % sync_to[1]['device']), (sync_to[1]['ip'], '/%s/0/123-abc' % sync_to[1]['device']), - ]) + (sync_to[2]['ip'], '/%s/0' % sync_to[2]['device']), + (sync_to[2]['ip'], '/%s/0/123-abc' % sync_to[2]['device']), + ] self.assertEqual(expected_suffix_calls, - set((r['ip'], r['path']) - for r in request_log.requests)) + [(r['ip'], r['path']) for r in request_log.requests]) expected_ssync_calls = sorted([ (sync_to[0]['ip'], 0, set(['123', 'abc'])), (sync_to[1]['ip'], 0, set(['123', 'abc'])), + (sync_to[2]['ip'], 0, set(['123', 'abc'])), ]) self.assertEqual(expected_ssync_calls, sorted(( c['node']['ip'], @@ -3656,6 +3836,11 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): '123': {right_frag_index: 'hash', None: 'hash'}, 'abc': {right_frag_index: 'hashX', None: 'hash'}, } + far_index = self.policy.get_backend_index(sync_to[2]['index']) + far_hashes = { + '123': {far_index: 'hash', None: 'hash'}, + 'abc': {far_index: 'hash', None: 'hash'}, + } job = { 'job_type': object_reconstructor.SYNC, @@ -3674,6 +3859,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): (200, pickle.dumps(left_hashes)), (200, pickle.dumps(right_hashes)), (200, pickle.dumps(right_hashes)), + (200, pickle.dumps(far_hashes)), ] codes, body_iter = zip(*responses) @@ -3739,6 +3925,12 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): 'abc': {right_frag_index: 'hash', None: 'different-because-durable'}, } + # far side is in sync + far_index = self.policy.get_backend_index(sync_to[2]['index']) + far_hashes = { + '123': {far_index: 'hash', None: 'hash'}, + 'abc': {far_index: 'hash', None: 'hash'}, + } part_path = os.path.join(self.devices, self.local_dev['device'], diskfile.get_data_dir(self.policy), @@ -3756,7 +3948,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): } responses = [(200, pickle.dumps(hashes)) for hashes in ( - left_hashes, right_hashes, right_hashes)] + left_hashes, right_hashes, right_hashes, far_hashes)] codes, body_iter = zip(*responses) ssync_calls = [] @@ -3770,6 +3962,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): (sync_to[0]['ip'], '/%s/0' % sync_to[0]['device']), (sync_to[1]['ip'], '/%s/0' % sync_to[1]['device']), (sync_to[1]['ip'], '/%s/0/abc' % sync_to[1]['device']), + (sync_to[2]['ip'], '/%s/0' % sync_to[2]['device']), ]) self.assertEqual(expected_suffix_calls, set((r['ip'], r['path']) @@ -3805,6 +3998,10 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): right_hashes = { '123': {right_frag_index: 'hash', None: 'hash'}, } + far_index = self.policy.get_backend_index(sync_to[2]['index']) + far_hashes = { + 'abc': {far_index: 'hashX', None: 'hash'}, + } part_path = os.path.join(self.devices, self.local_dev['device'], diskfile.get_data_dir(self.policy), str(partition)) @@ -3820,8 +4017,9 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): 'local_dev': self.local_dev, } - responses = [(200, pickle.dumps(hashes)) for hashes in ( - left_hashes, left_hashes, right_hashes, right_hashes)] + responses = [] + for hashes in (left_hashes, right_hashes, far_hashes): + responses.extend([(200, pickle.dumps(hashes))] * 2) codes, body_iter = zip(*responses) ssync_calls = [] @@ -3837,6 +4035,8 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): (sync_to[0]['ip'], '/%s/0/123' % sync_to[0]['device']), (sync_to[1]['ip'], '/%s/0' % sync_to[1]['device']), (sync_to[1]['ip'], '/%s/0/abc' % sync_to[1]['device']), + (sync_to[2]['ip'], '/%s/0' % sync_to[2]['device']), + (sync_to[2]['ip'], '/%s/0/123-abc' % sync_to[2]['device']), ]) self.assertEqual(expected_suffix_calls, set((r['ip'], r['path']) @@ -3844,10 +4044,12 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): self.assertEqual( dict(collections.Counter( - (c['node']['index'], tuple(c['suffixes'])) + (c['node']['index'], tuple(sorted(c['suffixes']))) for c in ssync_calls)), - {(sync_to[0]['index'], ('123', )): 1, - (sync_to[1]['index'], ('abc', )): 1}) + {(sync_to[0]['index'], ('123',)): 1, + (sync_to[1]['index'], ('abc',)): 1, + (sync_to[2]['index'], ('123', 'abc')): 1, + }) def test_process_job_primary_down(self): partition = 0 @@ -3859,7 +4061,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): } part_nodes = self.policy.object_ring.get_part_nodes(partition) - sync_to = part_nodes[:2] + sync_to = part_nodes[:3] part_path = os.path.join(self.devices, self.local_dev['device'], diskfile.get_data_dir(self.policy), @@ -3948,9 +4150,9 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): expected_suffix_calls = set(( node['replication_ip'], '/%s/0' % node['device'] - ) for node in part_nodes) + ) for node in sync_to) - possible_errors = [404, 507, Timeout(), Exception('kaboom!')] + possible_errors = [404, Timeout(), Exception('kaboom!')] codes = [random.choice(possible_errors) for r in expected_suffix_calls] @@ -3967,6 +4169,86 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): self.assertFalse(ssync_calls) + def test_process_job_sync_partner_unmounted(self): + partition = 0 + part_nodes = self.policy.object_ring.get_part_nodes(partition) + frag_index = [n['id'] for n in part_nodes].index(self.local_dev['id']) + sync_to = object_reconstructor._get_partners(frag_index, part_nodes) + self.assertEqual(3, len(sync_to)) + stub_hashes = { + '123': {frag_index: 'hash', None: 'hash'}, + 'abc': {frag_index: 'hash', None: 'hash'}, + } + # left partner out of sync + left_frag_index = self.policy.get_backend_index(sync_to[0]['index']) + left_hashes = { + '123': {left_frag_index: 'not-in-sync-hash', None: 'hash'}, + 'abc': {left_frag_index: 'hash', None: 'hash'}, + } + # we don't need right partner hashes + # far partner in sync + far_index = self.policy.get_backend_index(sync_to[2]['index']) + far_hashes = { + '123': {far_index: 'hash', None: 'hash'}, + 'abc': {far_index: 'hash', None: 'hash'}, + } + part_path = os.path.join(self.devices, self.local_dev['device'], + diskfile.get_data_dir(self.policy), + str(partition)) + job = { + 'job_type': object_reconstructor.SYNC, + 'frag_index': frag_index, + 'suffixes': stub_hashes.keys(), + 'sync_to': sync_to, + 'partition': partition, + 'path': part_path, + 'hashes': stub_hashes, + 'policy': self.policy, + 'device': self.local_dev['device'], + 'local_dev': self.local_dev, + } + + responses = [ + (200, pickle.dumps(left_hashes)), # hashes left partner + (200, pickle.dumps(left_hashes)), # hashes post-sync + (507, ''), # unmounted right partner + (200, pickle.dumps({})), # hashes handoff + (200, ''), # hashes post-sync + (200, pickle.dumps(far_hashes)), # hashes far partner + ] + codes, body_iter = zip(*responses) + + ssync_calls = [] + with mock_ssync_sender(ssync_calls), \ + mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes', + return_value=(None, stub_hashes)), \ + mocked_http_conn(*codes, body_iter=body_iter) as request_log: + self.reconstructor.process_job(job) + # increment frag_index since we're rebuilding to our right + frag_index = (frag_index + 1) % self.policy.ec_n_unique_fragments + handoffs = self.policy.object_ring.get_more_nodes(partition) + for i, handoff in enumerate(handoffs): + if i == frag_index: + break + else: + self.fail('Unable to find handoff?!') + expected = collections.Counter([ + (200, sync_to[0]['ip']), + (200, sync_to[0]['ip']), + (507, sync_to[1]['ip']), + (200, handoff['ip']), + (200, handoff['ip']), + (200, sync_to[2]['ip']), + ]) + self.assertEqual(expected, collections.Counter( + [(c, r['ip']) for c, r in zip(codes, request_log.requests)])) + expected = collections.Counter([ + sync_to[0]['ip'], + handoff['ip'], + ]) + self.assertEqual(expected, collections.Counter( + [c['node']['ip'] for c in ssync_calls])) + def test_process_job_handoff(self): frag_index = random.randint( 0, self.policy.ec_n_unique_fragments - 1) @@ -5092,6 +5374,50 @@ class TestObjectReconstructorECDuplicationFactor(TestObjectReconstructor): for index in range(28): self._test_reconstruct_with_duplicate_frags_no_errors(index) + def test_iter_nodes_for_frag(self): + self.reconstructor.rebuild_handoff_node_count = -1 + policy = ECStoragePolicy(1, name='test', ec_type=DEFAULT_TEST_EC_TYPE, + ec_ndata=4, ec_nparity=3, + ec_duplication_factor=2) + policy.object_ring = FabricatedRing(replicas=14, devices=42) + primaries = policy.object_ring.get_part_nodes(0) + + node = primaries[0] + nodes_for_frag = list(self.reconstructor._iter_nodes_for_frag( + policy, 0, node)) + expected = [0, 0, 7, 14, 21] + self.assertEqual(expected, [n.get('index', n.get('handoff_index')) + for n in nodes_for_frag]) + for node in nodes_for_frag: + self.assertEqual(0, node['backend_index']) + + node = primaries[3] + nodes_for_frag = list(self.reconstructor._iter_nodes_for_frag( + policy, 0, node)) + expected = [3, 3, 10, 17, 24] + self.assertEqual(expected, [n.get('index', n.get('handoff_index')) + for n in nodes_for_frag]) + for node in nodes_for_frag: + self.assertEqual(3, node['backend_index']) + + node = primaries[7] + nodes_for_frag = list(self.reconstructor._iter_nodes_for_frag( + policy, 0, node)) + expected = [7, 0, 7, 14, 21] + self.assertEqual(expected, [n.get('index', n.get('handoff_index')) + for n in nodes_for_frag]) + for node in nodes_for_frag: + self.assertEqual(0, node['backend_index']) + + node = primaries[-1] + nodes_for_frag = list(self.reconstructor._iter_nodes_for_frag( + policy, 0, node)) + expected = [13, 6, 13, 20, 27] + self.assertEqual(expected, [n.get('index', n.get('handoff_index')) + for n in nodes_for_frag]) + for node in nodes_for_frag: + self.assertEqual(6, node['backend_index']) + if __name__ == '__main__': unittest.main()