fixing the run_forever and unit tests
This commit is contained in:
parent
a86a569cae
commit
7db8c42a1a
@ -25,6 +25,8 @@ from swift.common.utils import get_logger, renamer, audit_location_generator, \
|
|||||||
from swift.common.exceptions import AuditException
|
from swift.common.exceptions import AuditException
|
||||||
from swift.common.daemon import Daemon
|
from swift.common.daemon import Daemon
|
||||||
|
|
||||||
|
SLEEP_BETWEEN_AUDITS = 30
|
||||||
|
|
||||||
|
|
||||||
class AuditorWorker(object):
|
class AuditorWorker(object):
|
||||||
"""Walk through file system to audit object"""
|
"""Walk through file system to audit object"""
|
||||||
@ -167,22 +169,27 @@ class ObjectAuditor(Daemon):
|
|||||||
self.conf_zero_byte_fps = int(conf.get(
|
self.conf_zero_byte_fps = int(conf.get(
|
||||||
'zero_byte_files_per_second', 50))
|
'zero_byte_files_per_second', 50))
|
||||||
|
|
||||||
|
def _sleep(self):
|
||||||
|
time.sleep(SLEEP_BETWEEN_AUDITS)
|
||||||
|
|
||||||
def run_forever(self, *args, **kwargs):
|
def run_forever(self, *args, **kwargs):
|
||||||
"""Run the object audit until stopped."""
|
"""Run the object audit until stopped."""
|
||||||
zero_byte_only_at_fps = kwargs.get('zero_byte_fps', 0) or \
|
zero_byte_only_at_fps = kwargs.get('zero_byte_fps', 0)
|
||||||
self.conf_zero_byte_fps
|
|
||||||
zero_byte_pid = 1
|
zero_byte_pid = 1
|
||||||
if zero_byte_only_at_fps:
|
if not zero_byte_only_at_fps:
|
||||||
zero_byte_pid = os.fork()
|
zero_byte_pid = os.fork()
|
||||||
if zero_byte_pid == 0:
|
if zero_byte_pid == 0:
|
||||||
|
# child process runs the 'all'
|
||||||
|
while True:
|
||||||
|
self.run_once(mode='forever')
|
||||||
|
self._sleep()
|
||||||
|
else:
|
||||||
|
# no fork or forked parent path
|
||||||
while True:
|
while True:
|
||||||
self.run_once(mode='forever',
|
self.run_once(mode='forever',
|
||||||
zero_byte_fps=zero_byte_only_at_fps)
|
zero_byte_fps=zero_byte_only_at_fps or
|
||||||
time.sleep(30)
|
self.conf_zero_byte_fps)
|
||||||
else:
|
self._sleep()
|
||||||
while not zero_byte_only_at_fps:
|
|
||||||
self.run_once(mode='forever')
|
|
||||||
time.sleep(30)
|
|
||||||
|
|
||||||
def run_once(self, *args, **kwargs):
|
def run_once(self, *args, **kwargs):
|
||||||
"""Run the object audit once."""
|
"""Run the object audit once."""
|
||||||
|
@ -24,8 +24,9 @@ from hashlib import md5
|
|||||||
from tempfile import mkdtemp
|
from tempfile import mkdtemp
|
||||||
from swift.obj import auditor
|
from swift.obj import auditor
|
||||||
from swift.obj import server as object_server
|
from swift.obj import server as object_server
|
||||||
from swift.obj.server import DiskFile, write_metadata
|
from swift.obj.server import DiskFile, write_metadata, DATADIR
|
||||||
from swift.common.utils import hash_path, mkdirs, normalize_timestamp, renamer
|
from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \
|
||||||
|
renamer, storage_directory
|
||||||
from swift.obj.replicator import invalidate_hash
|
from swift.obj.replicator import invalidate_hash
|
||||||
from swift.common.exceptions import AuditException
|
from swift.common.exceptions import AuditException
|
||||||
|
|
||||||
@ -263,10 +264,23 @@ class TestAuditor(unittest.TestCase):
|
|||||||
self.auditor.run_once()
|
self.auditor.run_once()
|
||||||
self.assertTrue(os.path.isdir(quarantine_path))
|
self.assertTrue(os.path.isdir(quarantine_path))
|
||||||
|
|
||||||
def setup_bad_zero_byte(self):
|
def setup_bad_zero_byte(self, with_ts=False):
|
||||||
self.auditor = auditor.ObjectAuditor(self.conf)
|
self.auditor = auditor.ObjectAuditor(self.conf)
|
||||||
self.auditor.log_time = 0
|
self.auditor.log_time = 0
|
||||||
cur_part = '0'
|
cur_part = '0'
|
||||||
|
ts_file_path = ''
|
||||||
|
if with_ts:
|
||||||
|
|
||||||
|
name_hash = hash_path('a', 'c', 'o')
|
||||||
|
dir_path = os.path.join(self.devices, 'sda',
|
||||||
|
storage_directory(DATADIR, cur_part, name_hash))
|
||||||
|
ts_file_path = os.path.join(dir_path, '99999.ts')
|
||||||
|
if not os.path.exists(dir_path):
|
||||||
|
mkdirs(dir_path)
|
||||||
|
fp = open(ts_file_path, 'w')
|
||||||
|
fp.close()
|
||||||
|
|
||||||
|
|
||||||
disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
|
disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
|
||||||
etag = md5()
|
etag = md5()
|
||||||
with disk_file.mkstemp() as (fd, tmppath):
|
with disk_file.mkstemp() as (fd, tmppath):
|
||||||
@ -281,6 +295,9 @@ class TestAuditor(unittest.TestCase):
|
|||||||
etag = etag.hexdigest()
|
etag = etag.hexdigest()
|
||||||
metadata['ETag'] = etag
|
metadata['ETag'] = etag
|
||||||
write_metadata(fd, metadata)
|
write_metadata(fd, metadata)
|
||||||
|
if disk_file.data_file:
|
||||||
|
return disk_file.data_file
|
||||||
|
return ts_file_path
|
||||||
|
|
||||||
def test_object_run_fast_track_all(self):
|
def test_object_run_fast_track_all(self):
|
||||||
self.setup_bad_zero_byte()
|
self.setup_bad_zero_byte()
|
||||||
@ -296,5 +313,68 @@ class TestAuditor(unittest.TestCase):
|
|||||||
'sda', 'quarantined', 'objects')
|
'sda', 'quarantined', 'objects')
|
||||||
self.assertTrue(os.path.isdir(quarantine_path))
|
self.assertTrue(os.path.isdir(quarantine_path))
|
||||||
|
|
||||||
|
def test_with_tombstone(self):
|
||||||
|
ts_file_path = self.setup_bad_zero_byte(with_ts=True)
|
||||||
|
self.auditor.run_once()
|
||||||
|
quarantine_path = os.path.join(self.devices,
|
||||||
|
'sda', 'quarantined', 'objects')
|
||||||
|
self.assertTrue(ts_file_path.endswith('ts'))
|
||||||
|
self.assertTrue(os.path.exists(ts_file_path))
|
||||||
|
|
||||||
|
def test_sleeper(self):
|
||||||
|
auditor.SLEEP_BETWEEN_AUDITS = 0.01
|
||||||
|
my_auditor = auditor.ObjectAuditor(self.conf)
|
||||||
|
start = time.time()
|
||||||
|
my_auditor._sleep()
|
||||||
|
self.assertEquals(round(time.time() - start, 2), 0.01)
|
||||||
|
|
||||||
|
def test_run_forever(self):
|
||||||
|
|
||||||
|
class StopForever(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
class ObjectAuditorMock(object):
|
||||||
|
check_args = ()
|
||||||
|
check_kwargs = {}
|
||||||
|
fork_called = 0
|
||||||
|
fork_res = 0
|
||||||
|
|
||||||
|
def mock_run(self, *args, **kwargs):
|
||||||
|
self.check_args = args
|
||||||
|
self.check_kwargs = kwargs
|
||||||
|
|
||||||
|
def mock_sleep(self):
|
||||||
|
raise StopForever('stop')
|
||||||
|
|
||||||
|
def mock_fork(self):
|
||||||
|
self.fork_called += 1
|
||||||
|
return self.fork_res
|
||||||
|
|
||||||
|
my_auditor = auditor.ObjectAuditor(dict(devices=self.devices,
|
||||||
|
mount_check='false',
|
||||||
|
zero_byte_files_per_second=89))
|
||||||
|
mocker = ObjectAuditorMock()
|
||||||
|
my_auditor.run_once = mocker.mock_run
|
||||||
|
my_auditor._sleep = mocker.mock_sleep
|
||||||
|
was_fork = os.fork
|
||||||
|
try:
|
||||||
|
os.fork = mocker.mock_fork
|
||||||
|
self.assertRaises(StopForever,
|
||||||
|
my_auditor.run_forever, zero_byte_fps=50)
|
||||||
|
self.assertEquals(mocker.check_kwargs['zero_byte_fps'], 50)
|
||||||
|
self.assertEquals(mocker.fork_called, 0)
|
||||||
|
|
||||||
|
self.assertRaises(StopForever, my_auditor.run_forever)
|
||||||
|
self.assertEquals(mocker.fork_called, 1)
|
||||||
|
self.assertEquals(mocker.check_args, ())
|
||||||
|
|
||||||
|
mocker.fork_res = 1
|
||||||
|
self.assertRaises(StopForever, my_auditor.run_forever)
|
||||||
|
self.assertEquals(mocker.fork_called, 2)
|
||||||
|
self.assertEquals(mocker.check_kwargs['zero_byte_fps'], 89)
|
||||||
|
|
||||||
|
finally:
|
||||||
|
os.fork = was_fork
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user