709 lines
29 KiB
Python
709 lines
29 KiB
Python
# Copyright (c) 2010-2012 OpenStack Foundation
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
from test import unit
|
|
import unittest
|
|
import mock
|
|
import os
|
|
import time
|
|
import string
|
|
from shutil import rmtree
|
|
from hashlib import md5
|
|
from tempfile import mkdtemp
|
|
from test.unit import FakeLogger, patch_policies, make_timestamp_iter, \
|
|
DEFAULT_TEST_EC_TYPE
|
|
from swift.obj import auditor
|
|
from swift.obj.diskfile import DiskFile, write_metadata, invalidate_hash, \
|
|
get_data_dir, DiskFileManager, ECDiskFileManager, AuditLocation
|
|
from swift.common.utils import mkdirs, normalize_timestamp, Timestamp
|
|
from swift.common.storage_policy import ECStoragePolicy, StoragePolicy, \
|
|
POLICIES
|
|
|
|
|
|
_mocked_policies = [
|
|
StoragePolicy(0, 'zero', False),
|
|
StoragePolicy(1, 'one', True),
|
|
ECStoragePolicy(2, 'two', ec_type=DEFAULT_TEST_EC_TYPE,
|
|
ec_ndata=2, ec_nparity=1, ec_segment_size=4096),
|
|
]
|
|
|
|
|
|
@patch_policies(_mocked_policies)
|
|
class TestAuditor(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.testdir = os.path.join(mkdtemp(), 'tmp_test_object_auditor')
|
|
self.devices = os.path.join(self.testdir, 'node')
|
|
self.rcache = os.path.join(self.testdir, 'object.recon')
|
|
self.logger = FakeLogger()
|
|
rmtree(self.testdir, ignore_errors=1)
|
|
mkdirs(os.path.join(self.devices, 'sda'))
|
|
os.mkdir(os.path.join(self.devices, 'sdb'))
|
|
|
|
# policy 0
|
|
self.objects = os.path.join(self.devices, 'sda',
|
|
get_data_dir(POLICIES[0]))
|
|
self.objects_2 = os.path.join(self.devices, 'sdb',
|
|
get_data_dir(POLICIES[0]))
|
|
os.mkdir(self.objects)
|
|
# policy 1
|
|
self.objects_p1 = os.path.join(self.devices, 'sda',
|
|
get_data_dir(POLICIES[1]))
|
|
self.objects_2_p1 = os.path.join(self.devices, 'sdb',
|
|
get_data_dir(POLICIES[1]))
|
|
os.mkdir(self.objects_p1)
|
|
# policy 2
|
|
self.objects_p2 = os.path.join(self.devices, 'sda',
|
|
get_data_dir(POLICIES[2]))
|
|
self.objects_2_p2 = os.path.join(self.devices, 'sdb',
|
|
get_data_dir(POLICIES[2]))
|
|
os.mkdir(self.objects_p2)
|
|
|
|
self.parts = {}
|
|
self.parts_p1 = {}
|
|
self.parts_p2 = {}
|
|
for part in ['0', '1', '2', '3']:
|
|
self.parts[part] = os.path.join(self.objects, part)
|
|
self.parts_p1[part] = os.path.join(self.objects_p1, part)
|
|
self.parts_p2[part] = os.path.join(self.objects_p2, part)
|
|
os.mkdir(os.path.join(self.objects, part))
|
|
os.mkdir(os.path.join(self.objects_p1, part))
|
|
os.mkdir(os.path.join(self.objects_p2, part))
|
|
|
|
self.conf = dict(
|
|
devices=self.devices,
|
|
mount_check='false',
|
|
object_size_stats='10,100,1024,10240')
|
|
self.df_mgr = DiskFileManager(self.conf, self.logger)
|
|
self.ec_df_mgr = ECDiskFileManager(self.conf, self.logger)
|
|
|
|
# diskfiles for policy 0, 1, 2
|
|
self.disk_file = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'o',
|
|
policy=POLICIES[0])
|
|
self.disk_file_p1 = self.df_mgr.get_diskfile('sda', '0', 'a', 'c',
|
|
'o', policy=POLICIES[1])
|
|
self.disk_file_ec = self.ec_df_mgr.get_diskfile(
|
|
'sda', '0', 'a', 'c', 'o', policy=POLICIES[2], frag_index=1)
|
|
|
|
def tearDown(self):
|
|
rmtree(os.path.dirname(self.testdir), ignore_errors=1)
|
|
unit.xattr_data = {}
|
|
|
|
def test_worker_conf_parms(self):
|
|
def check_common_defaults():
|
|
self.assertEqual(auditor_worker.max_bytes_per_second, 10000000)
|
|
self.assertEqual(auditor_worker.log_time, 3600)
|
|
|
|
# test default values
|
|
conf = dict(
|
|
devices=self.devices,
|
|
mount_check='false',
|
|
object_size_stats='10,100,1024,10240')
|
|
auditor_worker = auditor.AuditorWorker(conf, self.logger,
|
|
self.rcache, self.devices)
|
|
check_common_defaults()
|
|
for policy in POLICIES:
|
|
mgr = auditor_worker.diskfile_router[policy]
|
|
self.assertEqual(mgr.disk_chunk_size, 65536)
|
|
self.assertEqual(auditor_worker.max_files_per_second, 20)
|
|
self.assertEqual(auditor_worker.zero_byte_only_at_fps, 0)
|
|
|
|
# test specified audit value overrides
|
|
conf.update({'disk_chunk_size': 4096})
|
|
auditor_worker = auditor.AuditorWorker(conf, self.logger,
|
|
self.rcache, self.devices,
|
|
zero_byte_only_at_fps=50)
|
|
check_common_defaults()
|
|
for policy in POLICIES:
|
|
mgr = auditor_worker.diskfile_router[policy]
|
|
self.assertEqual(mgr.disk_chunk_size, 4096)
|
|
self.assertEqual(auditor_worker.max_files_per_second, 50)
|
|
self.assertEqual(auditor_worker.zero_byte_only_at_fps, 50)
|
|
|
|
def test_object_audit_extra_data(self):
|
|
def run_tests(disk_file):
|
|
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
|
|
self.rcache, self.devices)
|
|
data = '0' * 1024
|
|
etag = md5()
|
|
with disk_file.create() as writer:
|
|
writer.write(data)
|
|
etag.update(data)
|
|
etag = etag.hexdigest()
|
|
timestamp = str(normalize_timestamp(time.time()))
|
|
metadata = {
|
|
'ETag': etag,
|
|
'X-Timestamp': timestamp,
|
|
'Content-Length': str(os.fstat(writer._fd).st_size),
|
|
}
|
|
writer.put(metadata)
|
|
writer.commit(Timestamp(timestamp))
|
|
pre_quarantines = auditor_worker.quarantines
|
|
|
|
auditor_worker.object_audit(
|
|
AuditLocation(disk_file._datadir, 'sda', '0',
|
|
policy=disk_file.policy))
|
|
self.assertEqual(auditor_worker.quarantines, pre_quarantines)
|
|
|
|
os.write(writer._fd, 'extra_data')
|
|
|
|
auditor_worker.object_audit(
|
|
AuditLocation(disk_file._datadir, 'sda', '0',
|
|
policy=disk_file.policy))
|
|
self.assertEqual(auditor_worker.quarantines,
|
|
pre_quarantines + 1)
|
|
run_tests(self.disk_file)
|
|
run_tests(self.disk_file_p1)
|
|
run_tests(self.disk_file_ec)
|
|
|
|
def test_object_audit_diff_data(self):
|
|
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
|
|
self.rcache, self.devices)
|
|
data = '0' * 1024
|
|
etag = md5()
|
|
timestamp = str(normalize_timestamp(time.time()))
|
|
with self.disk_file.create() as writer:
|
|
writer.write(data)
|
|
etag.update(data)
|
|
etag = etag.hexdigest()
|
|
metadata = {
|
|
'ETag': etag,
|
|
'X-Timestamp': timestamp,
|
|
'Content-Length': str(os.fstat(writer._fd).st_size),
|
|
}
|
|
writer.put(metadata)
|
|
writer.commit(Timestamp(timestamp))
|
|
pre_quarantines = auditor_worker.quarantines
|
|
|
|
# remake so it will have metadata
|
|
self.disk_file = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'o',
|
|
policy=POLICIES.legacy)
|
|
|
|
auditor_worker.object_audit(
|
|
AuditLocation(self.disk_file._datadir, 'sda', '0',
|
|
policy=POLICIES.legacy))
|
|
self.assertEqual(auditor_worker.quarantines, pre_quarantines)
|
|
etag = md5()
|
|
etag.update('1' + '0' * 1023)
|
|
etag = etag.hexdigest()
|
|
metadata['ETag'] = etag
|
|
|
|
with self.disk_file.create() as writer:
|
|
writer.write(data)
|
|
writer.put(metadata)
|
|
writer.commit(Timestamp(timestamp))
|
|
|
|
auditor_worker.object_audit(
|
|
AuditLocation(self.disk_file._datadir, 'sda', '0',
|
|
policy=POLICIES.legacy))
|
|
self.assertEqual(auditor_worker.quarantines, pre_quarantines + 1)
|
|
|
|
def test_object_audit_no_meta(self):
|
|
timestamp = str(normalize_timestamp(time.time()))
|
|
path = os.path.join(self.disk_file._datadir, timestamp + '.data')
|
|
mkdirs(self.disk_file._datadir)
|
|
fp = open(path, 'w')
|
|
fp.write('0' * 1024)
|
|
fp.close()
|
|
invalidate_hash(os.path.dirname(self.disk_file._datadir))
|
|
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
|
|
self.rcache, self.devices)
|
|
pre_quarantines = auditor_worker.quarantines
|
|
auditor_worker.object_audit(
|
|
AuditLocation(self.disk_file._datadir, 'sda', '0',
|
|
policy=POLICIES.legacy))
|
|
self.assertEqual(auditor_worker.quarantines, pre_quarantines + 1)
|
|
|
|
def test_object_audit_will_not_swallow_errors_in_tests(self):
|
|
timestamp = str(normalize_timestamp(time.time()))
|
|
path = os.path.join(self.disk_file._datadir, timestamp + '.data')
|
|
mkdirs(self.disk_file._datadir)
|
|
with open(path, 'w') as f:
|
|
write_metadata(f, {'name': '/a/c/o'})
|
|
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
|
|
self.rcache, self.devices)
|
|
|
|
def blowup(*args):
|
|
raise NameError('tpyo')
|
|
with mock.patch.object(DiskFileManager,
|
|
'get_diskfile_from_audit_location', blowup):
|
|
self.assertRaises(NameError, auditor_worker.object_audit,
|
|
AuditLocation(os.path.dirname(path), 'sda', '0',
|
|
policy=POLICIES.legacy))
|
|
|
|
def test_failsafe_object_audit_will_swallow_errors_in_tests(self):
|
|
timestamp = str(normalize_timestamp(time.time()))
|
|
path = os.path.join(self.disk_file._datadir, timestamp + '.data')
|
|
mkdirs(self.disk_file._datadir)
|
|
with open(path, 'w') as f:
|
|
write_metadata(f, {'name': '/a/c/o'})
|
|
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
|
|
self.rcache, self.devices)
|
|
|
|
def blowup(*args):
|
|
raise NameError('tpyo')
|
|
with mock.patch('swift.obj.diskfile.DiskFileManager.diskfile_cls',
|
|
blowup):
|
|
auditor_worker.failsafe_object_audit(
|
|
AuditLocation(os.path.dirname(path), 'sda', '0',
|
|
policy=POLICIES.legacy))
|
|
self.assertEqual(auditor_worker.errors, 1)
|
|
|
|
def test_generic_exception_handling(self):
|
|
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
|
|
self.rcache, self.devices)
|
|
# pretend that we logged (and reset counters) just now
|
|
auditor_worker.last_logged = time.time()
|
|
timestamp = str(normalize_timestamp(time.time()))
|
|
pre_errors = auditor_worker.errors
|
|
data = '0' * 1024
|
|
etag = md5()
|
|
with self.disk_file.create() as writer:
|
|
writer.write(data)
|
|
etag.update(data)
|
|
etag = etag.hexdigest()
|
|
metadata = {
|
|
'ETag': etag,
|
|
'X-Timestamp': timestamp,
|
|
'Content-Length': str(os.fstat(writer._fd).st_size),
|
|
}
|
|
writer.put(metadata)
|
|
writer.commit(Timestamp(timestamp))
|
|
with mock.patch('swift.obj.diskfile.DiskFileManager.diskfile_cls',
|
|
lambda *_: 1 / 0):
|
|
auditor_worker.audit_all_objects()
|
|
self.assertEqual(auditor_worker.errors, pre_errors + 1)
|
|
|
|
def test_object_run_once_pass(self):
|
|
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
|
|
self.rcache, self.devices)
|
|
auditor_worker.log_time = 0
|
|
timestamp = str(normalize_timestamp(time.time()))
|
|
pre_quarantines = auditor_worker.quarantines
|
|
data = '0' * 1024
|
|
|
|
def write_file(df):
|
|
with df.create() as writer:
|
|
writer.write(data)
|
|
metadata = {
|
|
'ETag': md5(data).hexdigest(),
|
|
'X-Timestamp': timestamp,
|
|
'Content-Length': str(os.fstat(writer._fd).st_size),
|
|
}
|
|
writer.put(metadata)
|
|
writer.commit(Timestamp(timestamp))
|
|
|
|
# policy 0
|
|
write_file(self.disk_file)
|
|
# policy 1
|
|
write_file(self.disk_file_p1)
|
|
# policy 2
|
|
write_file(self.disk_file_ec)
|
|
|
|
auditor_worker.audit_all_objects()
|
|
self.assertEqual(auditor_worker.quarantines, pre_quarantines)
|
|
# 1 object per policy falls into 1024 bucket
|
|
self.assertEqual(auditor_worker.stats_buckets[1024], 3)
|
|
self.assertEqual(auditor_worker.stats_buckets[10240], 0)
|
|
|
|
# pick up some additional code coverage, large file
|
|
data = '0' * 1024 * 1024
|
|
for df in (self.disk_file, self.disk_file_ec):
|
|
with df.create() as writer:
|
|
writer.write(data)
|
|
metadata = {
|
|
'ETag': md5(data).hexdigest(),
|
|
'X-Timestamp': timestamp,
|
|
'Content-Length': str(os.fstat(writer._fd).st_size),
|
|
}
|
|
writer.put(metadata)
|
|
writer.commit(Timestamp(timestamp))
|
|
auditor_worker.audit_all_objects(device_dirs=['sda', 'sdb'])
|
|
self.assertEqual(auditor_worker.quarantines, pre_quarantines)
|
|
# still have the 1024 byte object left in policy-1 (plus the
|
|
# stats from the original 3)
|
|
self.assertEqual(auditor_worker.stats_buckets[1024], 4)
|
|
self.assertEqual(auditor_worker.stats_buckets[10240], 0)
|
|
# and then policy-0 disk_file was re-written as a larger object
|
|
self.assertEqual(auditor_worker.stats_buckets['OVER'], 2)
|
|
|
|
# pick up even more additional code coverage, misc paths
|
|
auditor_worker.log_time = -1
|
|
auditor_worker.stats_sizes = []
|
|
auditor_worker.audit_all_objects(device_dirs=['sda', 'sdb'])
|
|
self.assertEqual(auditor_worker.quarantines, pre_quarantines)
|
|
self.assertEqual(auditor_worker.stats_buckets[1024], 4)
|
|
self.assertEqual(auditor_worker.stats_buckets[10240], 0)
|
|
self.assertEqual(auditor_worker.stats_buckets['OVER'], 2)
|
|
|
|
def test_object_run_logging(self):
|
|
logger = FakeLogger()
|
|
auditor_worker = auditor.AuditorWorker(self.conf, logger,
|
|
self.rcache, self.devices)
|
|
auditor_worker.audit_all_objects(device_dirs=['sda'])
|
|
log_lines = logger.get_lines_for_level('info')
|
|
self.assertTrue(len(log_lines) > 0)
|
|
self.assertTrue(log_lines[0].index('ALL - parallel, sda'))
|
|
|
|
logger = FakeLogger()
|
|
auditor_worker = auditor.AuditorWorker(self.conf, logger,
|
|
self.rcache, self.devices,
|
|
zero_byte_only_at_fps=50)
|
|
auditor_worker.audit_all_objects(device_dirs=['sda'])
|
|
log_lines = logger.get_lines_for_level('info')
|
|
self.assertTrue(len(log_lines) > 0)
|
|
self.assertTrue(log_lines[0].index('ZBF - sda'))
|
|
|
|
def test_object_run_once_no_sda(self):
|
|
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
|
|
self.rcache, self.devices)
|
|
timestamp = str(normalize_timestamp(time.time()))
|
|
pre_quarantines = auditor_worker.quarantines
|
|
# pretend that we logged (and reset counters) just now
|
|
auditor_worker.last_logged = time.time()
|
|
data = '0' * 1024
|
|
etag = md5()
|
|
with self.disk_file.create() as writer:
|
|
writer.write(data)
|
|
etag.update(data)
|
|
etag = etag.hexdigest()
|
|
metadata = {
|
|
'ETag': etag,
|
|
'X-Timestamp': timestamp,
|
|
'Content-Length': str(os.fstat(writer._fd).st_size),
|
|
}
|
|
writer.put(metadata)
|
|
os.write(writer._fd, 'extra_data')
|
|
writer.commit(Timestamp(timestamp))
|
|
auditor_worker.audit_all_objects()
|
|
self.assertEqual(auditor_worker.quarantines, pre_quarantines + 1)
|
|
|
|
def test_object_run_once_multi_devices(self):
|
|
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
|
|
self.rcache, self.devices)
|
|
# pretend that we logged (and reset counters) just now
|
|
auditor_worker.last_logged = time.time()
|
|
timestamp = str(normalize_timestamp(time.time()))
|
|
pre_quarantines = auditor_worker.quarantines
|
|
data = '0' * 10
|
|
etag = md5()
|
|
with self.disk_file.create() as writer:
|
|
writer.write(data)
|
|
etag.update(data)
|
|
etag = etag.hexdigest()
|
|
metadata = {
|
|
'ETag': etag,
|
|
'X-Timestamp': timestamp,
|
|
'Content-Length': str(os.fstat(writer._fd).st_size),
|
|
}
|
|
writer.put(metadata)
|
|
writer.commit(Timestamp(timestamp))
|
|
auditor_worker.audit_all_objects()
|
|
self.disk_file = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'ob',
|
|
policy=POLICIES.legacy)
|
|
data = '1' * 10
|
|
etag = md5()
|
|
with self.disk_file.create() as writer:
|
|
writer.write(data)
|
|
etag.update(data)
|
|
etag = etag.hexdigest()
|
|
metadata = {
|
|
'ETag': etag,
|
|
'X-Timestamp': timestamp,
|
|
'Content-Length': str(os.fstat(writer._fd).st_size),
|
|
}
|
|
writer.put(metadata)
|
|
writer.commit(Timestamp(timestamp))
|
|
os.write(writer._fd, 'extra_data')
|
|
auditor_worker.audit_all_objects()
|
|
self.assertEqual(auditor_worker.quarantines, pre_quarantines + 1)
|
|
|
|
def test_object_run_fast_track_non_zero(self):
|
|
self.auditor = auditor.ObjectAuditor(self.conf)
|
|
self.auditor.log_time = 0
|
|
data = '0' * 1024
|
|
etag = md5()
|
|
with self.disk_file.create() as writer:
|
|
writer.write(data)
|
|
etag.update(data)
|
|
etag = etag.hexdigest()
|
|
timestamp = str(normalize_timestamp(time.time()))
|
|
metadata = {
|
|
'ETag': etag,
|
|
'X-Timestamp': timestamp,
|
|
'Content-Length': str(os.fstat(writer._fd).st_size),
|
|
}
|
|
writer.put(metadata)
|
|
writer.commit(Timestamp(timestamp))
|
|
etag = md5()
|
|
etag.update('1' + '0' * 1023)
|
|
etag = etag.hexdigest()
|
|
metadata['ETag'] = etag
|
|
write_metadata(writer._fd, metadata)
|
|
|
|
quarantine_path = os.path.join(self.devices,
|
|
'sda', 'quarantined', 'objects')
|
|
kwargs = {'mode': 'once'}
|
|
kwargs['zero_byte_fps'] = 50
|
|
self.auditor.run_audit(**kwargs)
|
|
self.assertFalse(os.path.isdir(quarantine_path))
|
|
del(kwargs['zero_byte_fps'])
|
|
self.auditor.run_audit(**kwargs)
|
|
self.assertTrue(os.path.isdir(quarantine_path))
|
|
|
|
def setup_bad_zero_byte(self, timestamp=None):
|
|
if timestamp is None:
|
|
timestamp = Timestamp(time.time())
|
|
self.auditor = auditor.ObjectAuditor(self.conf)
|
|
self.auditor.log_time = 0
|
|
etag = md5()
|
|
with self.disk_file.create() as writer:
|
|
etag = etag.hexdigest()
|
|
metadata = {
|
|
'ETag': etag,
|
|
'X-Timestamp': timestamp.internal,
|
|
'Content-Length': 10,
|
|
}
|
|
writer.put(metadata)
|
|
writer.commit(Timestamp(timestamp))
|
|
etag = md5()
|
|
etag = etag.hexdigest()
|
|
metadata['ETag'] = etag
|
|
write_metadata(writer._fd, metadata)
|
|
|
|
def test_object_run_fast_track_all(self):
|
|
self.setup_bad_zero_byte()
|
|
kwargs = {'mode': 'once'}
|
|
self.auditor.run_audit(**kwargs)
|
|
quarantine_path = os.path.join(self.devices,
|
|
'sda', 'quarantined', 'objects')
|
|
self.assertTrue(os.path.isdir(quarantine_path))
|
|
|
|
def test_object_run_fast_track_zero(self):
|
|
self.setup_bad_zero_byte()
|
|
kwargs = {'mode': 'once'}
|
|
kwargs['zero_byte_fps'] = 50
|
|
self.auditor.run_audit(**kwargs)
|
|
quarantine_path = os.path.join(self.devices,
|
|
'sda', 'quarantined', 'objects')
|
|
self.assertTrue(os.path.isdir(quarantine_path))
|
|
|
|
def test_object_run_fast_track_zero_check_closed(self):
|
|
rat = [False]
|
|
|
|
class FakeFile(DiskFile):
|
|
|
|
def _quarantine(self, data_file, msg):
|
|
rat[0] = True
|
|
DiskFile._quarantine(self, data_file, msg)
|
|
|
|
self.setup_bad_zero_byte()
|
|
with mock.patch('swift.obj.diskfile.DiskFileManager.diskfile_cls',
|
|
FakeFile):
|
|
kwargs = {'mode': 'once'}
|
|
kwargs['zero_byte_fps'] = 50
|
|
self.auditor.run_audit(**kwargs)
|
|
quarantine_path = os.path.join(self.devices,
|
|
'sda', 'quarantined', 'objects')
|
|
self.assertTrue(os.path.isdir(quarantine_path))
|
|
self.assertTrue(rat[0])
|
|
|
|
@mock.patch.object(auditor.ObjectAuditor, 'run_audit')
|
|
@mock.patch('os.fork', return_value=0)
|
|
def test_with_inaccessible_object_location(self, mock_os_fork,
|
|
mock_run_audit):
|
|
# Need to ensure that any failures in run_audit do
|
|
# not prevent sys.exit() from running. Otherwise we get
|
|
# zombie processes.
|
|
e = OSError('permission denied')
|
|
mock_run_audit.side_effect = e
|
|
self.auditor = auditor.ObjectAuditor(self.conf)
|
|
self.assertRaises(SystemExit, self.auditor.fork_child, self)
|
|
|
|
def test_with_only_tombstone(self):
|
|
# sanity check that auditor doesn't touch solitary tombstones
|
|
ts_iter = make_timestamp_iter()
|
|
self.setup_bad_zero_byte(timestamp=next(ts_iter))
|
|
self.disk_file.delete(next(ts_iter))
|
|
files = os.listdir(self.disk_file._datadir)
|
|
self.assertEqual(1, len(files))
|
|
self.assertTrue(files[0].endswith('ts'))
|
|
kwargs = {'mode': 'once'}
|
|
self.auditor.run_audit(**kwargs)
|
|
files_after = os.listdir(self.disk_file._datadir)
|
|
self.assertEqual(files, files_after)
|
|
|
|
def test_with_tombstone_and_data(self):
|
|
# rsync replication could leave a tombstone and data file in object
|
|
# dir - verify they are both removed during audit
|
|
ts_iter = make_timestamp_iter()
|
|
ts_tomb = next(ts_iter)
|
|
ts_data = next(ts_iter)
|
|
self.setup_bad_zero_byte(timestamp=ts_data)
|
|
tomb_file_path = os.path.join(self.disk_file._datadir,
|
|
'%s.ts' % ts_tomb.internal)
|
|
with open(tomb_file_path, 'wb') as fd:
|
|
write_metadata(fd, {'X-Timestamp': ts_tomb.internal})
|
|
files = os.listdir(self.disk_file._datadir)
|
|
self.assertEqual(2, len(files))
|
|
self.assertTrue(os.path.basename(tomb_file_path) in files, files)
|
|
kwargs = {'mode': 'once'}
|
|
self.auditor.run_audit(**kwargs)
|
|
self.assertFalse(os.path.exists(self.disk_file._datadir))
|
|
|
|
def test_sleeper(self):
|
|
with mock.patch(
|
|
'time.sleep', mock.MagicMock()) as mock_sleep:
|
|
my_auditor = auditor.ObjectAuditor(self.conf)
|
|
my_auditor._sleep()
|
|
mock_sleep.assert_called_with(30)
|
|
|
|
my_conf = dict(interval=2)
|
|
my_conf.update(self.conf)
|
|
my_auditor = auditor.ObjectAuditor(my_conf)
|
|
my_auditor._sleep()
|
|
mock_sleep.assert_called_with(2)
|
|
|
|
my_auditor = auditor.ObjectAuditor(self.conf)
|
|
my_auditor.interval = 2
|
|
my_auditor._sleep()
|
|
mock_sleep.assert_called_with(2)
|
|
|
|
def test_run_parallel_audit(self):
|
|
|
|
class StopForever(Exception):
|
|
pass
|
|
|
|
class Bogus(Exception):
|
|
pass
|
|
|
|
loop_error = Bogus('exception')
|
|
|
|
class ObjectAuditorMock(object):
|
|
check_args = ()
|
|
check_kwargs = {}
|
|
check_device_dir = None
|
|
fork_called = 0
|
|
master = 0
|
|
wait_called = 0
|
|
|
|
def mock_run(self, *args, **kwargs):
|
|
self.check_args = args
|
|
self.check_kwargs = kwargs
|
|
if 'zero_byte_fps' in kwargs:
|
|
self.check_device_dir = kwargs.get('device_dirs')
|
|
|
|
def mock_sleep_stop(self):
|
|
raise StopForever('stop')
|
|
|
|
def mock_sleep_continue(self):
|
|
return
|
|
|
|
def mock_audit_loop_error(self, parent, zbo_fps,
|
|
override_devices=None, **kwargs):
|
|
raise loop_error
|
|
|
|
def mock_fork(self):
|
|
self.fork_called += 1
|
|
if self.master:
|
|
return self.fork_called
|
|
else:
|
|
return 0
|
|
|
|
def mock_wait(self):
|
|
self.wait_called += 1
|
|
return (self.wait_called, 0)
|
|
|
|
for i in string.ascii_letters[2:26]:
|
|
mkdirs(os.path.join(self.devices, 'sd%s' % i))
|
|
|
|
my_auditor = auditor.ObjectAuditor(dict(devices=self.devices,
|
|
mount_check='false',
|
|
zero_byte_files_per_second=89,
|
|
concurrency=1))
|
|
|
|
mocker = ObjectAuditorMock()
|
|
my_auditor.logger.exception = mock.MagicMock()
|
|
real_audit_loop = my_auditor.audit_loop
|
|
my_auditor.audit_loop = mocker.mock_audit_loop_error
|
|
my_auditor.run_audit = mocker.mock_run
|
|
was_fork = os.fork
|
|
was_wait = os.wait
|
|
os.fork = mocker.mock_fork
|
|
os.wait = mocker.mock_wait
|
|
try:
|
|
my_auditor._sleep = mocker.mock_sleep_stop
|
|
my_auditor.run_once(zero_byte_fps=50)
|
|
my_auditor.logger.exception.assert_called_once_with(
|
|
'ERROR auditing: %s', loop_error)
|
|
my_auditor.logger.exception.reset_mock()
|
|
self.assertRaises(StopForever, my_auditor.run_forever)
|
|
my_auditor.logger.exception.assert_called_once_with(
|
|
'ERROR auditing: %s', loop_error)
|
|
my_auditor.audit_loop = real_audit_loop
|
|
|
|
self.assertRaises(StopForever,
|
|
my_auditor.run_forever, zero_byte_fps=50)
|
|
self.assertEqual(mocker.check_kwargs['zero_byte_fps'], 50)
|
|
self.assertEqual(mocker.fork_called, 0)
|
|
|
|
self.assertRaises(SystemExit, my_auditor.run_once)
|
|
self.assertEqual(mocker.fork_called, 1)
|
|
self.assertEqual(mocker.check_kwargs['zero_byte_fps'], 89)
|
|
self.assertEqual(mocker.check_device_dir, [])
|
|
self.assertEqual(mocker.check_args, ())
|
|
|
|
device_list = ['sd%s' % i for i in string.ascii_letters[2:10]]
|
|
device_string = ','.join(device_list)
|
|
device_string_bogus = device_string + ',bogus'
|
|
|
|
mocker.fork_called = 0
|
|
self.assertRaises(SystemExit, my_auditor.run_once,
|
|
devices=device_string_bogus)
|
|
self.assertEqual(mocker.fork_called, 1)
|
|
self.assertEqual(mocker.check_kwargs['zero_byte_fps'], 89)
|
|
self.assertEqual(sorted(mocker.check_device_dir), device_list)
|
|
|
|
mocker.master = 1
|
|
|
|
mocker.fork_called = 0
|
|
self.assertRaises(StopForever, my_auditor.run_forever)
|
|
# Fork is called 2 times since the zbf process is forked just
|
|
# once before self._sleep() is called and StopForever is raised
|
|
# Also wait is called just once before StopForever is raised
|
|
self.assertEqual(mocker.fork_called, 2)
|
|
self.assertEqual(mocker.wait_called, 1)
|
|
|
|
my_auditor._sleep = mocker.mock_sleep_continue
|
|
|
|
my_auditor.concurrency = 2
|
|
mocker.fork_called = 0
|
|
mocker.wait_called = 0
|
|
my_auditor.run_once()
|
|
# Fork is called no. of devices + (no. of devices)/2 + 1 times
|
|
# since zbf process is forked (no.of devices)/2 + 1 times
|
|
no_devices = len(os.listdir(self.devices))
|
|
self.assertEqual(mocker.fork_called, no_devices + no_devices / 2
|
|
+ 1)
|
|
self.assertEqual(mocker.wait_called, no_devices + no_devices / 2
|
|
+ 1)
|
|
|
|
finally:
|
|
os.fork = was_fork
|
|
os.wait = was_wait
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|