Fix stats calculation in object-reconstructor

This patch fixes the object-reconstructor to calculate device_count
as the total number of local devices in all policies. Previously
Swift counts it for each policy but reconstruction_device_count
which means the number of devices actually swift needs to reconstruct
is counted as sum of ones for all polices.

With this patch, Swift will gather all local devices for all policies
at first, and then, collect parts for each devices as well as current.
To do so, we can see the statuses for remaining job/disks percentage via
stats_line output.

To enable this change, this patch also touchs the object replicator
to get a DiskFileManager via the DiskFileRouter class so that
DiskFileManager instances are policy specific. Currently the same
replication policy DiskFileManager class is always used, but this
change future proofs the replicator for possible other DiskFileManager
implementations.

The change also gives the ObjectReplicator a _df_router variable,
making it consistent with the ObjectReconstructor, and allowing a
common way for ssync.Sender to access DiskFileManager instances via
it's daemon's _df_router instance.

Also, remove the use of FakeReplicator from the ssync test suite. It
was not necessary and risked masking divergence between ssync and the
replicator and reconstructor daemon implementations.

Co-Author: Alistair Coles <alistair.coles@hpe.com>

Closes-Bug: #1488608
Change-Id: Ic7a4c932b59158d21a5fb4de9ed3ed57f249d068
This commit is contained in:
Kota Tsuyuzaki 2016-10-30 22:24:18 -07:00
parent a32b411f22
commit b09360d447
8 changed files with 149 additions and 141 deletions

View File

