Add Storage Policy Support to the Auditor
This patch makes the object auditor policy-aware, so it'll audit objects in any storage policy. DocImpact Implements: blueprint storage-policies Change-Id: I94e3a7937d9814b9ecef6ca35371e245a43513d3
This commit is contained in:
parent
d5ca365965
commit
1a0e4d9197
@ -423,27 +423,40 @@ def object_audit_location_generator(devices, mount_check=True, logger=None,
|
||||
logger.debug(
|
||||
_('Skipping %s as it is not mounted'), device)
|
||||
continue
|
||||
datadir_path = os.path.join(devices, device, DATADIR_BASE)
|
||||
partitions = listdir(datadir_path)
|
||||
for partition in partitions:
|
||||
part_path = os.path.join(datadir_path, partition)
|
||||
# loop through object dirs for all policies
|
||||
for dir in [dir for dir in os.listdir(os.path.join(devices, device))
|
||||
if dir.startswith(DATADIR_BASE)]:
|
||||
datadir_path = os.path.join(devices, device, dir)
|
||||
# warn if the object dir doesn't match with a policy
|
||||
policy_idx = 0
|
||||
if '-' in dir:
|
||||
base, policy_idx = dir.split('-', 1)
|
||||
try:
|
||||
suffixes = listdir(part_path)
|
||||
except OSError as e:
|
||||
if e.errno != errno.ENOTDIR:
|
||||
raise
|
||||
continue
|
||||
for asuffix in suffixes:
|
||||
suff_path = os.path.join(part_path, asuffix)
|
||||
get_data_dir(policy_idx)
|
||||
except ValueError:
|
||||
if logger:
|
||||
logger.warn(_('Directory %s does not map to a '
|
||||
'valid policy') % dir)
|
||||
partitions = listdir(datadir_path)
|
||||
for partition in partitions:
|
||||
part_path = os.path.join(datadir_path, partition)
|
||||
try:
|
||||
hashes = listdir(suff_path)
|
||||
suffixes = listdir(part_path)
|
||||
except OSError as e:
|
||||
if e.errno != errno.ENOTDIR:
|
||||
raise
|
||||
continue
|
||||
for hsh in hashes:
|
||||
hsh_path = os.path.join(suff_path, hsh)
|
||||
yield AuditLocation(hsh_path, device, partition)
|
||||
for asuffix in suffixes:
|
||||
suff_path = os.path.join(part_path, asuffix)
|
||||
try:
|
||||
hashes = listdir(suff_path)
|
||||
except OSError as e:
|
||||
if e.errno != errno.ENOTDIR:
|
||||
raise
|
||||
continue
|
||||
for hsh in hashes:
|
||||
hsh_path = os.path.join(suff_path, hsh)
|
||||
yield AuditLocation(hsh_path, device, partition)
|
||||
|
||||
|
||||
class DiskFileManager(object):
|
||||
|
@ -22,14 +22,20 @@ import string
|
||||
from shutil import rmtree
|
||||
from hashlib import md5
|
||||
from tempfile import mkdtemp
|
||||
from test.unit import FakeLogger
|
||||
from test.unit import FakeLogger, patch_policies
|
||||
from swift.obj import auditor
|
||||
from swift.obj.diskfile import DiskFile, write_metadata, invalidate_hash, \
|
||||
DATADIR_BASE, DiskFileManager, AuditLocation
|
||||
get_data_dir, DiskFileManager, AuditLocation
|
||||
from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \
|
||||
storage_directory
|
||||
from swift.common.storage_policy import StoragePolicy
|
||||
|
||||
|
||||
_mocked_policies = [StoragePolicy(0, 'zero', False),
|
||||
StoragePolicy(1, 'one', True)]
|
||||
|
||||
|
||||
@patch_policies(_mocked_policies)
|
||||
class TestAuditor(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
@ -39,54 +45,70 @@ class TestAuditor(unittest.TestCase):
|
||||
self.logger = FakeLogger()
|
||||
rmtree(self.testdir, ignore_errors=1)
|
||||
mkdirs(os.path.join(self.devices, 'sda'))
|
||||
self.objects = os.path.join(self.devices, 'sda', 'objects')
|
||||
|
||||
os.mkdir(os.path.join(self.devices, 'sdb'))
|
||||
self.objects_2 = os.path.join(self.devices, 'sdb', 'objects')
|
||||
|
||||
# policy 0
|
||||
self.objects = os.path.join(self.devices, 'sda', get_data_dir(0))
|
||||
self.objects_2 = os.path.join(self.devices, 'sdb', get_data_dir(0))
|
||||
os.mkdir(self.objects)
|
||||
self.parts = {}
|
||||
# policy 1
|
||||
self.objects_p1 = os.path.join(self.devices, 'sda', get_data_dir(1))
|
||||
self.objects_2_p1 = os.path.join(self.devices, 'sdb', get_data_dir(1))
|
||||
os.mkdir(self.objects_p1)
|
||||
|
||||
self.parts = self.parts_p1 = {}
|
||||
for part in ['0', '1', '2', '3']:
|
||||
self.parts[part] = os.path.join(self.objects, part)
|
||||
self.parts_p1[part] = os.path.join(self.objects_p1, part)
|
||||
os.mkdir(os.path.join(self.objects, part))
|
||||
os.mkdir(os.path.join(self.objects_p1, part))
|
||||
|
||||
self.conf = dict(
|
||||
devices=self.devices,
|
||||
mount_check='false',
|
||||
object_size_stats='10,100,1024,10240')
|
||||
self.df_mgr = DiskFileManager(self.conf, self.logger)
|
||||
self.disk_file = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'o')
|
||||
|
||||
# diskfiles for policy 0, 1
|
||||
self.disk_file = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'o', 0)
|
||||
self.disk_file_p1 = self.df_mgr.get_diskfile('sda', '0', 'a', 'c',
|
||||
'o', 1)
|
||||
|
||||
def tearDown(self):
|
||||
rmtree(os.path.dirname(self.testdir), ignore_errors=1)
|
||||
unit.xattr_data = {}
|
||||
|
||||
def test_object_audit_extra_data(self):
|
||||
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
|
||||
self.rcache, self.devices)
|
||||
data = '0' * 1024
|
||||
etag = md5()
|
||||
with self.disk_file.create() as writer:
|
||||
writer.write(data)
|
||||
etag.update(data)
|
||||
etag = etag.hexdigest()
|
||||
timestamp = str(normalize_timestamp(time.time()))
|
||||
metadata = {
|
||||
'ETag': etag,
|
||||
'X-Timestamp': timestamp,
|
||||
'Content-Length': str(os.fstat(writer._fd).st_size),
|
||||
}
|
||||
writer.put(metadata)
|
||||
pre_quarantines = auditor_worker.quarantines
|
||||
def run_tests(disk_file):
|
||||
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
|
||||
self.rcache, self.devices)
|
||||
data = '0' * 1024
|
||||
etag = md5()
|
||||
with disk_file.create() as writer:
|
||||
writer.write(data)
|
||||
etag.update(data)
|
||||
etag = etag.hexdigest()
|
||||
timestamp = str(normalize_timestamp(time.time()))
|
||||
metadata = {
|
||||
'ETag': etag,
|
||||
'X-Timestamp': timestamp,
|
||||
'Content-Length': str(os.fstat(writer._fd).st_size),
|
||||
}
|
||||
writer.put(metadata)
|
||||
pre_quarantines = auditor_worker.quarantines
|
||||
|
||||
auditor_worker.object_audit(
|
||||
AuditLocation(self.disk_file._datadir, 'sda', '0'))
|
||||
self.assertEquals(auditor_worker.quarantines, pre_quarantines)
|
||||
auditor_worker.object_audit(
|
||||
AuditLocation(disk_file._datadir, 'sda', '0'))
|
||||
self.assertEquals(auditor_worker.quarantines, pre_quarantines)
|
||||
|
||||
os.write(writer._fd, 'extra_data')
|
||||
auditor_worker.object_audit(
|
||||
AuditLocation(self.disk_file._datadir, 'sda', '0'))
|
||||
self.assertEquals(auditor_worker.quarantines, pre_quarantines + 1)
|
||||
os.write(writer._fd, 'extra_data')
|
||||
|
||||
auditor_worker.object_audit(
|
||||
AuditLocation(disk_file._datadir, 'sda', '0'))
|
||||
self.assertEquals(auditor_worker.quarantines,
|
||||
pre_quarantines + 1)
|
||||
run_tests(self.disk_file)
|
||||
run_tests(self.disk_file_p1)
|
||||
|
||||
def test_object_audit_diff_data(self):
|
||||
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
|
||||
@ -200,20 +222,29 @@ class TestAuditor(unittest.TestCase):
|
||||
timestamp = str(normalize_timestamp(time.time()))
|
||||
pre_quarantines = auditor_worker.quarantines
|
||||
data = '0' * 1024
|
||||
etag = md5()
|
||||
with self.disk_file.create() as writer:
|
||||
writer.write(data)
|
||||
etag.update(data)
|
||||
etag = etag.hexdigest()
|
||||
metadata = {
|
||||
'ETag': etag,
|
||||
'X-Timestamp': timestamp,
|
||||
'Content-Length': str(os.fstat(writer._fd).st_size),
|
||||
}
|
||||
writer.put(metadata)
|
||||
|
||||
def write_file(df):
|
||||
etag = md5()
|
||||
with df.create() as writer:
|
||||
writer.write(data)
|
||||
etag.update(data)
|
||||
etag = etag.hexdigest()
|
||||
metadata = {
|
||||
'ETag': etag,
|
||||
'X-Timestamp': timestamp,
|
||||
'Content-Length': str(os.fstat(writer._fd).st_size),
|
||||
}
|
||||
writer.put(metadata)
|
||||
|
||||
# policy 0
|
||||
write_file(self.disk_file)
|
||||
# policy 1
|
||||
write_file(self.disk_file_p1)
|
||||
|
||||
auditor_worker.audit_all_objects()
|
||||
self.assertEquals(auditor_worker.quarantines, pre_quarantines)
|
||||
self.assertEquals(auditor_worker.stats_buckets[1024], 1)
|
||||
# 1 object per policy falls into 1024 bucket
|
||||
self.assertEquals(auditor_worker.stats_buckets[1024], 2)
|
||||
self.assertEquals(auditor_worker.stats_buckets[10240], 0)
|
||||
|
||||
# pick up some additional code coverage, large file
|
||||
@ -231,8 +262,11 @@ class TestAuditor(unittest.TestCase):
|
||||
writer.put(metadata)
|
||||
auditor_worker.audit_all_objects(device_dirs=['sda', 'sdb'])
|
||||
self.assertEquals(auditor_worker.quarantines, pre_quarantines)
|
||||
self.assertEquals(auditor_worker.stats_buckets[1024], 1)
|
||||
# still have the 1024 byte object left in policy-1 (plus the
|
||||
# stats from the original 2)
|
||||
self.assertEquals(auditor_worker.stats_buckets[1024], 3)
|
||||
self.assertEquals(auditor_worker.stats_buckets[10240], 0)
|
||||
# and then policy-0 disk_file was re-written as a larger object
|
||||
self.assertEquals(auditor_worker.stats_buckets['OVER'], 1)
|
||||
|
||||
# pick up even more additional code coverage, misc paths
|
||||
@ -240,7 +274,7 @@ class TestAuditor(unittest.TestCase):
|
||||
auditor_worker.stats_sizes = []
|
||||
auditor_worker.audit_all_objects(device_dirs=['sda', 'sdb'])
|
||||
self.assertEquals(auditor_worker.quarantines, pre_quarantines)
|
||||
self.assertEquals(auditor_worker.stats_buckets[1024], 1)
|
||||
self.assertEquals(auditor_worker.stats_buckets[1024], 3)
|
||||
self.assertEquals(auditor_worker.stats_buckets[10240], 0)
|
||||
self.assertEquals(auditor_worker.stats_buckets['OVER'], 1)
|
||||
|
||||
@ -339,7 +373,7 @@ class TestAuditor(unittest.TestCase):
|
||||
name_hash = hash_path('a', 'c', 'o')
|
||||
dir_path = os.path.join(
|
||||
self.devices, 'sda',
|
||||
storage_directory(DATADIR_BASE, '0', name_hash))
|
||||
storage_directory(get_data_dir(0), '0', name_hash))
|
||||
ts_file_path = os.path.join(dir_path, '99999.ts')
|
||||
if not os.path.exists(dir_path):
|
||||
mkdirs(dir_path)
|
||||
|
@ -33,7 +33,8 @@ from contextlib import closing, nested
|
||||
from gzip import GzipFile
|
||||
|
||||
from eventlet import tpool
|
||||
from test.unit import FakeLogger, mock as unit_mock, temptree, patch_policies
|
||||
from test.unit import (FakeLogger, mock as unit_mock, temptree,
|
||||
patch_policies, debug_logger)
|
||||
|
||||
from swift.obj import diskfile
|
||||
from swift.common import utils
|
||||
@ -689,8 +690,19 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
|
||||
"4a943bc72c2e647c4675923d58cf4ca5"))
|
||||
os.makedirs(os.path.join(tmpdir, "sdq", "objects", "3071", "8eb",
|
||||
"fcd938702024c25fef6c32fef05298eb"))
|
||||
|
||||
os.makedirs(os.path.join(tmpdir, "sdp", "objects-1", "9970", "ca5",
|
||||
"4a943bc72c2e647c4675923d58cf4ca5"))
|
||||
os.makedirs(os.path.join(tmpdir, "sdq", "objects-2", "9971", "8eb",
|
||||
"fcd938702024c25fef6c32fef05298eb"))
|
||||
os.makedirs(os.path.join(tmpdir, "sdq", "objects-99", "9972",
|
||||
"8eb",
|
||||
"fcd938702024c25fef6c32fef05298eb"))
|
||||
# the bad
|
||||
os.makedirs(os.path.join(tmpdir, "sdq", "objects-", "1135",
|
||||
"6c3",
|
||||
"fcd938702024c25fef6c32fef05298eb"))
|
||||
os.makedirs(os.path.join(tmpdir, "sdq", "objects-fud", "foo"))
|
||||
|
||||
self._make_file(os.path.join(tmpdir, "sdp", "objects", "1519",
|
||||
"fed"))
|
||||
self._make_file(os.path.join(tmpdir, "sdq", "objects", "9876"))
|
||||
@ -707,14 +719,26 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
|
||||
os.makedirs(os.path.join(tmpdir, "sdw", "containers", "28", "51e",
|
||||
"4f9eee668b66c6f0250bfa3c7ab9e51e"))
|
||||
|
||||
logger = debug_logger()
|
||||
locations = [(loc.path, loc.device, loc.partition)
|
||||
for loc in diskfile.object_audit_location_generator(
|
||||
devices=tmpdir, mount_check=False)]
|
||||
devices=tmpdir, mount_check=False,
|
||||
logger=logger)]
|
||||
locations.sort()
|
||||
|
||||
self.assertEqual(
|
||||
locations,
|
||||
[(os.path.join(tmpdir, "sdp", "objects", "1519", "aca",
|
||||
# expect some warnings about those bad dirs
|
||||
warnings = logger.get_lines_for_level('warning')
|
||||
self.assertEqual(set(warnings), set([
|
||||
'Directory objects- does not map to a valid policy',
|
||||
'Directory objects-2 does not map to a valid policy',
|
||||
'Directory objects-99 does not map to a valid policy',
|
||||
'Directory objects-fud does not map to a valid policy']))
|
||||
|
||||
expected = \
|
||||
[(os.path.join(tmpdir, "sdp", "objects-1", "9970", "ca5",
|
||||
"4a943bc72c2e647c4675923d58cf4ca5"),
|
||||
"sdp", "9970"),
|
||||
(os.path.join(tmpdir, "sdp", "objects", "1519", "aca",
|
||||
"5c1fdc1ffb12e5eaf84edc30d8b67aca"),
|
||||
"sdp", "1519"),
|
||||
(os.path.join(tmpdir, "sdp", "objects", "1519", "aca",
|
||||
@ -726,9 +750,27 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
|
||||
(os.path.join(tmpdir, "sdp", "objects", "9720", "ca5",
|
||||
"4a943bc72c2e647c4675923d58cf4ca5"),
|
||||
"sdp", "9720"),
|
||||
(os.path.join(tmpdir, "sdq", "objects-", "1135", "6c3",
|
||||
"fcd938702024c25fef6c32fef05298eb"),
|
||||
"sdq", "1135"),
|
||||
(os.path.join(tmpdir, "sdq", "objects-2", "9971", "8eb",
|
||||
"fcd938702024c25fef6c32fef05298eb"),
|
||||
"sdq", "9971"),
|
||||
(os.path.join(tmpdir, "sdq", "objects-99", "9972", "8eb",
|
||||
"fcd938702024c25fef6c32fef05298eb"),
|
||||
"sdq", "9972"),
|
||||
(os.path.join(tmpdir, "sdq", "objects", "3071", "8eb",
|
||||
"fcd938702024c25fef6c32fef05298eb"),
|
||||
"sdq", "3071")])
|
||||
"sdq", "3071"),
|
||||
]
|
||||
self.assertEqual(locations, expected)
|
||||
|
||||
#now without a logger
|
||||
locations = [(loc.path, loc.device, loc.partition)
|
||||
for loc in diskfile.object_audit_location_generator(
|
||||
devices=tmpdir, mount_check=False)]
|
||||
locations.sort()
|
||||
self.assertEqual(locations, expected)
|
||||
|
||||
def test_skipping_unmounted_devices(self):
|
||||
def mock_ismount(path):
|
||||
|
Loading…
x
Reference in New Issue
Block a user