ssync sender: add context to missing_check error log
Change-Id: I1280b8fc7b06eacb9f94c75db2e93aeea38ac5a2
This commit is contained in:
parent
a758d7acba
commit
88eb360f5b
@ -197,6 +197,12 @@ class Sender(object):
|
||||
set(send_map.keys()))
|
||||
can_delete_obj = dict((hash_, available_map[hash_])
|
||||
for hash_ in in_sync_hashes)
|
||||
self.daemon.logger.debug(
|
||||
'ssync completed ok: dev: %s, part: %s, policy: %d, '
|
||||
'num suffixes: %s, available: %d, sent: %d, deletable: %d',
|
||||
self.job['device'], self.job['partition'],
|
||||
self.job['policy'].idx, len(self.suffixes),
|
||||
len(available_map), len(send_map), len(can_delete_obj))
|
||||
return True, can_delete_obj
|
||||
except (exceptions.MessageTimeout,
|
||||
exceptions.ReplicationException) as err:
|
||||
@ -299,16 +305,20 @@ class Sender(object):
|
||||
objhash_timestamps[0] in
|
||||
self.remote_check_objs, hash_gen)
|
||||
nlines = 0
|
||||
nbytes = 0
|
||||
for object_hash, timestamps in hash_gen:
|
||||
available_map[object_hash] = timestamps
|
||||
with exceptions.MessageTimeout(
|
||||
self.daemon.node_timeout,
|
||||
'missing_check send line'):
|
||||
'missing_check send line: %d lines (%d bytes) sent'
|
||||
% (nlines, nbytes)):
|
||||
msg = b'%s\r\n' % encode_missing(object_hash, **timestamps)
|
||||
connection.send(b'%x\r\n%s\r\n' % (len(msg), msg))
|
||||
msg = b'%x\r\n%s\r\n' % (len(msg), msg)
|
||||
connection.send(msg)
|
||||
if nlines % 5 == 0:
|
||||
sleep() # Gives a chance for other greenthreads to run
|
||||
nlines += 1
|
||||
nbytes += len(msg)
|
||||
with exceptions.MessageTimeout(
|
||||
self.daemon.node_timeout, 'missing_check end'):
|
||||
msg = b':MISSING_CHECK: END\r\n'
|
||||
|
@ -57,7 +57,7 @@ class TestBaseSsync(BaseTest):
|
||||
'mount_check': 'false',
|
||||
'replication_concurrency_per_device': '0',
|
||||
'log_requests': 'false'}
|
||||
self.rx_logger = debug_logger()
|
||||
self.rx_logger = debug_logger(name='test-ssync-receiver')
|
||||
self.rx_controller = server.ObjectController(conf, self.rx_logger)
|
||||
self.ts_iter = make_timestamp_iter()
|
||||
self.rx_ip = '127.0.0.1'
|
||||
|
@ -102,7 +102,9 @@ class TestSender(BaseTest):
|
||||
self.daemon_logger = debug_logger('test-ssync-sender')
|
||||
self.daemon = ObjectReplicator(self.daemon_conf,
|
||||
self.daemon_logger)
|
||||
job = {'policy': POLICIES.legacy} # sufficient for Sender.__init__
|
||||
job = {'policy': POLICIES.legacy,
|
||||
'device': 'test-dev',
|
||||
'partition': '99'} # sufficient for Sender.__init__
|
||||
self.sender = ssync_sender.Sender(self.daemon, None, job, None)
|
||||
|
||||
def test_call_catches_MessageTimeout(self):
|
||||
@ -788,13 +790,36 @@ class TestSender(BaseTest):
|
||||
self.assertEqual(response.readline(), b'')
|
||||
self.assertEqual(response.readline(), b'')
|
||||
|
||||
def test_missing_check_timeout(self):
|
||||
def test_missing_check_timeout_start(self):
|
||||
connection = FakeConnection()
|
||||
connection.send = lambda d: eventlet.sleep(1)
|
||||
response = FakeResponse()
|
||||
self.sender.daemon.node_timeout = 0.01
|
||||
self.assertRaises(exceptions.MessageTimeout, self.sender.missing_check,
|
||||
connection, response)
|
||||
with mock.patch.object(connection, 'send',
|
||||
side_effect=lambda *args: eventlet.sleep(1)):
|
||||
with self.assertRaises(exceptions.MessageTimeout) as cm:
|
||||
self.sender.missing_check(connection, response)
|
||||
self.assertIn('0.01 seconds: missing_check start', str(cm.exception))
|
||||
|
||||
def test_missing_check_timeout_send_line(self):
|
||||
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
|
||||
yield (
|
||||
'9d41d8cd98f00b204e9800998ecf0abc',
|
||||
{'ts_data': Timestamp(1380144470.00000)})
|
||||
yield (
|
||||
'9d41d8cd98f00b204e9800998ecf0def',
|
||||
{'ts_data': Timestamp(1380144471.00000)})
|
||||
connection = FakeConnection()
|
||||
response = FakeResponse()
|
||||
self.sender.daemon.node_timeout = 0.01
|
||||
self.sender.df_mgr.yield_hashes = yield_hashes
|
||||
sleeps = [0, 0, 1]
|
||||
with mock.patch.object(
|
||||
connection, 'send',
|
||||
side_effect=lambda *args: eventlet.sleep(sleeps.pop(0))):
|
||||
with self.assertRaises(exceptions.MessageTimeout) as cm:
|
||||
self.sender.missing_check(connection, response)
|
||||
self.assertIn('0.01 seconds: missing_check send line: '
|
||||
'1 lines (57 bytes) sent', str(cm.exception))
|
||||
|
||||
def test_missing_check_has_empty_suffixes(self):
|
||||
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
|
||||
|
Loading…
x
Reference in New Issue
Block a user