Merge "Enable Object Replicator's failure count in recon"
This commit is contained in:
commit
2d41ff7b45
@ -549,12 +549,16 @@ Request URI Description
|
||||
/recon/sockstat returns consumable info from /proc/net/sockstat|6
|
||||
/recon/devices returns list of devices and devices dir i.e. /srv/node
|
||||
/recon/async returns count of async pending
|
||||
/recon/replication returns object replication times (for backward compatibility)
|
||||
/recon/replication returns object replication info (for backward compatibility)
|
||||
/recon/replication/<type> returns replication info for given type (account, container, object)
|
||||
/recon/auditor/<type> returns auditor stats on last reported scan for given type (account, container, object)
|
||||
/recon/updater/<type> returns last updater sweep times for given type (container, object)
|
||||
========================= ========================================================================================
|
||||
|
||||
Note that 'object_replication_last' and 'object_replication_time' in object
|
||||
replication info are considered to be transitional and will be removed in
|
||||
the subsequent releases. Use 'replication_last' and 'replication_time' instead.
|
||||
|
||||
This information can also be queried via the swift-recon command line utility::
|
||||
|
||||
fhines@ubuntu:~$ swift-recon -h
|
||||
|
71
swift/cli/recon.py
Executable file → Normal file
71
swift/cli/recon.py
Executable file → Normal file
@ -460,12 +460,14 @@ class SwiftRecon(object):
|
||||
recon.scout, hosts):
|
||||
if status == 200:
|
||||
stats['replication_time'].append(
|
||||
response.get('replication_time'))
|
||||
repl_stats = response['replication_stats']
|
||||
response.get('replication_time',
|
||||
response.get('object_replication_time', 0)))
|
||||
repl_stats = response.get('replication_stats')
|
||||
if repl_stats:
|
||||
for stat_key in ['attempted', 'failure', 'success']:
|
||||
stats[stat_key].append(repl_stats.get(stat_key))
|
||||
last = response.get('replication_last', 0)
|
||||
last = response.get('replication_last',
|
||||
response.get('object_replication_last', 0))
|
||||
if last < least_recent_time:
|
||||
least_recent_time = last
|
||||
least_recent_url = url
|
||||
@ -506,62 +508,6 @@ class SwiftRecon(object):
|
||||
elapsed, elapsed_unit, host))
|
||||
print("=" * 79)
|
||||
|
||||
def object_replication_check(self, hosts):
|
||||
"""
|
||||
Obtain and print replication statistics from object servers
|
||||
|
||||
:param hosts: set of hosts to check. in the format of:
|
||||
set([('127.0.0.1', 6020), ('127.0.0.2', 6030)])
|
||||
"""
|
||||
stats = {}
|
||||
recon = Scout("replication", self.verbose, self.suppress_errors,
|
||||
self.timeout)
|
||||
print("[%s] Checking on replication" % self._ptime())
|
||||
least_recent_time = 9999999999
|
||||
least_recent_url = None
|
||||
most_recent_time = 0
|
||||
most_recent_url = None
|
||||
for url, response, status, ts_start, ts_end in self.pool.imap(
|
||||
recon.scout, hosts):
|
||||
if status == 200:
|
||||
stats[url] = response['object_replication_time']
|
||||
last = response.get('object_replication_last', 0)
|
||||
if last < least_recent_time:
|
||||
least_recent_time = last
|
||||
least_recent_url = url
|
||||
if last > most_recent_time:
|
||||
most_recent_time = last
|
||||
most_recent_url = url
|
||||
times = [x for x in stats.values() if x is not None]
|
||||
if len(stats) > 0 and len(times) > 0:
|
||||
computed = self._gen_stats(times, 'replication_time')
|
||||
if computed['reported'] > 0:
|
||||
self._print_stats(computed)
|
||||
else:
|
||||
print("[replication_time] - No hosts returned valid data.")
|
||||
else:
|
||||
print("[replication_time] - No hosts returned valid data.")
|
||||
if least_recent_url is not None:
|
||||
host = urlparse(least_recent_url).netloc
|
||||
if not least_recent_time:
|
||||
print('Oldest completion was NEVER by %s.' % host)
|
||||
else:
|
||||
elapsed = time.time() - least_recent_time
|
||||
elapsed, elapsed_unit = seconds2timeunit(elapsed)
|
||||
print('Oldest completion was %s (%d %s ago) by %s.' % (
|
||||
time.strftime('%Y-%m-%d %H:%M:%S',
|
||||
time.gmtime(least_recent_time)),
|
||||
elapsed, elapsed_unit, host))
|
||||
if most_recent_url is not None:
|
||||
host = urlparse(most_recent_url).netloc
|
||||
elapsed = time.time() - most_recent_time
|
||||
elapsed, elapsed_unit = seconds2timeunit(elapsed)
|
||||
print('Most recent completion was %s (%d %s ago) by %s.' % (
|
||||
time.strftime('%Y-%m-%d %H:%M:%S',
|
||||
time.gmtime(most_recent_time)),
|
||||
elapsed, elapsed_unit, host))
|
||||
print("=" * 79)
|
||||
|
||||
def updater_check(self, hosts):
|
||||
"""
|
||||
Obtain and print updater statistics
|
||||
@ -1072,7 +1018,7 @@ class SwiftRecon(object):
|
||||
if options.all:
|
||||
if self.server_type == 'object':
|
||||
self.async_check(hosts)
|
||||
self.object_replication_check(hosts)
|
||||
self.replication_check(hosts)
|
||||
self.object_auditor_check(hosts)
|
||||
self.updater_check(hosts)
|
||||
self.expirer_check(hosts)
|
||||
@ -1102,10 +1048,7 @@ class SwiftRecon(object):
|
||||
if options.unmounted:
|
||||
self.umount_check(hosts)
|
||||
if options.replication:
|
||||
if self.server_type == 'object':
|
||||
self.object_replication_check(hosts)
|
||||
else:
|
||||
self.replication_check(hosts)
|
||||
self.replication_check(hosts)
|
||||
if options.auditor:
|
||||
if self.server_type == 'object':
|
||||
self.object_auditor_check(hosts)
|
||||
|
@ -187,7 +187,8 @@ class Replicator(Daemon):
|
||||
self.stats = {'attempted': 0, 'success': 0, 'failure': 0, 'ts_repl': 0,
|
||||
'no_change': 0, 'hashmatch': 0, 'rsync': 0, 'diff': 0,
|
||||
'remove': 0, 'empty': 0, 'remote_merge': 0,
|
||||
'start': time.time(), 'diff_capped': 0}
|
||||
'start': time.time(), 'diff_capped': 0,
|
||||
'failure_nodes': {}}
|
||||
|
||||
def _report_stats(self):
|
||||
"""Report the current stats to the logs."""
|
||||
@ -212,6 +213,13 @@ class Replicator(Daemon):
|
||||
('no_change', 'hashmatch', 'rsync', 'diff', 'ts_repl',
|
||||
'empty', 'diff_capped')]))
|
||||
|
||||
def _add_failure_stats(self, failure_devs_info):
|
||||
for node, dev in failure_devs_info:
|
||||
self.stats['failure'] += 1
|
||||
failure_devs = self.stats['failure_nodes'].setdefault(node, {})
|
||||
failure_devs.setdefault(dev, 0)
|
||||
failure_devs[dev] += 1
|
||||
|
||||
def _rsync_file(self, db_file, remote_file, whole_file=True,
|
||||
different_region=False):
|
||||
"""
|
||||
@ -479,7 +487,10 @@ class Replicator(Daemon):
|
||||
quarantine_db(broker.db_file, broker.db_type)
|
||||
else:
|
||||
self.logger.exception(_('ERROR reading db %s'), object_file)
|
||||
self.stats['failure'] += 1
|
||||
nodes = self.ring.get_part_nodes(int(partition))
|
||||
self._add_failure_stats([(failure_dev['replication_ip'],
|
||||
failure_dev['device'])
|
||||
for failure_dev in nodes])
|
||||
self.logger.increment('failures')
|
||||
return
|
||||
# The db is considered deleted if the delete_timestamp value is greater
|
||||
@ -494,6 +505,7 @@ class Replicator(Daemon):
|
||||
self.logger.timing_since('timing', start_time)
|
||||
return
|
||||
responses = []
|
||||
failure_devs_info = set()
|
||||
nodes = self.ring.get_part_nodes(int(partition))
|
||||
local_dev = None
|
||||
for node in nodes:
|
||||
@ -532,7 +544,8 @@ class Replicator(Daemon):
|
||||
self.logger.exception(_('ERROR syncing %(file)s with node'
|
||||
' %(node)s'),
|
||||
{'file': object_file, 'node': node})
|
||||
self.stats['success' if success else 'failure'] += 1
|
||||
if not success:
|
||||
failure_devs_info.add((node['replication_ip'], node['device']))
|
||||
self.logger.increment('successes' if success else 'failures')
|
||||
responses.append(success)
|
||||
try:
|
||||
@ -543,7 +556,17 @@ class Replicator(Daemon):
|
||||
if not shouldbehere and all(responses):
|
||||
# If the db shouldn't be on this node and has been successfully
|
||||
# synced to all of its peers, it can be removed.
|
||||
self.delete_db(broker)
|
||||
if not self.delete_db(broker):
|
||||
failure_devs_info.update(
|
||||
[(failure_dev['replication_ip'], failure_dev['device'])
|
||||
for failure_dev in repl_nodes])
|
||||
|
||||
target_devs_info = set([(target_dev['replication_ip'],
|
||||
target_dev['device'])
|
||||
for target_dev in repl_nodes])
|
||||
self.stats['success'] += len(target_devs_info - failure_devs_info)
|
||||
self._add_failure_stats(failure_devs_info)
|
||||
|
||||
self.logger.timing_since('timing', start_time)
|
||||
|
||||
def delete_db(self, broker):
|
||||
@ -558,9 +581,11 @@ class Replicator(Daemon):
|
||||
if err.errno not in (errno.ENOENT, errno.ENOTEMPTY):
|
||||
self.logger.exception(
|
||||
_('ERROR while trying to clean up %s') % suf_dir)
|
||||
return False
|
||||
self.stats['remove'] += 1
|
||||
device_name = self.extract_device(object_file)
|
||||
self.logger.increment('removes.' + device_name)
|
||||
return True
|
||||
|
||||
def extract_device(self, object_file):
|
||||
"""
|
||||
@ -592,6 +617,10 @@ class Replicator(Daemon):
|
||||
node['replication_port']):
|
||||
if self.mount_check and not ismount(
|
||||
os.path.join(self.root, node['device'])):
|
||||
self._add_failure_stats(
|
||||
[(failure_dev['replication_ip'],
|
||||
failure_dev['device'])
|
||||
for failure_dev in self.ring.devs if failure_dev])
|
||||
self.logger.warn(
|
||||
_('Skipping %(device)s as it is not mounted') % node)
|
||||
continue
|
||||
|
@ -134,19 +134,19 @@ class ReconMiddleware(object):
|
||||
|
||||
def get_replication_info(self, recon_type):
|
||||
"""get replication info"""
|
||||
replication_list = ['replication_time',
|
||||
'replication_stats',
|
||||
'replication_last']
|
||||
if recon_type == 'account':
|
||||
return self._from_recon_cache(['replication_time',
|
||||
'replication_stats',
|
||||
'replication_last'],
|
||||
return self._from_recon_cache(replication_list,
|
||||
self.account_recon_cache)
|
||||
elif recon_type == 'container':
|
||||
return self._from_recon_cache(['replication_time',
|
||||
'replication_stats',
|
||||
'replication_last'],
|
||||
return self._from_recon_cache(replication_list,
|
||||
self.container_recon_cache)
|
||||
elif recon_type == 'object':
|
||||
return self._from_recon_cache(['object_replication_time',
|
||||
'object_replication_last'],
|
||||
replication_list += ['object_replication_time',
|
||||
'object_replication_last']
|
||||
return self._from_recon_cache(replication_list,
|
||||
self.object_recon_cache)
|
||||
else:
|
||||
return None
|
||||
|
@ -105,6 +105,30 @@ class ObjectReplicator(Daemon):
|
||||
' normal rebalance')
|
||||
self._diskfile_mgr = DiskFileManager(conf, self.logger)
|
||||
|
||||
def _zero_stats(self):
|
||||
"""Zero out the stats."""
|
||||
self.stats = {'attempted': 0, 'success': 0, 'failure': 0,
|
||||
'hashmatch': 0, 'rsync': 0, 'remove': 0,
|
||||
'start': time.time(), 'failure_nodes': {}}
|
||||
|
||||
def _add_failure_stats(self, failure_devs_info):
|
||||
for node, dev in failure_devs_info:
|
||||
self.stats['failure'] += 1
|
||||
failure_devs = self.stats['failure_nodes'].setdefault(node, {})
|
||||
failure_devs.setdefault(dev, 0)
|
||||
failure_devs[dev] += 1
|
||||
|
||||
def _get_my_replication_ips(self):
|
||||
my_replication_ips = set()
|
||||
ips = whataremyips()
|
||||
for policy in POLICIES:
|
||||
self.load_object_ring(policy)
|
||||
for local_dev in [dev for dev in policy.object_ring.devs
|
||||
if dev and dev['replication_ip'] in ips and
|
||||
dev['replication_port'] == self.port]:
|
||||
my_replication_ips.add(local_dev['replication_ip'])
|
||||
return list(my_replication_ips)
|
||||
|
||||
# Just exists for doc anchor point
|
||||
def sync(self, node, job, suffixes, *args, **kwargs):
|
||||
"""
|
||||
@ -247,6 +271,7 @@ class ObjectReplicator(Daemon):
|
||||
self.replication_count += 1
|
||||
self.logger.increment('partition.delete.count.%s' % (job['device'],))
|
||||
self.headers['X-Backend-Storage-Policy-Index'] = int(job['policy'])
|
||||
failure_devs_info = set()
|
||||
begin = time.time()
|
||||
try:
|
||||
responses = []
|
||||
@ -255,6 +280,7 @@ class ObjectReplicator(Daemon):
|
||||
delete_objs = None
|
||||
if suffixes:
|
||||
for node in job['nodes']:
|
||||
self.stats['rsync'] += 1
|
||||
kwargs = {}
|
||||
if node['region'] in synced_remote_regions and \
|
||||
self.conf.get('sync_method', 'rsync') == 'ssync':
|
||||
@ -275,6 +301,9 @@ class ObjectReplicator(Daemon):
|
||||
if node['region'] != job['region']:
|
||||
synced_remote_regions[node['region']] = \
|
||||
candidates.keys()
|
||||
else:
|
||||
failure_devs_info.add((node['replication_ip'],
|
||||
node['device']))
|
||||
responses.append(success)
|
||||
for region, cand_objs in synced_remote_regions.items():
|
||||
if delete_objs is None:
|
||||
@ -290,11 +319,23 @@ class ObjectReplicator(Daemon):
|
||||
delete_handoff = len(responses) == len(job['nodes']) and \
|
||||
all(responses)
|
||||
if delete_handoff:
|
||||
self.stats['remove'] += 1
|
||||
if (self.conf.get('sync_method', 'rsync') == 'ssync' and
|
||||
delete_objs is not None):
|
||||
self.logger.info(_("Removing %s objects"),
|
||||
len(delete_objs))
|
||||
self.delete_handoff_objs(job, delete_objs)
|
||||
_junk, error_paths = self.delete_handoff_objs(
|
||||
job, delete_objs)
|
||||
# if replication works for a hand-off device and it faild,
|
||||
# the remote devices which are target of the replication
|
||||
# from the hand-off device will be marked. Because cleanup
|
||||
# after replication failed means replicator needs to
|
||||
# replicate again with the same info.
|
||||
if error_paths:
|
||||
failure_devs_info.update(
|
||||
[(failure_dev['replication_ip'],
|
||||
failure_dev['device'])
|
||||
for failure_dev in job['nodes']])
|
||||
else:
|
||||
self.delete_partition(job['path'])
|
||||
elif not suffixes:
|
||||
@ -302,14 +343,21 @@ class ObjectReplicator(Daemon):
|
||||
except (Exception, Timeout):
|
||||
self.logger.exception(_("Error syncing handoff partition"))
|
||||
finally:
|
||||
target_devs_info = set([(target_dev['replication_ip'],
|
||||
target_dev['device'])
|
||||
for target_dev in job['nodes']])
|
||||
self.stats['success'] += len(target_devs_info - failure_devs_info)
|
||||
self._add_failure_stats(failure_devs_info)
|
||||
self.partition_times.append(time.time() - begin)
|
||||
self.logger.timing_since('partition.delete.timing', begin)
|
||||
|
||||
def delete_partition(self, path):
|
||||
self.logger.info(_("Removing partition: %s"), path)
|
||||
tpool.execute(shutil.rmtree, path, ignore_errors=True)
|
||||
tpool.execute(shutil.rmtree, path)
|
||||
|
||||
def delete_handoff_objs(self, job, delete_objs):
|
||||
success_paths = []
|
||||
error_paths = []
|
||||
for object_hash in delete_objs:
|
||||
object_path = storage_directory(job['obj_path'], job['partition'],
|
||||
object_hash)
|
||||
@ -317,11 +365,14 @@ class ObjectReplicator(Daemon):
|
||||
suffix_dir = dirname(object_path)
|
||||
try:
|
||||
os.rmdir(suffix_dir)
|
||||
success_paths.append(object_path)
|
||||
except OSError as e:
|
||||
if e.errno not in (errno.ENOENT, errno.ENOTEMPTY):
|
||||
error_paths.append(object_path)
|
||||
self.logger.exception(
|
||||
"Unexpected error trying to cleanup suffix dir:%r",
|
||||
suffix_dir)
|
||||
return success_paths, error_paths
|
||||
|
||||
def update(self, job):
|
||||
"""
|
||||
@ -332,6 +383,8 @@ class ObjectReplicator(Daemon):
|
||||
self.replication_count += 1
|
||||
self.logger.increment('partition.update.count.%s' % (job['device'],))
|
||||
self.headers['X-Backend-Storage-Policy-Index'] = int(job['policy'])
|
||||
target_devs_info = set()
|
||||
failure_devs_info = set()
|
||||
begin = time.time()
|
||||
try:
|
||||
hashed, local_hash = tpool_reraise(
|
||||
@ -350,6 +403,7 @@ class ObjectReplicator(Daemon):
|
||||
while attempts_left > 0:
|
||||
# If this throws StopIteration it will be caught way below
|
||||
node = next(nodes)
|
||||
target_devs_info.add((node['replication_ip'], node['device']))
|
||||
attempts_left -= 1
|
||||
# if we have already synced to this remote region,
|
||||
# don't sync again on this replication pass
|
||||
@ -365,12 +419,16 @@ class ObjectReplicator(Daemon):
|
||||
self.logger.error(_('%(ip)s/%(device)s responded'
|
||||
' as unmounted'), node)
|
||||
attempts_left += 1
|
||||
failure_devs_info.add((node['replication_ip'],
|
||||
node['device']))
|
||||
continue
|
||||
if resp.status != HTTP_OK:
|
||||
self.logger.error(_("Invalid response %(resp)s "
|
||||
"from %(ip)s"),
|
||||
{'resp': resp.status,
|
||||
'ip': node['replication_ip']})
|
||||
failure_devs_info.add((node['replication_ip'],
|
||||
node['device']))
|
||||
continue
|
||||
remote_hash = pickle.loads(resp.read())
|
||||
del resp
|
||||
@ -378,6 +436,7 @@ class ObjectReplicator(Daemon):
|
||||
local_hash[suffix] !=
|
||||
remote_hash.get(suffix, -1)]
|
||||
if not suffixes:
|
||||
self.stats['hashmatch'] += 1
|
||||
continue
|
||||
hashed, recalc_hash = tpool_reraise(
|
||||
self._diskfile_mgr._get_hashes,
|
||||
@ -388,6 +447,7 @@ class ObjectReplicator(Daemon):
|
||||
suffixes = [suffix for suffix in local_hash if
|
||||
local_hash[suffix] !=
|
||||
remote_hash.get(suffix, -1)]
|
||||
self.stats['rsync'] += 1
|
||||
success, _junk = self.sync(node, job, suffixes)
|
||||
with Timeout(self.http_timeout):
|
||||
conn = http_connect(
|
||||
@ -396,18 +456,26 @@ class ObjectReplicator(Daemon):
|
||||
'/' + '-'.join(suffixes),
|
||||
headers=self.headers)
|
||||
conn.getresponse().read()
|
||||
if not success:
|
||||
failure_devs_info.add((node['replication_ip'],
|
||||
node['device']))
|
||||
# add only remote region when replicate succeeded
|
||||
if success and node['region'] != job['region']:
|
||||
synced_remote_regions.add(node['region'])
|
||||
self.suffix_sync += len(suffixes)
|
||||
self.logger.update_stats('suffix.syncs', len(suffixes))
|
||||
except (Exception, Timeout):
|
||||
failure_devs_info.add((node['replication_ip'],
|
||||
node['device']))
|
||||
self.logger.exception(_("Error syncing with node: %s") %
|
||||
node)
|
||||
self.suffix_count += len(local_hash)
|
||||
except (Exception, Timeout):
|
||||
failure_devs_info.update(target_devs_info)
|
||||
self.logger.exception(_("Error syncing partition"))
|
||||
finally:
|
||||
self.stats['success'] += len(target_devs_info - failure_devs_info)
|
||||
self._add_failure_stats(failure_devs_info)
|
||||
self.partition_times.append(time.time() - begin)
|
||||
self.logger.timing_since('partition.update.timing', begin)
|
||||
|
||||
@ -485,6 +553,9 @@ class ObjectReplicator(Daemon):
|
||||
using replication style storage policy
|
||||
"""
|
||||
jobs = []
|
||||
self.all_devs_info.update(
|
||||
[(dev['replication_ip'], dev['device'])
|
||||
for dev in policy.object_ring.devs if dev])
|
||||
data_dir = get_data_dir(policy)
|
||||
for local_dev in [dev for dev in policy.object_ring.devs
|
||||
if (dev
|
||||
@ -498,6 +569,11 @@ class ObjectReplicator(Daemon):
|
||||
obj_path = join(dev_path, data_dir)
|
||||
tmp_path = join(dev_path, get_tmp_dir(policy))
|
||||
if self.mount_check and not ismount(dev_path):
|
||||
self._add_failure_stats(
|
||||
[(failure_dev['replication_ip'],
|
||||
failure_dev['device'])
|
||||
for failure_dev in policy.object_ring.devs
|
||||
if failure_dev])
|
||||
self.logger.warn(_('%s is not mounted'), local_dev['device'])
|
||||
continue
|
||||
unlink_older_than(tmp_path, time.time() - self.reclaim_age)
|
||||
@ -512,6 +588,7 @@ class ObjectReplicator(Daemon):
|
||||
and partition not in override_partitions):
|
||||
continue
|
||||
|
||||
part_nodes = None
|
||||
try:
|
||||
job_path = join(obj_path, partition)
|
||||
part_nodes = policy.object_ring.get_part_nodes(
|
||||
@ -528,6 +605,17 @@ class ObjectReplicator(Daemon):
|
||||
partition=partition,
|
||||
region=local_dev['region']))
|
||||
except ValueError:
|
||||
if part_nodes:
|
||||
self._add_failure_stats(
|
||||
[(failure_dev['replication_ip'],
|
||||
failure_dev['device'])
|
||||
for failure_dev in nodes])
|
||||
else:
|
||||
self._add_failure_stats(
|
||||
[(failure_dev['replication_ip'],
|
||||
failure_dev['device'])
|
||||
for failure_dev in policy.object_ring.devs
|
||||
if failure_dev])
|
||||
continue
|
||||
return jobs
|
||||
|
||||
@ -573,19 +661,31 @@ class ObjectReplicator(Daemon):
|
||||
self.replication_count = 0
|
||||
self.last_replication_count = -1
|
||||
self.partition_times = []
|
||||
self.my_replication_ips = self._get_my_replication_ips()
|
||||
self.all_devs_info = set()
|
||||
|
||||
stats = eventlet.spawn(self.heartbeat)
|
||||
lockup_detector = eventlet.spawn(self.detect_lockups)
|
||||
eventlet.sleep() # Give spawns a cycle
|
||||
|
||||
current_nodes = None
|
||||
try:
|
||||
self.run_pool = GreenPool(size=self.concurrency)
|
||||
jobs = self.collect_jobs(override_devices=override_devices,
|
||||
override_partitions=override_partitions,
|
||||
override_policies=override_policies)
|
||||
for job in jobs:
|
||||
current_nodes = job['nodes']
|
||||
if override_devices and job['device'] not in override_devices:
|
||||
continue
|
||||
if override_partitions and \
|
||||
job['partition'] not in override_partitions:
|
||||
continue
|
||||
dev_path = join(self.devices_dir, job['device'])
|
||||
if self.mount_check and not ismount(dev_path):
|
||||
self._add_failure_stats([(failure_dev['replication_ip'],
|
||||
failure_dev['device'])
|
||||
for failure_dev in job['nodes']])
|
||||
self.logger.warn(_('%s is not mounted'), job['device'])
|
||||
continue
|
||||
if not self.check_ring(job['policy'].object_ring):
|
||||
@ -607,18 +707,26 @@ class ObjectReplicator(Daemon):
|
||||
self.run_pool.spawn(self.update_deleted, job)
|
||||
else:
|
||||
self.run_pool.spawn(self.update, job)
|
||||
current_nodes = None
|
||||
with Timeout(self.lockup_timeout):
|
||||
self.run_pool.waitall()
|
||||
except (Exception, Timeout):
|
||||
if current_nodes:
|
||||
self._add_failure_stats([(failure_dev['replication_ip'],
|
||||
failure_dev['device'])
|
||||
for failure_dev in current_nodes])
|
||||
else:
|
||||
self._add_failure_stats(self.all_devs_info)
|
||||
self.logger.exception(_("Exception in top-level replication loop"))
|
||||
self.kill_coros()
|
||||
finally:
|
||||
stats.kill()
|
||||
lockup_detector.kill()
|
||||
self.stats_line()
|
||||
self.stats['attempted'] = self.replication_count
|
||||
|
||||
def run_once(self, *args, **kwargs):
|
||||
start = time.time()
|
||||
self._zero_stats()
|
||||
self.logger.info(_("Running object replicator in script mode."))
|
||||
|
||||
override_devices = list_from_csv(kwargs.get('devices'))
|
||||
@ -635,27 +743,35 @@ class ObjectReplicator(Daemon):
|
||||
override_devices=override_devices,
|
||||
override_partitions=override_partitions,
|
||||
override_policies=override_policies)
|
||||
total = (time.time() - start) / 60
|
||||
total = (time.time() - self.stats['start']) / 60
|
||||
self.logger.info(
|
||||
_("Object replication complete (once). (%.02f minutes)"), total)
|
||||
if not (override_partitions or override_devices):
|
||||
dump_recon_cache({'object_replication_time': total,
|
||||
'object_replication_last': time.time()},
|
||||
replication_last = time.time()
|
||||
dump_recon_cache({'replication_stats': self.stats,
|
||||
'replication_time': total,
|
||||
'replication_last': replication_last,
|
||||
'object_replication_time': total,
|
||||
'object_replication_last': replication_last},
|
||||
self.rcache, self.logger)
|
||||
|
||||
def run_forever(self, *args, **kwargs):
|
||||
self.logger.info(_("Starting object replicator in daemon mode."))
|
||||
# Run the replicator continually
|
||||
while True:
|
||||
start = time.time()
|
||||
self._zero_stats()
|
||||
self.logger.info(_("Starting object replication pass."))
|
||||
# Run the replicator
|
||||
self.replicate()
|
||||
total = (time.time() - start) / 60
|
||||
total = (time.time() - self.stats['start']) / 60
|
||||
self.logger.info(
|
||||
_("Object replication complete. (%.02f minutes)"), total)
|
||||
dump_recon_cache({'object_replication_time': total,
|
||||
'object_replication_last': time.time()},
|
||||
replication_last = time.time()
|
||||
dump_recon_cache({'replication_stats': self.stats,
|
||||
'replication_time': total,
|
||||
'replication_last': replication_last,
|
||||
'object_replication_time': total,
|
||||
'object_replication_last': replication_last},
|
||||
self.rcache, self.logger)
|
||||
self.logger.debug('Replication sleeping for %s seconds.',
|
||||
self.interval)
|
||||
|
@ -578,44 +578,6 @@ class TestReconCommands(unittest.TestCase):
|
||||
cli.disk_usage([('127.0.0.1', 6010)], 5, 0)
|
||||
mock_print.assert_has_calls(expected_calls)
|
||||
|
||||
@mock.patch('__builtin__.print')
|
||||
@mock.patch('time.time')
|
||||
def test_object_replication_check(self, mock_now, mock_print):
|
||||
now = 1430000000.0
|
||||
|
||||
def dummy_request(*args, **kwargs):
|
||||
return [
|
||||
('http://127.0.0.1:6010/recon/replication/object',
|
||||
{"object_replication_time": 61,
|
||||
"object_replication_last": now},
|
||||
200,
|
||||
0,
|
||||
0),
|
||||
('http://127.0.0.1:6020/recon/replication/object',
|
||||
{"object_replication_time": 23,
|
||||
"object_replication_last": now},
|
||||
200,
|
||||
0,
|
||||
0),
|
||||
]
|
||||
|
||||
cli = recon.SwiftRecon()
|
||||
cli.pool.imap = dummy_request
|
||||
|
||||
default_calls = [
|
||||
mock.call('[replication_time] low: 23, high: 61, avg: 42.0, ' +
|
||||
'total: 84, Failed: 0.0%, no_result: 0, reported: 2'),
|
||||
mock.call('Oldest completion was 2015-04-25 22:13:20 ' +
|
||||
'(42 seconds ago) by 127.0.0.1:6010.'),
|
||||
mock.call('Most recent completion was 2015-04-25 22:13:20 ' +
|
||||
'(42 seconds ago) by 127.0.0.1:6010.'),
|
||||
]
|
||||
|
||||
mock_now.return_value = now + 42
|
||||
cli.object_replication_check([('127.0.0.1', 6010),
|
||||
('127.0.0.1', 6020)])
|
||||
mock_print.assert_has_calls(default_calls)
|
||||
|
||||
@mock.patch('__builtin__.print')
|
||||
@mock.patch('time.time')
|
||||
def test_replication_check(self, mock_now, mock_print):
|
||||
|
@ -506,6 +506,9 @@ class TestReconSuccess(TestCase):
|
||||
"attempted": 1, "diff": 0,
|
||||
"diff_capped": 0, "empty": 0,
|
||||
"failure": 0, "hashmatch": 0,
|
||||
"failure_nodes": {
|
||||
"192.168.0.1": 0,
|
||||
"192.168.0.2": 0},
|
||||
"no_change": 2, "remote_merge": 0,
|
||||
"remove": 0, "rsync": 0,
|
||||
"start": 1333044050.855202,
|
||||
@ -523,6 +526,9 @@ class TestReconSuccess(TestCase):
|
||||
"attempted": 1, "diff": 0,
|
||||
"diff_capped": 0, "empty": 0,
|
||||
"failure": 0, "hashmatch": 0,
|
||||
"failure_nodes": {
|
||||
"192.168.0.1": 0,
|
||||
"192.168.0.2": 0},
|
||||
"no_change": 2, "remote_merge": 0,
|
||||
"remove": 0, "rsync": 0,
|
||||
"start": 1333044050.855202,
|
||||
@ -537,6 +543,9 @@ class TestReconSuccess(TestCase):
|
||||
"attempted": 179, "diff": 0,
|
||||
"diff_capped": 0, "empty": 0,
|
||||
"failure": 0, "hashmatch": 0,
|
||||
"failure_nodes": {
|
||||
"192.168.0.1": 0,
|
||||
"192.168.0.2": 0},
|
||||
"no_change": 358, "remote_merge": 0,
|
||||
"remove": 0, "rsync": 0,
|
||||
"start": 5.5, "success": 358,
|
||||
@ -555,6 +564,9 @@ class TestReconSuccess(TestCase):
|
||||
"attempted": 179, "diff": 0,
|
||||
"diff_capped": 0, "empty": 0,
|
||||
"failure": 0, "hashmatch": 0,
|
||||
"failure_nodes": {
|
||||
"192.168.0.1": 0,
|
||||
"192.168.0.2": 0},
|
||||
"no_change": 358, "remote_merge": 0,
|
||||
"remove": 0, "rsync": 0,
|
||||
"start": 5.5, "success": 358,
|
||||
@ -562,17 +574,40 @@ class TestReconSuccess(TestCase):
|
||||
"replication_last": 1357969645.25})
|
||||
|
||||
def test_get_replication_object(self):
|
||||
from_cache_response = {"object_replication_time": 200.0,
|
||||
"object_replication_last": 1357962809.15}
|
||||
from_cache_response = {
|
||||
"replication_time": 0.2615511417388916,
|
||||
"replication_stats": {
|
||||
"attempted": 179,
|
||||
"failure": 0, "hashmatch": 0,
|
||||
"failure_nodes": {
|
||||
"192.168.0.1": 0,
|
||||
"192.168.0.2": 0},
|
||||
"remove": 0, "rsync": 0,
|
||||
"start": 1333044050.855202, "success": 358},
|
||||
"replication_last": 1357969645.25,
|
||||
"object_replication_time": 0.2615511417388916,
|
||||
"object_replication_last": 1357969645.25}
|
||||
self.fakecache.fakeout_calls = []
|
||||
self.fakecache.fakeout = from_cache_response
|
||||
rv = self.app.get_replication_info('object')
|
||||
self.assertEquals(self.fakecache.fakeout_calls,
|
||||
[((['object_replication_time',
|
||||
[((['replication_time', 'replication_stats',
|
||||
'replication_last', 'object_replication_time',
|
||||
'object_replication_last'],
|
||||
'/var/cache/swift/object.recon'), {})])
|
||||
self.assertEquals(rv, {'object_replication_time': 200.0,
|
||||
'object_replication_last': 1357962809.15})
|
||||
self.assertEquals(rv, {
|
||||
"replication_time": 0.2615511417388916,
|
||||
"replication_stats": {
|
||||
"attempted": 179,
|
||||
"failure": 0, "hashmatch": 0,
|
||||
"failure_nodes": {
|
||||
"192.168.0.1": 0,
|
||||
"192.168.0.2": 0},
|
||||
"remove": 0, "rsync": 0,
|
||||
"start": 1333044050.855202, "success": 358},
|
||||
"replication_last": 1357969645.25,
|
||||
"object_replication_time": 0.2615511417388916,
|
||||
"object_replication_last": 1357969645.25})
|
||||
|
||||
def test_get_updater_info_container(self):
|
||||
from_cache_response = {"container_updater_sweep": 18.476239919662476}
|
||||
|
@ -222,6 +222,8 @@ class TestObjectReplicator(unittest.TestCase):
|
||||
def _create_replicator(self):
|
||||
self.replicator = object_replicator.ObjectReplicator(self.conf)
|
||||
self.replicator.logger = self.logger
|
||||
self.replicator._zero_stats()
|
||||
self.replicator.all_devs_info = set()
|
||||
self.df_mgr = diskfile.DiskFileManager(self.conf, self.logger)
|
||||
|
||||
def test_run_once(self):
|
||||
@ -788,6 +790,7 @@ class TestObjectReplicator(unittest.TestCase):
|
||||
self.conf['sync_method'] = 'ssync'
|
||||
self.replicator = object_replicator.ObjectReplicator(self.conf)
|
||||
self.replicator.logger = debug_logger()
|
||||
self.replicator._zero_stats()
|
||||
|
||||
with mock.patch('swift.obj.replicator.http_connect',
|
||||
mock_http_connect(200)):
|
||||
|
Loading…
x
Reference in New Issue
Block a user