diff --git a/swift/account/auditor.py b/swift/account/auditor.py index af38ed3bde..261acf7e92 100644 --- a/swift/account/auditor.py +++ b/swift/account/auditor.py @@ -20,6 +20,7 @@ from random import random import swift.common.db from swift.account.backend import AccountBroker, DATADIR +from swift.common.exceptions import InvalidAccountInfo from swift.common.utils import get_logger, audit_location_generator, \ config_true_value, dump_recon_cache, ratelimit_sleep from swift.common.daemon import Daemon @@ -30,9 +31,9 @@ from eventlet import Timeout class AccountAuditor(Daemon): """Audit accounts.""" - def __init__(self, conf): + def __init__(self, conf, logger=None): self.conf = conf - self.logger = get_logger(conf, log_route='account-auditor') + self.logger = logger or get_logger(conf, log_route='account-auditor') self.devices = conf.get('devices', '/srv/node') self.mount_check = config_true_value(conf.get('mount_check', 'true')) self.interval = int(conf.get('interval', 1800)) @@ -104,6 +105,29 @@ class AccountAuditor(Daemon): dump_recon_cache({'account_auditor_pass_completed': elapsed}, self.rcache, self.logger) + def validate_per_policy_counts(self, broker): + info = broker.get_info() + policy_stats = broker.get_policy_stats(do_migrations=True) + policy_totals = { + 'container_count': 0, + 'object_count': 0, + 'bytes_used': 0, + } + for policy_stat in policy_stats.values(): + for key in policy_totals: + policy_totals[key] += policy_stat[key] + + for key in policy_totals: + if policy_totals[key] == info[key]: + continue + raise InvalidAccountInfo(_( + 'The total %(key)s for the container (%(total)s) does not ' + 'match the sum of %(key)s across policies (%(sum)s)') % { + 'key': key, + 'total': info[key], + 'sum': policy_totals[key], + }) + def account_audit(self, path): """ Audits the given account path @@ -114,10 +138,15 @@ class AccountAuditor(Daemon): try: broker = AccountBroker(path) if not broker.is_deleted(): - broker.get_info() + self.validate_per_policy_counts(broker) self.logger.increment('passes') self.account_passes += 1 self.logger.debug('Audit passed for %s' % broker) + except InvalidAccountInfo as e: + self.logger.increment('failures') + self.account_failures += 1 + self.logger.error( + _('Audit Failed for %s: %s'), path, str(e)) except (Exception, Timeout): self.logger.increment('failures') self.account_failures += 1 diff --git a/swift/account/backend.py b/swift/account/backend.py index 1ad37c22c1..89c6cfb65e 100644 --- a/swift/account/backend.py +++ b/swift/account/backend.py @@ -32,17 +32,19 @@ POLICY_STAT_TRIGGER_SCRIPT = """ CREATE TRIGGER container_insert_ps AFTER INSERT ON container BEGIN INSERT OR IGNORE INTO policy_stat - (storage_policy_index, object_count, bytes_used) - VALUES (new.storage_policy_index, 0, 0); + (storage_policy_index, container_count, object_count, bytes_used) + VALUES (new.storage_policy_index, 0, 0, 0); UPDATE policy_stat - SET object_count = object_count + new.object_count, + SET container_count = container_count + (1 - new.deleted), + object_count = object_count + new.object_count, bytes_used = bytes_used + new.bytes_used WHERE storage_policy_index = new.storage_policy_index; END; CREATE TRIGGER container_delete_ps AFTER DELETE ON container BEGIN UPDATE policy_stat - SET object_count = object_count - old.object_count, + SET container_count = container_count - (1 - old.deleted), + object_count = object_count - old.object_count, bytes_used = bytes_used - old.bytes_used WHERE storage_policy_index = old.storage_policy_index; END; @@ -165,13 +167,15 @@ class AccountBroker(DatabaseBroker): conn.executescript(""" CREATE TABLE policy_stat ( storage_policy_index INTEGER PRIMARY KEY, + container_count INTEGER DEFAULT 0, object_count INTEGER DEFAULT 0, bytes_used INTEGER DEFAULT 0 ); INSERT OR IGNORE INTO policy_stat ( - storage_policy_index, object_count, bytes_used + storage_policy_index, container_count, object_count, + bytes_used ) - SELECT 0, object_count, bytes_used + SELECT 0, container_count, object_count, bytes_used FROM account_stat WHERE container_count > 0; """) @@ -296,24 +300,45 @@ class AccountBroker(DatabaseBroker): return row['status'] == "DELETED" or ( row['delete_timestamp'] > row['put_timestamp']) - def get_policy_stats(self): + def get_policy_stats(self, do_migrations=False): """ Get global policy stats for the account. + :param do_migrations: boolean, if True the policy stat dicts will + always include the 'container_count' key; + otherwise it may be ommited on legacy databases + until they are migrated. + :returns: dict of policy stats where the key is the policy index and the value is a dictionary like {'object_count': M, - 'bytes_used': N} + 'bytes_used': N, 'container_count': L} """ - info = [] + columns = [ + 'storage_policy_index', + 'container_count', + 'object_count', + 'bytes_used', + ] + + def run_query(): + return (conn.execute(''' + SELECT %s + FROM policy_stat + ''' % ', '.join(columns)).fetchall()) + self._commit_puts_stale_ok() + info = [] with self.get() as conn: try: - info = (conn.execute(''' - SELECT storage_policy_index, object_count, bytes_used - FROM policy_stat - ''').fetchall()) + info = run_query() except sqlite3.OperationalError as err: - if "no such table: policy_stat" not in str(err): + if "no such column: container_count" in str(err): + if do_migrations: + self._migrate_add_container_count(conn) + else: + columns.remove('container_count') + info = run_query() + elif "no such table: policy_stat" not in str(err): raise policy_stats = {} @@ -501,10 +526,72 @@ class AccountBroker(DatabaseBroker): self._migrate_add_storage_policy_index(conn) _really_merge_items(conn) + def _migrate_add_container_count(self, conn): + """ + Add the container_count column to the 'policy_stat' table and + update it + + :param conn: DB connection object + """ + # add the container_count column + curs = conn.cursor() + curs.executescript(''' + DROP TRIGGER container_delete_ps; + DROP TRIGGER container_insert_ps; + ALTER TABLE policy_stat + ADD COLUMN container_count INTEGER DEFAULT 0; + ''' + POLICY_STAT_TRIGGER_SCRIPT) + + # keep the simple case simple, if there's only one entry in the + # policy_stat table we just copy the total container count from the + # account_stat table + + # if that triggers an update then the where changes <> 0 *would* exist + # and the insert or replace from the count subqueries won't execute + + curs.executescript(""" + UPDATE policy_stat + SET container_count = ( + SELECT container_count + FROM account_stat) + WHERE ( + SELECT COUNT(storage_policy_index) + FROM policy_stat + ) <= 1; + + INSERT OR REPLACE INTO policy_stat ( + storage_policy_index, + container_count, + object_count, + bytes_used + ) + SELECT p.storage_policy_index, + c.count, + p.object_count, + p.bytes_used + FROM ( + SELECT storage_policy_index, + COUNT(*) as count + FROM container + WHERE deleted = 0 + GROUP BY storage_policy_index + ) c + JOIN policy_stat p + ON p.storage_policy_index = c.storage_policy_index + WHERE NOT EXISTS( + SELECT changes() as change + FROM policy_stat + WHERE change <> 0 + ); + """) + conn.commit() + def _migrate_add_storage_policy_index(self, conn): """ Add the storage_policy_index column to the 'container' table and set up triggers, creating the policy_stat table if needed. + + :param conn: DB connection object """ try: self.create_policy_stat_table(conn) diff --git a/swift/common/exceptions.py b/swift/common/exceptions.py index e46df933b7..856c489757 100644 --- a/swift/common/exceptions.py +++ b/swift/common/exceptions.py @@ -79,6 +79,10 @@ class DeviceUnavailable(SwiftException): pass +class InvalidAccountInfo(SwiftException): + pass + + class PathNotDir(OSError): pass diff --git a/test/unit/account/test_auditor.py b/test/unit/account/test_auditor.py index 499b44155d..c79209bc09 100644 --- a/test/unit/account/test_auditor.py +++ b/test/unit/account/test_auditor.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from collections import defaultdict +import itertools import unittest import mock import time @@ -23,7 +25,11 @@ from shutil import rmtree from eventlet import Timeout from swift.account import auditor -from test.unit import FakeLogger +from swift.common.storage_policy import POLICIES +from swift.common.utils import Timestamp +from test.unit import debug_logger, patch_policies, with_tempdir +from test.unit.account.test_backend import ( + AccountBrokerPreTrackContainerCountSetup) class FakeAccountBroker(object): @@ -37,16 +43,22 @@ class FakeAccountBroker(object): def get_info(self): if self.file.startswith('fail'): - raise ValueError + raise ValueError() if self.file.startswith('true'): - return 'ok' + return defaultdict(int) + + def get_policy_stats(self, **kwargs): + if self.file.startswith('fail'): + raise ValueError() + if self.file.startswith('true'): + return defaultdict(int) class TestAuditor(unittest.TestCase): def setUp(self): self.testdir = os.path.join(mkdtemp(), 'tmp_test_account_auditor') - self.logger = FakeLogger() + self.logger = debug_logger() rmtree(self.testdir, ignore_errors=1) os.mkdir(self.testdir) fnames = ['true1.db', 'true2.db', 'true3.db', @@ -69,9 +81,7 @@ class TestAuditor(unittest.TestCase): def sleep(self, sec): self.times += 1 - if self.times < sleep_times: - time.sleep(0.1) - else: + if self.times >= sleep_times: # stop forever by an error raise ValueError() @@ -79,7 +89,7 @@ class TestAuditor(unittest.TestCase): return time.time() conf = {} - test_auditor = auditor.AccountAuditor(conf) + test_auditor = auditor.AccountAuditor(conf, logger=self.logger) with mock.patch('swift.account.auditor.time', FakeTime()): def fake_audit_location_generator(*args, **kwargs): @@ -106,7 +116,7 @@ class TestAuditor(unittest.TestCase): @mock.patch('swift.account.auditor.AccountBroker', FakeAccountBroker) def test_run_once(self): conf = {} - test_auditor = auditor.AccountAuditor(conf) + test_auditor = auditor.AccountAuditor(conf, logger=self.logger) def fake_audit_location_generator(*args, **kwargs): files = os.listdir(self.testdir) @@ -121,7 +131,7 @@ class TestAuditor(unittest.TestCase): @mock.patch('swift.account.auditor.AccountBroker', FakeAccountBroker) def test_one_audit_pass(self): conf = {} - test_auditor = auditor.AccountAuditor(conf) + test_auditor = auditor.AccountAuditor(conf, logger=self.logger) def fake_audit_location_generator(*args, **kwargs): files = os.listdir(self.testdir) @@ -138,7 +148,7 @@ class TestAuditor(unittest.TestCase): @mock.patch('swift.account.auditor.AccountBroker', FakeAccountBroker) def test_account_auditor(self): conf = {} - test_auditor = auditor.AccountAuditor(conf) + test_auditor = auditor.AccountAuditor(conf, logger=self.logger) files = os.listdir(self.testdir) for f in files: path = os.path.join(self.testdir, f) @@ -146,5 +156,108 @@ class TestAuditor(unittest.TestCase): self.assertEqual(test_auditor.account_failures, 2) self.assertEqual(test_auditor.account_passes, 3) + +@patch_policies +class TestAuditorRealBrokerMigration( + AccountBrokerPreTrackContainerCountSetup, unittest.TestCase): + + def test_db_migration(self): + # add a few containers + policies = itertools.cycle(POLICIES) + num_containers = len(POLICIES) * 3 + per_policy_container_counts = defaultdict(int) + for i in range(num_containers): + name = 'test-container-%02d' % i + policy = next(policies) + self.broker.put_container(name, next(self.ts), + 0, 0, 0, int(policy)) + per_policy_container_counts[int(policy)] += 1 + + self.broker._commit_puts() + self.assertEqual(num_containers, + self.broker.get_info()['container_count']) + + # still un-migrated + self.assertUnmigrated(self.broker) + + # run auditor, and validate migration + conf = {'devices': self.tempdir, 'mount_check': False, + 'recon_cache_path': self.tempdir} + test_auditor = auditor.AccountAuditor(conf, logger=debug_logger()) + test_auditor.run_once() + + self.restore_account_broker() + + broker = auditor.AccountBroker(self.db_path) + # go after rows directly to avoid unintentional migration + with broker.get() as conn: + rows = conn.execute(''' + SELECT storage_policy_index, container_count + FROM policy_stat + ''').fetchall() + for policy_index, container_count in rows: + self.assertEqual(container_count, + per_policy_container_counts[policy_index]) + + +class TestAuditorRealBroker(unittest.TestCase): + + def setUp(self): + self.logger = debug_logger() + + @with_tempdir + def test_db_validate_fails(self, tempdir): + ts = (Timestamp(t).internal for t in itertools.count(int(time.time()))) + db_path = os.path.join(tempdir, 'sda', 'accounts', + '0', '0', '0', 'test.db') + broker = auditor.AccountBroker(db_path, account='a') + broker.initialize(next(ts)) + # add a few containers + policies = itertools.cycle(POLICIES) + num_containers = len(POLICIES) * 3 + per_policy_container_counts = defaultdict(int) + for i in range(num_containers): + name = 'test-container-%02d' % i + policy = next(policies) + broker.put_container(name, next(ts), 0, 0, 0, int(policy)) + per_policy_container_counts[int(policy)] += 1 + + broker._commit_puts() + self.assertEqual(broker.get_info()['container_count'], num_containers) + + messed_up_policy = random.choice(list(POLICIES)) + + # now mess up a policy_stats table count + with broker.get() as conn: + conn.executescript(''' + UPDATE policy_stat + SET container_count = container_count - 1 + WHERE storage_policy_index = %d; + ''' % int(messed_up_policy)) + + # validate it's messed up + policy_stats = broker.get_policy_stats() + self.assertEqual( + policy_stats[int(messed_up_policy)]['container_count'], + per_policy_container_counts[int(messed_up_policy)] - 1) + + # do an audit + conf = {'devices': tempdir, 'mount_check': False, + 'recon_cache_path': tempdir} + test_auditor = auditor.AccountAuditor(conf, logger=self.logger) + test_auditor.run_once() + + # validate errors + self.assertEqual(test_auditor.account_failures, 1) + error_lines = test_auditor.logger.get_lines_for_level('error') + self.assertEqual(len(error_lines), 1) + error_message = error_lines[0] + self.assert_(broker.db_file in error_message) + self.assert_('container_count' in error_message) + self.assert_('does not match' in error_message) + self.assertEqual(test_auditor.logger.get_increment_counts(), + {'failures': 1}) + + if __name__ == '__main__': unittest.main() diff --git a/test/unit/account/test_backend.py b/test/unit/account/test_backend.py index 1fb65260ac..40fac76731 100644 --- a/test/unit/account/test_backend.py +++ b/test/unit/account/test_backend.py @@ -15,6 +15,7 @@ """ Tests for swift.account.backend """ +from collections import defaultdict import hashlib import json import unittest @@ -656,9 +657,10 @@ class TestAccountBroker(unittest.TestCase): put_timestamp, 0, 0, 0, policy.idx) - policy_stats = broker.get_policy_stats() stats = policy_stats[policy.idx] + if 'container_count' in stats: + self.assertEqual(stats['container_count'], 1) self.assertEqual(stats['object_count'], 0) self.assertEqual(stats['bytes_used'], 0) @@ -674,6 +676,8 @@ class TestAccountBroker(unittest.TestCase): policy_stats = broker.get_policy_stats() stats = policy_stats[policy.idx] + if 'container_count' in stats: + self.assertEqual(stats['container_count'], 1) self.assertEqual(stats['object_count'], count) self.assertEqual(stats['bytes_used'], count) @@ -681,6 +685,8 @@ class TestAccountBroker(unittest.TestCase): for policy_index, stats in policy_stats.items(): policy = POLICIES[policy_index] count = policy.idx * 100 # coupled with policy for test + if 'container_count' in stats: + self.assertEqual(stats['container_count'], 1) self.assertEqual(stats['object_count'], count) self.assertEqual(stats['bytes_used'], count) @@ -695,6 +701,8 @@ class TestAccountBroker(unittest.TestCase): policy_stats = broker.get_policy_stats() stats = policy_stats[policy.idx] + if 'container_count' in stats: + self.assertEqual(stats['container_count'], 0) self.assertEqual(stats['object_count'], 0) self.assertEqual(stats['bytes_used'], 0) @@ -714,8 +722,12 @@ class TestAccountBroker(unittest.TestCase): stats = broker.get_policy_stats() self.assertEqual(len(stats), 2) + if 'container_count' in stats[0]: + self.assertEqual(stats[0]['container_count'], 1) self.assertEqual(stats[0]['object_count'], 13) self.assertEqual(stats[0]['bytes_used'], 8156441) + if 'container_count' in stats[1]: + self.assertEqual(stats[1]['container_count'], 1) self.assertEqual(stats[1]['object_count'], 8) self.assertEqual(stats[1]['bytes_used'], 6085379) @@ -1019,8 +1031,12 @@ class TestAccountBrokerBeforeSPI(TestAccountBroker): # we should have stats for both containers stats = broker.get_policy_stats() self.assertEqual(len(stats), 2) + if 'container_count' in stats[0]: + self.assertEqual(stats[0]['container_count'], 1) self.assertEqual(stats[0]['object_count'], 1) self.assertEqual(stats[0]['bytes_used'], 2) + if 'container_count' in stats[1]: + self.assertEqual(stats[1]['container_count'], 1) self.assertEqual(stats[1]['object_count'], 3) self.assertEqual(stats[1]['bytes_used'], 4) @@ -1032,8 +1048,12 @@ class TestAccountBrokerBeforeSPI(TestAccountBroker): conn.commit() stats = broker.get_policy_stats() self.assertEqual(len(stats), 2) + if 'container_count' in stats[0]: + self.assertEqual(stats[0]['container_count'], 0) self.assertEqual(stats[0]['object_count'], 0) self.assertEqual(stats[0]['bytes_used'], 0) + if 'container_count' in stats[1]: + self.assertEqual(stats[1]['container_count'], 1) self.assertEqual(stats[1]['object_count'], 3) self.assertEqual(stats[1]['bytes_used'], 4) @@ -1099,3 +1119,345 @@ class TestAccountBrokerBeforeSPI(TestAccountBroker): with broker.get() as conn: conn.execute('SELECT * FROM policy_stat') conn.execute('SELECT storage_policy_index FROM container') + + +def pre_track_containers_create_policy_stat(self, conn): + """ + Copied from AccountBroker before the container_count column was + added. + Create policy_stat table which is specific to the account DB. + Not a part of Pluggable Back-ends, internal to the baseline code. + + :param conn: DB connection object + """ + conn.executescript(""" + CREATE TABLE policy_stat ( + storage_policy_index INTEGER PRIMARY KEY, + object_count INTEGER DEFAULT 0, + bytes_used INTEGER DEFAULT 0 + ); + INSERT OR IGNORE INTO policy_stat ( + storage_policy_index, object_count, bytes_used + ) + SELECT 0, object_count, bytes_used + FROM account_stat + WHERE container_count > 0; + """) + + +def pre_track_containers_create_container_table(self, conn): + """ + Copied from AccountBroker before the container_count column was + added (using old stat trigger script) + Create container table which is specific to the account DB. + + :param conn: DB connection object + """ + # revert to old trigger script to support one of the tests + OLD_POLICY_STAT_TRIGGER_SCRIPT = """ + CREATE TRIGGER container_insert_ps AFTER INSERT ON container + BEGIN + INSERT OR IGNORE INTO policy_stat + (storage_policy_index, object_count, bytes_used) + VALUES (new.storage_policy_index, 0, 0); + UPDATE policy_stat + SET object_count = object_count + new.object_count, + bytes_used = bytes_used + new.bytes_used + WHERE storage_policy_index = new.storage_policy_index; + END; + CREATE TRIGGER container_delete_ps AFTER DELETE ON container + BEGIN + UPDATE policy_stat + SET object_count = object_count - old.object_count, + bytes_used = bytes_used - old.bytes_used + WHERE storage_policy_index = old.storage_policy_index; + END; + + """ + conn.executescript(""" + CREATE TABLE container ( + ROWID INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT, + put_timestamp TEXT, + delete_timestamp TEXT, + object_count INTEGER, + bytes_used INTEGER, + deleted INTEGER DEFAULT 0, + storage_policy_index INTEGER DEFAULT 0 + ); + + CREATE INDEX ix_container_deleted_name ON + container (deleted, name); + + CREATE TRIGGER container_insert AFTER INSERT ON container + BEGIN + UPDATE account_stat + SET container_count = container_count + (1 - new.deleted), + object_count = object_count + new.object_count, + bytes_used = bytes_used + new.bytes_used, + hash = chexor(hash, new.name, + new.put_timestamp || '-' || + new.delete_timestamp || '-' || + new.object_count || '-' || new.bytes_used); + END; + + CREATE TRIGGER container_update BEFORE UPDATE ON container + BEGIN + SELECT RAISE(FAIL, 'UPDATE not allowed; DELETE and INSERT'); + END; + + + CREATE TRIGGER container_delete AFTER DELETE ON container + BEGIN + UPDATE account_stat + SET container_count = container_count - (1 - old.deleted), + object_count = object_count - old.object_count, + bytes_used = bytes_used - old.bytes_used, + hash = chexor(hash, old.name, + old.put_timestamp || '-' || + old.delete_timestamp || '-' || + old.object_count || '-' || old.bytes_used); + END; + """ + OLD_POLICY_STAT_TRIGGER_SCRIPT) + + +class AccountBrokerPreTrackContainerCountSetup(object): + def assertUnmigrated(self, broker): + with broker.get() as conn: + try: + conn.execute(''' + SELECT container_count FROM policy_stat + ''').fetchone()[0] + except sqlite3.OperationalError as err: + # confirm that the column really isn't there + self.assert_('no such column: container_count' in str(err)) + else: + self.fail('broker did not raise sqlite3.OperationalError ' + 'trying to select container_count from policy_stat!') + + def setUp(self): + # use old version of policy_stat + self._imported_create_policy_stat_table = \ + AccountBroker.create_policy_stat_table + AccountBroker.create_policy_stat_table = \ + pre_track_containers_create_policy_stat + # use old container table so we use old trigger for + # updating policy_stat + self._imported_create_container_table = \ + AccountBroker.create_container_table + AccountBroker.create_container_table = \ + pre_track_containers_create_container_table + + broker = AccountBroker(':memory:', account='a') + broker.initialize(Timestamp('1').internal) + self.assertUnmigrated(broker) + + self.tempdir = mkdtemp() + self.ts = (Timestamp(t).internal for t in itertools.count(int(time()))) + + self.db_path = os.path.join(self.tempdir, 'sda', 'accounts', + '0', '0', '0', 'test.db') + self.broker = AccountBroker(self.db_path, account='a') + self.broker.initialize(next(self.ts)) + + # Common sanity-check that our starting, pre-migration state correctly + # does not have the container_count column. + self.assertUnmigrated(self.broker) + + def tearDown(self): + rmtree(self.tempdir, ignore_errors=True) + + self.restore_account_broker() + + broker = AccountBroker(':memory:', account='a') + broker.initialize(Timestamp('1').internal) + with broker.get() as conn: + conn.execute('SELECT container_count FROM policy_stat') + + def restore_account_broker(self): + AccountBroker.create_policy_stat_table = \ + self._imported_create_policy_stat_table + AccountBroker.create_container_table = \ + self._imported_create_container_table + + +@patch_policies([StoragePolicy(0, 'zero', False), + StoragePolicy(1, 'one', True), + StoragePolicy(2, 'two', False), + StoragePolicy(3, 'three', False)]) +class TestAccountBrokerBeforePerPolicyContainerTrack( + AccountBrokerPreTrackContainerCountSetup, TestAccountBroker): + """ + Tests for AccountBroker against databases created before + the container_count column was added to the policy_stat table. + """ + + def test_policy_table_cont_count_do_migrations(self): + # add a few containers + num_containers = 8 + policies = itertools.cycle(POLICIES) + per_policy_container_counts = defaultdict(int) + + # add a few container entries + for i in range(num_containers): + name = 'test-container-%02d' % i + policy = next(policies) + self.broker.put_container(name, next(self.ts), + 0, 0, 0, int(policy)) + per_policy_container_counts[int(policy)] += 1 + + total_container_count = self.broker.get_info()['container_count'] + self.assertEqual(total_container_count, num_containers) + + # still un-migrated + self.assertUnmigrated(self.broker) + + policy_stats = self.broker.get_policy_stats() + self.assertEqual(len(policy_stats), len(per_policy_container_counts)) + for stats in policy_stats.values(): + self.assertEqual(stats['object_count'], 0) + self.assertEqual(stats['bytes_used'], 0) + # un-migrated dbs should not return container_count + self.assertFalse('container_count' in stats) + + # now force the migration + policy_stats = self.broker.get_policy_stats(do_migrations=True) + self.assertEqual(len(policy_stats), len(per_policy_container_counts)) + for policy_index, stats in policy_stats.items(): + self.assertEqual(stats['object_count'], 0) + self.assertEqual(stats['bytes_used'], 0) + self.assertEqual(stats['container_count'], + per_policy_container_counts[policy_index]) + + def test_policy_table_cont_count_update_get_stats(self): + # add a few container entries + for policy in POLICIES: + for i in range(0, policy.idx + 1): + container_name = 'c%s_0' % policy.idx + self.broker.put_container('c%s_%s' % (policy.idx, i), + 0, 0, 0, 0, policy.idx) + # _commit_puts_stale_ok() called by get_policy_stats() + + # calling get_policy_stats() with do_migrations will alter the table + # and populate it based on what's in the container table now + stats = self.broker.get_policy_stats(do_migrations=True) + + # now confirm that the column was created + with self.broker.get() as conn: + conn.execute('SELECT container_count FROM policy_stat') + + # confirm stats reporting back correctly + self.assertEqual(len(stats), 4) + for policy in POLICIES: + self.assertEqual(stats[policy.idx]['container_count'], + policy.idx + 1) + + # now delete one from each policy and check the stats + with self.broker.get() as conn: + for policy in POLICIES: + container_name = 'c%s_0' % policy.idx + conn.execute(''' + DELETE FROM container + WHERE name = ? + ''', (container_name,)) + conn.commit() + stats = self.broker.get_policy_stats() + self.assertEqual(len(stats), 4) + for policy in POLICIES: + self.assertEqual(stats[policy.idx]['container_count'], + policy.idx) + + # now put them back and make sure things are still cool + for policy in POLICIES: + container_name = 'c%s_0' % policy.idx + self.broker.put_container(container_name, 0, 0, 0, 0, policy.idx) + # _commit_puts_stale_ok() called by get_policy_stats() + + # confirm stats reporting back correctly + stats = self.broker.get_policy_stats() + self.assertEqual(len(stats), 4) + for policy in POLICIES: + self.assertEqual(stats[policy.idx]['container_count'], + policy.idx + 1) + + def test_per_policy_cont_count_migration_with_deleted(self): + num_containers = 15 + policies = itertools.cycle(POLICIES) + container_policy_map = {} + + # add a few container entries + for i in range(num_containers): + name = 'test-container-%02d' % i + policy = next(policies) + self.broker.put_container(name, next(self.ts), + 0, 0, 0, int(policy)) + # keep track of stub container policies + container_policy_map[name] = policy + + # delete about half of the containers + for i in range(0, num_containers, 2): + name = 'test-container-%02d' % i + policy = container_policy_map[name] + self.broker.put_container(name, 0, next(self.ts), + 0, 0, int(policy)) + + total_container_count = self.broker.get_info()['container_count'] + self.assertEqual(total_container_count, num_containers / 2) + + # trigger migration + policy_info = self.broker.get_policy_stats(do_migrations=True) + self.assertEqual(len(policy_info), min(num_containers, len(POLICIES))) + policy_container_count = sum(p['container_count'] for p in + policy_info.values()) + self.assertEqual(total_container_count, policy_container_count) + + def test_per_policy_cont_count_migration_with_single_policy(self): + num_containers = 100 + + with patch_policies(legacy_only=True): + policy = POLICIES[0] + # add a few container entries + for i in range(num_containers): + name = 'test-container-%02d' % i + self.broker.put_container(name, next(self.ts), + 0, 0, 0, int(policy)) + # delete about half of the containers + for i in range(0, num_containers, 2): + name = 'test-container-%02d' % i + self.broker.put_container(name, 0, next(self.ts), + 0, 0, int(policy)) + + total_container_count = self.broker.get_info()['container_count'] + # trigger migration + policy_info = self.broker.get_policy_stats(do_migrations=True) + + self.assertEqual(total_container_count, num_containers / 2) + + self.assertEqual(len(policy_info), 1) + policy_container_count = sum(p['container_count'] for p in + policy_info.values()) + self.assertEqual(total_container_count, policy_container_count) + + def test_per_policy_cont_count_migration_impossible(self): + with patch_policies(legacy_only=True): + # add a container for the legacy policy + policy = POLICIES[0] + self.broker.put_container('test-legacy-container', next(self.ts), + 0, 0, 0, int(policy)) + + # now create an impossible situation by adding a container for a + # policy index that doesn't exist + non_existant_policy_index = int(policy) + 1 + self.broker.put_container('test-non-existant-policy', + next(self.ts), 0, 0, 0, + non_existant_policy_index) + + total_container_count = self.broker.get_info()['container_count'] + + # trigger migration + policy_info = self.broker.get_policy_stats(do_migrations=True) + + self.assertEqual(total_container_count, 2) + self.assertEqual(len(policy_info), 2) + for policy_stat in policy_info.values(): + self.assertEqual(policy_stat['container_count'], 1) diff --git a/test/unit/account/test_server.py b/test/unit/account/test_server.py index e515b3d221..c18c57edb1 100644 --- a/test/unit/account/test_server.py +++ b/test/unit/account/test_server.py @@ -1708,6 +1708,9 @@ class TestAccountController(unittest.TestCase): self.assertEquals( resp.headers['X-Account-Storage-Policy-%s-Bytes-Used' % POLICIES[0].name], '4') + self.assertEquals( + resp.headers['X-Account-Storage-Policy-%s-Container-Count' % + POLICIES[0].name], '1') def test_policy_stats_non_default(self): ts = itertools.count() @@ -1743,6 +1746,9 @@ class TestAccountController(unittest.TestCase): self.assertEquals( resp.headers['X-Account-Storage-Policy-%s-Bytes-Used' % policy.name], '4') + self.assertEquals( + resp.headers['X-Account-Storage-Policy-%s-Container-Count' % + policy.name], '1') def test_empty_policy_stats(self): ts = itertools.count() diff --git a/test/unit/account/test_utils.py b/test/unit/account/test_utils.py index 46f8835bff..ea90decfc9 100644 --- a/test/unit/account/test_utils.py +++ b/test/unit/account/test_utils.py @@ -117,9 +117,70 @@ class TestAccountUtils(unittest.TestCase): }) for policy in POLICIES: prefix = 'X-Account-Storage-Policy-%s-' % policy.name + expected[prefix + 'Container-Count'] = 1 expected[prefix + 'Object-Count'] = int(policy) expected[prefix + 'Bytes-Used'] = int(policy) * 10 resp_headers = utils.get_response_headers(broker) + per_policy_container_headers = [ + h for h in resp_headers if + h.lower().startswith('x-account-storage-policy-') and + h.lower().endswith('-container-count')] + self.assertTrue(per_policy_container_headers) + for key, value in resp_headers.items(): + expected_value = expected.pop(key) + self.assertEqual(expected_value, str(value), + 'value for %r was %r not %r' % ( + key, value, expected_value)) + self.assertFalse(expected) + + @patch_policies + def test_get_response_headers_with_legacy_data(self): + broker = backend.AccountBroker(':memory:', account='a') + now = time.time() + with mock.patch('time.time', new=lambda: now): + broker.initialize(Timestamp(now).internal) + # add some container data + ts = (Timestamp(t).internal for t in itertools.count(int(now))) + total_containers = 0 + total_objects = 0 + total_bytes = 0 + for policy in POLICIES: + delete_timestamp = ts.next() + put_timestamp = ts.next() + object_count = int(policy) + bytes_used = int(policy) * 10 + broker.put_container('c-%s' % policy.name, put_timestamp, + delete_timestamp, object_count, bytes_used, + int(policy)) + total_containers += 1 + total_objects += object_count + total_bytes += bytes_used + expected = HeaderKeyDict({ + 'X-Account-Container-Count': total_containers, + 'X-Account-Object-Count': total_objects, + 'X-Account-Bytes-Used': total_bytes, + 'X-Timestamp': Timestamp(now).normal, + 'X-PUT-Timestamp': Timestamp(now).normal, + }) + for policy in POLICIES: + prefix = 'X-Account-Storage-Policy-%s-' % policy.name + expected[prefix + 'Object-Count'] = int(policy) + expected[prefix + 'Bytes-Used'] = int(policy) * 10 + orig_policy_stats = broker.get_policy_stats + + def stub_policy_stats(*args, **kwargs): + policy_stats = orig_policy_stats(*args, **kwargs) + for stats in policy_stats.values(): + # legacy db's won't return container_count + del stats['container_count'] + return policy_stats + broker.get_policy_stats = stub_policy_stats + resp_headers = utils.get_response_headers(broker) + per_policy_container_headers = [ + h for h in resp_headers if + h.lower().startswith('x-account-storage-policy-') and + h.lower().endswith('-container-count')] + self.assertFalse(per_policy_container_headers) for key, value in resp_headers.items(): expected_value = expected.pop(key) self.assertEqual(expected_value, str(value),