Merge "Throttle update_auditor_status calls"
This commit is contained in:
commit
20c143e7a3
@ -88,6 +88,7 @@ TMP_BASE = 'tmp'
|
|||||||
get_data_dir = partial(get_policy_string, DATADIR_BASE)
|
get_data_dir = partial(get_policy_string, DATADIR_BASE)
|
||||||
get_async_dir = partial(get_policy_string, ASYNCDIR_BASE)
|
get_async_dir = partial(get_policy_string, ASYNCDIR_BASE)
|
||||||
get_tmp_dir = partial(get_policy_string, TMP_BASE)
|
get_tmp_dir = partial(get_policy_string, TMP_BASE)
|
||||||
|
MIN_TIME_UPDATE_AUDITOR_STATUS = 60
|
||||||
|
|
||||||
|
|
||||||
def _get_filename(fd):
|
def _get_filename(fd):
|
||||||
@ -445,6 +446,16 @@ def update_auditor_status(datadir_path, logger, partitions, auditor_type):
|
|||||||
status = status.encode('utf8')
|
status = status.encode('utf8')
|
||||||
auditor_status = os.path.join(
|
auditor_status = os.path.join(
|
||||||
datadir_path, "auditor_status_%s.json" % auditor_type)
|
datadir_path, "auditor_status_%s.json" % auditor_type)
|
||||||
|
try:
|
||||||
|
mtime = os.stat(auditor_status).st_mtime
|
||||||
|
except OSError:
|
||||||
|
mtime = 0
|
||||||
|
recently_updated = (mtime + MIN_TIME_UPDATE_AUDITOR_STATUS) > time.time()
|
||||||
|
if recently_updated and len(partitions) > 0:
|
||||||
|
if logger:
|
||||||
|
logger.debug(
|
||||||
|
'Skipping the update of recently changed %s' % auditor_status)
|
||||||
|
return
|
||||||
try:
|
try:
|
||||||
with open(auditor_status, "wb") as statusfile:
|
with open(auditor_status, "wb") as statusfile:
|
||||||
statusfile.write(status)
|
statusfile.write(status)
|
||||||
|
@ -38,7 +38,7 @@ from contextlib import closing, contextmanager
|
|||||||
from gzip import GzipFile
|
from gzip import GzipFile
|
||||||
|
|
||||||
from eventlet import hubs, timeout, tpool
|
from eventlet import hubs, timeout, tpool
|
||||||
from swift.obj.diskfile import MD5_OF_EMPTY_STRING
|
from swift.obj.diskfile import MD5_OF_EMPTY_STRING, update_auditor_status
|
||||||
from test.unit import (FakeLogger, mock as unit_mock, temptree,
|
from test.unit import (FakeLogger, mock as unit_mock, temptree,
|
||||||
patch_policies, debug_logger, EMPTY_ETAG,
|
patch_policies, debug_logger, EMPTY_ETAG,
|
||||||
make_timestamp_iter, DEFAULT_TEST_EC_TYPE,
|
make_timestamp_iter, DEFAULT_TEST_EC_TYPE,
|
||||||
@ -491,10 +491,13 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
|
|||||||
os.makedirs(os.path.join(tmpdir, "sdf", "objects", "1", "a", "b"))
|
os.makedirs(os.path.join(tmpdir, "sdf", "objects", "1", "a", "b"))
|
||||||
os.makedirs(os.path.join(tmpdir, "sdf", "objects", "2", "a", "b"))
|
os.makedirs(os.path.join(tmpdir, "sdf", "objects", "2", "a", "b"))
|
||||||
|
|
||||||
# Auditor starts, there are two partitions to check
|
# Pretend that some time passed between each partition
|
||||||
gen = diskfile.object_audit_location_generator(tmpdir, False)
|
with mock.patch('os.stat') as mock_stat:
|
||||||
gen.next()
|
mock_stat.return_value.st_mtime = time() - 60
|
||||||
gen.next()
|
# Auditor starts, there are two partitions to check
|
||||||
|
gen = diskfile.object_audit_location_generator(tmpdir, False)
|
||||||
|
gen.next()
|
||||||
|
gen.next()
|
||||||
|
|
||||||
# Auditor stopped for some reason without raising StopIterator in
|
# Auditor stopped for some reason without raising StopIterator in
|
||||||
# the generator and restarts There is now only one remaining
|
# the generator and restarts There is now only one remaining
|
||||||
@ -520,6 +523,43 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
|
|||||||
gen.next()
|
gen.next()
|
||||||
gen.next()
|
gen.next()
|
||||||
|
|
||||||
|
def test_update_auditor_status_throttle(self):
|
||||||
|
# If there are a lot of nearly empty partitions, the
|
||||||
|
# update_auditor_status will write the status file many times a second,
|
||||||
|
# creating some unexpected high write load. This test ensures that the
|
||||||
|
# status file is only written once a minute.
|
||||||
|
with temptree([]) as tmpdir:
|
||||||
|
os.makedirs(os.path.join(tmpdir, "sdf", "objects", "1", "a", "b"))
|
||||||
|
with mock.patch('__builtin__.open') as mock_open:
|
||||||
|
# File does not exist yet - write expected
|
||||||
|
update_auditor_status(tmpdir, None, ['42'], "ALL")
|
||||||
|
self.assertEqual(1, mock_open.call_count)
|
||||||
|
|
||||||
|
mock_open.reset_mock()
|
||||||
|
|
||||||
|
# File exists, updated just now - no write expected
|
||||||
|
with mock.patch('os.stat') as mock_stat:
|
||||||
|
mock_stat.return_value.st_mtime = time()
|
||||||
|
update_auditor_status(tmpdir, None, ['42'], "ALL")
|
||||||
|
self.assertEqual(0, mock_open.call_count)
|
||||||
|
|
||||||
|
mock_open.reset_mock()
|
||||||
|
|
||||||
|
# File exists, updated just now, but empty partition list. This
|
||||||
|
# is a finalizing call, write expected
|
||||||
|
with mock.patch('os.stat') as mock_stat:
|
||||||
|
mock_stat.return_value.st_mtime = time()
|
||||||
|
update_auditor_status(tmpdir, None, [], "ALL")
|
||||||
|
self.assertEqual(1, mock_open.call_count)
|
||||||
|
|
||||||
|
mock_open.reset_mock()
|
||||||
|
|
||||||
|
# File updated more than 60 seconds ago - write expected
|
||||||
|
with mock.patch('os.stat') as mock_stat:
|
||||||
|
mock_stat.return_value.st_mtime = time() - 61
|
||||||
|
update_auditor_status(tmpdir, None, ['42'], "ALL")
|
||||||
|
self.assertEqual(1, mock_open.call_count)
|
||||||
|
|
||||||
|
|
||||||
class TestDiskFileRouter(unittest.TestCase):
|
class TestDiskFileRouter(unittest.TestCase):
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user