Refactor db auditors into a db_auditor base class
The container and account auditors, and their tests are almost identical. This patch reduces the code duplication by refactoring into a base class called DatabaseAuditor. This also means the container and account auditor tests have also mostly been refactored. Change-Id: I9765d65f12afec295d9eaae52858e4e7272c9c4c
This commit is contained in:
parent
d9a6fe4362
commit
4cb52b44dd
@ -13,100 +13,21 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import os
|
||||
import time
|
||||
|
||||
from swift import gettext_ as _
|
||||
from random import random
|
||||
|
||||
import swift.common.db
|
||||
from swift.account.backend import AccountBroker, DATADIR
|
||||
from swift.account.backend import AccountBroker
|
||||
from swift.common.exceptions import InvalidAccountInfo
|
||||
from swift.common.utils import get_logger, audit_location_generator, \
|
||||
config_true_value, dump_recon_cache, ratelimit_sleep
|
||||
from swift.common.daemon import Daemon
|
||||
|
||||
from eventlet import Timeout
|
||||
from swift.common.db_auditor import DatabaseAuditor
|
||||
|
||||
|
||||
class AccountAuditor(Daemon):
|
||||
class AccountAuditor(DatabaseAuditor):
|
||||
"""Audit accounts."""
|
||||
|
||||
def __init__(self, conf, logger=None):
|
||||
self.conf = conf
|
||||
self.logger = logger or get_logger(conf, log_route='account-auditor')
|
||||
self.devices = conf.get('devices', '/srv/node')
|
||||
self.mount_check = config_true_value(conf.get('mount_check', 'true'))
|
||||
self.interval = int(conf.get('interval', 1800))
|
||||
self.logging_interval = 3600 # once an hour
|
||||
self.account_passes = 0
|
||||
self.account_failures = 0
|
||||
self.accounts_running_time = 0
|
||||
self.max_accounts_per_second = \
|
||||
float(conf.get('accounts_per_second', 200))
|
||||
swift.common.db.DB_PREALLOCATION = \
|
||||
config_true_value(conf.get('db_preallocation', 'f'))
|
||||
self.recon_cache_path = conf.get('recon_cache_path',
|
||||
'/var/cache/swift')
|
||||
self.rcache = os.path.join(self.recon_cache_path, "account.recon")
|
||||
server_type = "account"
|
||||
broker_class = AccountBroker
|
||||
|
||||
def _one_audit_pass(self, reported):
|
||||
all_locs = audit_location_generator(self.devices, DATADIR, '.db',
|
||||
mount_check=self.mount_check,
|
||||
logger=self.logger)
|
||||
for path, device, partition in all_locs:
|
||||
self.account_audit(path)
|
||||
if time.time() - reported >= self.logging_interval:
|
||||
self.logger.info(_('Since %(time)s: Account audits: '
|
||||
'%(passed)s passed audit,'
|
||||
'%(failed)s failed audit'),
|
||||
{'time': time.ctime(reported),
|
||||
'passed': self.account_passes,
|
||||
'failed': self.account_failures})
|
||||
dump_recon_cache({'account_audits_since': reported,
|
||||
'account_audits_passed': self.account_passes,
|
||||
'account_audits_failed':
|
||||
self.account_failures},
|
||||
self.rcache, self.logger)
|
||||
reported = time.time()
|
||||
self.account_passes = 0
|
||||
self.account_failures = 0
|
||||
self.accounts_running_time = ratelimit_sleep(
|
||||
self.accounts_running_time, self.max_accounts_per_second)
|
||||
return reported
|
||||
|
||||
def run_forever(self, *args, **kwargs):
|
||||
"""Run the account audit until stopped."""
|
||||
reported = time.time()
|
||||
time.sleep(random() * self.interval)
|
||||
while True:
|
||||
self.logger.info(_('Begin account audit pass.'))
|
||||
begin = time.time()
|
||||
try:
|
||||
reported = self._one_audit_pass(reported)
|
||||
except (Exception, Timeout):
|
||||
self.logger.increment('errors')
|
||||
self.logger.exception(_('ERROR auditing'))
|
||||
elapsed = time.time() - begin
|
||||
if elapsed < self.interval:
|
||||
time.sleep(self.interval - elapsed)
|
||||
self.logger.info(
|
||||
_('Account audit pass completed: %.02fs'), elapsed)
|
||||
dump_recon_cache({'account_auditor_pass_completed': elapsed},
|
||||
self.rcache, self.logger)
|
||||
|
||||
def run_once(self, *args, **kwargs):
|
||||
"""Run the account audit once."""
|
||||
self.logger.info(_('Begin account audit "once" mode'))
|
||||
begin = reported = time.time()
|
||||
self._one_audit_pass(reported)
|
||||
elapsed = time.time() - begin
|
||||
self.logger.info(
|
||||
_('Account audit "once" mode completed: %.02fs'), elapsed)
|
||||
dump_recon_cache({'account_auditor_pass_completed': elapsed},
|
||||
self.rcache, self.logger)
|
||||
|
||||
def validate_per_policy_counts(self, broker):
|
||||
info = broker.get_info()
|
||||
def _audit(self, info, broker):
|
||||
# Validate per policy counts
|
||||
policy_stats = broker.get_policy_stats(do_migrations=True)
|
||||
policy_totals = {
|
||||
'container_count': 0,
|
||||
@ -120,36 +41,7 @@ class AccountAuditor(Daemon):
|
||||
for key in policy_totals:
|
||||
if policy_totals[key] == info[key]:
|
||||
continue
|
||||
raise InvalidAccountInfo(_(
|
||||
return InvalidAccountInfo(_(
|
||||
'The total %(key)s for the container (%(total)s) does not '
|
||||
'match the sum of %(key)s across policies (%(sum)s)')
|
||||
% {'key': key,
|
||||
'total': info[key],
|
||||
'sum': policy_totals[key]})
|
||||
|
||||
def account_audit(self, path):
|
||||
"""
|
||||
Audits the given account path
|
||||
|
||||
:param path: the path to an account db
|
||||
"""
|
||||
start_time = time.time()
|
||||
try:
|
||||
broker = AccountBroker(path, logger=self.logger)
|
||||
if not broker.is_deleted():
|
||||
self.validate_per_policy_counts(broker)
|
||||
self.logger.increment('passes')
|
||||
self.account_passes += 1
|
||||
self.logger.debug(_('Audit passed for %s'), broker)
|
||||
except InvalidAccountInfo as e:
|
||||
self.logger.increment('failures')
|
||||
self.account_failures += 1
|
||||
self.logger.error(
|
||||
_('Audit Failed for %(path)s: %(err)s'),
|
||||
{'path': path, 'err': str(e)})
|
||||
except (Exception, Timeout):
|
||||
self.logger.increment('failures')
|
||||
self.account_failures += 1
|
||||
self.logger.exception(_('ERROR Could not get account info %s'),
|
||||
path)
|
||||
self.logger.timing_since('timing', start_time)
|
||||
% {'key': key, 'total': info[key], 'sum': policy_totals[key]})
|
||||
|
165
swift/common/db_auditor.py
Normal file
165
swift/common/db_auditor.py
Normal file
@ -0,0 +1,165 @@
|
||||
# Copyright (c) 2010-2018 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import os
|
||||
import time
|
||||
from swift import gettext_ as _
|
||||
from random import random
|
||||
|
||||
from eventlet import Timeout
|
||||
|
||||
import swift.common.db
|
||||
from swift.common.utils import get_logger, audit_location_generator, \
|
||||
config_true_value, dump_recon_cache, ratelimit_sleep
|
||||
from swift.common.daemon import Daemon
|
||||
from swift.common.exceptions import DatabaseAuditorException
|
||||
|
||||
|
||||
class DatabaseAuditor(Daemon):
|
||||
"""Base Database Auditor."""
|
||||
|
||||
@property
|
||||
def server_type(self):
|
||||
raise NotImplementedError
|
||||
|
||||
@property
|
||||
def broker_class(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def __init__(self, conf, logger=None):
|
||||
self.conf = conf
|
||||
self.logger = logger or get_logger(conf, log_route='{}-auditor'.format(
|
||||
self.server_type))
|
||||
self.devices = conf.get('devices', '/srv/node')
|
||||
self.mount_check = config_true_value(conf.get('mount_check', 'true'))
|
||||
self.interval = int(conf.get('interval', 1800))
|
||||
self.logging_interval = 3600 # once an hour
|
||||
self.passes = 0
|
||||
self.failures = 0
|
||||
self.running_time = 0
|
||||
self.max_dbs_per_second = \
|
||||
float(conf.get('{}s_per_second'.format(self.server_type), 200))
|
||||
swift.common.db.DB_PREALLOCATION = \
|
||||
config_true_value(conf.get('db_preallocation', 'f'))
|
||||
self.recon_cache_path = conf.get('recon_cache_path',
|
||||
'/var/cache/swift')
|
||||
self.rcache = os.path.join(self.recon_cache_path,
|
||||
"{}.recon".format(self.server_type))
|
||||
|
||||
self.datadir = '{}s'.format(self.server_type)
|
||||
|
||||
def _one_audit_pass(self, reported):
|
||||
all_locs = audit_location_generator(self.devices, self.datadir, '.db',
|
||||
mount_check=self.mount_check,
|
||||
logger=self.logger)
|
||||
for path, device, partition in all_locs:
|
||||
self.audit(path)
|
||||
if time.time() - reported >= self.logging_interval:
|
||||
self.logger.info(
|
||||
_('Since %(time)s: %(server_type)s audits: %(pass)s '
|
||||
'passed audit, %(fail)s failed audit'),
|
||||
{'time': time.ctime(reported),
|
||||
'pass': self.passes,
|
||||
'fail': self.failures,
|
||||
'server_type': self.server_type})
|
||||
dump_recon_cache(
|
||||
{'{}_audits_since'.format(self.server_type): reported,
|
||||
'{}_audits_passed'.format(self.server_type): self.passes,
|
||||
'{}_audits_failed'.format(self.server_type):
|
||||
self.failures},
|
||||
self.rcache, self.logger)
|
||||
reported = time.time()
|
||||
self.passes = 0
|
||||
self.failures = 0
|
||||
self.running_time = ratelimit_sleep(
|
||||
self.running_time, self.max_dbs_per_second)
|
||||
return reported
|
||||
|
||||
def run_forever(self, *args, **kwargs):
|
||||
"""Run the database audit until stopped."""
|
||||
reported = time.time()
|
||||
time.sleep(random() * self.interval)
|
||||
while True:
|
||||
self.logger.info(
|
||||
_('Begin {} audit pass.').format(self.server_type))
|
||||
begin = time.time()
|
||||
try:
|
||||
reported = self._one_audit_pass(reported)
|
||||
except (Exception, Timeout):
|
||||
self.logger.increment('errors')
|
||||
self.logger.exception(_('ERROR auditing'))
|
||||
elapsed = time.time() - begin
|
||||
if elapsed < self.interval:
|
||||
time.sleep(self.interval - elapsed)
|
||||
self.logger.info(
|
||||
_('%(server_type)s audit pass completed: %(elapsed).02fs'),
|
||||
{'elapsed': elapsed, 'server_type': self.server_type.title()})
|
||||
dump_recon_cache({
|
||||
'{}_auditor_pass_completed'.format(self.server_type): elapsed},
|
||||
self.rcache, self.logger)
|
||||
|
||||
def run_once(self, *args, **kwargs):
|
||||
"""Run the database audit once."""
|
||||
self.logger.info(
|
||||
_('Begin {} audit "once" mode').format(self.server_type))
|
||||
begin = reported = time.time()
|
||||
self._one_audit_pass(reported)
|
||||
elapsed = time.time() - begin
|
||||
self.logger.info(
|
||||
_('%(server_type)s audit "once" mode completed: %(elapsed).02fs'),
|
||||
{'elapsed': elapsed, 'server_type': self.server_type.title()})
|
||||
dump_recon_cache(
|
||||
{'{}_auditor_pass_completed'.format(self.server_type): elapsed},
|
||||
self.rcache, self.logger)
|
||||
|
||||
def audit(self, path):
|
||||
"""
|
||||
Audits the given database path
|
||||
|
||||
:param path: the path to a db
|
||||
"""
|
||||
start_time = time.time()
|
||||
try:
|
||||
broker = self.broker_class(path, logger=self.logger)
|
||||
if not broker.is_deleted():
|
||||
info = broker.get_info()
|
||||
err = self._audit(info, broker)
|
||||
if err:
|
||||
raise err
|
||||
self.logger.increment('passes')
|
||||
self.passes += 1
|
||||
self.logger.debug('Audit passed for %s', broker)
|
||||
except DatabaseAuditorException as e:
|
||||
self.logger.increment('failures')
|
||||
self.failures += 1
|
||||
self.logger.error(_('Audit Failed for %(path)s: %(err)s'),
|
||||
{'path': path, 'err': str(e)})
|
||||
except (Exception, Timeout):
|
||||
self.logger.increment('failures')
|
||||
self.failures += 1
|
||||
self.logger.exception(
|
||||
_('ERROR Could not get %(server_type)s info %(path)s'),
|
||||
{'server_type': self.server_type, 'path': path})
|
||||
self.logger.timing_since('timing', start_time)
|
||||
|
||||
def _audit(self, info, broker):
|
||||
"""
|
||||
Run any additional audit checks in sub auditor classes
|
||||
|
||||
:param info: The DB <account/container>_info
|
||||
:param broker: The broker
|
||||
:return: None on success, otherwise an exception to throw.
|
||||
"""
|
||||
raise NotImplementedError
|
@ -113,7 +113,11 @@ class DeviceUnavailable(SwiftException):
|
||||
pass
|
||||
|
||||
|
||||
class InvalidAccountInfo(SwiftException):
|
||||
class DatabaseAuditorException(SwiftException):
|
||||
pass
|
||||
|
||||
|
||||
class InvalidAccountInfo(DatabaseAuditorException):
|
||||
pass
|
||||
|
||||
|
||||
|
@ -13,113 +13,16 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import os
|
||||
import time
|
||||
from swift import gettext_ as _
|
||||
from random import random
|
||||
|
||||
from eventlet import Timeout
|
||||
|
||||
import swift.common.db
|
||||
from swift.container.backend import ContainerBroker, DATADIR
|
||||
from swift.common.utils import get_logger, audit_location_generator, \
|
||||
config_true_value, dump_recon_cache, ratelimit_sleep
|
||||
from swift.common.daemon import Daemon
|
||||
from swift.container.backend import ContainerBroker
|
||||
from swift.common.db_auditor import DatabaseAuditor
|
||||
|
||||
|
||||
class ContainerAuditor(Daemon):
|
||||
class ContainerAuditor(DatabaseAuditor):
|
||||
"""Audit containers."""
|
||||
|
||||
def __init__(self, conf, logger=None):
|
||||
self.conf = conf
|
||||
self.logger = logger or get_logger(conf, log_route='container-auditor')
|
||||
self.devices = conf.get('devices', '/srv/node')
|
||||
self.mount_check = config_true_value(conf.get('mount_check', 'true'))
|
||||
self.interval = int(conf.get('interval', 1800))
|
||||
self.container_passes = 0
|
||||
self.container_failures = 0
|
||||
self.containers_running_time = 0
|
||||
self.max_containers_per_second = \
|
||||
float(conf.get('containers_per_second', 200))
|
||||
swift.common.db.DB_PREALLOCATION = \
|
||||
config_true_value(conf.get('db_preallocation', 'f'))
|
||||
self.recon_cache_path = conf.get('recon_cache_path',
|
||||
'/var/cache/swift')
|
||||
self.rcache = os.path.join(self.recon_cache_path, "container.recon")
|
||||
server_type = "container"
|
||||
broker_class = ContainerBroker
|
||||
|
||||
def _one_audit_pass(self, reported):
|
||||
all_locs = audit_location_generator(self.devices, DATADIR, '.db',
|
||||
mount_check=self.mount_check,
|
||||
logger=self.logger)
|
||||
for path, device, partition in all_locs:
|
||||
self.container_audit(path)
|
||||
if time.time() - reported >= 3600: # once an hour
|
||||
self.logger.info(
|
||||
_('Since %(time)s: Container audits: %(pass)s passed '
|
||||
'audit, %(fail)s failed audit'),
|
||||
{'time': time.ctime(reported),
|
||||
'pass': self.container_passes,
|
||||
'fail': self.container_failures})
|
||||
dump_recon_cache(
|
||||
{'container_audits_since': reported,
|
||||
'container_audits_passed': self.container_passes,
|
||||
'container_audits_failed': self.container_failures},
|
||||
self.rcache, self.logger)
|
||||
reported = time.time()
|
||||
self.container_passes = 0
|
||||
self.container_failures = 0
|
||||
self.containers_running_time = ratelimit_sleep(
|
||||
self.containers_running_time, self.max_containers_per_second)
|
||||
return reported
|
||||
|
||||
def run_forever(self, *args, **kwargs):
|
||||
"""Run the container audit until stopped."""
|
||||
reported = time.time()
|
||||
time.sleep(random() * self.interval)
|
||||
while True:
|
||||
self.logger.info(_('Begin container audit pass.'))
|
||||
begin = time.time()
|
||||
try:
|
||||
reported = self._one_audit_pass(reported)
|
||||
except (Exception, Timeout):
|
||||
self.logger.increment('errors')
|
||||
self.logger.exception(_('ERROR auditing'))
|
||||
elapsed = time.time() - begin
|
||||
if elapsed < self.interval:
|
||||
time.sleep(self.interval - elapsed)
|
||||
self.logger.info(
|
||||
_('Container audit pass completed: %.02fs'), elapsed)
|
||||
dump_recon_cache({'container_auditor_pass_completed': elapsed},
|
||||
self.rcache, self.logger)
|
||||
|
||||
def run_once(self, *args, **kwargs):
|
||||
"""Run the container audit once."""
|
||||
self.logger.info(_('Begin container audit "once" mode'))
|
||||
begin = reported = time.time()
|
||||
self._one_audit_pass(reported)
|
||||
elapsed = time.time() - begin
|
||||
self.logger.info(
|
||||
_('Container audit "once" mode completed: %.02fs'), elapsed)
|
||||
dump_recon_cache({'container_auditor_pass_completed': elapsed},
|
||||
self.rcache, self.logger)
|
||||
|
||||
def container_audit(self, path):
|
||||
"""
|
||||
Audits the given container path
|
||||
|
||||
:param path: the path to a container db
|
||||
"""
|
||||
start_time = time.time()
|
||||
try:
|
||||
broker = ContainerBroker(path, logger=self.logger)
|
||||
if not broker.is_deleted():
|
||||
broker.get_info()
|
||||
self.logger.increment('passes')
|
||||
self.container_passes += 1
|
||||
self.logger.debug('Audit passed for %s', broker)
|
||||
except (Exception, Timeout):
|
||||
self.logger.increment('failures')
|
||||
self.container_failures += 1
|
||||
self.logger.exception(_('ERROR Could not get container info %s'),
|
||||
path)
|
||||
self.logger.timing_since('timing', start_time)
|
||||
def _audit(self, job, broker):
|
||||
return None
|
||||
|
@ -16,13 +16,9 @@
|
||||
from collections import defaultdict
|
||||
import itertools
|
||||
import unittest
|
||||
import mock
|
||||
import time
|
||||
import os
|
||||
import random
|
||||
from tempfile import mkdtemp
|
||||
from shutil import rmtree
|
||||
from eventlet import Timeout
|
||||
|
||||
from swift.account import auditor
|
||||
from swift.common.storage_policy import POLICIES
|
||||
@ -32,132 +28,6 @@ from test.unit.account.test_backend import (
|
||||
AccountBrokerPreTrackContainerCountSetup)
|
||||
|
||||
|
||||
class FakeAccountBroker(object):
|
||||
def __init__(self, path, logger):
|
||||
self.path = path
|
||||
self.db_file = path
|
||||
self.file = os.path.basename(path)
|
||||
self.logger = logger
|
||||
|
||||
def is_deleted(self):
|
||||
return False
|
||||
|
||||
def get_info(self):
|
||||
if self.file.startswith('fail'):
|
||||
raise ValueError()
|
||||
if self.file.startswith('true'):
|
||||
return defaultdict(int)
|
||||
|
||||
def get_policy_stats(self, **kwargs):
|
||||
if self.file.startswith('fail'):
|
||||
raise ValueError()
|
||||
if self.file.startswith('true'):
|
||||
return defaultdict(int)
|
||||
|
||||
|
||||
class TestAuditor(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.testdir = os.path.join(mkdtemp(), 'tmp_test_account_auditor')
|
||||
self.logger = debug_logger()
|
||||
rmtree(self.testdir, ignore_errors=1)
|
||||
os.mkdir(self.testdir)
|
||||
fnames = ['true1.db', 'true2.db', 'true3.db',
|
||||
'fail1.db', 'fail2.db']
|
||||
for fn in fnames:
|
||||
with open(os.path.join(self.testdir, fn), 'w+') as f:
|
||||
f.write(' ')
|
||||
|
||||
def tearDown(self):
|
||||
rmtree(os.path.dirname(self.testdir), ignore_errors=1)
|
||||
|
||||
@mock.patch('swift.account.auditor.AccountBroker', FakeAccountBroker)
|
||||
def test_run_forever(self):
|
||||
sleep_times = random.randint(5, 10)
|
||||
call_times = sleep_times - 1
|
||||
|
||||
class FakeTime(object):
|
||||
def __init__(self):
|
||||
self.times = 0
|
||||
|
||||
def sleep(self, sec):
|
||||
self.times += 1
|
||||
if self.times >= sleep_times:
|
||||
# stop forever by an error
|
||||
raise ValueError()
|
||||
|
||||
def time(self):
|
||||
return time.time()
|
||||
|
||||
conf = {}
|
||||
test_auditor = auditor.AccountAuditor(conf, logger=self.logger)
|
||||
|
||||
with mock.patch('swift.account.auditor.time', FakeTime()):
|
||||
def fake_audit_location_generator(*args, **kwargs):
|
||||
files = os.listdir(self.testdir)
|
||||
return [(os.path.join(self.testdir, f), '', '') for f in files]
|
||||
|
||||
with mock.patch('swift.account.auditor.audit_location_generator',
|
||||
fake_audit_location_generator):
|
||||
self.assertRaises(ValueError, test_auditor.run_forever)
|
||||
self.assertEqual(test_auditor.account_failures, 2 * call_times)
|
||||
self.assertEqual(test_auditor.account_passes, 3 * call_times)
|
||||
|
||||
# now force timeout path code coverage
|
||||
def fake_one_audit_pass(reported):
|
||||
raise Timeout()
|
||||
|
||||
with mock.patch('swift.account.auditor.AccountAuditor._one_audit_pass',
|
||||
fake_one_audit_pass):
|
||||
with mock.patch('swift.account.auditor.time', FakeTime()):
|
||||
self.assertRaises(ValueError, test_auditor.run_forever)
|
||||
self.assertEqual(test_auditor.account_failures, 2 * call_times)
|
||||
self.assertEqual(test_auditor.account_passes, 3 * call_times)
|
||||
|
||||
@mock.patch('swift.account.auditor.AccountBroker', FakeAccountBroker)
|
||||
def test_run_once(self):
|
||||
conf = {}
|
||||
test_auditor = auditor.AccountAuditor(conf, logger=self.logger)
|
||||
|
||||
def fake_audit_location_generator(*args, **kwargs):
|
||||
files = os.listdir(self.testdir)
|
||||
return [(os.path.join(self.testdir, f), '', '') for f in files]
|
||||
|
||||
with mock.patch('swift.account.auditor.audit_location_generator',
|
||||
fake_audit_location_generator):
|
||||
test_auditor.run_once()
|
||||
self.assertEqual(test_auditor.account_failures, 2)
|
||||
self.assertEqual(test_auditor.account_passes, 3)
|
||||
|
||||
@mock.patch('swift.account.auditor.AccountBroker', FakeAccountBroker)
|
||||
def test_one_audit_pass(self):
|
||||
conf = {}
|
||||
test_auditor = auditor.AccountAuditor(conf, logger=self.logger)
|
||||
|
||||
def fake_audit_location_generator(*args, **kwargs):
|
||||
files = os.listdir(self.testdir)
|
||||
return [(os.path.join(self.testdir, f), '', '') for f in files]
|
||||
|
||||
# force code coverage for logging path
|
||||
test_auditor.logging_interval = 0
|
||||
with mock.patch('swift.account.auditor.audit_location_generator',
|
||||
fake_audit_location_generator):
|
||||
test_auditor._one_audit_pass(test_auditor.logging_interval)
|
||||
self.assertEqual(test_auditor.account_failures, 0)
|
||||
self.assertEqual(test_auditor.account_passes, 0)
|
||||
|
||||
@mock.patch('swift.account.auditor.AccountBroker', FakeAccountBroker)
|
||||
def test_account_auditor(self):
|
||||
conf = {}
|
||||
test_auditor = auditor.AccountAuditor(conf, logger=self.logger)
|
||||
files = os.listdir(self.testdir)
|
||||
for f in files:
|
||||
path = os.path.join(self.testdir, f)
|
||||
test_auditor.account_audit(path)
|
||||
self.assertEqual(test_auditor.account_failures, 2)
|
||||
self.assertEqual(test_auditor.account_passes, 3)
|
||||
|
||||
|
||||
@patch_policies
|
||||
class TestAuditorRealBrokerMigration(
|
||||
AccountBrokerPreTrackContainerCountSetup, unittest.TestCase):
|
||||
@ -249,7 +119,7 @@ class TestAuditorRealBroker(unittest.TestCase):
|
||||
test_auditor.run_once()
|
||||
|
||||
# validate errors
|
||||
self.assertEqual(test_auditor.account_failures, 1)
|
||||
self.assertEqual(test_auditor.failures, 1)
|
||||
error_lines = test_auditor.logger.get_lines_for_level('error')
|
||||
self.assertEqual(len(error_lines), 1)
|
||||
error_message = error_lines[0]
|
||||
|
151
test/unit/common/test_db_auditor.py
Normal file
151
test/unit/common/test_db_auditor.py
Normal file
@ -0,0 +1,151 @@
|
||||
# Copyright (c) 2010-2018 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import unittest
|
||||
import mock
|
||||
import time
|
||||
import os
|
||||
import random
|
||||
from tempfile import mkdtemp
|
||||
from shutil import rmtree
|
||||
from eventlet import Timeout
|
||||
|
||||
from swift.common.db_auditor import DatabaseAuditor
|
||||
from test.unit import debug_logger
|
||||
|
||||
|
||||
class FakeDatabaseBroker(object):
|
||||
def __init__(self, path, logger):
|
||||
self.path = path
|
||||
self.db_file = path
|
||||
self.file = os.path.basename(path)
|
||||
|
||||
def is_deleted(self):
|
||||
return False
|
||||
|
||||
def get_info(self):
|
||||
if self.file.startswith('fail'):
|
||||
raise ValueError
|
||||
if self.file.startswith('true'):
|
||||
return 'ok'
|
||||
|
||||
|
||||
class FakeDatabaseAuditor(DatabaseAuditor):
|
||||
server_type = "test"
|
||||
broker_class = FakeDatabaseBroker
|
||||
|
||||
def _audit(self, info, broker):
|
||||
return None
|
||||
|
||||
|
||||
class TestAuditor(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.testdir = os.path.join(mkdtemp(), 'tmp_test_database_auditor')
|
||||
self.logger = debug_logger()
|
||||
rmtree(self.testdir, ignore_errors=1)
|
||||
os.mkdir(self.testdir)
|
||||
fnames = ['true1.db', 'true2.db', 'true3.db',
|
||||
'fail1.db', 'fail2.db']
|
||||
for fn in fnames:
|
||||
with open(os.path.join(self.testdir, fn), 'w+') as f:
|
||||
f.write(' ')
|
||||
|
||||
def tearDown(self):
|
||||
rmtree(os.path.dirname(self.testdir), ignore_errors=1)
|
||||
|
||||
def test_run_forever(self):
|
||||
sleep_times = random.randint(5, 10)
|
||||
call_times = sleep_times - 1
|
||||
|
||||
class FakeTime(object):
|
||||
def __init__(self):
|
||||
self.times = 0
|
||||
|
||||
def sleep(self, sec):
|
||||
self.times += 1
|
||||
if self.times < sleep_times:
|
||||
time.sleep(0.1)
|
||||
else:
|
||||
# stop forever by an error
|
||||
raise ValueError()
|
||||
|
||||
def time(self):
|
||||
return time.time()
|
||||
|
||||
conf = {}
|
||||
test_auditor = FakeDatabaseAuditor(conf, logger=self.logger)
|
||||
|
||||
with mock.patch('swift.common.db_auditor.time', FakeTime()):
|
||||
def fake_audit_location_generator(*args, **kwargs):
|
||||
files = os.listdir(self.testdir)
|
||||
return [(os.path.join(self.testdir, f), '', '') for f in files]
|
||||
|
||||
with mock.patch('swift.common.db_auditor.audit_location_generator',
|
||||
fake_audit_location_generator):
|
||||
self.assertRaises(ValueError, test_auditor.run_forever)
|
||||
self.assertEqual(test_auditor.failures, 2 * call_times)
|
||||
self.assertEqual(test_auditor.passes, 3 * call_times)
|
||||
|
||||
# now force timeout path code coverage
|
||||
with mock.patch('swift.common.db_auditor.DatabaseAuditor.'
|
||||
'_one_audit_pass', side_effect=Timeout()):
|
||||
with mock.patch('swift.common.db_auditor.time', FakeTime()):
|
||||
self.assertRaises(ValueError, test_auditor.run_forever)
|
||||
|
||||
def test_run_once(self):
|
||||
conf = {}
|
||||
test_auditor = FakeDatabaseAuditor(conf, logger=self.logger)
|
||||
|
||||
def fake_audit_location_generator(*args, **kwargs):
|
||||
files = os.listdir(self.testdir)
|
||||
return [(os.path.join(self.testdir, f), '', '') for f in files]
|
||||
|
||||
with mock.patch('swift.common.db_auditor.audit_location_generator',
|
||||
fake_audit_location_generator):
|
||||
test_auditor.run_once()
|
||||
self.assertEqual(test_auditor.failures, 2)
|
||||
self.assertEqual(test_auditor.passes, 3)
|
||||
|
||||
def test_one_audit_pass(self):
|
||||
conf = {}
|
||||
test_auditor = FakeDatabaseAuditor(conf, logger=self.logger)
|
||||
|
||||
def fake_audit_location_generator(*args, **kwargs):
|
||||
files = sorted(os.listdir(self.testdir))
|
||||
return [(os.path.join(self.testdir, f), '', '') for f in files]
|
||||
|
||||
# force code coverage for logging path
|
||||
with mock.patch('swift.common.db_auditor.audit_location_generator',
|
||||
fake_audit_location_generator),\
|
||||
mock.patch('time.time',
|
||||
return_value=(test_auditor.logging_interval * 2)):
|
||||
test_auditor._one_audit_pass(0)
|
||||
self.assertEqual(test_auditor.failures, 1)
|
||||
self.assertEqual(test_auditor.passes, 3)
|
||||
|
||||
def test_container_auditor(self):
|
||||
conf = {}
|
||||
test_auditor = FakeDatabaseAuditor(conf, logger=self.logger)
|
||||
files = os.listdir(self.testdir)
|
||||
for f in files:
|
||||
path = os.path.join(self.testdir, f)
|
||||
test_auditor.audit(path)
|
||||
self.assertEqual(test_auditor.failures, 2)
|
||||
self.assertEqual(test_auditor.passes, 3)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
@ -15,12 +15,7 @@
|
||||
|
||||
import unittest
|
||||
import mock
|
||||
import time
|
||||
import os
|
||||
import random
|
||||
from tempfile import mkdtemp
|
||||
from shutil import rmtree
|
||||
from eventlet import Timeout
|
||||
|
||||
from swift.common.utils import normalize_timestamp
|
||||
from swift.container import auditor
|
||||
@ -28,129 +23,10 @@ from test.unit import debug_logger, with_tempdir
|
||||
from test.unit.container import test_backend
|
||||
|
||||
|
||||
class FakeContainerBroker(object):
|
||||
def __init__(self, path, logger):
|
||||
self.path = path
|
||||
self.db_file = path
|
||||
self.file = os.path.basename(path)
|
||||
|
||||
def is_deleted(self):
|
||||
return False
|
||||
|
||||
def get_info(self):
|
||||
if self.file.startswith('fail'):
|
||||
raise ValueError
|
||||
if self.file.startswith('true'):
|
||||
return 'ok'
|
||||
|
||||
|
||||
class TestAuditor(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.testdir = os.path.join(mkdtemp(), 'tmp_test_container_auditor')
|
||||
self.logger = debug_logger()
|
||||
rmtree(self.testdir, ignore_errors=1)
|
||||
os.mkdir(self.testdir)
|
||||
fnames = ['true1.db', 'true2.db', 'true3.db',
|
||||
'fail1.db', 'fail2.db']
|
||||
for fn in fnames:
|
||||
with open(os.path.join(self.testdir, fn), 'w+') as f:
|
||||
f.write(' ')
|
||||
|
||||
def tearDown(self):
|
||||
rmtree(os.path.dirname(self.testdir), ignore_errors=1)
|
||||
|
||||
@mock.patch('swift.container.auditor.dump_recon_cache')
|
||||
@mock.patch('swift.container.auditor.ContainerBroker', FakeContainerBroker)
|
||||
def test_run_forever(self, mock_recon):
|
||||
sleep_times = random.randint(5, 10)
|
||||
call_times = sleep_times - 1
|
||||
|
||||
class FakeTime(object):
|
||||
def __init__(self):
|
||||
self.times = 0
|
||||
|
||||
def sleep(self, sec):
|
||||
self.times += 1
|
||||
if self.times < sleep_times:
|
||||
time.sleep(0.1)
|
||||
else:
|
||||
# stop forever by an error
|
||||
raise ValueError()
|
||||
|
||||
def time(self):
|
||||
return time.time()
|
||||
|
||||
conf = {}
|
||||
test_auditor = auditor.ContainerAuditor(conf, logger=self.logger)
|
||||
|
||||
with mock.patch('swift.container.auditor.time', FakeTime()):
|
||||
def fake_audit_location_generator(*args, **kwargs):
|
||||
files = os.listdir(self.testdir)
|
||||
return [(os.path.join(self.testdir, f), '', '') for f in files]
|
||||
|
||||
with mock.patch('swift.container.auditor.audit_location_generator',
|
||||
fake_audit_location_generator):
|
||||
self.assertRaises(ValueError, test_auditor.run_forever)
|
||||
self.assertEqual(test_auditor.container_failures, 2 * call_times)
|
||||
self.assertEqual(test_auditor.container_passes, 3 * call_times)
|
||||
|
||||
# now force timeout path code coverage
|
||||
with mock.patch('swift.container.auditor.ContainerAuditor.'
|
||||
'_one_audit_pass', side_effect=Timeout()):
|
||||
with mock.patch('swift.container.auditor.time', FakeTime()):
|
||||
self.assertRaises(ValueError, test_auditor.run_forever)
|
||||
|
||||
@mock.patch('swift.container.auditor.dump_recon_cache')
|
||||
@mock.patch('swift.container.auditor.ContainerBroker', FakeContainerBroker)
|
||||
def test_run_once(self, mock_recon):
|
||||
conf = {}
|
||||
test_auditor = auditor.ContainerAuditor(conf, logger=self.logger)
|
||||
|
||||
def fake_audit_location_generator(*args, **kwargs):
|
||||
files = os.listdir(self.testdir)
|
||||
return [(os.path.join(self.testdir, f), '', '') for f in files]
|
||||
|
||||
with mock.patch('swift.container.auditor.audit_location_generator',
|
||||
fake_audit_location_generator):
|
||||
test_auditor.run_once()
|
||||
self.assertEqual(test_auditor.container_failures, 2)
|
||||
self.assertEqual(test_auditor.container_passes, 3)
|
||||
|
||||
@mock.patch('swift.container.auditor.dump_recon_cache')
|
||||
@mock.patch('swift.container.auditor.ContainerBroker', FakeContainerBroker)
|
||||
def test_one_audit_pass(self, mock_recon):
|
||||
conf = {}
|
||||
test_auditor = auditor.ContainerAuditor(conf, logger=self.logger)
|
||||
|
||||
def fake_audit_location_generator(*args, **kwargs):
|
||||
files = sorted(os.listdir(self.testdir))
|
||||
return [(os.path.join(self.testdir, f), '', '') for f in files]
|
||||
|
||||
# force code coverage for logging path
|
||||
test_auditor.logging_interval = 0
|
||||
with mock.patch('swift.container.auditor.audit_location_generator',
|
||||
fake_audit_location_generator):
|
||||
test_auditor._one_audit_pass(test_auditor.logging_interval)
|
||||
self.assertEqual(test_auditor.container_failures, 1)
|
||||
self.assertEqual(test_auditor.container_passes, 3)
|
||||
|
||||
@mock.patch('swift.container.auditor.ContainerBroker', FakeContainerBroker)
|
||||
def test_container_auditor(self):
|
||||
conf = {}
|
||||
test_auditor = auditor.ContainerAuditor(conf, logger=self.logger)
|
||||
files = os.listdir(self.testdir)
|
||||
for f in files:
|
||||
path = os.path.join(self.testdir, f)
|
||||
test_auditor.container_audit(path)
|
||||
self.assertEqual(test_auditor.container_failures, 2)
|
||||
self.assertEqual(test_auditor.container_passes, 3)
|
||||
|
||||
|
||||
class TestAuditorMigrations(unittest.TestCase):
|
||||
|
||||
@with_tempdir
|
||||
@mock.patch('swift.container.auditor.dump_recon_cache')
|
||||
@mock.patch('swift.common.db_auditor.dump_recon_cache')
|
||||
def test_db_migration(self, tempdir, mock_recon):
|
||||
db_path = os.path.join(tempdir, 'sda', 'containers', '0', '0', '0',
|
||||
'test.db')
|
||||
|
Loading…
Reference in New Issue
Block a user