Make dark data watcher ignore the newly updated objects
When objects are freshly uploaded, they may take a little time to appear in container listings, producing false positives. Because we needed to test this, we also reworked/added the tests and fixed some issues, including adding an EC fragment (thanks to Alistair's code). Closes-Bug: 1925782 Change-Id: Ieafa72a496328f7a487ca7062da6253994a5a07d Co-Authored-By: Alistair Coles <alistairncoles@gmail.com>
This commit is contained in:
parent
74be74395c
commit
95e0316451
@ -528,11 +528,19 @@ use = egg:swift#recon
|
|||||||
#
|
#
|
||||||
# watchers =
|
# watchers =
|
||||||
|
|
||||||
# Watcher-specific parameters can he added after "object-auditor:watcher:"
|
# Watcher-specific parameters can be added in a section with a name
|
||||||
# like the following (note that entry points are qualified by package#):
|
# [object-auditor:watcher:some_package#some_watcher]. The following
|
||||||
|
# example uses the built-in reference watcher.
|
||||||
#
|
#
|
||||||
# [object-auditor:watcher:swift#dark_data]
|
# [object-auditor:watcher:swift#dark_data]
|
||||||
|
#
|
||||||
|
# Action type can be 'log' (default), 'delete', or 'quarantine'.
|
||||||
# action=log
|
# action=log
|
||||||
|
#
|
||||||
|
# The watcher ignores the objects younger than certain minimum age.
|
||||||
|
# This prevents spurious actions upon fresh objects while container
|
||||||
|
# listings eventually settle.
|
||||||
|
# grace_age=604800
|
||||||
|
|
||||||
[object-expirer]
|
[object-expirer]
|
||||||
# If this true, this expirer will execute tasks from legacy expirer task queue,
|
# If this true, this expirer will execute tasks from legacy expirer task queue,
|
||||||
|
@ -46,13 +46,14 @@ cluster has nodes separated by function.
|
|||||||
import os
|
import os
|
||||||
import random
|
import random
|
||||||
import shutil
|
import shutil
|
||||||
|
import time
|
||||||
|
|
||||||
from eventlet import Timeout
|
from eventlet import Timeout
|
||||||
|
|
||||||
from swift.common.direct_client import direct_get_container
|
from swift.common.direct_client import direct_get_container
|
||||||
from swift.common.exceptions import ClientException, QuarantineRequest
|
from swift.common.exceptions import ClientException, QuarantineRequest
|
||||||
from swift.common.ring import Ring
|
from swift.common.ring import Ring
|
||||||
from swift.common.utils import split_path
|
from swift.common.utils import split_path, Timestamp
|
||||||
|
|
||||||
|
|
||||||
class ContainerError(Exception):
|
class ContainerError(Exception):
|
||||||
@ -68,10 +69,12 @@ class DarkDataWatcher(object):
|
|||||||
self.container_ring = Ring(swift_dir, ring_name='container')
|
self.container_ring = Ring(swift_dir, ring_name='container')
|
||||||
self.dark_data_policy = conf.get('action')
|
self.dark_data_policy = conf.get('action')
|
||||||
if self.dark_data_policy not in ['log', 'delete', 'quarantine']:
|
if self.dark_data_policy not in ['log', 'delete', 'quarantine']:
|
||||||
|
if self.dark_data_policy is not None:
|
||||||
self.logger.warning(
|
self.logger.warning(
|
||||||
"Dark data action %r unknown, defaults to action = 'log'" %
|
"Dark data action %r unknown, defaults to action = 'log'" %
|
||||||
(self.dark_data_policy,))
|
(self.dark_data_policy,))
|
||||||
self.dark_data_policy = 'log'
|
self.dark_data_policy = 'log'
|
||||||
|
self.grace_age = int(conf.get('grace_age', 604800))
|
||||||
|
|
||||||
def start(self, audit_type, **other_kwargs):
|
def start(self, audit_type, **other_kwargs):
|
||||||
self.is_zbf = audit_type == 'ZBF'
|
self.is_zbf = audit_type == 'ZBF'
|
||||||
@ -98,6 +101,13 @@ class DarkDataWatcher(object):
|
|||||||
if self.is_zbf:
|
if self.is_zbf:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
put_tstr = object_metadata['X-Timestamp']
|
||||||
|
if float(Timestamp(put_tstr)) + self.grace_age >= time.time():
|
||||||
|
# We can add "tot_new" if lumping these with the good objects
|
||||||
|
# ever bothers anyone.
|
||||||
|
self.tot_okay += 1
|
||||||
|
return
|
||||||
|
|
||||||
obj_path = object_metadata['name']
|
obj_path = object_metadata['name']
|
||||||
try:
|
try:
|
||||||
obj_info = get_info_1(self.container_ring, obj_path, self.logger)
|
obj_info = get_info_1(self.container_ring, obj_path, self.logger)
|
||||||
|
@ -60,9 +60,11 @@ class TestDarkDataDeletion(ReplProbeTest):
|
|||||||
config['object-auditor'].update(
|
config['object-auditor'].update(
|
||||||
{'watchers': 'swift#dark_data'})
|
{'watchers': 'swift#dark_data'})
|
||||||
# Note that this setdefault business may mean the watcher doesn't
|
# Note that this setdefault business may mean the watcher doesn't
|
||||||
# pick up DEFAULT values, but that (probably?) won't matter
|
# pick up DEFAULT values, but that (probably?) won't matter.
|
||||||
|
# We set grace_age to 0 so that tests don't have to deal with time.
|
||||||
config.setdefault(CONF_SECTION, {}).update(
|
config.setdefault(CONF_SECTION, {}).update(
|
||||||
{'action': self.action})
|
{'action': self.action,
|
||||||
|
'grace_age': "0"})
|
||||||
|
|
||||||
parser = ConfigParser()
|
parser = ConfigParser()
|
||||||
for section in ('object-auditor', CONF_SECTION):
|
for section in ('object-auditor', CONF_SECTION):
|
||||||
|
@ -145,7 +145,7 @@ class TestAuditorBase(unittest.TestCase):
|
|||||||
self.disk_file_p1 = self.df_mgr.get_diskfile('sda', '0', 'a', 'c2',
|
self.disk_file_p1 = self.df_mgr.get_diskfile('sda', '0', 'a', 'c2',
|
||||||
'o', policy=POLICIES[1])
|
'o', policy=POLICIES[1])
|
||||||
self.disk_file_ec = self.ec_df_mgr.get_diskfile(
|
self.disk_file_ec = self.ec_df_mgr.get_diskfile(
|
||||||
'sda', '0', 'a', 'c', 'o', policy=POLICIES[2], frag_index=1)
|
'sda', '0', 'a', 'c_ec', 'o', policy=POLICIES[2], frag_index=1)
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
rmtree(os.path.dirname(self.testdir), ignore_errors=1)
|
rmtree(os.path.dirname(self.testdir), ignore_errors=1)
|
||||||
@ -1574,33 +1574,53 @@ class TestAuditWatchers(TestAuditorBase):
|
|||||||
|
|
||||||
timestamp = Timestamp(time.time())
|
timestamp = Timestamp(time.time())
|
||||||
|
|
||||||
|
disk_file = self.df_mgr.get_diskfile(
|
||||||
|
'sda', '0', 'a', 'c', 'o0', policy=POLICIES.legacy)
|
||||||
data = b'0' * 1024
|
data = b'0' * 1024
|
||||||
etag = md5()
|
etag = md5()
|
||||||
with self.disk_file.create() as writer:
|
with disk_file.create() as writer:
|
||||||
writer.write(data)
|
writer.write(data)
|
||||||
etag.update(data)
|
etag.update(data)
|
||||||
etag = etag.hexdigest()
|
|
||||||
metadata = {
|
metadata = {
|
||||||
'ETag': etag,
|
'ETag': etag.hexdigest(),
|
||||||
'X-Timestamp': timestamp.internal,
|
'X-Timestamp': timestamp.internal,
|
||||||
'Content-Length': str(len(data)),
|
'Content-Length': str(len(data)),
|
||||||
'X-Object-Meta-Flavor': 'banana',
|
'X-Object-Meta-Flavor': 'banana',
|
||||||
}
|
}
|
||||||
writer.put(metadata)
|
writer.put(metadata)
|
||||||
|
# The commit does nothing; we keep it for code copy-paste with EC.
|
||||||
|
writer.commit(timestamp)
|
||||||
|
|
||||||
|
disk_file = self.df_mgr.get_diskfile(
|
||||||
|
'sda', '0', 'a', 'c', 'o1', policy=POLICIES.legacy)
|
||||||
data = b'1' * 2048
|
data = b'1' * 2048
|
||||||
etag = md5()
|
etag = md5()
|
||||||
with self.disk_file_p1.create() as writer:
|
with disk_file.create() as writer:
|
||||||
writer.write(data)
|
writer.write(data)
|
||||||
etag.update(data)
|
etag.update(data)
|
||||||
etag = etag.hexdigest()
|
|
||||||
metadata = {
|
metadata = {
|
||||||
'ETag': etag,
|
'ETag': etag.hexdigest(),
|
||||||
'X-Timestamp': timestamp.internal,
|
'X-Timestamp': timestamp.internal,
|
||||||
'Content-Length': str(len(data)),
|
'Content-Length': str(len(data)),
|
||||||
'X-Object-Meta-Flavor': 'orange',
|
'X-Object-Meta-Flavor': 'orange',
|
||||||
}
|
}
|
||||||
writer.put(metadata)
|
writer.put(metadata)
|
||||||
|
writer.commit(timestamp)
|
||||||
|
|
||||||
|
frag_0 = self.disk_file_ec.policy.pyeclib_driver.encode(
|
||||||
|
b'x' * self.disk_file_ec.policy.ec_segment_size)[0]
|
||||||
|
etag = md5()
|
||||||
|
with self.disk_file_ec.create() as writer:
|
||||||
|
writer.write(frag_0)
|
||||||
|
etag.update(frag_0)
|
||||||
|
metadata = {
|
||||||
|
'ETag': etag.hexdigest(),
|
||||||
|
'X-Timestamp': timestamp.internal,
|
||||||
|
'Content-Length': str(len(frag_0)),
|
||||||
|
'X-Object-Meta-Flavor': 'peach',
|
||||||
|
}
|
||||||
|
writer.put(metadata)
|
||||||
|
writer.commit(timestamp)
|
||||||
|
|
||||||
def test_watchers(self):
|
def test_watchers(self):
|
||||||
|
|
||||||
@ -1650,7 +1670,7 @@ class TestAuditWatchers(TestAuditorBase):
|
|||||||
|
|
||||||
my_auditor.run_audit(mode='once', zero_byte_fps=float("inf"))
|
my_auditor.run_audit(mode='once', zero_byte_fps=float("inf"))
|
||||||
|
|
||||||
self.assertEqual(len(calls), 5)
|
self.assertEqual(len(calls), 6)
|
||||||
|
|
||||||
self.assertEqual(calls[0], ["__init__", conf, mock.ANY])
|
self.assertEqual(calls[0], ["__init__", conf, mock.ANY])
|
||||||
self.assertIsInstance(calls[0][2], PrefixLoggerAdapter)
|
self.assertIsInstance(calls[0][2], PrefixLoggerAdapter)
|
||||||
@ -1663,23 +1683,30 @@ class TestAuditWatchers(TestAuditorBase):
|
|||||||
|
|
||||||
# The order in which the auditor finds things on the filesystem is
|
# The order in which the auditor finds things on the filesystem is
|
||||||
# irrelevant; what matters is that it finds all the things.
|
# irrelevant; what matters is that it finds all the things.
|
||||||
calls[2:4] = sorted(calls[2:4], key=lambda item: item[1]['name'])
|
calls[2:5] = sorted(calls[2:5], key=lambda item: item[1]['name'])
|
||||||
|
|
||||||
self.assertDictContainsSubset({'name': '/a/c/o',
|
self.assertDictContainsSubset({'name': '/a/c/o0',
|
||||||
'X-Object-Meta-Flavor': 'banana'},
|
'X-Object-Meta-Flavor': 'banana'},
|
||||||
calls[2][1])
|
calls[2][1])
|
||||||
self.assertIn('node/sda/objects/0/', calls[2][2]) # data_file_path
|
self.assertIn('node/sda/objects/0/', calls[2][2]) # data_file_path
|
||||||
self.assertTrue(calls[2][2].endswith('.data')) # data_file_path
|
self.assertTrue(calls[2][2].endswith('.data')) # data_file_path
|
||||||
self.assertEqual({}, calls[2][3])
|
self.assertEqual({}, calls[2][3])
|
||||||
|
|
||||||
self.assertDictContainsSubset({'name': '/a/c2/o',
|
self.assertDictContainsSubset({'name': '/a/c/o1',
|
||||||
'X-Object-Meta-Flavor': 'orange'},
|
'X-Object-Meta-Flavor': 'orange'},
|
||||||
calls[3][1])
|
calls[3][1])
|
||||||
self.assertIn('node/sda/objects-1/0/', calls[3][2]) # data_file_path
|
self.assertIn('node/sda/objects/0/', calls[3][2]) # data_file_path
|
||||||
self.assertTrue(calls[3][2].endswith('.data')) # data_file_path
|
self.assertTrue(calls[3][2].endswith('.data')) # data_file_path
|
||||||
self.assertEqual({}, calls[3][3])
|
self.assertEqual({}, calls[3][3])
|
||||||
|
|
||||||
self.assertEqual(calls[4], ["end"])
|
self.assertDictContainsSubset({'name': '/a/c_ec/o',
|
||||||
|
'X-Object-Meta-Flavor': 'peach'},
|
||||||
|
calls[4][1])
|
||||||
|
self.assertIn('node/sda/objects-2/0/', calls[4][2]) # data_file_path
|
||||||
|
self.assertTrue(calls[4][2].endswith('.data')) # data_file_path
|
||||||
|
self.assertEqual({}, calls[4][3])
|
||||||
|
|
||||||
|
self.assertEqual(calls[5], ["end"])
|
||||||
|
|
||||||
log_lines = self.logger.get_lines_for_level('debug')
|
log_lines = self.logger.get_lines_for_level('debug')
|
||||||
self.assertIn(
|
self.assertIn(
|
||||||
@ -1688,20 +1715,35 @@ class TestAuditWatchers(TestAuditorBase):
|
|||||||
|
|
||||||
def test_builtin_watchers(self):
|
def test_builtin_watchers(self):
|
||||||
|
|
||||||
conf = self.conf.copy()
|
# Yep, back-channel signaling in tests.
|
||||||
conf['watchers'] = 'test_watcher1'
|
sentinel = 'DARK'
|
||||||
conf['__file__'] = '/etc/swift/swift.conf'
|
|
||||||
ret_config = {'swift#dark_data': {'action': 'log'}}
|
timestamp = Timestamp(time.time())
|
||||||
with mock.patch('swift.obj.auditor.parse_prefixed_conf',
|
|
||||||
return_value=ret_config), \
|
disk_file = self.df_mgr.get_diskfile(
|
||||||
mock.patch('swift.obj.auditor.load_pkg_resource',
|
'sda', '0', 'a', sentinel, 'o2', policy=POLICIES.legacy)
|
||||||
side_effect=[DarkDataWatcher]):
|
data = b'2' * 1024
|
||||||
my_auditor = auditor.ObjectAuditor(conf, logger=self.logger)
|
etag = md5()
|
||||||
|
with disk_file.create() as writer:
|
||||||
|
writer.write(data)
|
||||||
|
etag.update(data)
|
||||||
|
metadata = {
|
||||||
|
'ETag': etag.hexdigest(),
|
||||||
|
'X-Timestamp': timestamp.internal,
|
||||||
|
'Content-Length': str(len(data)),
|
||||||
|
'X-Object-Meta-Flavor': 'mango',
|
||||||
|
}
|
||||||
|
writer.put(metadata)
|
||||||
|
writer.commit(timestamp)
|
||||||
|
|
||||||
def fake_direct_get_container(node, part, account, container,
|
def fake_direct_get_container(node, part, account, container,
|
||||||
prefix=None, limit=None):
|
prefix=None, limit=None):
|
||||||
self.assertEqual(part, 1)
|
self.assertEqual(part, 1)
|
||||||
self.assertEqual(limit, 1)
|
self.assertEqual(limit, 1)
|
||||||
|
|
||||||
|
if container == sentinel:
|
||||||
|
return {}, []
|
||||||
|
|
||||||
# The returned entry is not abbreviated, but is full of nonsese.
|
# The returned entry is not abbreviated, but is full of nonsese.
|
||||||
entry = {'bytes': 30968411,
|
entry = {'bytes': 30968411,
|
||||||
'hash': '60303f4122966fe5925f045eb52d1129',
|
'hash': '60303f4122966fe5925f045eb52d1129',
|
||||||
@ -1710,18 +1752,71 @@ class TestAuditWatchers(TestAuditorBase):
|
|||||||
'last_modified': '2017-08-15T03:30:57.693210'}
|
'last_modified': '2017-08-15T03:30:57.693210'}
|
||||||
return {}, [entry]
|
return {}, [entry]
|
||||||
|
|
||||||
|
conf = self.conf.copy()
|
||||||
|
conf['watchers'] = 'test_watcher1'
|
||||||
|
conf['__file__'] = '/etc/swift/swift.conf'
|
||||||
|
|
||||||
|
# with default watcher config the DARK object will not be older than
|
||||||
|
# grace_age so will not be logged
|
||||||
|
ret_config = {'test_watcher1': {'action': 'log'}}
|
||||||
|
with mock.patch('swift.obj.auditor.parse_prefixed_conf',
|
||||||
|
return_value=ret_config), \
|
||||||
|
mock.patch('swift.obj.auditor.load_pkg_resource',
|
||||||
|
side_effect=[DarkDataWatcher]):
|
||||||
|
my_auditor = auditor.ObjectAuditor(conf, logger=self.logger)
|
||||||
|
|
||||||
with mock.patch('swift.obj.watchers.dark_data.Ring', FakeRing1), \
|
with mock.patch('swift.obj.watchers.dark_data.Ring', FakeRing1), \
|
||||||
mock.patch("swift.obj.watchers.dark_data.direct_get_container",
|
mock.patch("swift.obj.watchers.dark_data.direct_get_container",
|
||||||
fake_direct_get_container):
|
fake_direct_get_container):
|
||||||
my_auditor.run_audit(mode='once')
|
my_auditor.run_audit(mode='once')
|
||||||
|
|
||||||
# N.B. We want to check for ok files instead of dark because
|
|
||||||
# if anything goes wrong inside, we want it fail the test.
|
|
||||||
log_lines = self.logger.get_lines_for_level('info')
|
log_lines = self.logger.get_lines_for_level('info')
|
||||||
self.assertIn(
|
self.assertIn(
|
||||||
'[audit-watcher test_watcher1] total unknown 0 ok 2 dark 0',
|
'[audit-watcher test_watcher1] total unknown 0 ok 4 dark 0',
|
||||||
log_lines)
|
log_lines)
|
||||||
|
|
||||||
|
self.logger.clear()
|
||||||
|
# with grace_age=0 the DARK object will be older than
|
||||||
|
# grace_age so will be logged
|
||||||
|
ret_config = {'test_watcher1': {'action': 'log', 'grace_age': '0'}}
|
||||||
|
with mock.patch('swift.obj.auditor.parse_prefixed_conf',
|
||||||
|
return_value=ret_config), \
|
||||||
|
mock.patch('swift.obj.auditor.load_pkg_resource',
|
||||||
|
side_effect=[DarkDataWatcher]):
|
||||||
|
my_auditor = auditor.ObjectAuditor(conf, logger=self.logger)
|
||||||
|
|
||||||
|
with mock.patch('swift.obj.watchers.dark_data.Ring', FakeRing1), \
|
||||||
|
mock.patch("swift.obj.watchers.dark_data.direct_get_container",
|
||||||
|
fake_direct_get_container):
|
||||||
|
my_auditor.run_audit(mode='once')
|
||||||
|
|
||||||
|
log_lines = self.logger.get_lines_for_level('info')
|
||||||
|
self.assertIn(
|
||||||
|
'[audit-watcher test_watcher1] total unknown 0 ok 3 dark 1',
|
||||||
|
log_lines)
|
||||||
|
|
||||||
|
def test_dark_data_watcher_init(self):
|
||||||
|
conf = {}
|
||||||
|
with mock.patch('swift.obj.watchers.dark_data.Ring', FakeRing1):
|
||||||
|
watcher = DarkDataWatcher(conf, self.logger)
|
||||||
|
self.assertEqual(self.logger, watcher.logger)
|
||||||
|
self.assertEqual(604800, watcher.grace_age)
|
||||||
|
self.assertEqual('log', watcher.dark_data_policy)
|
||||||
|
|
||||||
|
conf = {'grace_age': 360, 'action': 'delete'}
|
||||||
|
with mock.patch('swift.obj.watchers.dark_data.Ring', FakeRing1):
|
||||||
|
watcher = DarkDataWatcher(conf, self.logger)
|
||||||
|
self.assertEqual(self.logger, watcher.logger)
|
||||||
|
self.assertEqual(360, watcher.grace_age)
|
||||||
|
self.assertEqual('delete', watcher.dark_data_policy)
|
||||||
|
|
||||||
|
conf = {'grace_age': 0, 'action': 'invalid'}
|
||||||
|
with mock.patch('swift.obj.watchers.dark_data.Ring', FakeRing1):
|
||||||
|
watcher = DarkDataWatcher(conf, self.logger)
|
||||||
|
self.assertEqual(self.logger, watcher.logger)
|
||||||
|
self.assertEqual(0, watcher.grace_age)
|
||||||
|
self.assertEqual('log', watcher.dark_data_policy)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user