From b09360d447447845898c9517742613ffecbe01c4 Mon Sep 17 00:00:00 2001 From: Kota Tsuyuzaki Date: Sun, 30 Oct 2016 22:24:18 -0700 Subject: [PATCH] Fix stats calculation in object-reconstructor This patch fixes the object-reconstructor to calculate device_count as the total number of local devices in all policies. Previously Swift counts it for each policy but reconstruction_device_count which means the number of devices actually swift needs to reconstruct is counted as sum of ones for all polices. With this patch, Swift will gather all local devices for all policies at first, and then, collect parts for each devices as well as current. To do so, we can see the statuses for remaining job/disks percentage via stats_line output. To enable this change, this patch also touchs the object replicator to get a DiskFileManager via the DiskFileRouter class so that DiskFileManager instances are policy specific. Currently the same replication policy DiskFileManager class is always used, but this change future proofs the replicator for possible other DiskFileManager implementations. The change also gives the ObjectReplicator a _df_router variable, making it consistent with the ObjectReconstructor, and allowing a common way for ssync.Sender to access DiskFileManager instances via it's daemon's _df_router instance. Also, remove the use of FakeReplicator from the ssync test suite. It was not necessary and risked masking divergence between ssync and the replicator and reconstructor daemon implementations. Co-Author: Alistair Coles Closes-Bug: #1488608 Change-Id: Ic7a4c932b59158d21a5fb4de9ed3ed57f249d068 --- swift/obj/reconstructor.py | 29 +++--- swift/obj/replicator.py | 9 +- swift/obj/ssync_sender.py | 2 +- test/unit/obj/common.py | 34 +++---- test/unit/obj/test_reconstructor.py | 2 +- test/unit/obj/test_replicator.py | 135 +++++++++++++++------------- test/unit/obj/test_ssync.py | 39 ++++---- test/unit/obj/test_ssync_sender.py | 40 ++++----- 8 files changed, 149 insertions(+), 141 deletions(-) diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py index 3758906ef4..007652a7ab 100644 --- a/swift/obj/reconstructor.py +++ b/swift/obj/reconstructor.py @@ -800,12 +800,13 @@ class ObjectReconstructor(Daemon): override_devices = override_devices or [] override_partitions = override_partitions or [] ips = whataremyips(self.bind_ip) - for policy in POLICIES: - if policy.policy_type != EC_POLICY: - continue - self._diskfile_mgr = self._df_router[policy] + ec_policies = (policy for policy in POLICIES + if policy.policy_type == EC_POLICY) + + policy2devices = {} + + for policy in ec_policies: self.load_object_ring(policy) - data_dir = get_data_dir(policy) local_devices = list(six.moves.filter( lambda dev: dev and is_local_device( ips, self.port, @@ -813,21 +814,23 @@ class ObjectReconstructor(Daemon): policy.object_ring.devs)) if override_devices: - self.device_count = len(override_devices) - else: - self.device_count = len(local_devices) + local_devices = list(six.moves.filter( + lambda dev_info: dev_info['device'] in override_devices, + local_devices)) + policy2devices[policy] = local_devices + self.device_count += len(local_devices) + + for policy, local_devices in policy2devices.items(): + df_mgr = self._df_router[policy] for local_dev in local_devices: - if override_devices and (local_dev['device'] not in - override_devices): - continue self.reconstruction_device_count += 1 - dev_path = self._df_router[policy].get_dev_path( - local_dev['device']) + dev_path = df_mgr.get_dev_path(local_dev['device']) if not dev_path: self.logger.warning(_('%s is not mounted'), local_dev['device']) continue + data_dir = get_data_dir(policy) obj_path = join(dev_path, data_dir) tmp_path = join(dev_path, get_tmp_dir(int(policy))) unlink_older_than(tmp_path, time.time() - diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py index 0aee76bfeb..8c72c3359c 100644 --- a/swift/obj/replicator.py +++ b/swift/obj/replicator.py @@ -38,7 +38,7 @@ from swift.common.bufferedhttp import http_connect from swift.common.daemon import Daemon from swift.common.http import HTTP_OK, HTTP_INSUFFICIENT_STORAGE from swift.obj import ssync_sender -from swift.obj.diskfile import DiskFileManager, get_data_dir, get_tmp_dir +from swift.obj.diskfile import get_data_dir, get_tmp_dir, DiskFileRouter from swift.common.storage_policy import POLICIES, REPL_POLICY DEFAULT_RSYNC_TIMEOUT = 900 @@ -121,7 +121,7 @@ class ObjectReplicator(Daemon): 'operation, please disable handoffs_first and ' 'handoff_delete before the next ' 'normal rebalance') - self._diskfile_mgr = DiskFileManager(conf, self.logger) + self._df_router = DiskFileRouter(conf, self.logger) def _zero_stats(self): """Zero out the stats.""" @@ -406,9 +406,10 @@ class ObjectReplicator(Daemon): target_devs_info = set() failure_devs_info = set() begin = time.time() + df_mgr = self._df_router[job['policy']] try: hashed, local_hash = tpool_reraise( - self._diskfile_mgr._get_hashes, job['path'], + df_mgr._get_hashes, job['path'], do_listdir=_do_listdir( int(job['partition']), self.replication_cycle), @@ -462,7 +463,7 @@ class ObjectReplicator(Daemon): self.stats['hashmatch'] += 1 continue hashed, recalc_hash = tpool_reraise( - self._diskfile_mgr._get_hashes, + df_mgr._get_hashes, job['path'], recalculate=suffixes, reclaim_age=self.reclaim_age) self.logger.update_stats('suffix.hashes', hashed) diff --git a/swift/obj/ssync_sender.py b/swift/obj/ssync_sender.py index 692dd45c5b..5bf7f153d9 100644 --- a/swift/obj/ssync_sender.py +++ b/swift/obj/ssync_sender.py @@ -80,7 +80,7 @@ class Sender(object): def __init__(self, daemon, node, job, suffixes, remote_check_objs=None): self.daemon = daemon - self.df_mgr = self.daemon._diskfile_mgr + self.df_mgr = self.daemon._df_router[job['policy']] self.node = node self.job = job self.suffixes = suffixes diff --git a/test/unit/obj/common.py b/test/unit/obj/common.py index 48d91f1003..3fc3dc2be7 100644 --- a/test/unit/obj/common.py +++ b/test/unit/obj/common.py @@ -19,28 +19,9 @@ import tempfile import unittest import time +from swift.common import utils from swift.common.storage_policy import POLICIES from swift.common.utils import Timestamp -from swift.obj import diskfile - -from test.unit import debug_logger - - -class FakeReplicator(object): - def __init__(self, testdir, policy=None): - self.logger = debug_logger('test-ssync-sender') - self.conn_timeout = 1 - self.node_timeout = 2 - self.http_timeout = 3 - self.network_chunk_size = 65536 - self.disk_chunk_size = 4096 - conf = { - 'devices': testdir, - 'mount_check': 'false', - } - policy = POLICIES.default if policy is None else policy - self._diskfile_router = diskfile.DiskFileRouter(conf, self.logger) - self._diskfile_mgr = self._diskfile_router[policy] def write_diskfile(df, timestamp, data='test data', frag_index=None, @@ -74,9 +55,18 @@ def write_diskfile(df, timestamp, data='test data', frag_index=None, class BaseTest(unittest.TestCase): def setUp(self): + self.device = 'dev' + self.partition = '9' + self.tmpdir = tempfile.mkdtemp() + # sender side setup + self.tx_testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_sender') + utils.mkdirs(os.path.join(self.tx_testdir, self.device)) + self.daemon_conf = { + 'devices': self.tx_testdir, + 'mount_check': 'false', + } # daemon will be set in subclass setUp self.daemon = None - self.tmpdir = tempfile.mkdtemp() def tearDown(self): shutil.rmtree(self.tmpdir, ignore_errors=True) @@ -90,7 +80,7 @@ class BaseTest(unittest.TestCase): object_parts = account, container, obj timestamp = Timestamp(time.time()) if timestamp is None else timestamp if df_mgr is None: - df_mgr = self.daemon._diskfile_router[policy] + df_mgr = self.daemon._df_router[policy] df = df_mgr.get_diskfile( device, partition, *object_parts, policy=policy, frag_index=frag_index) diff --git a/test/unit/obj/test_reconstructor.py b/test/unit/obj/test_reconstructor.py index cec08ce506..353e7c1c7a 100644 --- a/test/unit/obj/test_reconstructor.py +++ b/test/unit/obj/test_reconstructor.py @@ -862,7 +862,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): self.suffixes = suffixes self.daemon = daemon self.job = job - hash_gen = self.daemon._diskfile_mgr.yield_hashes( + hash_gen = self.daemon._df_router[job['policy']].yield_hashes( self.job['device'], self.job['partition'], self.job['policy'], self.suffixes, frag_index=self.job.get('frag_index')) diff --git a/test/unit/obj/test_replicator.py b/test/unit/obj/test_replicator.py index c863e45a93..2f6cef18c2 100644 --- a/test/unit/obj/test_replicator.py +++ b/test/unit/obj/test_replicator.py @@ -12,7 +12,7 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. - +import collections import unittest import os import mock @@ -26,10 +26,10 @@ from collections import defaultdict from errno import ENOENT, ENOTEMPTY, ENOTDIR from eventlet.green import subprocess -from eventlet import Timeout, tpool +from eventlet import Timeout from test.unit import (debug_logger, patch_policies, make_timestamp_iter, - mocked_http_conn) + mocked_http_conn, FakeLogger) from swift.common import utils from swift.common.utils import (hash_path, mkdirs, normalize_timestamp, storage_directory) @@ -1623,68 +1623,80 @@ class TestObjectReplicator(unittest.TestCase): object_replicator.http_connect = was_connector def test_run_once_recover_from_timeout(self): + # verify that replicator will pass over all policies' partitions even + # if a timeout occurs while replicating one partition to one node. + timeouts = [Timeout()] + + def fake_get_hashes(df_mgr, part_path, **kwargs): + self.get_hash_count += 1 + # Simulate a REPLICATE timeout by raising Timeout for second call + # to get_hashes (with recalculate suffixes) for a specific + # partition + if (timeouts and '/objects/' in part_path and + part_path.endswith('0') and 'recalculate' in kwargs): + raise timeouts.pop(0) + return 1, {'abc': 'def'} + + # map partition_path -> [nodes] + sync_paths = collections.defaultdict(list) + + def fake_sync(node, job, suffixes, *args, **kwargs): + sync_paths[job['path']].append(node) + return True, {} + conf = dict(swift_dir=self.testdir, devices=self.devices, - bind_ips=_ips()[0], + bind_ip=_ips()[0], # local dev has id=0 mount_check='false', timeout='300', stats_interval='1') - replicator = object_replicator.ObjectReplicator(conf) - was_connector = object_replicator.http_connect - was_get_hashes = object_replicator.DiskFileManager._get_hashes - was_execute = tpool.execute - self.get_hash_count = 0 - try: + with mock.patch('swift.obj.diskfile.DiskFileManager._get_hashes', + fake_get_hashes): + with mock.patch('swift.obj.replicator.http_connect', + mock_http_connect(200)): + with mock.patch('swift.obj.replicator.dump_recon_cache'): + replicator = object_replicator.ObjectReplicator( + conf, logger=FakeLogger()) - def fake_get_hashes(*args, **kwargs): - self.get_hash_count += 1 - if self.get_hash_count == 3: - # raise timeout on last call to get hashes - raise Timeout() - return 2, {'abc': 'def'} + self.get_hash_count = 0 + with mock.patch.object(replicator, 'sync', fake_sync): + replicator.run_once() - def fake_exc(tester, *args, **kwargs): - if 'Error syncing partition timeout' in args[0]: - tester.i_failed = True + log_lines = replicator.logger.get_lines_for_level('error') + self.assertIn("Error syncing with node:", log_lines[0]) + self.assertFalse(log_lines[1:]) + # setup creates 4 partitions; partition 1 does not map to local dev id + # 0 so will be handled by update_delete(); partitions 0, 2, 3 are + # handled by update() for each of two policies, so expect 6 paths to be + # sync'd + self.assertEqual(6, len(sync_paths)) + # partition 3 has 2 nodes in remote region, only first node is sync'd. + # partition 0 in policy 0 has fake_get_hashes timeout before first + # sync, so only second node is sync'd. + # other partitions are sync'd to 2 nodes in same region. + expected_node_count = { # map path_end -> expected sync node count + '/objects/0': 1, + '/objects/1': 2, + '/objects/2': 2, + '/objects/3': 1, + '/objects-1/0': 2, + '/objects-1/1': 2, + '/objects-1/2': 2, + '/objects-1/3': 1 + } + for path, nodes in sync_paths.items(): + path_end = path[path.index('/objects'):] + self.assertEqual(expected_node_count[path_end], len(nodes), + 'Expected %s but got %s for path %s' % + (expected_node_count[path_end], len(nodes), path)) + # partitions 0 and 2 attempt 3 calls each per policy to get_hashes = 12 + # partitions 3 attempts 2 calls per policy to get_hashes = 4 + # partitions 1 dosn't get_hashes because of update_deleted + self.assertEqual(16, self.get_hash_count) - self.i_failed = False - object_replicator.http_connect = mock_http_connect(200) - object_replicator.DiskFileManager._get_hashes = fake_get_hashes - replicator.logger.exception = \ - lambda *args, **kwargs: fake_exc(self, *args, **kwargs) - # Write some files into '1' and run replicate- they should be moved - # to the other partitions and then node should get deleted. - cur_part = '1' - df = self.df_mgr.get_diskfile('sda', cur_part, 'a', 'c', 'o', - policy=POLICIES.legacy) - mkdirs(df._datadir) - f = open(os.path.join(df._datadir, - normalize_timestamp(time.time()) + '.data'), - 'wb') - f.write('1234567890') - f.close() - ohash = hash_path('a', 'c', 'o') - data_dir = ohash[-3:] - whole_path_from = os.path.join(self.objects, cur_part, data_dir) - process_arg_checker = [] - ring = replicator.load_object_ring(POLICIES[0]) - nodes = [node for node in - ring.get_part_nodes(int(cur_part)) - if node['ip'] not in _ips()] - - for node in nodes: - rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], - cur_part) - process_arg_checker.append( - (0, '', ['rsync', whole_path_from, rsync_mod])) - self.assertTrue(os.access(os.path.join(self.objects, - '1', data_dir, ohash), - os.F_OK)) - with _mock_process(process_arg_checker): - replicator.run_once() - self.assertFalse(process_errors) - self.assertFalse(self.i_failed) - finally: - object_replicator.http_connect = was_connector - object_replicator.DiskFileManager._get_hashes = was_get_hashes - tpool.execute = was_execute + # attempt to 16 times but succeeded only 15 times due to Timeout + suffix_hashes = sum( + count for (metric, count), _junk in + replicator.logger.log_dict['update_stats'] + if metric == 'suffix.hashes') + self.assertEqual(15, suffix_hashes) def test_run(self): with _mock_process([(0, '')] * 100): @@ -1737,7 +1749,8 @@ class TestObjectReplicator(unittest.TestCase): do_listdir_results = [False, False, True, False, True, False] mock_do_listdir.side_effect = do_listdir_results expected_tpool_calls = [ - mock.call(self.replicator._diskfile_mgr._get_hashes, job['path'], + mock.call(self.replicator._df_router[job['policy']]._get_hashes, + job['path'], do_listdir=do_listdir, reclaim_age=self.replicator.reclaim_age) for job, do_listdir in zip(jobs, do_listdir_results) diff --git a/test/unit/obj/test_ssync.py b/test/unit/obj/test_ssync.py index d933bc2f04..41a2a8872d 100644 --- a/test/unit/obj/test_ssync.py +++ b/test/unit/obj/test_ssync.py @@ -31,9 +31,10 @@ from swift.common.utils import Timestamp from swift.obj import ssync_sender, server from swift.obj.reconstructor import RebuildingECDiskFileStream, \ ObjectReconstructor +from swift.obj.replicator import ObjectReplicator from test.unit import patch_policies, debug_logger, encode_frag_archive_bodies -from test.unit.obj.common import BaseTest, FakeReplicator +from test.unit.obj.common import BaseTest class TestBaseSsync(BaseTest): @@ -46,13 +47,6 @@ class TestBaseSsync(BaseTest): """ def setUp(self): super(TestBaseSsync, self).setUp() - self.device = 'dev' - self.partition = '9' - # sender side setup - self.tx_testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_sender') - utils.mkdirs(os.path.join(self.tx_testdir, self.device)) - self.daemon = FakeReplicator(self.tx_testdir) - # rx side setup self.rx_testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_receiver') utils.mkdirs(os.path.join(self.rx_testdir, self.device)) @@ -142,7 +136,7 @@ class TestBaseSsync(BaseTest): return diskfiles def _open_tx_diskfile(self, obj_name, policy, frag_index=None): - df_mgr = self.daemon._diskfile_router[policy] + df_mgr = self.daemon._df_router[policy] df = df_mgr.get_diskfile( self.device, self.partition, account='a', container='c', obj=obj_name, policy=policy, frag_index=frag_index) @@ -310,6 +304,8 @@ class TestBaseSsyncEC(TestBaseSsync): def setUp(self): super(TestBaseSsyncEC, self).setUp() self.policy = POLICIES.default + self.daemon = ObjectReconstructor(self.daemon_conf, + debug_logger('test-ssync-sender')) def _get_object_data(self, path, frag_index=None, **kwargs): # return a frag archive for given object name and frag index. @@ -337,7 +333,7 @@ class TestSsyncEC(TestBaseSsyncEC): tx_objs = {} rx_objs = {} tx_tombstones = {} - tx_df_mgr = self.daemon._diskfile_router[policy] + tx_df_mgr = self.daemon._df_router[policy] rx_df_mgr = self.rx_controller._diskfile_router[policy] # o1 has primary and handoff fragment archives t1 = next(self.ts_iter) @@ -421,7 +417,7 @@ class TestSsyncEC(TestBaseSsyncEC): # create sender side diskfiles... tx_objs = {} rx_objs = {} - tx_df_mgr = self.daemon._diskfile_router[policy] + tx_df_mgr = self.daemon._df_router[policy] rx_df_mgr = self.rx_controller._diskfile_router[policy] expected_subreqs = defaultdict(list) @@ -531,7 +527,7 @@ class TestSsyncEC(TestBaseSsyncEC): tx_objs = {} tx_tombstones = {} rx_objs = {} - tx_df_mgr = self.daemon._diskfile_router[policy] + tx_df_mgr = self.daemon._df_router[policy] rx_df_mgr = self.rx_controller._diskfile_router[policy] # o1 only has primary t1 = next(self.ts_iter) @@ -631,7 +627,7 @@ class TestSsyncEC(TestBaseSsyncEC): def test_send_with_frag_index_none(self): policy = POLICIES.default - tx_df_mgr = self.daemon._diskfile_router[policy] + tx_df_mgr = self.daemon._df_router[policy] rx_df_mgr = self.rx_controller._diskfile_router[policy] # create an ec fragment on the remote node ts1 = next(self.ts_iter) @@ -692,7 +688,7 @@ class TestSsyncEC(TestBaseSsyncEC): # create non durable tx obj by not committing, then create a legacy # .durable file tx_objs = {} - tx_df_mgr = self.daemon._diskfile_router[policy] + tx_df_mgr = self.daemon._df_router[policy] rx_df_mgr = self.rx_controller._diskfile_router[policy] t1 = next(self.ts_iter) tx_objs['o1'] = self._create_ondisk_files( @@ -791,7 +787,7 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC): # create sender side diskfiles... self.tx_objs = {} - tx_df_mgr = self.daemon._diskfile_router[self.policy] + tx_df_mgr = self.daemon._df_router[self.policy] t1 = next(self.ts_iter) self.tx_objs['o1'] = self._create_ondisk_files( tx_df_mgr, 'o1', self.policy, t1, (self.tx_node_index,)) @@ -1073,6 +1069,11 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC): @patch_policies class TestSsyncReplication(TestBaseSsync): + def setUp(self): + super(TestSsyncReplication, self).setUp() + self.daemon = ObjectReplicator(self.daemon_conf, + debug_logger('test-ssync-sender')) + def test_sync(self): policy = POLICIES.default rx_node_index = 0 @@ -1082,7 +1083,7 @@ class TestSsyncReplication(TestBaseSsync): rx_objs = {} tx_tombstones = {} rx_tombstones = {} - tx_df_mgr = self.daemon._diskfile_router[policy] + tx_df_mgr = self.daemon._df_router[policy] rx_df_mgr = self.rx_controller._diskfile_router[policy] # o1 and o2 are on tx only t1 = next(self.ts_iter) @@ -1204,7 +1205,7 @@ class TestSsyncReplication(TestBaseSsync): rx_objs = {} tx_tombstones = {} rx_tombstones = {} - tx_df_mgr = self.daemon._diskfile_router[policy] + tx_df_mgr = self.daemon._df_router[policy] rx_df_mgr = self.rx_controller._diskfile_router[policy] expected_subreqs = defaultdict(list) @@ -1349,7 +1350,7 @@ class TestSsyncReplication(TestBaseSsync): rx_node_index = 0 # create diskfiles... - tx_df_mgr = self.daemon._diskfile_router[policy] + tx_df_mgr = self.daemon._df_router[policy] rx_df_mgr = self.rx_controller._diskfile_router[policy] # rx has data at t1 but no meta @@ -1434,7 +1435,7 @@ class TestSsyncReplication(TestBaseSsync): # create diskfiles... tx_objs = {} rx_objs = {} - tx_df_mgr = self.daemon._diskfile_router[policy] + tx_df_mgr = self.daemon._df_router[policy] rx_df_mgr = self.rx_controller._diskfile_router[policy] expected_subreqs = defaultdict(list) diff --git a/test/unit/obj/test_ssync_sender.py b/test/unit/obj/test_ssync_sender.py index c7968c68f0..ab9053bd13 100644 --- a/test/unit/obj/test_ssync_sender.py +++ b/test/unit/obj/test_ssync_sender.py @@ -24,9 +24,10 @@ from swift.common import exceptions, utils from swift.common.storage_policy import POLICIES from swift.common.utils import Timestamp from swift.obj import ssync_sender, diskfile, ssync_receiver +from swift.obj.replicator import ObjectReplicator -from test.unit import patch_policies, make_timestamp_iter -from test.unit.obj.common import FakeReplicator, BaseTest +from test.unit import patch_policies, make_timestamp_iter, debug_logger +from test.unit.obj.common import BaseTest class NullBufferedHTTPConnection(object): @@ -84,10 +85,10 @@ class TestSender(BaseTest): def setUp(self): super(TestSender, self).setUp() - self.testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_sender') - utils.mkdirs(os.path.join(self.testdir, 'dev')) - self.daemon = FakeReplicator(self.testdir) - self.sender = ssync_sender.Sender(self.daemon, None, None, None) + self.daemon = ObjectReplicator(self.daemon_conf, + debug_logger('test-ssync-sender')) + job = {'policy': POLICIES.legacy} # sufficient for Sender.__init__ + self.sender = ssync_sender.Sender(self.daemon, None, job, None) def test_call_catches_MessageTimeout(self): @@ -146,8 +147,7 @@ class TestSender(BaseTest): '1.2.3.4:5678/sda1/9 EXCEPTION in ssync.Sender:')) def test_call_catches_exception_handling_exception(self): - job = node = None # Will cause inside exception handler to fail - self.sender = ssync_sender.Sender(self.daemon, node, job, None) + self.sender.node = None # Will cause inside exception handler to fail self.sender.suffixes = ['abc'] self.sender.connect = 'cause exception' success, candidates = self.sender() @@ -459,7 +459,7 @@ class TestSender(BaseTest): ':UPDATES: START\r\n' ':UPDATES: END\r\n' )) - self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes + self.sender.df_mgr.yield_hashes = yield_hashes self.sender.connect = mock.MagicMock() df = mock.MagicMock() df.content_length = 0 @@ -505,7 +505,7 @@ class TestSender(BaseTest): ':MISSING_CHECK: START\r\n' '9d41d8cd98f00b204e9800998ecf0abc d\r\n' ':MISSING_CHECK: END\r\n')) - self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes + self.sender.df_mgr.yield_hashes = yield_hashes self.sender.connect = mock.MagicMock() self.sender.updates = mock.MagicMock() self.sender.disconnect = mock.MagicMock() @@ -541,7 +541,7 @@ class TestSender(BaseTest): chunk_body=( ':MISSING_CHECK: START\r\n' ':MISSING_CHECK: END\r\n')) - self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes + self.sender.df_mgr.yield_hashes = yield_hashes self.sender.connect = mock.MagicMock() self.sender.updates = mock.MagicMock() self.sender.disconnect = mock.MagicMock() @@ -578,7 +578,7 @@ class TestSender(BaseTest): ':MISSING_CHECK: START\r\n' '9d41d8cd98f00b204e9800998ecf0abc d\r\n' ':MISSING_CHECK: END\r\n')) - self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes + self.sender.df_mgr.yield_hashes = yield_hashes self.sender.connect = mock.MagicMock() self.sender.updates = mock.MagicMock() self.sender.disconnect = mock.MagicMock() @@ -743,7 +743,7 @@ class TestSender(BaseTest): chunk_body=( ':MISSING_CHECK: START\r\n' ':MISSING_CHECK: END\r\n')) - self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes + self.sender.df_mgr.yield_hashes = yield_hashes self.sender.missing_check() self.assertEqual( ''.join(self.sender.connection.sent), @@ -791,7 +791,7 @@ class TestSender(BaseTest): chunk_body=( ':MISSING_CHECK: START\r\n' ':MISSING_CHECK: END\r\n')) - self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes + self.sender.df_mgr.yield_hashes = yield_hashes self.sender.missing_check() self.assertEqual( ''.join(self.sender.connection.sent), @@ -836,7 +836,7 @@ class TestSender(BaseTest): 'policy': POLICIES.legacy, } self.sender.suffixes = ['abc'] - self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes + self.sender.df_mgr.yield_hashes = yield_hashes self.sender.response = FakeResponse(chunk_body='\r\n') exc = None try: @@ -875,7 +875,7 @@ class TestSender(BaseTest): 'policy': POLICIES.legacy, } self.sender.suffixes = ['abc'] - self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes + self.sender.df_mgr.yield_hashes = yield_hashes self.sender.response = FakeResponse( chunk_body=':MISSING_CHECK: START\r\n') exc = None @@ -915,7 +915,7 @@ class TestSender(BaseTest): 'policy': POLICIES.legacy, } self.sender.suffixes = ['abc'] - self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes + self.sender.df_mgr.yield_hashes = yield_hashes self.sender.response = FakeResponse(chunk_body='OH HAI\r\n') exc = None try: @@ -959,7 +959,7 @@ class TestSender(BaseTest): ':MISSING_CHECK: START\r\n' '0123abc dm\r\n' ':MISSING_CHECK: END\r\n')) - self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes + self.sender.df_mgr.yield_hashes = yield_hashes self.sender.missing_check() self.assertEqual( ''.join(self.sender.connection.sent), @@ -1001,7 +1001,7 @@ class TestSender(BaseTest): ':MISSING_CHECK: START\r\n' '0123abc d extra response parts\r\n' ':MISSING_CHECK: END\r\n')) - self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes + self.sender.df_mgr.yield_hashes = yield_hashes self.sender.missing_check() self.assertEqual(self.sender.send_map, {'0123abc': {'data': True}}) @@ -1307,7 +1307,7 @@ class TestSender(BaseTest): self.assertEqual(path, '/a/c/o') self.assertTrue(isinstance(df, diskfile.DiskFile)) self.assertEqual(expected, df.get_metadata()) - self.assertEqual(os.path.join(self.testdir, 'dev/objects/9/', + self.assertEqual(os.path.join(self.tx_testdir, 'dev/objects/9/', object_hash[-3:], object_hash), df._datadir)