Merge "Track unlinks of async_pendings."

This commit is contained in:
Jenkins 2012-10-23 22:19:01 +00:00 committed by Gerrit Code Review
commit 655892423e
4 changed files with 79 additions and 38 deletions

View File

@ -518,7 +518,7 @@ Metric Name Description
`account-replicator.no_changes` Count of accounts found to be in sync.
`account-replicator.hashmatches` Count of accounts found to be in sync via hash
comparison (`broker.merge_syncs` was called).
`account-replicator.rsyncs` Count of completely missing accounts where were sent
`account-replicator.rsyncs` Count of completely missing accounts which were sent
via rsync.
`account-replicator.remote_merges` Count of syncs handled by sending entire database
via rsync.
@ -628,7 +628,7 @@ Metric Name Description
sychronization via deletion.
`container-sync.puts` Count of container database rows sync'ed by PUTing.
`container-sync.puts.timing` Timing data for each container database row
sychronization via PUTing.
synchronization via PUTing.
=============================== ====================================================
Metrics for `container-updater`:
@ -693,7 +693,7 @@ Metric Name Description
`object-replicator.partition.update.timing` Timing data for partitions replicated which also
belong on this node. This metric is not tracked
per-device.
`object-replicator.suffix.hashes` Count of suffix directories whose has (of filenames)
`object-replicator.suffix.hashes` Count of suffix directories whose hash (of filenames)
was recalculated.
`object-replicator.suffix.syncs` Count of suffix directories replicated with rsync.
=================================================== ====================================================
@ -756,7 +756,12 @@ Metric Name Description
`object-updater.quarantines` Count of async_pending container updates which were
corrupted and moved to quarantine.
`object-updater.successes` Count of successful container updates.
`object-updater.failures` Count of failed continer updates.
`object-updater.failures` Count of failed container updates.
`object-updater.unlinks` Count of async_pending files unlinked. An
async_pending file is unlinked either when it is
successfully processed or when the replicator sees
that there is a newer async_pending file for the
same object.
============================ ====================================================
Metrics for `proxy-server` (in the table, `<type>` is the proxy-server
@ -869,7 +874,7 @@ Object Auditor
--------------
On system failures, the XFS file system can sometimes truncate files it's
trying to write and produce zero byte files. The object-auditor will catch
trying to write and produce zero-byte files. The object-auditor will catch
these problems but in the case of a system crash it would be advisable to run
an extra, less rate limited sweep to check for these specific files. You can
run this command as follows:
@ -927,7 +932,7 @@ Swift Oldies are processes that have just been around for a long
time. There's nothing necessarily wrong with this, but it might
indicate a hung process if you regularly upgrade and reload/restart
services. You might have so many servers that you don't notice when a
reload/restart fails, `swift-oldies` can help with this.
reload/restart fails; `swift-oldies` can help with this.
For example, if you upgraded and reloaded/restarted everything 2 days
ago, and you've already cleaned up any orphans with `swift-orphans`,

View File

@ -41,7 +41,7 @@ class ObjectUpdater(Daemon):
self.logger = get_logger(conf, log_route='object-updater')
self.devices = conf.get('devices', '/srv/node')
self.mount_check = conf.get('mount_check', 'true').lower() in \
('true', 't', '1', 'on', 'yes', 'y')
('true', 't', '1', 'on', 'yes', 'y')
self.swift_dir = conf.get('swift_dir', '/etc/swift')
self.interval = int(conf.get('interval', 300))
self.container_ring = None
@ -90,9 +90,10 @@ class ObjectUpdater(Daemon):
forkbegin = time.time()
self.object_sweep(os.path.join(self.devices, device))
elapsed = time.time() - forkbegin
self.logger.info(_('Object update sweep of %(device)s'
' completed: %(elapsed).02fs, %(success)s successes'
', %(fail)s failures'),
self.logger.info(
_('Object update sweep of %(device)s'
' completed: %(elapsed).02fs, %(success)s successes'
', %(fail)s failures'),
{'device': device, 'elapsed': elapsed,
'success': self.successes, 'fail': self.failures})
sys.exit()
@ -100,7 +101,7 @@ class ObjectUpdater(Daemon):
pids.remove(os.wait()[0])
elapsed = time.time() - begin
self.logger.info(_('Object update sweep completed: %.02fs'),
elapsed)
elapsed)
dump_recon_cache({'object_updater_sweep': elapsed},
self.rcache, self.logger)
if elapsed < self.interval:
@ -121,8 +122,9 @@ class ObjectUpdater(Daemon):
continue
self.object_sweep(os.path.join(self.devices, device))
elapsed = time.time() - begin
self.logger.info(_('Object update single threaded sweep completed: '
'%(elapsed).02fs, %(success)s successes, %(fail)s failures'),
self.logger.info(
_('Object update single threaded sweep completed: '
'%(elapsed).02fs, %(success)s successes, %(fail)s failures'),
{'elapsed': elapsed, 'success': self.successes,
'fail': self.failures})
dump_recon_cache({'object_updater_sweep': elapsed},
@ -156,6 +158,7 @@ class ObjectUpdater(Daemon):
% (update_path))
continue
if obj_hash == last_obj_hash:
self.logger.increment("unlinks")
os.unlink(update_path)
else:
self.process_object_update(update_path, device)
@ -180,12 +183,13 @@ class ObjectUpdater(Daemon):
self.logger.exception(
_('ERROR Pickle problem, quarantining %s'), update_path)
self.logger.increment('quarantines')
renamer(update_path, os.path.join(device,
'quarantined', 'objects', os.path.basename(update_path)))
renamer(update_path, os.path.join(
device, 'quarantined', 'objects',
os.path.basename(update_path)))
return
successes = update.get('successes', [])
part, nodes = self.get_container_ring().get_nodes(
update['account'], update['container'])
update['account'], update['container'])
obj = '/%s/%s/%s' % \
(update['account'], update['container'], update['obj'])
success = True
@ -193,7 +197,7 @@ class ObjectUpdater(Daemon):
for node in nodes:
if node['id'] not in successes:
status = self.object_update(node, part, update['op'], obj,
update['headers'])
update['headers'])
if not is_success(status) and status != HTTP_NOT_FOUND:
success = False
else:
@ -203,13 +207,14 @@ class ObjectUpdater(Daemon):
self.successes += 1
self.logger.increment('successes')
self.logger.debug(_('Update sent for %(obj)s %(path)s'),
{'obj': obj, 'path': update_path})
{'obj': obj, 'path': update_path})
self.logger.increment("unlinks")
os.unlink(update_path)
else:
self.failures += 1
self.logger.increment('failures')
self.logger.debug(_('Update failed for %(obj)s %(path)s'),
{'obj': obj, 'path': update_path})
{'obj': obj, 'path': update_path})
if new_successes:
update['successes'] = successes
write_pickle(update, update_path, os.path.join(device, 'tmp'))
@ -227,12 +232,12 @@ class ObjectUpdater(Daemon):
try:
with ConnectionTimeout(self.conn_timeout):
conn = http_connect(node['ip'], node['port'], node['device'],
part, op, obj, headers)
part, op, obj, headers)
with Timeout(self.node_timeout):
resp = conn.getresponse()
resp.read()
return resp.status
except (Exception, Timeout):
self.logger.exception(_('ERROR with remote server '
'%(ip)s:%(port)s/%(device)s'), node)
'%(ip)s:%(port)s/%(device)s'), node)
return HTTP_INTERNAL_SERVER_ERROR