@ -800,12 +800,13 @@ class ObjectReconstructor(Daemon):
override_devices = override_devices or [] override_devices = override_devices or []
override_partitions = override_partitions or [] override_partitions = override_partitions or []
ips = whataremyips(self.bind_ip) ips = whataremyips(self.bind_ip)
for policy in POLICIES: ec_policies = (policy for policy in POLICIES
if policy.policy_type != EC_POLICY: if policy.policy_type == EC_POLICY)
continue
self._diskfile_mgr = self._df_router[policy] policy2devices = {}
for policy in ec_policies:
self.load_object_ring(policy) self.load_object_ring(policy)
data_dir = get_data_dir(policy)
local_devices = list(six.moves.filter( local_devices = list(six.moves.filter(
lambda dev: dev and is_local_device( lambda dev: dev and is_local_device(
ips, self.port, ips, self.port,
@ -813,21 +814,23 @@ class ObjectReconstructor(Daemon):
policy.object_ring.devs)) policy.object_ring.devs))
if override_devices: if override_devices:
self.device_count = len(override_devices) local_devices = list(six.moves.filter(
else: lambda dev_info: dev_info['device'] in override_devices,
self.device_count = len(local_devices) local_devices))
policy2devices[policy] = local_devices
self.device_count += len(local_devices)
for policy, local_devices in policy2devices.items():
df_mgr = self._df_router[policy]
for local_dev in local_devices: for local_dev in local_devices:
if override_devices and (local_dev['device'] not in
override_devices):
continue
self.reconstruction_device_count += 1 self.reconstruction_device_count += 1
dev_path = self._df_router[policy].get_dev_path( dev_path = df_mgr.get_dev_path(local_dev['device'])
local_dev['device'])
if not dev_path: if not dev_path:
self.logger.warning(_('%s is not mounted'), self.logger.warning(_('%s is not mounted'),
local_dev['device']) local_dev['device'])
continue continue
data_dir = get_data_dir(policy)
obj_path = join(dev_path, data_dir) obj_path = join(dev_path, data_dir)
tmp_path = join(dev_path, get_tmp_dir(int(policy))) tmp_path = join(dev_path, get_tmp_dir(int(policy)))
unlink_older_than(tmp_path, time.time() - unlink_older_than(tmp_path, time.time() -

View File

@ -38,7 +38,7 @@ from swift.common.bufferedhttp import http_connect
from swift.common.daemon import Daemon from swift.common.daemon import Daemon
from swift.common.http import HTTP_OK, HTTP_INSUFFICIENT_STORAGE from swift.common.http import HTTP_OK, HTTP_INSUFFICIENT_STORAGE
from swift.obj import ssync_sender from swift.obj import ssync_sender
from swift.obj.diskfile import DiskFileManager, get_data_dir, get_tmp_dir from swift.obj.diskfile import get_data_dir, get_tmp_dir, DiskFileRouter
from swift.common.storage_policy import POLICIES, REPL_POLICY from swift.common.storage_policy import POLICIES, REPL_POLICY
DEFAULT_RSYNC_TIMEOUT = 900 DEFAULT_RSYNC_TIMEOUT = 900
@ -121,7 +121,7 @@ class ObjectReplicator(Daemon):
'operation, please disable handoffs_first and ' 'operation, please disable handoffs_first and '
'handoff_delete before the next ' 'handoff_delete before the next '
'normal rebalance') 'normal rebalance')
self._diskfile_mgr = DiskFileManager(conf, self.logger) self._df_router = DiskFileRouter(conf, self.logger)
def _zero_stats(self): def _zero_stats(self):
"""Zero out the stats.""" """Zero out the stats."""
@ -406,9 +406,10 @@ class ObjectReplicator(Daemon):
target_devs_info = set() target_devs_info = set()
failure_devs_info = set() failure_devs_info = set()
begin = time.time() begin = time.time()
df_mgr = self._df_router[job['policy']]
try: try:
hashed, local_hash = tpool_reraise( hashed, local_hash = tpool_reraise(
self._diskfile_mgr._get_hashes, job['path'], df_mgr._get_hashes, job['path'],
do_listdir=_do_listdir( do_listdir=_do_listdir(
int(job['partition']), int(job['partition']),
self.replication_cycle), self.replication_cycle),
@ -462,7 +463,7 @@ class ObjectReplicator(Daemon):
self.stats['hashmatch'] += 1 self.stats['hashmatch'] += 1
continue continue
hashed, recalc_hash = tpool_reraise( hashed, recalc_hash = tpool_reraise(
self._diskfile_mgr._get_hashes, df_mgr._get_hashes,
job['path'], recalculate=suffixes, job['path'], recalculate=suffixes,
reclaim_age=self.reclaim_age) reclaim_age=self.reclaim_age)
self.logger.update_stats('suffix.hashes', hashed) self.logger.update_stats('suffix.hashes', hashed)

View File

@ -80,7 +80,7 @@ class Sender(object):
def __init__(self, daemon, node, job, suffixes, remote_check_objs=None): def __init__(self, daemon, node, job, suffixes, remote_check_objs=None):
self.daemon = daemon self.daemon = daemon
self.df_mgr = self.daemon._diskfile_mgr self.df_mgr = self.daemon._df_router[job['policy']]
self.node = node self.node = node
self.job = job self.job = job
self.suffixes = suffixes self.suffixes = suffixes

View File

@ -19,28 +19,9 @@ import tempfile
import unittest import unittest
import time import time
from swift.common import utils
from swift.common.storage_policy import POLICIES from swift.common.storage_policy import POLICIES
from swift.common.utils import Timestamp from swift.common.utils import Timestamp
from swift.obj import diskfile
from test.unit import debug_logger
class FakeReplicator(object):
def __init__(self, testdir, policy=None):
self.logger = debug_logger('test-ssync-sender')
self.conn_timeout = 1
self.node_timeout = 2
self.http_timeout = 3
self.network_chunk_size = 65536
self.disk_chunk_size = 4096
conf = {
'devices': testdir,
'mount_check': 'false',
}
policy = POLICIES.default if policy is None else policy
self._diskfile_router = diskfile.DiskFileRouter(conf, self.logger)
self._diskfile_mgr = self._diskfile_router[policy]
def write_diskfile(df, timestamp, data='test data', frag_index=None, def write_diskfile(df, timestamp, data='test data', frag_index=None,
@ -74,9 +55,18 @@ def write_diskfile(df, timestamp, data='test data', frag_index=None,
class BaseTest(unittest.TestCase): class BaseTest(unittest.TestCase):
def setUp(self): def setUp(self):
self.device = 'dev'
self.partition = '9'
self.tmpdir = tempfile.mkdtemp()
# sender side setup
self.tx_testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_sender')
utils.mkdirs(os.path.join(self.tx_testdir, self.device))
self.daemon_conf = {
'devices': self.tx_testdir,
'mount_check': 'false',
}
# daemon will be set in subclass setUp # daemon will be set in subclass setUp
self.daemon = None self.daemon = None
self.tmpdir = tempfile.mkdtemp()
def tearDown(self): def tearDown(self):
shutil.rmtree(self.tmpdir, ignore_errors=True) shutil.rmtree(self.tmpdir, ignore_errors=True)
@ -90,7 +80,7 @@ class BaseTest(unittest.TestCase):
object_parts = account, container, obj object_parts = account, container, obj
timestamp = Timestamp(time.time()) if timestamp is None else timestamp timestamp = Timestamp(time.time()) if timestamp is None else timestamp
if df_mgr is None: if df_mgr is None:
df_mgr = self.daemon._diskfile_router[policy] df_mgr = self.daemon._df_router[policy]
df = df_mgr.get_diskfile( df = df_mgr.get_diskfile(
device, partition, *object_parts, policy=policy, device, partition, *object_parts, policy=policy,
frag_index=frag_index) frag_index=frag_index)

View File

@ -862,7 +862,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
self.suffixes = suffixes self.suffixes = suffixes
self.daemon = daemon self.daemon = daemon
self.job = job self.job = job
hash_gen = self.daemon._diskfile_mgr.yield_hashes( hash_gen = self.daemon._df_router[job['policy']].yield_hashes(
self.job['device'], self.job['partition'], self.job['device'], self.job['partition'],
self.job['policy'], self.suffixes, self.job['policy'], self.suffixes,
frag_index=self.job.get('frag_index')) frag_index=self.job.get('frag_index'))

View File

@ -12,7 +12,7 @@
# implied. # implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import collections
import unittest import unittest
import os import os
import mock import mock
@ -26,10 +26,10 @@ from collections import defaultdict
from errno import ENOENT, ENOTEMPTY, ENOTDIR from errno import ENOENT, ENOTEMPTY, ENOTDIR
from eventlet.green import subprocess from eventlet.green import subprocess
from eventlet import Timeout, tpool from eventlet import Timeout
from test.unit import (debug_logger, patch_policies, make_timestamp_iter, from test.unit import (debug_logger, patch_policies, make_timestamp_iter,
mocked_http_conn) mocked_http_conn, FakeLogger)
from swift.common import utils from swift.common import utils
from swift.common.utils import (hash_path, mkdirs, normalize_timestamp, from swift.common.utils import (hash_path, mkdirs, normalize_timestamp,
storage_directory) storage_directory)
@ -1623,68 +1623,80 @@ class TestObjectReplicator(unittest.TestCase):
object_replicator.http_connect = was_connector object_replicator.http_connect = was_connector
def test_run_once_recover_from_timeout(self): def test_run_once_recover_from_timeout(self):
conf = dict(swift_dir=self.testdir, devices=self.devices, # verify that replicator will pass over all policies' partitions even
bind_ips=_ips()[0], # if a timeout occurs while replicating one partition to one node.
mount_check='false', timeout='300', stats_interval='1') timeouts = [Timeout()]
replicator = object_replicator.ObjectReplicator(conf)
was_connector = object_replicator.http_connect
was_get_hashes = object_replicator.DiskFileManager._get_hashes
was_execute = tpool.execute
self.get_hash_count = 0
try:
def fake_get_hashes(*args, **kwargs): def fake_get_hashes(df_mgr, part_path, **kwargs):
self.get_hash_count += 1 self.get_hash_count += 1
if self.get_hash_count == 3: # Simulate a REPLICATE timeout by raising Timeout for second call
# raise timeout on last call to get hashes # to get_hashes (with recalculate suffixes) for a specific
raise Timeout() # partition
return 2, {'abc': 'def'} if (timeouts and '/objects/' in part_path and
part_path.endswith('0') and 'recalculate' in kwargs):
raise timeouts.pop(0)
return 1, {'abc': 'def'}
def fake_exc(tester, *args, **kwargs): # map partition_path -> [nodes]
if 'Error syncing partition timeout' in args[0]: sync_paths = collections.defaultdict(list)
tester.i_failed = True
self.i_failed = False def fake_sync(node, job, suffixes, *args, **kwargs):
object_replicator.http_connect = mock_http_connect(200) sync_paths[job['path']].append(node)
object_replicator.DiskFileManager._get_hashes = fake_get_hashes return True, {}
replicator.logger.exception = \
lambda *args, **kwargs: fake_exc(self, *args, **kwargs)
# Write some files into '1' and run replicate- they should be moved
# to the other partitions and then node should get deleted.
cur_part = '1'
df = self.df_mgr.get_diskfile('sda', cur_part, 'a', 'c', 'o',
policy=POLICIES.legacy)
mkdirs(df._datadir)
f = open(os.path.join(df._datadir,
normalize_timestamp(time.time()) + '.data'),
'wb')
f.write('1234567890')
f.close()
ohash = hash_path('a', 'c', 'o')
data_dir = ohash[-3:]
whole_path_from = os.path.join(self.objects, cur_part, data_dir)
process_arg_checker = []
ring = replicator.load_object_ring(POLICIES[0])
nodes = [node for node in
ring.get_part_nodes(int(cur_part))
if node['ip'] not in _ips()]
for node in nodes: conf = dict(swift_dir=self.testdir, devices=self.devices,
rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], bind_ip=_ips()[0], # local dev has id=0
cur_part) mount_check='false', timeout='300', stats_interval='1')
process_arg_checker.append( with mock.patch('swift.obj.diskfile.DiskFileManager._get_hashes',
(0, '', ['rsync', whole_path_from, rsync_mod])) fake_get_hashes):
self.assertTrue(os.access(os.path.join(self.objects, with mock.patch('swift.obj.replicator.http_connect',
'1', data_dir, ohash), mock_http_connect(200)):
os.F_OK)) with mock.patch('swift.obj.replicator.dump_recon_cache'):
with _mock_process(process_arg_checker): replicator = object_replicator.ObjectReplicator(
conf, logger=FakeLogger())
self.get_hash_count = 0
with mock.patch.object(replicator, 'sync', fake_sync):
replicator.run_once() replicator.run_once()
self.assertFalse(process_errors)
self.assertFalse(self.i_failed) log_lines = replicator.logger.get_lines_for_level('error')
finally: self.assertIn("Error syncing with node:", log_lines[0])
object_replicator.http_connect = was_connector self.assertFalse(log_lines[1:])
object_replicator.DiskFileManager._get_hashes = was_get_hashes # setup creates 4 partitions; partition 1 does not map to local dev id
tpool.execute = was_execute # 0 so will be handled by update_delete(); partitions 0, 2, 3 are
# handled by update() for each of two policies, so expect 6 paths to be
# sync'd
self.assertEqual(6, len(sync_paths))
# partition 3 has 2 nodes in remote region, only first node is sync'd.
# partition 0 in policy 0 has fake_get_hashes timeout before first
# sync, so only second node is sync'd.
# other partitions are sync'd to 2 nodes in same region.
expected_node_count = { # map path_end -> expected sync node count
'/objects/0': 1,
'/objects/1': 2,
'/objects/2': 2,
'/objects/3': 1,
'/objects-1/0': 2,
'/objects-1/1': 2,
'/objects-1/2': 2,
'/objects-1/3': 1
}
for path, nodes in sync_paths.items():
path_end = path[path.index('/objects'):]
self.assertEqual(expected_node_count[path_end], len(nodes),
'Expected %s but got %s for path %s' %
(expected_node_count[path_end], len(nodes), path))
# partitions 0 and 2 attempt 3 calls each per policy to get_hashes = 12
# partitions 3 attempts 2 calls per policy to get_hashes = 4
# partitions 1 dosn't get_hashes because of update_deleted
self.assertEqual(16, self.get_hash_count)
# attempt to 16 times but succeeded only 15 times due to Timeout
suffix_hashes = sum(
count for (metric, count), _junk in
replicator.logger.log_dict['update_stats']
if metric == 'suffix.hashes')
self.assertEqual(15, suffix_hashes)
def test_run(self): def test_run(self):
with _mock_process([(0, '')] * 100): with _mock_process([(0, '')] * 100):
@ -1737,7 +1749,8 @@ class TestObjectReplicator(unittest.TestCase):
do_listdir_results = [False, False, True, False, True, False] do_listdir_results = [False, False, True, False, True, False]
mock_do_listdir.side_effect = do_listdir_results mock_do_listdir.side_effect = do_listdir_results
expected_tpool_calls = [ expected_tpool_calls = [
mock.call(self.replicator._diskfile_mgr._get_hashes, job['path'], mock.call(self.replicator._df_router[job['policy']]._get_hashes,
job['path'],
do_listdir=do_listdir, do_listdir=do_listdir,
reclaim_age=self.replicator.reclaim_age) reclaim_age=self.replicator.reclaim_age)
for job, do_listdir in zip(jobs, do_listdir_results) for job, do_listdir in zip(jobs, do_listdir_results)

View File

@ -31,9 +31,10 @@ from swift.common.utils import Timestamp
from swift.obj import ssync_sender, server from swift.obj import ssync_sender, server
from swift.obj.reconstructor import RebuildingECDiskFileStream, \ from swift.obj.reconstructor import RebuildingECDiskFileStream, \
ObjectReconstructor ObjectReconstructor
from swift.obj.replicator import ObjectReplicator
from test.unit import patch_policies, debug_logger, encode_frag_archive_bodies from test.unit import patch_policies, debug_logger, encode_frag_archive_bodies
from test.unit.obj.common import BaseTest, FakeReplicator from test.unit.obj.common import BaseTest
class TestBaseSsync(BaseTest): class TestBaseSsync(BaseTest):
@ -46,13 +47,6 @@ class TestBaseSsync(BaseTest):
""" """
def setUp(self): def setUp(self):
super(TestBaseSsync, self).setUp() super(TestBaseSsync, self).setUp()
self.device = 'dev'
self.partition = '9'
# sender side setup
self.tx_testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_sender')
utils.mkdirs(os.path.join(self.tx_testdir, self.device))
self.daemon = FakeReplicator(self.tx_testdir)
# rx side setup # rx side setup
self.rx_testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_receiver') self.rx_testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_receiver')
utils.mkdirs(os.path.join(self.rx_testdir, self.device)) utils.mkdirs(os.path.join(self.rx_testdir, self.device))
@ -142,7 +136,7 @@ class TestBaseSsync(BaseTest):
return diskfiles return diskfiles
def _open_tx_diskfile(self, obj_name, policy, frag_index=None): def _open_tx_diskfile(self, obj_name, policy, frag_index=None):
df_mgr = self.daemon._diskfile_router[policy] df_mgr = self.daemon._df_router[policy]
df = df_mgr.get_diskfile( df = df_mgr.get_diskfile(
self.device, self.partition, account='a', container='c', self.device, self.partition, account='a', container='c',
obj=obj_name, policy=policy, frag_index=frag_index) obj=obj_name, policy=policy, frag_index=frag_index)
@ -310,6 +304,8 @@ class TestBaseSsyncEC(TestBaseSsync):
def setUp(self): def setUp(self):
super(TestBaseSsyncEC, self).setUp() super(TestBaseSsyncEC, self).setUp()
self.policy = POLICIES.default self.policy = POLICIES.default
self.daemon = ObjectReconstructor(self.daemon_conf,
debug_logger('test-ssync-sender'))
def _get_object_data(self, path, frag_index=None, **kwargs): def _get_object_data(self, path, frag_index=None, **kwargs):
# return a frag archive for given object name and frag index. # return a frag archive for given object name and frag index.
@ -337,7 +333,7 @@ class TestSsyncEC(TestBaseSsyncEC):
tx_objs = {} tx_objs = {}
rx_objs = {} rx_objs = {}
tx_tombstones = {} tx_tombstones = {}
tx_df_mgr = self.daemon._diskfile_router[policy] tx_df_mgr = self.daemon._df_router[policy]
rx_df_mgr = self.rx_controller._diskfile_router[policy] rx_df_mgr = self.rx_controller._diskfile_router[policy]
# o1 has primary and handoff fragment archives # o1 has primary and handoff fragment archives
t1 = next(self.ts_iter) t1 = next(self.ts_iter)
@ -421,7 +417,7 @@ class TestSsyncEC(TestBaseSsyncEC):
# create sender side diskfiles... # create sender side diskfiles...
tx_objs = {} tx_objs = {}
rx_objs = {} rx_objs = {}
tx_df_mgr = self.daemon._diskfile_router[policy] tx_df_mgr = self.daemon._df_router[policy]
rx_df_mgr = self.rx_controller._diskfile_router[policy] rx_df_mgr = self.rx_controller._diskfile_router[policy]
expected_subreqs = defaultdict(list) expected_subreqs = defaultdict(list)
@ -531,7 +527,7 @@ class TestSsyncEC(TestBaseSsyncEC):
tx_objs = {} tx_objs = {}
tx_tombstones = {} tx_tombstones = {}
rx_objs = {} rx_objs = {}
tx_df_mgr = self.daemon._diskfile_router[policy] tx_df_mgr = self.daemon._df_router[policy]
rx_df_mgr = self.rx_controller._diskfile_router[policy] rx_df_mgr = self.rx_controller._diskfile_router[policy]
# o1 only has primary # o1 only has primary
t1 = next(self.ts_iter) t1 = next(self.ts_iter)
@ -631,7 +627,7 @@ class TestSsyncEC(TestBaseSsyncEC):
def test_send_with_frag_index_none(self): def test_send_with_frag_index_none(self):
policy = POLICIES.default policy = POLICIES.default
tx_df_mgr = self.daemon._diskfile_router[policy] tx_df_mgr = self.daemon._df_router[policy]
rx_df_mgr = self.rx_controller._diskfile_router[policy] rx_df_mgr = self.rx_controller._diskfile_router[policy]
# create an ec fragment on the remote node # create an ec fragment on the remote node
ts1 = next(self.ts_iter) ts1 = next(self.ts_iter)
@ -692,7 +688,7 @@ class TestSsyncEC(TestBaseSsyncEC):
# create non durable tx obj by not committing, then create a legacy # create non durable tx obj by not committing, then create a legacy
# .durable file # .durable file
tx_objs = {} tx_objs = {}
tx_df_mgr = self.daemon._diskfile_router[policy] tx_df_mgr = self.daemon._df_router[policy]
rx_df_mgr = self.rx_controller._diskfile_router[policy] rx_df_mgr = self.rx_controller._diskfile_router[policy]
t1 = next(self.ts_iter) t1 = next(self.ts_iter)
tx_objs['o1'] = self._create_ondisk_files( tx_objs['o1'] = self._create_ondisk_files(
@ -791,7 +787,7 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC):
# create sender side diskfiles... # create sender side diskfiles...
self.tx_objs = {} self.tx_objs = {}
tx_df_mgr = self.daemon._diskfile_router[self.policy] tx_df_mgr = self.daemon._df_router[self.policy]
t1 = next(self.ts_iter) t1 = next(self.ts_iter)
self.tx_objs['o1'] = self._create_ondisk_files( self.tx_objs['o1'] = self._create_ondisk_files(
tx_df_mgr, 'o1', self.policy, t1, (self.tx_node_index,)) tx_df_mgr, 'o1', self.policy, t1, (self.tx_node_index,))
@ -1073,6 +1069,11 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC):
@patch_policies @patch_policies
class TestSsyncReplication(TestBaseSsync): class TestSsyncReplication(TestBaseSsync):
def setUp(self):
super(TestSsyncReplication, self).setUp()
self.daemon = ObjectReplicator(self.daemon_conf,
debug_logger('test-ssync-sender'))
def test_sync(self): def test_sync(self):
policy = POLICIES.default policy = POLICIES.default
rx_node_index = 0 rx_node_index = 0
@ -1082,7 +1083,7 @@ class TestSsyncReplication(TestBaseSsync):
rx_objs = {} rx_objs = {}
tx_tombstones = {} tx_tombstones = {}
rx_tombstones = {} rx_tombstones = {}
tx_df_mgr = self.daemon._diskfile_router[policy] tx_df_mgr = self.daemon._df_router[policy]
rx_df_mgr = self.rx_controller._diskfile_router[policy] rx_df_mgr = self.rx_controller._diskfile_router[policy]
# o1 and o2 are on tx only # o1 and o2 are on tx only
t1 = next(self.ts_iter) t1 = next(self.ts_iter)
@ -1204,7 +1205,7 @@ class TestSsyncReplication(TestBaseSsync):
rx_objs = {} rx_objs = {}
tx_tombstones = {} tx_tombstones = {}
rx_tombstones = {} rx_tombstones = {}
tx_df_mgr = self.daemon._diskfile_router[policy] tx_df_mgr = self.daemon._df_router[policy]
rx_df_mgr = self.rx_controller._diskfile_router[policy] rx_df_mgr = self.rx_controller._diskfile_router[policy]
expected_subreqs = defaultdict(list) expected_subreqs = defaultdict(list)
@ -1349,7 +1350,7 @@ class TestSsyncReplication(TestBaseSsync):
rx_node_index = 0 rx_node_index = 0
# create diskfiles... # create diskfiles...
tx_df_mgr = self.daemon._diskfile_router[policy] tx_df_mgr = self.daemon._df_router[policy]
rx_df_mgr = self.rx_controller._diskfile_router[policy] rx_df_mgr = self.rx_controller._diskfile_router[policy]
# rx has data at t1 but no meta # rx has data at t1 but no meta
@ -1434,7 +1435,7 @@ class TestSsyncReplication(TestBaseSsync):
# create diskfiles... # create diskfiles...
tx_objs = {} tx_objs = {}
rx_objs = {} rx_objs = {}
tx_df_mgr = self.daemon._diskfile_router[policy] tx_df_mgr = self.daemon._df_router[policy]
rx_df_mgr = self.rx_controller._diskfile_router[policy] rx_df_mgr = self.rx_controller._diskfile_router[policy]
expected_subreqs = defaultdict(list) expected_subreqs = defaultdict(list)

View File

@ -24,9 +24,10 @@ from swift.common import exceptions, utils
from swift.common.storage_policy import POLICIES from swift.common.storage_policy import POLICIES
from swift.common.utils import Timestamp from swift.common.utils import Timestamp
from swift.obj import ssync_sender, diskfile, ssync_receiver from swift.obj import ssync_sender, diskfile, ssync_receiver
from swift.obj.replicator import ObjectReplicator
from test.unit import patch_policies, make_timestamp_iter from test.unit import patch_policies, make_timestamp_iter, debug_logger
from test.unit.obj.common import FakeReplicator, BaseTest from test.unit.obj.common import BaseTest
class NullBufferedHTTPConnection(object): class NullBufferedHTTPConnection(object):
@ -84,10 +85,10 @@ class TestSender(BaseTest):
def setUp(self): def setUp(self):
super(TestSender, self).setUp() super(TestSender, self).setUp()
self.testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_sender') self.daemon = ObjectReplicator(self.daemon_conf,
utils.mkdirs(os.path.join(self.testdir, 'dev')) debug_logger('test-ssync-sender'))
self.daemon = FakeReplicator(self.testdir) job = {'policy': POLICIES.legacy} # sufficient for Sender.__init__
self.sender = ssync_sender.Sender(self.daemon, None, None, None) self.sender = ssync_sender.Sender(self.daemon, None, job, None)
def test_call_catches_MessageTimeout(self): def test_call_catches_MessageTimeout(self):
@ -146,8 +147,7 @@ class TestSender(BaseTest):
'1.2.3.4:5678/sda1/9 EXCEPTION in ssync.Sender:')) '1.2.3.4:5678/sda1/9 EXCEPTION in ssync.Sender:'))
def test_call_catches_exception_handling_exception(self): def test_call_catches_exception_handling_exception(self):
job = node = None # Will cause inside exception handler to fail self.sender.node = None # Will cause inside exception handler to fail
self.sender = ssync_sender.Sender(self.daemon, node, job, None)
self.sender.suffixes = ['abc'] self.sender.suffixes = ['abc']
self.sender.connect = 'cause exception' self.sender.connect = 'cause exception'
success, candidates = self.sender() success, candidates = self.sender()
@ -459,7 +459,7 @@ class TestSender(BaseTest):
':UPDATES: START\r\n' ':UPDATES: START\r\n'
':UPDATES: END\r\n' ':UPDATES: END\r\n'
)) ))
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.connect = mock.MagicMock() self.sender.connect = mock.MagicMock()
df = mock.MagicMock() df = mock.MagicMock()
df.content_length = 0 df.content_length = 0
@ -505,7 +505,7 @@ class TestSender(BaseTest):
':MISSING_CHECK: START\r\n' ':MISSING_CHECK: START\r\n'
'9d41d8cd98f00b204e9800998ecf0abc d\r\n' '9d41d8cd98f00b204e9800998ecf0abc d\r\n'
':MISSING_CHECK: END\r\n')) ':MISSING_CHECK: END\r\n'))
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.connect = mock.MagicMock() self.sender.connect = mock.MagicMock()
self.sender.updates = mock.MagicMock() self.sender.updates = mock.MagicMock()
self.sender.disconnect = mock.MagicMock() self.sender.disconnect = mock.MagicMock()
@ -541,7 +541,7 @@ class TestSender(BaseTest):
chunk_body=( chunk_body=(
':MISSING_CHECK: START\r\n' ':MISSING_CHECK: START\r\n'
':MISSING_CHECK: END\r\n')) ':MISSING_CHECK: END\r\n'))
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.connect = mock.MagicMock() self.sender.connect = mock.MagicMock()
self.sender.updates = mock.MagicMock() self.sender.updates = mock.MagicMock()
self.sender.disconnect = mock.MagicMock() self.sender.disconnect = mock.MagicMock()
@ -578,7 +578,7 @@ class TestSender(BaseTest):
':MISSING_CHECK: START\r\n' ':MISSING_CHECK: START\r\n'
'9d41d8cd98f00b204e9800998ecf0abc d\r\n' '9d41d8cd98f00b204e9800998ecf0abc d\r\n'
':MISSING_CHECK: END\r\n')) ':MISSING_CHECK: END\r\n'))
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.connect = mock.MagicMock() self.sender.connect = mock.MagicMock()
self.sender.updates = mock.MagicMock() self.sender.updates = mock.MagicMock()
self.sender.disconnect = mock.MagicMock() self.sender.disconnect = mock.MagicMock()
@ -743,7 +743,7 @@ class TestSender(BaseTest):
chunk_body=( chunk_body=(
':MISSING_CHECK: START\r\n' ':MISSING_CHECK: START\r\n'
':MISSING_CHECK: END\r\n')) ':MISSING_CHECK: END\r\n'))
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.missing_check() self.sender.missing_check()
self.assertEqual( self.assertEqual(
''.join(self.sender.connection.sent), ''.join(self.sender.connection.sent),
@ -791,7 +791,7 @@ class TestSender(BaseTest):
chunk_body=( chunk_body=(
':MISSING_CHECK: START\r\n' ':MISSING_CHECK: START\r\n'
':MISSING_CHECK: END\r\n')) ':MISSING_CHECK: END\r\n'))
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.missing_check() self.sender.missing_check()
self.assertEqual( self.assertEqual(
''.join(self.sender.connection.sent), ''.join(self.sender.connection.sent),
@ -836,7 +836,7 @@ class TestSender(BaseTest):
'policy': POLICIES.legacy, 'policy': POLICIES.legacy,
} }
self.sender.suffixes = ['abc'] self.sender.suffixes = ['abc']
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.response = FakeResponse(chunk_body='\r\n') self.sender.response = FakeResponse(chunk_body='\r\n')
exc = None exc = None
try: try:
@ -875,7 +875,7 @@ class TestSender(BaseTest):
'policy': POLICIES.legacy, 'policy': POLICIES.legacy,
} }
self.sender.suffixes = ['abc'] self.sender.suffixes = ['abc']
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.response = FakeResponse( self.sender.response = FakeResponse(
chunk_body=':MISSING_CHECK: START\r\n') chunk_body=':MISSING_CHECK: START\r\n')
exc = None exc = None
@ -915,7 +915,7 @@ class TestSender(BaseTest):
'policy': POLICIES.legacy, 'policy': POLICIES.legacy,
} }
self.sender.suffixes = ['abc'] self.sender.suffixes = ['abc']
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.response = FakeResponse(chunk_body='OH HAI\r\n') self.sender.response = FakeResponse(chunk_body='OH HAI\r\n')
exc = None exc = None
try: try:
@ -959,7 +959,7 @@ class TestSender(BaseTest):
':MISSING_CHECK: START\r\n' ':MISSING_CHECK: START\r\n'
'0123abc dm\r\n' '0123abc dm\r\n'
':MISSING_CHECK: END\r\n')) ':MISSING_CHECK: END\r\n'))
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.missing_check() self.sender.missing_check()
self.assertEqual( self.assertEqual(
''.join(self.sender.connection.sent), ''.join(self.sender.connection.sent),
@ -1001,7 +1001,7 @@ class TestSender(BaseTest):
':MISSING_CHECK: START\r\n' ':MISSING_CHECK: START\r\n'
'0123abc d extra response parts\r\n' '0123abc d extra response parts\r\n'
':MISSING_CHECK: END\r\n')) ':MISSING_CHECK: END\r\n'))
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.missing_check() self.sender.missing_check()
self.assertEqual(self.sender.send_map, self.assertEqual(self.sender.send_map,
{'0123abc': {'data': True}}) {'0123abc': {'data': True}})
@ -1307,7 +1307,7 @@ class TestSender(BaseTest):
self.assertEqual(path, '/a/c/o') self.assertEqual(path, '/a/c/o')
self.assertTrue(isinstance(df, diskfile.DiskFile)) self.assertTrue(isinstance(df, diskfile.DiskFile))
self.assertEqual(expected, df.get_metadata()) self.assertEqual(expected, df.get_metadata())
self.assertEqual(os.path.join(self.testdir, 'dev/objects/9/', self.assertEqual(os.path.join(self.tx_testdir, 'dev/objects/9/',
object_hash[-3:], object_hash), object_hash[-3:], object_hash),
df._datadir) df._datadir)