Merge "sharder: quote() more Swift paths when logging"
This commit is contained in:
commit
7862ec7b8a
@ -21,6 +21,7 @@ from random import random
|
|||||||
|
|
||||||
import os
|
import os
|
||||||
import six
|
import six
|
||||||
|
from six.moves.urllib.parse import quote
|
||||||
from eventlet import Timeout
|
from eventlet import Timeout
|
||||||
|
|
||||||
from swift.common import internal_client
|
from swift.common import internal_client
|
||||||
@ -564,13 +565,13 @@ class ContainerSharder(ContainerReplicator):
|
|||||||
params=params)
|
params=params)
|
||||||
except internal_client.UnexpectedResponse as err:
|
except internal_client.UnexpectedResponse as err:
|
||||||
self.logger.warning("Failed to get shard ranges from %s: %s",
|
self.logger.warning("Failed to get shard ranges from %s: %s",
|
||||||
broker.root_path, err)
|
quote(broker.root_path), err)
|
||||||
return None
|
return None
|
||||||
record_type = resp.headers.get('x-backend-record-type')
|
record_type = resp.headers.get('x-backend-record-type')
|
||||||
if record_type != 'shard':
|
if record_type != 'shard':
|
||||||
err = 'unexpected record type %r' % record_type
|
err = 'unexpected record type %r' % record_type
|
||||||
self.logger.error("Failed to get shard ranges from %s: %s",
|
self.logger.error("Failed to get shard ranges from %s: %s",
|
||||||
broker.root_path, err)
|
quote(broker.root_path), err)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@ -582,7 +583,7 @@ class ContainerSharder(ContainerReplicator):
|
|||||||
except (ValueError, TypeError, KeyError) as err:
|
except (ValueError, TypeError, KeyError) as err:
|
||||||
self.logger.error(
|
self.logger.error(
|
||||||
"Failed to get shard ranges from %s: invalid data: %r",
|
"Failed to get shard ranges from %s: invalid data: %r",
|
||||||
broker.root_path, err)
|
quote(broker.root_path), err)
|
||||||
return None
|
return None
|
||||||
finally:
|
finally:
|
||||||
self.logger.txn_id = None
|
self.logger.txn_id = None
|
||||||
@ -645,7 +646,7 @@ class ContainerSharder(ContainerReplicator):
|
|||||||
if not node:
|
if not node:
|
||||||
raise DeviceUnavailable(
|
raise DeviceUnavailable(
|
||||||
'No mounted devices found suitable for creating shard broker '
|
'No mounted devices found suitable for creating shard broker '
|
||||||
'for %s in partition %s' % (shard_range.name, part))
|
'for %s in partition %s' % (quote(shard_range.name), part))
|
||||||
|
|
||||||
shard_broker = ContainerBroker.create_broker(
|
shard_broker = ContainerBroker.create_broker(
|
||||||
os.path.join(self.root, node['device']), part, shard_range.account,
|
os.path.join(self.root, node['device']), part, shard_range.account,
|
||||||
@ -690,8 +691,8 @@ class ContainerSharder(ContainerReplicator):
|
|||||||
|
|
||||||
if warnings:
|
if warnings:
|
||||||
self.logger.warning(
|
self.logger.warning(
|
||||||
'Audit failed for root %s (%s): %s' %
|
'Audit failed for root %s (%s): %s',
|
||||||
(broker.db_file, broker.path, ', '.join(warnings)))
|
broker.db_file, quote(broker.path), ', '.join(warnings))
|
||||||
self._increment_stat('audit_root', 'failure', statsd=True)
|
self._increment_stat('audit_root', 'failure', statsd=True)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
@ -734,13 +735,13 @@ class ContainerSharder(ContainerReplicator):
|
|||||||
|
|
||||||
if warnings:
|
if warnings:
|
||||||
self.logger.warning(
|
self.logger.warning(
|
||||||
'Audit warnings for shard %s (%s): %s' %
|
'Audit warnings for shard %s (%s): %s',
|
||||||
(broker.db_file, broker.path, ', '.join(warnings)))
|
broker.db_file, quote(broker.path), ', '.join(warnings))
|
||||||
|
|
||||||
if errors:
|
if errors:
|
||||||
self.logger.warning(
|
self.logger.warning(
|
||||||
'Audit failed for shard %s (%s) - skipping: %s' %
|
'Audit failed for shard %s (%s) - skipping: %s',
|
||||||
(broker.db_file, broker.path, ', '.join(errors)))
|
broker.db_file, quote(broker.path), ', '.join(errors))
|
||||||
self._increment_stat('audit_shard', 'failure', statsd=True)
|
self._increment_stat('audit_shard', 'failure', statsd=True)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
@ -755,7 +756,7 @@ class ContainerSharder(ContainerReplicator):
|
|||||||
broker.empty()):
|
broker.empty()):
|
||||||
broker.delete_db(Timestamp.now().internal)
|
broker.delete_db(Timestamp.now().internal)
|
||||||
self.logger.debug('Deleted shard container %s (%s)',
|
self.logger.debug('Deleted shard container %s (%s)',
|
||||||
broker.db_file, broker.path)
|
broker.db_file, quote(broker.path))
|
||||||
self._increment_stat('audit_shard', 'success', statsd=True)
|
self._increment_stat('audit_shard', 'success', statsd=True)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
@ -890,7 +891,7 @@ class ContainerSharder(ContainerReplicator):
|
|||||||
if not success and responses.count(True) < quorum:
|
if not success and responses.count(True) < quorum:
|
||||||
self.logger.warning(
|
self.logger.warning(
|
||||||
'Failed to sufficiently replicate misplaced objects: %s in %s '
|
'Failed to sufficiently replicate misplaced objects: %s in %s '
|
||||||
'(not removing)', dest_shard_range, broker.path)
|
'(not removing)', dest_shard_range, quote(broker.path))
|
||||||
return False
|
return False
|
||||||
|
|
||||||
if broker.get_info()['id'] != info['id']:
|
if broker.get_info()['id'] != info['id']:
|
||||||
@ -910,7 +911,7 @@ class ContainerSharder(ContainerReplicator):
|
|||||||
if not success:
|
if not success:
|
||||||
self.logger.warning(
|
self.logger.warning(
|
||||||
'Refused to remove misplaced objects: %s in %s',
|
'Refused to remove misplaced objects: %s in %s',
|
||||||
dest_shard_range, broker.path)
|
dest_shard_range, quote(broker.path))
|
||||||
return success
|
return success
|
||||||
|
|
||||||
def _move_objects(self, src_broker, src_shard_range, policy_index,
|
def _move_objects(self, src_broker, src_shard_range, policy_index,
|
||||||
@ -953,7 +954,7 @@ class ContainerSharder(ContainerReplicator):
|
|||||||
if unplaced:
|
if unplaced:
|
||||||
self.logger.warning(
|
self.logger.warning(
|
||||||
'Failed to find destination for at least %s misplaced objects '
|
'Failed to find destination for at least %s misplaced objects '
|
||||||
'in %s' % (unplaced, src_broker.path))
|
'in %s', unplaced, quote(src_broker.path))
|
||||||
|
|
||||||
# TODO: consider executing the replication jobs concurrently
|
# TODO: consider executing the replication jobs concurrently
|
||||||
for dest_shard_range, dest_args in dest_brokers.items():
|
for dest_shard_range, dest_args in dest_brokers.items():
|
||||||
@ -1039,7 +1040,7 @@ class ContainerSharder(ContainerReplicator):
|
|||||||
their correct shard containers, False otherwise
|
their correct shard containers, False otherwise
|
||||||
"""
|
"""
|
||||||
self.logger.debug('Looking for misplaced objects in %s (%s)',
|
self.logger.debug('Looking for misplaced objects in %s (%s)',
|
||||||
broker.path, broker.db_file)
|
quote(broker.path), broker.db_file)
|
||||||
self._increment_stat('misplaced', 'attempted')
|
self._increment_stat('misplaced', 'attempted')
|
||||||
src_broker = src_broker or broker
|
src_broker = src_broker or broker
|
||||||
if src_bounds is None:
|
if src_bounds is None:
|
||||||
@ -1078,10 +1079,12 @@ class ContainerSharder(ContainerReplicator):
|
|||||||
own_shard_range = broker.get_own_shard_range()
|
own_shard_range = broker.get_own_shard_range()
|
||||||
shard_ranges = broker.get_shard_ranges()
|
shard_ranges = broker.get_shard_ranges()
|
||||||
if shard_ranges and shard_ranges[-1].upper >= own_shard_range.upper:
|
if shard_ranges and shard_ranges[-1].upper >= own_shard_range.upper:
|
||||||
self.logger.debug('Scan already completed for %s', broker.path)
|
self.logger.debug('Scan already completed for %s',
|
||||||
|
quote(broker.path))
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
self.logger.info('Starting scan for shard ranges on %s', broker.path)
|
self.logger.info('Starting scan for shard ranges on %s',
|
||||||
|
quote(broker.path))
|
||||||
self._increment_stat('scanned', 'attempted')
|
self._increment_stat('scanned', 'attempted')
|
||||||
|
|
||||||
start = time.time()
|
start = time.time()
|
||||||
@ -1138,7 +1141,7 @@ class ContainerSharder(ContainerReplicator):
|
|||||||
else:
|
else:
|
||||||
self.logger.error(
|
self.logger.error(
|
||||||
'PUT of new shard container %r failed for %s.',
|
'PUT of new shard container %r failed for %s.',
|
||||||
shard_range, broker.path)
|
shard_range, quote(broker.path))
|
||||||
self._increment_stat('created', 'failure', statsd=True)
|
self._increment_stat('created', 'failure', statsd=True)
|
||||||
# break, not continue, because elsewhere it is assumed that
|
# break, not continue, because elsewhere it is assumed that
|
||||||
# finding and cleaving shard ranges progresses linearly, so we
|
# finding and cleaving shard ranges progresses linearly, so we
|
||||||
@ -1159,8 +1162,9 @@ class ContainerSharder(ContainerReplicator):
|
|||||||
|
|
||||||
def _cleave_shard_range(self, broker, cleaving_context, shard_range):
|
def _cleave_shard_range(self, broker, cleaving_context, shard_range):
|
||||||
self.logger.info("Cleaving '%s' from row %s into %s for %r",
|
self.logger.info("Cleaving '%s' from row %s into %s for %r",
|
||||||
broker.path, cleaving_context.last_cleave_to_row,
|
quote(broker.path),
|
||||||
shard_range.name, shard_range)
|
cleaving_context.last_cleave_to_row,
|
||||||
|
quote(shard_range.name), shard_range)
|
||||||
self._increment_stat('cleaved', 'attempted')
|
self._increment_stat('cleaved', 'attempted')
|
||||||
start = time.time()
|
start = time.time()
|
||||||
policy_index = broker.storage_policy_index
|
policy_index = broker.storage_policy_index
|
||||||
@ -1194,7 +1198,7 @@ class ContainerSharder(ContainerReplicator):
|
|||||||
shard_broker.merge_items(objects)
|
shard_broker.merge_items(objects)
|
||||||
if objects is None:
|
if objects is None:
|
||||||
self.logger.info("Cleaving '%s': %r - zero objects found",
|
self.logger.info("Cleaving '%s': %r - zero objects found",
|
||||||
broker.path, shard_range)
|
quote(broker.path), shard_range)
|
||||||
if shard_broker.get_info()['put_timestamp'] == put_timestamp:
|
if shard_broker.get_info()['put_timestamp'] == put_timestamp:
|
||||||
# This was just created; don't need to replicate this
|
# This was just created; don't need to replicate this
|
||||||
# SR because there was nothing there. So cleanup and
|
# SR because there was nothing there. So cleanup and
|
||||||
@ -1226,7 +1230,7 @@ class ContainerSharder(ContainerReplicator):
|
|||||||
source_broker.get_syncs())
|
source_broker.get_syncs())
|
||||||
else:
|
else:
|
||||||
self.logger.debug("Cleaving '%s': %r - shard db already in sync",
|
self.logger.debug("Cleaving '%s': %r - shard db already in sync",
|
||||||
broker.path, shard_range)
|
quote(broker.path), shard_range)
|
||||||
|
|
||||||
replication_quorum = self.existing_shard_replication_quorum
|
replication_quorum = self.existing_shard_replication_quorum
|
||||||
if shard_range.includes(own_shard_range):
|
if shard_range.includes(own_shard_range):
|
||||||
@ -1253,7 +1257,7 @@ class ContainerSharder(ContainerReplicator):
|
|||||||
|
|
||||||
self.logger.info(
|
self.logger.info(
|
||||||
'Replicating new shard container %s for %s',
|
'Replicating new shard container %s for %s',
|
||||||
shard_broker.path, shard_broker.get_own_shard_range())
|
quote(shard_broker.path), shard_broker.get_own_shard_range())
|
||||||
|
|
||||||
success, responses = self._replicate_object(
|
success, responses = self._replicate_object(
|
||||||
shard_part, shard_broker.db_file, node_id)
|
shard_part, shard_broker.db_file, node_id)
|
||||||
@ -1266,7 +1270,7 @@ class ContainerSharder(ContainerReplicator):
|
|||||||
# until each shard range has been successfully cleaved
|
# until each shard range has been successfully cleaved
|
||||||
self.logger.warning(
|
self.logger.warning(
|
||||||
'Failed to sufficiently replicate cleaved shard %s for %s: '
|
'Failed to sufficiently replicate cleaved shard %s for %s: '
|
||||||
'%s successes, %s required.', shard_range, broker.path,
|
'%s successes, %s required.', shard_range, quote(broker.path),
|
||||||
replication_successes, replication_quorum)
|
replication_successes, replication_quorum)
|
||||||
self._increment_stat('cleaved', 'failure', statsd=True)
|
self._increment_stat('cleaved', 'failure', statsd=True)
|
||||||
return CLEAVE_FAILED
|
return CLEAVE_FAILED
|
||||||
@ -1284,7 +1288,7 @@ class ContainerSharder(ContainerReplicator):
|
|||||||
cleaving_context.store(broker)
|
cleaving_context.store(broker)
|
||||||
self.logger.info(
|
self.logger.info(
|
||||||
'Cleaved %s for shard range %s in %gs.',
|
'Cleaved %s for shard range %s in %gs.',
|
||||||
broker.path, shard_range, elapsed)
|
quote(broker.path), shard_range, elapsed)
|
||||||
self._increment_stat('cleaved', 'success', statsd=True)
|
self._increment_stat('cleaved', 'success', statsd=True)
|
||||||
return CLEAVE_SUCCESS
|
return CLEAVE_SUCCESS
|
||||||
|
|
||||||
@ -1292,8 +1296,8 @@ class ContainerSharder(ContainerReplicator):
|
|||||||
# Returns True if misplaced objects have been moved and the entire
|
# Returns True if misplaced objects have been moved and the entire
|
||||||
# container namespace has been successfully cleaved, False otherwise
|
# container namespace has been successfully cleaved, False otherwise
|
||||||
if broker.is_sharded():
|
if broker.is_sharded():
|
||||||
self.logger.debug('Passing over already sharded container %s/%s',
|
self.logger.debug('Passing over already sharded container %s',
|
||||||
broker.account, broker.container)
|
quote(broker.path))
|
||||||
return True
|
return True
|
||||||
|
|
||||||
cleaving_context = CleavingContext.load(broker)
|
cleaving_context = CleavingContext.load(broker)
|
||||||
@ -1303,7 +1307,7 @@ class ContainerSharder(ContainerReplicator):
|
|||||||
# the *retiring* db.
|
# the *retiring* db.
|
||||||
self.logger.debug(
|
self.logger.debug(
|
||||||
'Moving any misplaced objects from sharding container: %s',
|
'Moving any misplaced objects from sharding container: %s',
|
||||||
broker.path)
|
quote(broker.path))
|
||||||
bounds = self._make_default_misplaced_object_bounds(broker)
|
bounds = self._make_default_misplaced_object_bounds(broker)
|
||||||
cleaving_context.misplaced_done = self._move_misplaced_objects(
|
cleaving_context.misplaced_done = self._move_misplaced_objects(
|
||||||
broker, src_broker=broker.get_brokers()[0],
|
broker, src_broker=broker.get_brokers()[0],
|
||||||
@ -1312,7 +1316,7 @@ class ContainerSharder(ContainerReplicator):
|
|||||||
|
|
||||||
if cleaving_context.cleaving_done:
|
if cleaving_context.cleaving_done:
|
||||||
self.logger.debug('Cleaving already complete for container %s',
|
self.logger.debug('Cleaving already complete for container %s',
|
||||||
broker.path)
|
quote(broker.path))
|
||||||
return cleaving_context.misplaced_done
|
return cleaving_context.misplaced_done
|
||||||
|
|
||||||
ranges_todo = broker.get_shard_ranges(marker=cleaving_context.marker)
|
ranges_todo = broker.get_shard_ranges(marker=cleaving_context.marker)
|
||||||
@ -1323,12 +1327,12 @@ class ContainerSharder(ContainerReplicator):
|
|||||||
self.logger.debug('Continuing to cleave (%s done, %s todo): %s',
|
self.logger.debug('Continuing to cleave (%s done, %s todo): %s',
|
||||||
cleaving_context.ranges_done,
|
cleaving_context.ranges_done,
|
||||||
cleaving_context.ranges_todo,
|
cleaving_context.ranges_todo,
|
||||||
broker.path)
|
quote(broker.path))
|
||||||
else:
|
else:
|
||||||
cleaving_context.start()
|
cleaving_context.start()
|
||||||
cleaving_context.ranges_todo = len(ranges_todo)
|
cleaving_context.ranges_todo = len(ranges_todo)
|
||||||
self.logger.debug('Starting to cleave (%s todo): %s',
|
self.logger.debug('Starting to cleave (%s todo): %s',
|
||||||
cleaving_context.ranges_todo, broker.path)
|
cleaving_context.ranges_todo, quote(broker.path))
|
||||||
|
|
||||||
ranges_done = []
|
ranges_done = []
|
||||||
for shard_range in ranges_todo:
|
for shard_range in ranges_todo:
|
||||||
@ -1357,7 +1361,8 @@ class ContainerSharder(ContainerReplicator):
|
|||||||
# sure we *also* do that if we hit a failure right off the bat
|
# sure we *also* do that if we hit a failure right off the bat
|
||||||
cleaving_context.store(broker)
|
cleaving_context.store(broker)
|
||||||
self.logger.debug(
|
self.logger.debug(
|
||||||
'Cleaved %s shard ranges for %s', len(ranges_done), broker.path)
|
'Cleaved %s shard ranges for %s',
|
||||||
|
len(ranges_done), quote(broker.path))
|
||||||
return (cleaving_context.misplaced_done and
|
return (cleaving_context.misplaced_done and
|
||||||
cleaving_context.cleaving_done)
|
cleaving_context.cleaving_done)
|
||||||
|
|
||||||
@ -1386,11 +1391,11 @@ class ContainerSharder(ContainerReplicator):
|
|||||||
else:
|
else:
|
||||||
self.logger.warning(
|
self.logger.warning(
|
||||||
'Failed to remove retiring db file for %s',
|
'Failed to remove retiring db file for %s',
|
||||||
broker.path)
|
quote(broker.path))
|
||||||
else:
|
else:
|
||||||
self.logger.warning(
|
self.logger.warning(
|
||||||
'Repeat cleaving required for %r with context: %s'
|
'Repeat cleaving required for %r with context: %s',
|
||||||
% (broker.db_files[0], dict(cleaving_context)))
|
broker.db_files[0], dict(cleaving_context))
|
||||||
cleaving_context.reset()
|
cleaving_context.reset()
|
||||||
cleaving_context.store(broker)
|
cleaving_context.store(broker)
|
||||||
|
|
||||||
@ -1400,14 +1405,14 @@ class ContainerSharder(ContainerReplicator):
|
|||||||
candidates = find_sharding_candidates(
|
candidates = find_sharding_candidates(
|
||||||
broker, self.shard_container_threshold, shard_ranges)
|
broker, self.shard_container_threshold, shard_ranges)
|
||||||
if candidates:
|
if candidates:
|
||||||
self.logger.debug('Identified %s sharding candidates'
|
self.logger.debug('Identified %s sharding candidates',
|
||||||
% len(candidates))
|
len(candidates))
|
||||||
broker.merge_shard_ranges(candidates)
|
broker.merge_shard_ranges(candidates)
|
||||||
|
|
||||||
def _find_and_enable_shrinking_candidates(self, broker):
|
def _find_and_enable_shrinking_candidates(self, broker):
|
||||||
if not broker.is_sharded():
|
if not broker.is_sharded():
|
||||||
self.logger.warning('Cannot shrink a not yet sharded container %s',
|
self.logger.warning('Cannot shrink a not yet sharded container %s',
|
||||||
broker.path)
|
quote(broker.path))
|
||||||
return
|
return
|
||||||
|
|
||||||
merge_pairs = find_shrinking_candidates(
|
merge_pairs = find_shrinking_candidates(
|
||||||
@ -1456,7 +1461,7 @@ class ContainerSharder(ContainerReplicator):
|
|||||||
broker.get_info() # make sure account/container are populated
|
broker.get_info() # make sure account/container are populated
|
||||||
state = broker.get_db_state()
|
state = broker.get_db_state()
|
||||||
self.logger.debug('Starting processing %s state %s',
|
self.logger.debug('Starting processing %s state %s',
|
||||||
broker.path, state)
|
quote(broker.path), state)
|
||||||
|
|
||||||
if not self._audit_container(broker):
|
if not self._audit_container(broker):
|
||||||
return
|
return
|
||||||
@ -1492,8 +1497,8 @@ class ContainerSharder(ContainerReplicator):
|
|||||||
else:
|
else:
|
||||||
self.logger.debug(
|
self.logger.debug(
|
||||||
'Own shard range in state %r but no shard ranges '
|
'Own shard range in state %r but no shard ranges '
|
||||||
'and not leader; remaining unsharded: %s'
|
'and not leader; remaining unsharded: %s',
|
||||||
% (own_shard_range.state_text, broker.path))
|
own_shard_range.state_text, quote(broker.path))
|
||||||
|
|
||||||
if state == SHARDING:
|
if state == SHARDING:
|
||||||
if is_leader:
|
if is_leader:
|
||||||
@ -1512,13 +1517,14 @@ class ContainerSharder(ContainerReplicator):
|
|||||||
cleave_complete = self._cleave(broker)
|
cleave_complete = self._cleave(broker)
|
||||||
|
|
||||||
if cleave_complete:
|
if cleave_complete:
|
||||||
self.logger.info('Completed cleaving of %s', broker.path)
|
self.logger.info('Completed cleaving of %s',
|
||||||
|
quote(broker.path))
|
||||||
if self._complete_sharding(broker):
|
if self._complete_sharding(broker):
|
||||||
state = SHARDED
|
state = SHARDED
|
||||||
self._increment_stat('visited', 'completed', statsd=True)
|
self._increment_stat('visited', 'completed', statsd=True)
|
||||||
else:
|
else:
|
||||||
self.logger.debug('Remaining in sharding state %s',
|
self.logger.debug('Remaining in sharding state %s',
|
||||||
broker.path)
|
quote(broker.path))
|
||||||
|
|
||||||
if state == SHARDED and broker.is_root_container():
|
if state == SHARDED and broker.is_root_container():
|
||||||
if is_leader:
|
if is_leader:
|
||||||
@ -1539,9 +1545,8 @@ class ContainerSharder(ContainerReplicator):
|
|||||||
# simultaneously become deleted.
|
# simultaneously become deleted.
|
||||||
self._update_root_container(broker)
|
self._update_root_container(broker)
|
||||||
|
|
||||||
self.logger.debug('Finished processing %s/%s state %s',
|
self.logger.debug('Finished processing %s state %s',
|
||||||
broker.account, broker.container,
|
quote(broker.path), broker.get_db_state())
|
||||||
broker.get_db_state())
|
|
||||||
|
|
||||||
def _one_shard_cycle(self, devices_to_shard, partitions_to_shard):
|
def _one_shard_cycle(self, devices_to_shard, partitions_to_shard):
|
||||||
"""
|
"""
|
||||||
|
Loading…
Reference in New Issue
Block a user