View File

@ -141,6 +141,17 @@ class FakeLogger(object):
update_stats = _store_in('update_stats')
set_statsd_prefix = _store_in('set_statsd_prefix')
def get_increments(self):
return [call[0][0] for call in self.log_dict['increment']]
def get_increment_counts(self):
counts = {}
for metric in self.get_increments():
if metric not in counts:
counts[metric] = 0
counts[metric] += 1
return counts
def setFormatter(self, obj):
self.formatter = obj

View File

@ -30,6 +30,7 @@ from swift.common.ring import RingData
from swift.common import utils
from swift.common.utils import hash_path, normalize_timestamp, mkdirs, \
write_pickle
from test.unit import FakeLogger
class TestObjectUpdater(unittest.TestCase):
@ -37,14 +38,15 @@ class TestObjectUpdater(unittest.TestCase):
def setUp(self):
utils.HASH_PATH_SUFFIX = 'endcap'
self.testdir = os.path.join(os.path.dirname(__file__),
'object_updater')
'object_updater')
rmtree(self.testdir, ignore_errors=1)
os.mkdir(self.testdir)
pickle.dump(RingData([[0, 1, 0, 1], [1, 0, 1, 0]],
[{'id': 0, 'ip': '127.0.0.1', 'port': 1, 'device': 'sda1',
'zone': 0},
{'id': 1, 'ip': '127.0.0.1', 'port': 1, 'device': 'sda1',
'zone': 2}], 30),
pickle.dump(
RingData([[0, 1, 0, 1], [1, 0, 1, 0]],
[{'id': 0, 'ip': '127.0.0.1', 'port': 1, 'device': 'sda1',
'zone': 0},
{'id': 1, 'ip': '127.0.0.1', 'port': 1, 'device': 'sda1',
'zone': 2}], 30),
GzipFile(os.path.join(self.testdir, 'container.ring.gz'), 'wb'))
self.devices_dir = os.path.join(self.testdir, 'devices')
os.mkdir(self.devices_dir)
@ -62,8 +64,7 @@ class TestObjectUpdater(unittest.TestCase):
'swift_dir': self.testdir,
'interval': '1',
'concurrency': '2',
'node_timeout': '5',
})
'node_timeout': '5'})
self.assert_(hasattr(cu, 'logger'))
self.assert_(cu.logger is not None)
self.assertEquals(cu.devices, self.devices_dir)
@ -87,7 +88,7 @@ class TestObjectUpdater(unittest.TestCase):
ohash = hash_path('account', 'container', o)
for t in timestamps:
o_path = os.path.join(prefix_dir, ohash + '-' +
normalize_timestamp(t))
normalize_timestamp(t))
if t == timestamps[0]:
expected.add(o_path)
write_pickle({}, o_path)
@ -105,8 +106,7 @@ class TestObjectUpdater(unittest.TestCase):
'swift_dir': self.testdir,
'interval': '1',
'concurrency': '1',
'node_timeout': '5',
})
'node_timeout': '5'})
cu.object_sweep(self.sda1)
self.assert_(not os.path.exists(prefix_dir))
self.assertEqual(expected, seen)
@ -118,8 +118,7 @@ class TestObjectUpdater(unittest.TestCase):
'swift_dir': self.testdir,
'interval': '1',
'concurrency': '1',
'node_timeout': '15',
})
'node_timeout': '15'})
cu.run_once()
async_dir = os.path.join(self.sda1, object_server.ASYNCDIR)
os.mkdir(async_dir)
@ -135,13 +134,24 @@ class TestObjectUpdater(unittest.TestCase):
ohash = hash_path('a', 'c', 'o')
odir = os.path.join(async_dir, ohash[-3:])
mkdirs(odir)
op_path = os.path.join(odir,
older_op_path = os.path.join(
odir,
'%s-%s' % (ohash, normalize_timestamp(time() - 1)))
op_path = os.path.join(
odir,
'%s-%s' % (ohash, normalize_timestamp(time())))
pickle.dump({'op': 'PUT', 'account': 'a', 'container': 'c', 'obj': 'o',
'headers': {'X-Container-Timestamp': normalize_timestamp(0)}},
open(op_path, 'wb'))
for path in (op_path, older_op_path):
with open(path, 'wb') as async_pending:
pickle.dump({'op': 'PUT', 'account': 'a', 'container': 'c',
'obj': 'o', 'headers': {
'X-Container-Timestamp': normalize_timestamp(0)}},
async_pending)
cu.logger = FakeLogger()
cu.run_once()
self.assert_(not os.path.exists(older_op_path))
self.assert_(os.path.exists(op_path))
self.assertEqual(cu.logger.get_increment_counts(),
{'failures': 1, 'unlinks': 1})
bindsock = listen(('127.0.0.1', 0))
@ -182,21 +192,31 @@ class TestObjectUpdater(unittest.TestCase):
except BaseException, err:
return err
return None
event = spawn(accept, [201, 500])
for dev in cu.get_container_ring().devs:
if dev is not None:
dev['port'] = bindsock.getsockname()[1]
cu.logger = FakeLogger()
cu.run_once()
err = event.wait()
if err:
raise err
self.assert_(os.path.exists(op_path))
self.assertEqual(cu.logger.get_increment_counts(),
{'failures': 1})
event = spawn(accept, [201])
cu.logger = FakeLogger()
cu.run_once()
err = event.wait()
if err:
raise err
self.assert_(not os.path.exists(op_path))
self.assertEqual(cu.logger.get_increment_counts(),
{'unlinks': 1, 'successes': 1})
if __name__ == '__main__':
unittest.main()