Clean up some replication error messages

Lines like `Invalid response 500 from ::1` aren't terribly useful in an
all-in-one, while lines like

   Error syncing with node: {'device': 'd5', 'id': 3, 'ip': '::1',
   'meta': '', 'port': 6200, 'region': 1, 'replication_ip': '::1',
   'replication_port': 6200, 'weight': 8000.0, 'zone': 1, 'index': 0}:
   Timeout (60s)

are needlessly verbose.

While we're at it, introduce a node_to_string() helper, and use it in a
bunch of places.

Change-Id: I62b12f69e9ac44ce27ffaed320c0a3563673a018
This commit is contained in:
Tim Burke 2020-12-28 21:58:30 -08:00
parent 4ed2b89cb7
commit a7af17394b
11 changed files with 119 additions and 82 deletions

View File

@ -34,7 +34,7 @@ from swift.common.request_helpers import USE_REPLICATION_NETWORK_HEADER
from swift.common.ring import Ring
from swift.common.ring.utils import is_local_device
from swift.common.utils import get_logger, whataremyips, config_true_value, \
Timestamp, md5
Timestamp, md5, node_to_string
from swift.common.daemon import Daemon
from swift.common.storage_policy import POLICIES, PolicyError
@ -379,15 +379,14 @@ class AccountReaper(Daemon):
except ClientException as err:
if self.logger.getEffectiveLevel() <= DEBUG:
self.logger.exception(
'Exception with %(ip)s:%(port)s/%(device)s', node)
'Exception with %s', node_to_string(node))
self.stats_return_codes[err.http_status // 100] = \
self.stats_return_codes.get(err.http_status // 100, 0) + 1
self.logger.increment(
'return_codes.%d' % (err.http_status // 100,))
except (Timeout, socket.error):
self.logger.error(
'Timeout Exception with %(ip)s:%(port)s/%(device)s',
node)
'Timeout Exception with %s', node_to_string(node))
if not objects:
break
try:
@ -429,7 +428,7 @@ class AccountReaper(Daemon):
except ClientException as err:
if self.logger.getEffectiveLevel() <= DEBUG:
self.logger.exception(
'Exception with %(ip)s:%(port)s/%(device)s', node)
'Exception with %s', node_to_string(node))
failures += 1
self.logger.increment('containers_failures')
self.stats_return_codes[err.http_status // 100] = \
@ -438,8 +437,7 @@ class AccountReaper(Daemon):
'return_codes.%d' % (err.http_status // 100,))
except (Timeout, socket.error):
self.logger.error(
'Timeout Exception with %(ip)s:%(port)s/%(device)s',
node)
'Timeout Exception with %s', node_to_string(node))
failures += 1
self.logger.increment('containers_failures')
if successes > failures:
@ -506,7 +504,7 @@ class AccountReaper(Daemon):
except ClientException as err:
if self.logger.getEffectiveLevel() <= DEBUG:
self.logger.exception(
'Exception with %(ip)s:%(port)s/%(device)s', node)
'Exception with %s', node_to_string(node))
failures += 1
self.logger.increment('objects_failures')
self.stats_return_codes[err.http_status // 100] = \
@ -517,8 +515,7 @@ class AccountReaper(Daemon):
failures += 1
self.logger.increment('objects_failures')
self.logger.error(
'Timeout Exception with %(ip)s:%(port)s/%(device)s',
node)
'Timeout Exception with %s', node_to_string(node))
if successes > failures:
self.stats_objects_deleted += 1
self.logger.increment('objects_deleted')

View File

@ -29,7 +29,8 @@ from swift.common import direct_client
from swift.common.internal_client import SimpleClient
from swift.common.ring import Ring
from swift.common.exceptions import ClientException
from swift.common.utils import compute_eta, get_time_units, config_true_value
from swift.common.utils import compute_eta, get_time_units, \
config_true_value, node_to_string
from swift.common.storage_policy import POLICIES
@ -89,7 +90,7 @@ def container_dispersion_report(coropool, connpool, account, container_ring,
def direct(container, part, nodes):
found_count = 0
for node in nodes:
error_log = get_error_log('%(ip)s:%(port)s/%(device)s' % node)
error_log = get_error_log(node_to_string(node))
try:
attempts, _junk = direct_client.retry(
direct_client.direct_head_container, node, part, account,
@ -201,7 +202,7 @@ def object_dispersion_report(coropool, connpool, account, object_ring,
def direct(obj, part, nodes):
found_count = 0
for node in nodes:
error_log = get_error_log('%(ip)s:%(port)s/%(device)s' % node)
error_log = get_error_log(node_to_string(node))
try:
attempts, _junk = direct_client.retry(
direct_client.direct_head_object, node, part, account,

View File

@ -2783,6 +2783,19 @@ def parse_socket_string(socket_string, default_port):
return (host, port)
def node_to_string(node_dict, replication=False):
if replication:
ip = node_dict['replication_ip']
port = node_dict['replication_port']
else:
ip = node_dict['ip']
port = node_dict['port']
if ':' in ip:
# IPv6
ip = '[%s]' % ip
return '{}:{}/{}'.format(ip, port, node_dict['device'])
def storage_directory(datadir, partition, name_hash):
"""
Get the storage directory

View File

@ -32,7 +32,7 @@ from swift.common.exceptions import ConnectionTimeout, LockTimeout
from swift.common.ring import Ring
from swift.common.utils import get_logger, config_true_value, \
dump_recon_cache, majority_size, Timestamp, EventletRateLimiter, \
eventlet_monkey_patch
eventlet_monkey_patch, node_to_string
from swift.common.daemon import Daemon
from swift.common.http import is_success, HTTP_INTERNAL_SERVER_ERROR
from swift.common.recon import RECON_CONTAINER_FILE, DEFAULT_RECON_CACHE_PATH
@ -340,9 +340,8 @@ class ContainerUpdater(Daemon):
node['device'], part, 'PUT', container, headers=headers)
except (Exception, Timeout):
self.logger.exception(
'ERROR account update failed with '
'%(replication_ip)s:%(replication_port)s/%(device)s '
'(will retry later): ', node)
'ERROR account update failed with %s (will retry later):',
node_to_string(node, replication=True))
return HTTP_INTERNAL_SERVER_ERROR
with Timeout(self.node_timeout):
try:
@ -352,9 +351,8 @@ class ContainerUpdater(Daemon):
except (Exception, Timeout):
if self.logger.getEffectiveLevel() <= logging.DEBUG:
self.logger.exception(
'Exception with '
'%(replication_ip)s:%(replication_port)s/%(device)s',
node)
'Exception with %s',
node_to_string(node, replication=True))
return HTTP_INTERNAL_SERVER_ERROR
finally:
conn.close()

View File

@ -30,7 +30,7 @@ from eventlet.support.greenlets import GreenletExit
from swift.common.utils import (
whataremyips, unlink_older_than, compute_eta, get_logger,
dump_recon_cache, mkdirs, config_true_value,
GreenAsyncPile, Timestamp, remove_file,
GreenAsyncPile, Timestamp, remove_file, node_to_string,
load_recon_cache, parse_override_options, distribute_evenly,
PrefixLoggerAdapter, remove_directory, config_request_node_count_value,
non_negative_int)
@ -85,15 +85,11 @@ def _full_path(node, part, relative_path, policy):
"""
if not isinstance(relative_path, six.text_type):
relative_path = relative_path.decode('utf8')
return '%(replication_ip)s:%(replication_port)s' \
'/%(device)s/%(part)s%(path)s ' \
'policy#%(policy)d' % {
'replication_ip': node['replication_ip'],
'replication_port': node['replication_port'],
'device': node['device'],
'part': part, 'path': relative_path,
'policy': policy,
}
return '%(node)s/%(part)s%(path)s policy#%(policy)d' % {
'node': node_to_string(node, replication=True),
'part': part, 'path': relative_path,
'policy': policy,
}
class ResponseBucket(object):

View File

@ -35,7 +35,7 @@ from swift.common.utils import whataremyips, unlink_older_than, \
rsync_module_interpolation, mkdirs, config_true_value, \
config_auto_int_value, storage_directory, \
load_recon_cache, PrefixLoggerAdapter, parse_override_options, \
distribute_evenly, listdir
distribute_evenly, listdir, node_to_string
from swift.common.bufferedhttp import http_connect
from swift.common.daemon import Daemon
from swift.common.http import HTTP_OK, HTTP_INSUFFICIENT_STORAGE
@ -628,7 +628,7 @@ class ObjectReplicator(Daemon):
if e.errno not in (errno.ENOENT, errno.ENOTEMPTY):
error_paths.append(object_path)
self.logger.exception(
"Unexpected error trying to cleanup suffix dir:%r",
"Unexpected error trying to cleanup suffix dir %r",
suffix_dir)
return success_paths, error_paths
@ -666,6 +666,7 @@ class ObjectReplicator(Daemon):
while attempts_left > 0:
# If this throws StopIteration it will be caught way below
node = next(nodes)
node_str = node_to_string(node, replication=True)
target_devs_info.add((node['replication_ip'], node['device']))
attempts_left -= 1
# if we have already synced to this remote region,
@ -679,18 +680,16 @@ class ObjectReplicator(Daemon):
node['device'], job['partition'], 'REPLICATE',
'', headers=headers).getresponse()
if resp.status == HTTP_INSUFFICIENT_STORAGE:
self.logger.error(
'%(replication_ip)s/%(device)s '
'responded as unmounted', node)
self.logger.error('%s responded as unmounted',
node_str)
attempts_left += 1
failure_devs_info.add((node['replication_ip'],
node['device']))
continue
if resp.status != HTTP_OK:
self.logger.error("Invalid response %(resp)s "
"from %(ip)s",
{'resp': resp.status,
'ip': node['replication_ip']})
self.logger.error(
"Invalid response %(resp)s from %(remote)s",
{'resp': resp.status, 'remote': node_str})
failure_devs_info.add((node['replication_ip'],
node['device']))
continue
@ -728,7 +727,7 @@ class ObjectReplicator(Daemon):
failure_devs_info.add((node['replication_ip'],
node['device']))
self.logger.exception("Error syncing with node: %s",
node)
node_str)
stats.suffix_count += len(local_hash)
except StopIteration:
self.logger.error('Ran out of handoffs while replicating '

View File

@ -20,7 +20,7 @@ from six.moves import urllib
from swift.common import bufferedhttp
from swift.common import exceptions
from swift.common import http
from swift.common.utils import config_true_value
from swift.common import utils
def encode_missing(object_hash, ts_data, ts_meta=None, ts_ctype=None,
@ -208,20 +208,18 @@ class Sender(object):
return True, can_delete_obj
except (exceptions.MessageTimeout,
exceptions.ReplicationException) as err:
self.daemon.logger.error(
'%s:%s/%s/%s %s', self.node.get('replication_ip'),
self.node.get('replication_port'), self.node.get('device'),
self.job.get('partition'), err)
node_str = utils.node_to_string(self.node, replication=True)
self.daemon.logger.error('%s/%s %s', node_str,
self.job['partition'], err)
except Exception:
# We don't want any exceptions to escape our code and possibly
# mess up the original replicator code that called us since it
# was originally written to shell out to rsync which would do
# no such thing.
node_str = utils.node_to_string(self.node, replication=True)
self.daemon.logger.exception(
'%s:%s/%s/%s EXCEPTION in ssync.Sender',
self.node.get('replication_ip'),
self.node.get('replication_port'),
self.node.get('device'), self.job.get('partition'))
'%s/%s EXCEPTION in ssync.Sender',
node_str, self.job['partition'])
finally:
self.disconnect(connection)
except Exception:
@ -268,7 +266,7 @@ class Sender(object):
raise exceptions.ReplicationException(
'Expected status %s; got %s (%s)' %
(http.HTTP_OK, response.status, err_msg))
if self.include_non_durable and not config_true_value(
if self.include_non_durable and not utils.config_true_value(
response.getheader('x-backend-accept-no-commit', False)):
# fall back to legacy behaviour if receiver does not understand
# X-Backend-Commit

View File

@ -34,7 +34,7 @@ from swift.common.utils import get_logger, renamer, write_pickle, \
dump_recon_cache, config_true_value, RateLimitedIterator, split_path, \
eventlet_monkey_patch, get_redirect_data, ContextPool, hash_path, \
non_negative_float, config_positive_int_value, non_negative_int, \
EventletRateLimiter
EventletRateLimiter, node_to_string
from swift.common.daemon import Daemon
from swift.common.header_key_dict import HeaderKeyDict
from swift.common.storage_policy import split_policy_string, PolicyError
@ -735,15 +735,13 @@ class ObjectUpdater(Daemon):
if not success:
self.logger.debug(
'Error code %(status)d is returned from remote '
'server %(ip)s: %(port)s / %(device)s',
{'status': resp.status, 'ip': node['replication_ip'],
'port': node['replication_port'],
'device': node['device']})
'server %(node)s',
{'status': resp.status,
'node': node_to_string(node, replication=True)})
return success, node['id'], redirect
except Exception:
self.logger.exception(
'ERROR with remote server '
'%(replication_ip)s:%(replication_port)s/%(device)s', node)
self.logger.exception('ERROR with remote server %s',
node_to_string(node, replication=True))
except Timeout as exc:
action = 'connecting to'
if not isinstance(exc, ConnectionTimeout):
@ -752,9 +750,8 @@ class ObjectUpdater(Daemon):
status = 499
action = 'waiting on'
self.logger.info(
'Timeout %(action)s remote server '
'%(replication_ip)s:%(replication_port)s/%(device)s: %(exc)s',
dict(node, exc=exc, action=action))
'Timeout %s remote server %s: %s',
action, node_to_string(node, replication=True), exc)
finally:
elapsed = time.time() - start
self.logger.timing('updater.timing.status.%s' % status,

View File

@ -34,7 +34,7 @@ from swift.common.ring import Ring
from swift.common.utils import Watchdog, get_logger, \
get_remote_client, split_path, config_true_value, generate_trans_id, \
affinity_key_function, affinity_locality_predicate, list_from_csv, \
parse_prefixed_conf, config_auto_int_value, \
parse_prefixed_conf, config_auto_int_value, node_to_string, \
config_request_node_count_value, config_percent_value
from swift.common.registry import register_swift_info
from swift.common.constraints import check_utf8, valid_api_version
@ -647,7 +647,7 @@ class Application(object):
self.node_timings[node['ip']] = (timing, now + self.timing_expiry)
def _error_limit_node_key(self, node):
return "{ip}:{port}/{device}".format(**node)
return node_to_string(node)
def error_limited(self, node):
"""
@ -669,7 +669,7 @@ class Application(object):
limited = error_stats['errors'] > self.error_suppression_limit
if limited:
self.logger.debug(
'Node error limited %(ip)s:%(port)s (%(device)s)', node)
'Node error limited: %s', node_to_string(node))
return limited
def error_limit(self, node, msg):
@ -686,9 +686,8 @@ class Application(object):
error_stats = self._error_limiting.setdefault(node_key, {})
error_stats['errors'] = self.error_suppression_limit + 1
error_stats['last_error'] = time()
self.logger.error('%(msg)s %(ip)s:%(port)s/%(device)s', {
'msg': msg, 'ip': node['ip'],
'port': node['port'], 'device': node['device']})
self.logger.error('%(msg)s %(node)s',
{'msg': msg, 'node': node_to_string(node)})
def _incr_node_errors(self, node):
node_key = self._error_limit_node_key(node)
@ -706,9 +705,8 @@ class Application(object):
self._incr_node_errors(node)
if isinstance(msg, bytes):
msg = msg.decode('utf-8')
self.logger.error('%(msg)s %(ip)s:%(port)s/%(device)s', {
'msg': msg, 'ip': node['ip'],
'port': node['port'], 'device': node['device']})
self.logger.error('%(msg)s %(node)s',
{'msg': msg, 'node': node_to_string(node)})
def iter_nodes(self, ring, partition, logger, node_iter=None, policy=None):
return NodeIter(self, ring, partition, logger, node_iter=node_iter,
@ -732,10 +730,9 @@ class Application(object):
log = self.logger.exception
if isinstance(additional_info, bytes):
additional_info = additional_info.decode('utf-8')
log('ERROR with %(type)s server %(ip)s:%(port)s/%(device)s'
log('ERROR with %(type)s server %(node)s'
' re: %(info)s',
{'type': typ, 'ip': node['ip'],
'port': node['port'], 'device': node['device'],
{'type': typ, 'node': node_to_string(node),
'info': additional_info},
**kwargs)

View File

@ -2208,6 +2208,41 @@ class TestUtils(unittest.TestCase):
self.assertEqual(utils.storage_directory('objects', '1', 'ABCDEF'),
'objects/1/DEF/ABCDEF')
def test_node_to_string(self):
dev = {
'id': 3,
'region': 1,
'zone': 1,
'ip': '127.0.0.1',
'port': 6200,
'replication_ip': '127.0.1.1',
'replication_port': 6400,
'device': 'sdb',
'meta': '',
'weight': 8000.0,
'index': 0,
}
self.assertEqual(utils.node_to_string(dev), '127.0.0.1:6200/sdb')
self.assertEqual(utils.node_to_string(dev, replication=True),
'127.0.1.1:6400/sdb')
dev = {
'id': 3,
'region': 1,
'zone': 1,
'ip': "fe80::0204:61ff:fe9d:f156",
'port': 6200,
'replication_ip': "fe80::0204:61ff:ff9d:1234",
'replication_port': 6400,
'device': 'sdb',
'meta': '',
'weight': 8000.0,
'index': 0,
}
self.assertEqual(utils.node_to_string(dev),
'[fe80::0204:61ff:fe9d:f156]:6200/sdb')
self.assertEqual(utils.node_to_string(dev, replication=True),
'[fe80::0204:61ff:ff9d:1234]:6400/sdb')
def test_is_valid_ip(self):
self.assertTrue(is_valid_ip("127.0.0.1"))
self.assertTrue(is_valid_ip("10.0.0.1"))

View File

@ -1742,7 +1742,7 @@ class TestObjectReplicator(unittest.TestCase):
raise_exception_rmdir(OSError, ENOTDIR)):
self.replicator.replicate()
self.assertEqual(mock_logger.get_lines_for_level('error'), [
'Unexpected error trying to cleanup suffix dir:%r: ' %
'Unexpected error trying to cleanup suffix dir %r: ' %
os.path.dirname(df._datadir),
])
self.assertFalse(os.access(whole_path_from, os.F_OK))
@ -1928,7 +1928,6 @@ class TestObjectReplicator(unittest.TestCase):
# Check incorrect http_connect with status 507 and
# count of attempts and call args
resp.status = 507
error = '%(replication_ip)s/%(device)s responded as unmounted'
expected_listdir_calls = [
mock.call(int(job['partition']),
self.replicator.replication_cycle)
@ -1948,13 +1947,16 @@ class TestObjectReplicator(unittest.TestCase):
self.replicator.update(job)
error_lines = self.logger.get_lines_for_level('error')
expected = []
error = '%s responded as unmounted'
# ... first the primaries
for node in job['nodes']:
expected.append(error % node)
node_str = utils.node_to_string(node, replication=True)
expected.append(error % node_str)
# ... then it will get handoffs
for node in job['policy'].object_ring.get_more_nodes(
int(job['partition'])):
expected.append(error % node)
node_str = utils.node_to_string(node, replication=True)
expected.append(error % node_str)
# ... and finally we get an error about running out of nodes
expected.append('Ran out of handoffs while replicating '
'partition %s of policy %d' %
@ -1978,13 +1980,16 @@ class TestObjectReplicator(unittest.TestCase):
mock_do_listdir.return_value = False
# Check incorrect http_connect with status 400 != HTTP_OK
resp.status = 400
error = 'Invalid response %(resp)s from %(ip)s'
error = 'Invalid response %(resp)s from %(node)s'
for job in jobs:
set_default(self)
self.replicator.update(job)
# ... only the primaries
expected = [error % {'resp': 400, 'ip': node['replication_ip']}
for node in job['nodes']]
expected = [
error % {
"resp": 400,
"node": utils.node_to_string(node, replication=True)}
for node in job['nodes']]
self.assertEqual(expected,
self.logger.get_lines_for_level('error'))
self.assertEqual(len(self.replicator.partition_times), 1)
@ -1994,12 +1999,13 @@ class TestObjectReplicator(unittest.TestCase):
# incorrect pickle.loads(resp.read())
resp.status = 200
resp.read.return_value = b'garbage'
expect = 'Error syncing with node: %r: '
expect = 'Error syncing with node: %s: '
for job in jobs:
set_default(self)
self.replicator.update(job)
# ... only the primaries
expected = [expect % node for node in job['nodes']]
expected = [expect % utils.node_to_string(node, replication=True)
for node in job['nodes']]
error_lines = self.logger.get_lines_for_level('error')
self.assertEqual(expected, error_lines)
self.assertEqual(len(self.replicator.partition_times), 1)