Merge "replace a value set with utils.TRUE_VALUES"

This commit is contained in:
Jenkins 2012-09-27 17:40:06 +00:00 committed by Gerrit Code Review
commit be8777965f

View File

@ -32,8 +32,8 @@ from eventlet.support.greenlets import GreenletExit
from swift.common.ring import Ring from swift.common.ring import Ring
from swift.common.utils import whataremyips, unlink_older_than, lock_path, \ from swift.common.utils import whataremyips, unlink_older_than, lock_path, \
compute_eta, get_logger, write_pickle, renamer, dump_recon_cache, \ compute_eta, get_logger, write_pickle, renamer, dump_recon_cache, \
rsync_ip, mkdirs rsync_ip, mkdirs, TRUE_VALUES
from swift.common.bufferedhttp import http_connect from swift.common.bufferedhttp import http_connect
from swift.common.daemon import Daemon from swift.common.daemon import Daemon
from swift.common.http import HTTP_OK, HTTP_INSUFFICIENT_STORAGE from swift.common.http import HTTP_OK, HTTP_INSUFFICIENT_STORAGE
@ -204,7 +204,7 @@ def get_hashes(partition_dir, recalculate=[], do_listdir=False,
if modified: if modified:
with lock_path(partition_dir): with lock_path(partition_dir):
if not os.path.exists(hashes_file) or \ if not os.path.exists(hashes_file) or \
os.path.getmtime(hashes_file) == mtime: os.path.getmtime(hashes_file) == mtime:
write_pickle( write_pickle(
hashes, hashes_file, partition_dir, PICKLE_PROTOCOL) hashes, hashes_file, partition_dir, PICKLE_PROTOCOL)
return hashed, hashes return hashed, hashes
@ -247,9 +247,9 @@ class ObjectReplicator(Daemon):
self.logger = get_logger(conf, log_route='object-replicator') self.logger = get_logger(conf, log_route='object-replicator')
self.devices_dir = conf.get('devices', '/srv/node') self.devices_dir = conf.get('devices', '/srv/node')
self.mount_check = conf.get('mount_check', 'true').lower() in \ self.mount_check = conf.get('mount_check', 'true').lower() in \
('true', 't', '1', 'on', 'yes', 'y') TRUE_VALUES
self.vm_test_mode = conf.get( self.vm_test_mode = conf.get(
'vm_test_mode', 'no').lower() in ('yes', 'true', 'on', '1') 'vm_test_mode', 'no').lower() in TRUE_VALUES
self.swift_dir = conf.get('swift_dir', '/etc/swift') self.swift_dir = conf.get('swift_dir', '/etc/swift')
self.port = int(conf.get('bind_port', 6000)) self.port = int(conf.get('bind_port', 6000))
self.concurrency = int(conf.get('concurrency', 1)) self.concurrency = int(conf.get('concurrency', 1))
@ -278,8 +278,9 @@ class ObjectReplicator(Daemon):
ret_val = None ret_val = None
try: try:
with Timeout(self.rsync_timeout): with Timeout(self.rsync_timeout):
proc = subprocess.Popen(args, stdout=subprocess.PIPE, proc = subprocess.Popen(args,
stderr=subprocess.STDOUT) stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
results = proc.stdout.read() results = proc.stdout.read()
ret_val = proc.wait() ret_val = proc.wait()
except Timeout: except Timeout:
@ -298,7 +299,7 @@ class ObjectReplicator(Daemon):
self.logger.error(result) self.logger.error(result)
if ret_val: if ret_val:
self.logger.error(_('Bad rsync return code: %(args)s -> %(ret)d'), self.logger.error(_('Bad rsync return code: %(args)s -> %(ret)d'),
{'args': str(args), 'ret': ret_val}) {'args': str(args), 'ret': ret_val})
elif results: elif results:
self.logger.info( self.logger.info(
_("Successful rsync of %(src)s at %(dst)s (%(time).03f)"), _("Successful rsync of %(src)s at %(dst)s (%(time).03f)"),
@ -384,13 +385,15 @@ class ObjectReplicator(Daemon):
success = self.rsync(node, job, suffixes) success = self.rsync(node, job, suffixes)
if success: if success:
with Timeout(self.http_timeout): with Timeout(self.http_timeout):
http_connect(node['ip'], node['port'], http_connect(
node['ip'], node['port'],
node['device'], job['partition'], 'REPLICATE', node['device'], job['partition'], 'REPLICATE',
'/' + '-'.join(suffixes), '/' + '-'.join(suffixes),
headers={'Content-Length': '0'}).getresponse().read() headers={'Content-Length': '0'}).\
getresponse().read()
responses.append(success) responses.append(success)
if not suffixes or (len(responses) == \ if not suffixes or (len(responses) ==
len(job['nodes']) and all(responses)): len(job['nodes']) and all(responses)):
self.logger.info(_("Removing partition: %s"), job['path']) self.logger.info(_("Removing partition: %s"), job['path'])
tpool.execute(shutil.rmtree, job['path'], ignore_errors=True) tpool.execute(shutil.rmtree, job['path'], ignore_errors=True)
except (Exception, Timeout): except (Exception, Timeout):
@ -409,49 +412,57 @@ class ObjectReplicator(Daemon):
self.logger.increment('partition.update.count.%s' % (job['device'],)) self.logger.increment('partition.update.count.%s' % (job['device'],))
begin = time.time() begin = time.time()
try: try:
hashed, local_hash = tpool_reraise(get_hashes, job['path'], hashed, local_hash = tpool_reraise(
do_listdir=(self.replication_count % 10) == 0, get_hashes, job['path'],
reclaim_age=self.reclaim_age) do_listdir=(self.replication_count % 10) == 0,
reclaim_age=self.reclaim_age)
self.suffix_hash += hashed self.suffix_hash += hashed
self.logger.update_stats('suffix.hashes', hashed) self.logger.update_stats('suffix.hashes', hashed)
attempts_left = len(job['nodes']) attempts_left = len(job['nodes'])
nodes = itertools.chain(job['nodes'], nodes = itertools.chain(
self.object_ring.get_more_nodes(int(job['partition']))) job['nodes'],
self.object_ring.get_more_nodes(int(job['partition'])))
while attempts_left > 0: while attempts_left > 0:
# If this throws StopIterator it will be caught way below # If this throws StopIterator it will be caught way below
node = next(nodes) node = next(nodes)
attempts_left -= 1 attempts_left -= 1
try: try:
with Timeout(self.http_timeout): with Timeout(self.http_timeout):
resp = http_connect(node['ip'], node['port'], resp = http_connect(
node['device'], job['partition'], 'REPLICATE', node['ip'], node['port'],
node['device'], job['partition'], 'REPLICATE',
'', headers={'Content-Length': '0'}).getresponse() '', headers={'Content-Length': '0'}).getresponse()
if resp.status == HTTP_INSUFFICIENT_STORAGE: if resp.status == HTTP_INSUFFICIENT_STORAGE:
self.logger.error(_('%(ip)s/%(device)s responded' self.logger.error(_('%(ip)s/%(device)s responded'
' as unmounted'), node) ' as unmounted'), node)
attempts_left += 1 attempts_left += 1
continue continue
if resp.status != HTTP_OK: if resp.status != HTTP_OK:
self.logger.error(_("Invalid response %(resp)s " self.logger.error(_("Invalid response %(resp)s "
"from %(ip)s"), "from %(ip)s"),
{'resp': resp.status, 'ip': node['ip']}) {'resp': resp.status,
'ip': node['ip']})
continue continue
remote_hash = pickle.loads(resp.read()) remote_hash = pickle.loads(resp.read())
del resp del resp
suffixes = [suffix for suffix in local_hash if suffixes = [suffix for suffix in local_hash if
local_hash[suffix] != remote_hash.get(suffix, -1)] local_hash[suffix] !=
remote_hash.get(suffix, -1)]
if not suffixes: if not suffixes:
continue continue
hashed, recalc_hash = tpool_reraise(get_hashes, hashed, recalc_hash = tpool_reraise(
get_hashes,
job['path'], recalculate=suffixes, job['path'], recalculate=suffixes,
reclaim_age=self.reclaim_age) reclaim_age=self.reclaim_age)
self.logger.update_stats('suffix.hashes', hashed) self.logger.update_stats('suffix.hashes', hashed)
local_hash = recalc_hash local_hash = recalc_hash
suffixes = [suffix for suffix in local_hash if suffixes = [suffix for suffix in local_hash if
local_hash[suffix] != remote_hash.get(suffix, -1)] local_hash[suffix] !=
remote_hash.get(suffix, -1)]
self.rsync(node, job, suffixes) self.rsync(node, job, suffixes)
with Timeout(self.http_timeout): with Timeout(self.http_timeout):
conn = http_connect(node['ip'], node['port'], conn = http_connect(
node['ip'], node['port'],
node['device'], job['partition'], 'REPLICATE', node['device'], job['partition'], 'REPLICATE',
'/' + '-'.join(suffixes), '/' + '-'.join(suffixes),
headers={'Content-Length': '0'}) headers={'Content-Length': '0'})
@ -460,7 +471,7 @@ class ObjectReplicator(Daemon):
self.logger.update_stats('suffix.syncs', len(suffixes)) self.logger.update_stats('suffix.syncs', len(suffixes))
except (Exception, Timeout): except (Exception, Timeout):
self.logger.exception(_("Error syncing with node: %s") % self.logger.exception(_("Error syncing with node: %s") %
node) node)
self.suffix_count += len(local_hash) self.suffix_count += len(local_hash)
except (Exception, Timeout): except (Exception, Timeout):
self.logger.exception(_("Error syncing partition")) self.logger.exception(_("Error syncing partition"))
@ -475,29 +486,34 @@ class ObjectReplicator(Daemon):
if self.replication_count: if self.replication_count:
elapsed = (time.time() - self.start) or 0.000001 elapsed = (time.time() - self.start) or 0.000001
rate = self.replication_count / elapsed rate = self.replication_count / elapsed
self.logger.info(_("%(replicated)d/%(total)d (%(percentage).2f%%)" self.logger.info(
" partitions replicated in %(time).2fs (%(rate).2f/sec, " _("%(replicated)d/%(total)d (%(percentage).2f%%)"
"%(remaining)s remaining)"), " partitions replicated in %(time).2fs (%(rate).2f/sec, "
"%(remaining)s remaining)"),
{'replicated': self.replication_count, 'total': self.job_count, {'replicated': self.replication_count, 'total': self.job_count,
'percentage': self.replication_count * 100.0 / self.job_count, 'percentage': self.replication_count * 100.0 / self.job_count,
'time': time.time() - self.start, 'rate': rate, 'time': time.time() - self.start, 'rate': rate,
'remaining': '%d%s' % compute_eta(self.start, 'remaining': '%d%s' % compute_eta(self.start,
self.replication_count, self.job_count)}) self.replication_count,
self.job_count)})
if self.suffix_count: if self.suffix_count:
self.logger.info(_("%(checked)d suffixes checked - " self.logger.info(
"%(hashed).2f%% hashed, %(synced).2f%% synced"), _("%(checked)d suffixes checked - "
"%(hashed).2f%% hashed, %(synced).2f%% synced"),
{'checked': self.suffix_count, {'checked': self.suffix_count,
'hashed': (self.suffix_hash * 100.0) / self.suffix_count, 'hashed': (self.suffix_hash * 100.0) / self.suffix_count,
'synced': (self.suffix_sync * 100.0) / self.suffix_count}) 'synced': (self.suffix_sync * 100.0) / self.suffix_count})
self.partition_times.sort() self.partition_times.sort()
self.logger.info(_("Partition times: max %(max).4fs, " self.logger.info(
"min %(min).4fs, med %(med).4fs"), _("Partition times: max %(max).4fs, "
"min %(min).4fs, med %(med).4fs"),
{'max': self.partition_times[-1], {'max': self.partition_times[-1],
'min': self.partition_times[0], 'min': self.partition_times[0],
'med': self.partition_times[ 'med': self.partition_times[
len(self.partition_times) // 2]}) len(self.partition_times) // 2]})
else: else:
self.logger.info(_("Nothing replicated for %s seconds."), self.logger.info(
_("Nothing replicated for %s seconds."),
(time.time() - self.start)) (time.time() - self.start))
def kill_coros(self): def kill_coros(self):
@ -538,7 +554,8 @@ class ObjectReplicator(Daemon):
jobs = [] jobs = []
ips = whataremyips() ips = whataremyips()
for local_dev in [dev for dev in self.object_ring.devs for local_dev in [dev for dev in self.object_ring.devs
if dev and dev['ip'] in ips and dev['port'] == self.port]: if dev and dev['ip'] in ips and
dev['port'] == self.port]:
dev_path = join(self.devices_dir, local_dev['device']) dev_path = join(self.devices_dir, local_dev['device'])
obj_path = join(dev_path, 'objects') obj_path = join(dev_path, 'objects')
tmp_path = join(dev_path, 'tmp') tmp_path = join(dev_path, 'tmp')
@ -563,11 +580,12 @@ class ObjectReplicator(Daemon):
self.object_ring.get_part_nodes(int(partition)) self.object_ring.get_part_nodes(int(partition))
nodes = [node for node in part_nodes nodes = [node for node in part_nodes
if node['id'] != local_dev['id']] if node['id'] != local_dev['id']]
jobs.append(dict(path=job_path, jobs.append(
device=local_dev['device'], dict(path=job_path,
nodes=nodes, device=local_dev['device'],
delete=len(nodes) > len(part_nodes) - 1, nodes=nodes,
partition=partition)) delete=len(nodes) > len(part_nodes) - 1,
partition=partition))
except ValueError, OSError: except ValueError, OSError:
continue continue
random.shuffle(jobs) random.shuffle(jobs)
@ -598,7 +616,7 @@ class ObjectReplicator(Daemon):
continue continue
if not self.check_ring(): if not self.check_ring():
self.logger.info(_("Ring change detected. Aborting " self.logger.info(_("Ring change detected. Aborting "
"current replication pass.")) "current replication pass."))
return return
if job['delete']: if job['delete']:
self.run_pool.spawn(self.update_deleted, job) self.run_pool.spawn(self.update_deleted, job)
@ -638,5 +656,5 @@ class ObjectReplicator(Daemon):
dump_recon_cache({'object_replication_time': total}, dump_recon_cache({'object_replication_time': total},
self.rcache, self.logger) self.rcache, self.logger)
self.logger.debug(_('Replication sleeping for %s seconds.'), self.logger.debug(_('Replication sleeping for %s seconds.'),
self.run_pause) self.run_pause)
sleep(self.run_pause) sleep(self.run_pause)