From 217198b83b69095724c8cb8aae6746fbbfbe8556 Mon Sep 17 00:00:00 2001 From: gholt Date: Fri, 21 Jan 2011 12:43:50 -0800 Subject: [PATCH 1/3] container-updater: temporrar account update suppression on errors --- etc/container-server.conf-sample | 2 ++ swift/container/updater.py | 44 +++++++++++++++++++++++++---- test/unit/container/test_updater.py | 1 + 3 files changed, 41 insertions(+), 6 deletions(-) diff --git a/etc/container-server.conf-sample b/etc/container-server.conf-sample index fb250708fe..96c1f7b3cb 100644 --- a/etc/container-server.conf-sample +++ b/etc/container-server.conf-sample @@ -38,6 +38,8 @@ use = egg:swift#container # conn_timeout = 0.5 # slowdown will sleep that amount between containers # slowdown = 0.01 +# Seconds to suppress updating an account that has generated an error +# account_suppression_time = 60 [container-auditor] # log_name = container-auditor diff --git a/swift/container/updater.py b/swift/container/updater.py index d6b1beb2b1..c5d927f2b9 100644 --- a/swift/container/updater.py +++ b/swift/container/updater.py @@ -19,6 +19,7 @@ import signal import sys import time from random import random, shuffle +from tempfile import mkstemp from eventlet import spawn, patcher, Timeout @@ -51,6 +52,10 @@ class ContainerUpdater(Daemon): self.no_changes = 0 self.successes = 0 self.failures = 0 + self.account_suppressions = {} + self.account_suppression_time = \ + float(conf.get('account_suppression_time', 60)) + self.new_account_suppressions = None def get_account_ring(self): """Get the account ring. Load it if it hasn't been yet.""" @@ -88,21 +93,41 @@ class ContainerUpdater(Daemon): while True: self.logger.info(_('Begin container update sweep')) begin = time.time() - pids = [] + now = time.time() + expired_suppressions = \ + [a for a, u in self.account_suppressions.iteritems() if u < now] + for account in expired_suppressions: + del self.account_suppressions[account] + pid2filename = {} # read from account ring to ensure it's fresh self.get_account_ring().get_nodes('') for path in self.get_paths(): - while len(pids) >= self.concurrency: - pids.remove(os.wait()[0]) + while len(pid2filename) >= self.concurrency: + pid = os.wait()[0] + try: + with open(pid2filename[pid], 'r') as tmpfile: + for line in tmpfile: + account, until = line.split() + until = float(until) + self.account_suppressions[account] = until + except: + self.logger.exception(_('ERROR with pid2filename ' + '%(pid)s %(filename)s: ') % {'pid': pid, + 'filename': pid2filename[pid]}) + os.unlink(pid2filename[pid]) + del pid2filename[pid] + fd, tmpfilename = mkstemp() + os.close(fd) pid = os.fork() if pid: - pids.append(pid) + pid2filename[pid] = tmpfilename else: signal.signal(signal.SIGTERM, signal.SIG_DFL) patcher.monkey_patch(all=False, socket=True) self.no_changes = 0 self.successes = 0 self.failures = 0 + self.new_account_suppressions = open(tmpfilename, 'w') forkbegin = time.time() self.container_sweep(path) elapsed = time.time() - forkbegin @@ -114,8 +139,8 @@ class ContainerUpdater(Daemon): 'success': self.successes, 'fail': self.failures, 'no_change': self.no_changes}) sys.exit() - while pids: - pids.remove(os.wait()[0]) + while pid2filename: + del pid2filename[os.wait()[0]] elapsed = time.time() - begin self.logger.info(_('Container update sweep completed: %.02fs'), elapsed) @@ -165,6 +190,8 @@ class ContainerUpdater(Daemon): # definitely doesn't have up to date statistics. if float(info['put_timestamp']) <= 0: return + if self.account_suppressions.get(info['account'], 0) > time.time(): + return if info['put_timestamp'] > info['reported_put_timestamp'] or \ info['delete_timestamp'] > info['reported_delete_timestamp'] \ or info['object_count'] != info['reported_object_count'] or \ @@ -195,6 +222,11 @@ class ContainerUpdater(Daemon): self.logger.debug( _('Update report failed for %(container)s %(dbfile)s'), {'container': container, 'dbfile': dbfile}) + self.account_suppressions[info['account']] = until = \ + time.time() + self.account_suppression_time + if self.new_account_suppressions: + print >>self.new_account_suppressions, \ + info['account'], until else: self.no_changes += 1 diff --git a/test/unit/container/test_updater.py b/test/unit/container/test_updater.py index b7dbe6dd6d..9ee265a566 100644 --- a/test/unit/container/test_updater.py +++ b/test/unit/container/test_updater.py @@ -78,6 +78,7 @@ class TestContainerUpdater(unittest.TestCase): 'interval': '1', 'concurrency': '1', 'node_timeout': '15', + 'account_suppression_time': 0 }) cu.run_once() containers_dir = os.path.join(self.sda1, container_server.DATADIR) From fe1befe91e5d178e0cf8303857edb43175fd2f3b Mon Sep 17 00:00:00 2001 From: gholt Date: Sat, 22 Jan 2011 10:01:43 -0800 Subject: [PATCH 2/3] Doc update --- doc/source/deployment_guide.rst | 32 +++++++++++++++++++------------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/doc/source/deployment_guide.rst b/doc/source/deployment_guide.rst index 40854b0a1f..ab84ae1550 100644 --- a/doc/source/deployment_guide.rst +++ b/doc/source/deployment_guide.rst @@ -299,19 +299,25 @@ reclaim_age 604800 Time elapsed in seconds before a [container-updater] -================== ================= ======================================= -Option Default Description ------------------- ----------------- --------------------------------------- -log_name container-updater Label used when logging -log_facility LOG_LOCAL0 Syslog log facility -log_level INFO Logging level -interval 300 Minimum time for a pass to take -concurrency 4 Number of updater workers to spawn -node_timeout 3 Request timeout to external services -conn_timeout 0.5 Connection timeout to external services -slowdown 0.01 Time in seconds to wait between - containers -================== ================= ======================================= +======================== ================= ================================== +Option Default Description +------------------------ ----------------- ---------------------------------- +log_name container-updater Label used when logging +log_facility LOG_LOCAL0 Syslog log facility +log_level INFO Logging level +interval 300 Minimum time for a pass to take +concurrency 4 Number of updater workers to spawn +node_timeout 3 Request timeout to external + services +conn_timeout 0.5 Connection timeout to external + services +slowdown 0.01 Time in seconds to wait between + containers +account_suppression_time 60 Seconds to suppress updating an + account that has generated an + error (timeout, not yet found, + etc.) +======================== ================= ================================== [container-auditor] From 9b8a8b1791d900db79a529383d9a4544c4a05dfd Mon Sep 17 00:00:00 2001 From: gholt Date: Tue, 25 Jan 2011 15:21:49 -0800 Subject: [PATCH 3/3] Update to load suppressions from both os.wait points --- swift/container/updater.py | 33 +++++++++++++++++++++------------ 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/swift/container/updater.py b/swift/container/updater.py index c5d927f2b9..928a53be92 100644 --- a/swift/container/updater.py +++ b/swift/container/updater.py @@ -85,6 +85,19 @@ class ContainerUpdater(Daemon): shuffle(paths) return paths + def _load_suppressions(self, filename): + try: + with open(filename, 'r') as tmpfile: + for line in tmpfile: + account, until = line.split() + until = float(until) + self.account_suppressions[account] = until + except: + self.logger.exception( + _('ERROR with loading suppressions from %s: ') % filename) + finally: + os.unlink(filename) + def run_forever(self): # pragma: no cover """ Run the updator continuously. @@ -105,17 +118,9 @@ class ContainerUpdater(Daemon): while len(pid2filename) >= self.concurrency: pid = os.wait()[0] try: - with open(pid2filename[pid], 'r') as tmpfile: - for line in tmpfile: - account, until = line.split() - until = float(until) - self.account_suppressions[account] = until - except: - self.logger.exception(_('ERROR with pid2filename ' - '%(pid)s %(filename)s: ') % {'pid': pid, - 'filename': pid2filename[pid]}) - os.unlink(pid2filename[pid]) - del pid2filename[pid] + self._load_suppressions(pid2filename[pid]) + finally: + del pid2filename[pid] fd, tmpfilename = mkstemp() os.close(fd) pid = os.fork() @@ -140,7 +145,11 @@ class ContainerUpdater(Daemon): 'no_change': self.no_changes}) sys.exit() while pid2filename: - del pid2filename[os.wait()[0]] + pid = os.wait()[0] + try: + self._load_suppressions(pid2filename[pid]) + finally: + del pid2filename[pid] elapsed = time.time() - begin self.logger.info(_('Container update sweep completed: %.02fs'), elapsed)