Add container_count to policy_stat table

Start tracking the container count per policy including reporting
it in account HEAD and supporting installations where the DB
existed before the updated schema.

Migration is triggered by the account audtior; if the database is
un-migrated it will continue to report policy_stats without the per
policy container_count keys.

Closes-Bug: #1367514
Change-Id: I07331cea177e19b3df303609a4ac510765a19162
This commit is contained in:
paul luse 2014-09-11 06:55:45 -07:00 committed by Darrell Bishop
parent 4f16239280
commit 15fbf9fe7c
7 changed files with 691 additions and 29 deletions

View File

@ -20,6 +20,7 @@ from random import random
import swift.common.db
from swift.account.backend import AccountBroker, DATADIR
from swift.common.exceptions import InvalidAccountInfo
from swift.common.utils import get_logger, audit_location_generator, \
config_true_value, dump_recon_cache, ratelimit_sleep
from swift.common.daemon import Daemon
@ -30,9 +31,9 @@ from eventlet import Timeout
class AccountAuditor(Daemon):
"""Audit accounts."""
def __init__(self, conf):
def __init__(self, conf, logger=None):
self.conf = conf
self.logger = get_logger(conf, log_route='account-auditor')
self.logger = logger or get_logger(conf, log_route='account-auditor')
self.devices = conf.get('devices', '/srv/node')
self.mount_check = config_true_value(conf.get('mount_check', 'true'))
self.interval = int(conf.get('interval', 1800))
@ -104,6 +105,29 @@ class AccountAuditor(Daemon):
dump_recon_cache({'account_auditor_pass_completed': elapsed},
self.rcache, self.logger)
def validate_per_policy_counts(self, broker):
info = broker.get_info()
policy_stats = broker.get_policy_stats(do_migrations=True)
policy_totals = {
'container_count': 0,
'object_count': 0,
'bytes_used': 0,
}
for policy_stat in policy_stats.values():
for key in policy_totals:
policy_totals[key] += policy_stat[key]
for key in policy_totals:
if policy_totals[key] == info[key]:
continue
raise InvalidAccountInfo(_(
'The total %(key)s for the container (%(total)s) does not '
'match the sum of %(key)s across policies (%(sum)s)') % {
'key': key,
'total': info[key],
'sum': policy_totals[key],
})
def account_audit(self, path):
"""
Audits the given account path
@ -114,10 +138,15 @@ class AccountAuditor(Daemon):
try:
broker = AccountBroker(path)
if not broker.is_deleted():
broker.get_info()
self.validate_per_policy_counts(broker)
self.logger.increment('passes')
self.account_passes += 1
self.logger.debug('Audit passed for %s' % broker)
except InvalidAccountInfo as e:
self.logger.increment('failures')
self.account_failures += 1
self.logger.error(
_('Audit Failed for %s: %s'), path, str(e))
except (Exception, Timeout):
self.logger.increment('failures')
self.account_failures += 1

View File

@ -32,17 +32,19 @@ POLICY_STAT_TRIGGER_SCRIPT = """
CREATE TRIGGER container_insert_ps AFTER INSERT ON container
BEGIN
INSERT OR IGNORE INTO policy_stat
(storage_policy_index, object_count, bytes_used)
VALUES (new.storage_policy_index, 0, 0);
(storage_policy_index, container_count, object_count, bytes_used)
VALUES (new.storage_policy_index, 0, 0, 0);
UPDATE policy_stat
SET object_count = object_count + new.object_count,
SET container_count = container_count + (1 - new.deleted),
object_count = object_count + new.object_count,
bytes_used = bytes_used + new.bytes_used
WHERE storage_policy_index = new.storage_policy_index;
END;
CREATE TRIGGER container_delete_ps AFTER DELETE ON container
BEGIN
UPDATE policy_stat
SET object_count = object_count - old.object_count,
SET container_count = container_count - (1 - old.deleted),
object_count = object_count - old.object_count,
bytes_used = bytes_used - old.bytes_used
WHERE storage_policy_index = old.storage_policy_index;
END;
@ -165,13 +167,15 @@ class AccountBroker(DatabaseBroker):
conn.executescript("""
CREATE TABLE policy_stat (
storage_policy_index INTEGER PRIMARY KEY,
container_count INTEGER DEFAULT 0,
object_count INTEGER DEFAULT 0,
bytes_used INTEGER DEFAULT 0
);
INSERT OR IGNORE INTO policy_stat (
storage_policy_index, object_count, bytes_used
storage_policy_index, container_count, object_count,
bytes_used
)
SELECT 0, object_count, bytes_used
SELECT 0, container_count, object_count, bytes_used
FROM account_stat
WHERE container_count > 0;
""")
@ -296,24 +300,45 @@ class AccountBroker(DatabaseBroker):
return row['status'] == "DELETED" or (
row['delete_timestamp'] > row['put_timestamp'])
def get_policy_stats(self):
def get_policy_stats(self, do_migrations=False):
"""
Get global policy stats for the account.
:param do_migrations: boolean, if True the policy stat dicts will
always include the 'container_count' key;
otherwise it may be ommited on legacy databases
until they are migrated.
:returns: dict of policy stats where the key is the policy index and
the value is a dictionary like {'object_count': M,
'bytes_used': N}
'bytes_used': N, 'container_count': L}
"""
info = []
columns = [
'storage_policy_index',
'container_count',
'object_count',
'bytes_used',
]
def run_query():
return (conn.execute('''
SELECT %s
FROM policy_stat
''' % ', '.join(columns)).fetchall())
self._commit_puts_stale_ok()
info = []
with self.get() as conn:
try:
info = (conn.execute('''
SELECT storage_policy_index, object_count, bytes_used
FROM policy_stat
''').fetchall())
info = run_query()
except sqlite3.OperationalError as err:
if "no such table: policy_stat" not in str(err):
if "no such column: container_count" in str(err):
if do_migrations:
self._migrate_add_container_count(conn)
else:
columns.remove('container_count')
info = run_query()
elif "no such table: policy_stat" not in str(err):
raise
policy_stats = {}
@ -501,10 +526,72 @@ class AccountBroker(DatabaseBroker):
self._migrate_add_storage_policy_index(conn)
_really_merge_items(conn)
def _migrate_add_container_count(self, conn):
"""
Add the container_count column to the 'policy_stat' table and
update it
:param conn: DB connection object
"""
# add the container_count column
curs = conn.cursor()
curs.executescript('''
DROP TRIGGER container_delete_ps;
DROP TRIGGER container_insert_ps;
ALTER TABLE policy_stat
ADD COLUMN container_count INTEGER DEFAULT 0;
''' + POLICY_STAT_TRIGGER_SCRIPT)
# keep the simple case simple, if there's only one entry in the
# policy_stat table we just copy the total container count from the
# account_stat table
# if that triggers an update then the where changes <> 0 *would* exist
# and the insert or replace from the count subqueries won't execute
curs.executescript("""
UPDATE policy_stat
SET container_count = (
SELECT container_count
FROM account_stat)
WHERE (
SELECT COUNT(storage_policy_index)
FROM policy_stat
) <= 1;
INSERT OR REPLACE INTO policy_stat (
storage_policy_index,
container_count,
object_count,
bytes_used
)
SELECT p.storage_policy_index,
c.count,
p.object_count,
p.bytes_used
FROM (
SELECT storage_policy_index,
COUNT(*) as count
FROM container
WHERE deleted = 0
GROUP BY storage_policy_index
) c
JOIN policy_stat p
ON p.storage_policy_index = c.storage_policy_index
WHERE NOT EXISTS(
SELECT changes() as change
FROM policy_stat
WHERE change <> 0
);
""")
conn.commit()
def _migrate_add_storage_policy_index(self, conn):
"""
Add the storage_policy_index column to the 'container' table and
set up triggers, creating the policy_stat table if needed.
:param conn: DB connection object
"""
try:
self.create_policy_stat_table(conn)

View File

@ -79,6 +79,10 @@ class DeviceUnavailable(SwiftException):
pass
class InvalidAccountInfo(SwiftException):
pass
class PathNotDir(OSError):
pass

View File

@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import defaultdict
import itertools
import unittest
import mock
import time
@ -23,7 +25,11 @@ from shutil import rmtree
from eventlet import Timeout
from swift.account import auditor
from test.unit import FakeLogger
from swift.common.storage_policy import POLICIES
from swift.common.utils import Timestamp
from test.unit import debug_logger, patch_policies, with_tempdir
from test.unit.account.test_backend import (
AccountBrokerPreTrackContainerCountSetup)
class FakeAccountBroker(object):
@ -37,16 +43,22 @@ class FakeAccountBroker(object):
def get_info(self):
if self.file.startswith('fail'):
raise ValueError
raise ValueError()
if self.file.startswith('true'):
return 'ok'
return defaultdict(int)
def get_policy_stats(self, **kwargs):
if self.file.startswith('fail'):
raise ValueError()
if self.file.startswith('true'):
return defaultdict(int)
class TestAuditor(unittest.TestCase):
def setUp(self):
self.testdir = os.path.join(mkdtemp(), 'tmp_test_account_auditor')
self.logger = FakeLogger()
self.logger = debug_logger()
rmtree(self.testdir, ignore_errors=1)
os.mkdir(self.testdir)
fnames = ['true1.db', 'true2.db', 'true3.db',
@ -69,9 +81,7 @@ class TestAuditor(unittest.TestCase):
def sleep(self, sec):
self.times += 1
if self.times < sleep_times:
time.sleep(0.1)
else:
if self.times >= sleep_times:
# stop forever by an error
raise ValueError()
@ -79,7 +89,7 @@ class TestAuditor(unittest.TestCase):
return time.time()
conf = {}
test_auditor = auditor.AccountAuditor(conf)
test_auditor = auditor.AccountAuditor(conf, logger=self.logger)
with mock.patch('swift.account.auditor.time', FakeTime()):
def fake_audit_location_generator(*args, **kwargs):
@ -106,7 +116,7 @@ class TestAuditor(unittest.TestCase):
@mock.patch('swift.account.auditor.AccountBroker', FakeAccountBroker)
def test_run_once(self):
conf = {}
test_auditor = auditor.AccountAuditor(conf)
test_auditor = auditor.AccountAuditor(conf, logger=self.logger)
def fake_audit_location_generator(*args, **kwargs):
files = os.listdir(self.testdir)
@ -121,7 +131,7 @@ class TestAuditor(unittest.TestCase):
@mock.patch('swift.account.auditor.AccountBroker', FakeAccountBroker)
def test_one_audit_pass(self):
conf = {}
test_auditor = auditor.AccountAuditor(conf)
test_auditor = auditor.AccountAuditor(conf, logger=self.logger)
def fake_audit_location_generator(*args, **kwargs):
files = os.listdir(self.testdir)
@ -138,7 +148,7 @@ class TestAuditor(unittest.TestCase):
@mock.patch('swift.account.auditor.AccountBroker', FakeAccountBroker)
def test_account_auditor(self):
conf = {}
test_auditor = auditor.AccountAuditor(conf)
test_auditor = auditor.AccountAuditor(conf, logger=self.logger)
files = os.listdir(self.testdir)
for f in files:
path = os.path.join(self.testdir, f)
@ -146,5 +156,108 @@ class TestAuditor(unittest.TestCase):
self.assertEqual(test_auditor.account_failures, 2)
self.assertEqual(test_auditor.account_passes, 3)
@patch_policies
class TestAuditorRealBrokerMigration(
AccountBrokerPreTrackContainerCountSetup, unittest.TestCase):
def test_db_migration(self):
# add a few containers
policies = itertools.cycle(POLICIES)
num_containers = len(POLICIES) * 3
per_policy_container_counts = defaultdict(int)
for i in range(num_containers):
name = 'test-container-%02d' % i
policy = next(policies)
self.broker.put_container(name, next(self.ts),
0, 0, 0, int(policy))
per_policy_container_counts[int(policy)] += 1
self.broker._commit_puts()
self.assertEqual(num_containers,
self.broker.get_info()['container_count'])
# still un-migrated
self.assertUnmigrated(self.broker)
# run auditor, and validate migration
conf = {'devices': self.tempdir, 'mount_check': False,
'recon_cache_path': self.tempdir}
test_auditor = auditor.AccountAuditor(conf, logger=debug_logger())
test_auditor.run_once()
self.restore_account_broker()
broker = auditor.AccountBroker(self.db_path)
# go after rows directly to avoid unintentional migration
with broker.get() as conn:
rows = conn.execute('''
SELECT storage_policy_index, container_count
FROM policy_stat
''').fetchall()
for policy_index, container_count in rows:
self.assertEqual(container_count,
per_policy_container_counts[policy_index])
class TestAuditorRealBroker(unittest.TestCase):
def setUp(self):
self.logger = debug_logger()
@with_tempdir
def test_db_validate_fails(self, tempdir):
ts = (Timestamp(t).internal for t in itertools.count(int(time.time())))
db_path = os.path.join(tempdir, 'sda', 'accounts',
'0', '0', '0', 'test.db')
broker = auditor.AccountBroker(db_path, account='a')
broker.initialize(next(ts))
# add a few containers
policies = itertools.cycle(POLICIES)
num_containers = len(POLICIES) * 3
per_policy_container_counts = defaultdict(int)
for i in range(num_containers):
name = 'test-container-%02d' % i
policy = next(policies)
broker.put_container(name, next(ts), 0, 0, 0, int(policy))
per_policy_container_counts[int(policy)] += 1
broker._commit_puts()
self.assertEqual(broker.get_info()['container_count'], num_containers)
messed_up_policy = random.choice(list(POLICIES))
# now mess up a policy_stats table count
with broker.get() as conn:
conn.executescript('''
UPDATE policy_stat
SET container_count = container_count - 1
WHERE storage_policy_index = %d;
''' % int(messed_up_policy))
# validate it's messed up
policy_stats = broker.get_policy_stats()
self.assertEqual(
policy_stats[int(messed_up_policy)]['container_count'],
per_policy_container_counts[int(messed_up_policy)] - 1)
# do an audit
conf = {'devices': tempdir, 'mount_check': False,
'recon_cache_path': tempdir}
test_auditor = auditor.AccountAuditor(conf, logger=self.logger)
test_auditor.run_once()
# validate errors
self.assertEqual(test_auditor.account_failures, 1)
error_lines = test_auditor.logger.get_lines_for_level('error')
self.assertEqual(len(error_lines), 1)
error_message = error_lines[0]
self.assert_(broker.db_file in error_message)
self.assert_('container_count' in error_message)
self.assert_('does not match' in error_message)
self.assertEqual(test_auditor.logger.get_increment_counts(),
{'failures': 1})
if __name__ == '__main__':
unittest.main()

View File

@ -15,6 +15,7 @@
""" Tests for swift.account.backend """
from collections import defaultdict
import hashlib
import unittest
import pickle
@ -627,9 +628,10 @@ class TestAccountBroker(unittest.TestCase):
put_timestamp, 0,
0, 0,
policy.idx)
policy_stats = broker.get_policy_stats()
stats = policy_stats[policy.idx]
if 'container_count' in stats:
self.assertEqual(stats['container_count'], 1)
self.assertEqual(stats['object_count'], 0)
self.assertEqual(stats['bytes_used'], 0)
@ -645,6 +647,8 @@ class TestAccountBroker(unittest.TestCase):
policy_stats = broker.get_policy_stats()
stats = policy_stats[policy.idx]
if 'container_count' in stats:
self.assertEqual(stats['container_count'], 1)
self.assertEqual(stats['object_count'], count)
self.assertEqual(stats['bytes_used'], count)
@ -652,6 +656,8 @@ class TestAccountBroker(unittest.TestCase):
for policy_index, stats in policy_stats.items():
policy = POLICIES[policy_index]
count = policy.idx * 100 # coupled with policy for test
if 'container_count' in stats:
self.assertEqual(stats['container_count'], 1)
self.assertEqual(stats['object_count'], count)
self.assertEqual(stats['bytes_used'], count)
@ -666,6 +672,8 @@ class TestAccountBroker(unittest.TestCase):
policy_stats = broker.get_policy_stats()
stats = policy_stats[policy.idx]
if 'container_count' in stats:
self.assertEqual(stats['container_count'], 0)
self.assertEqual(stats['object_count'], 0)
self.assertEqual(stats['bytes_used'], 0)
@ -685,8 +693,12 @@ class TestAccountBroker(unittest.TestCase):
stats = broker.get_policy_stats()
self.assertEqual(len(stats), 2)
if 'container_count' in stats[0]:
self.assertEqual(stats[0]['container_count'], 1)
self.assertEqual(stats[0]['object_count'], 13)
self.assertEqual(stats[0]['bytes_used'], 8156441)
if 'container_count' in stats[1]:
self.assertEqual(stats[1]['container_count'], 1)
self.assertEqual(stats[1]['object_count'], 8)
self.assertEqual(stats[1]['bytes_used'], 6085379)
@ -990,8 +1002,12 @@ class TestAccountBrokerBeforeSPI(TestAccountBroker):
# we should have stats for both containers
stats = broker.get_policy_stats()
self.assertEqual(len(stats), 2)
if 'container_count' in stats[0]:
self.assertEqual(stats[0]['container_count'], 1)
self.assertEqual(stats[0]['object_count'], 1)
self.assertEqual(stats[0]['bytes_used'], 2)
if 'container_count' in stats[1]:
self.assertEqual(stats[1]['container_count'], 1)
self.assertEqual(stats[1]['object_count'], 3)
self.assertEqual(stats[1]['bytes_used'], 4)
@ -1003,8 +1019,12 @@ class TestAccountBrokerBeforeSPI(TestAccountBroker):
conn.commit()
stats = broker.get_policy_stats()
self.assertEqual(len(stats), 2)
if 'container_count' in stats[0]:
self.assertEqual(stats[0]['container_count'], 0)
self.assertEqual(stats[0]['object_count'], 0)
self.assertEqual(stats[0]['bytes_used'], 0)
if 'container_count' in stats[1]:
self.assertEqual(stats[1]['container_count'], 1)
self.assertEqual(stats[1]['object_count'], 3)
self.assertEqual(stats[1]['bytes_used'], 4)
@ -1070,3 +1090,345 @@ class TestAccountBrokerBeforeSPI(TestAccountBroker):
with broker.get() as conn:
conn.execute('SELECT * FROM policy_stat')
conn.execute('SELECT storage_policy_index FROM container')
def pre_track_containers_create_policy_stat(self, conn):
"""
Copied from AccountBroker before the container_count column was
added.
Create policy_stat table which is specific to the account DB.
Not a part of Pluggable Back-ends, internal to the baseline code.
:param conn: DB connection object
"""
conn.executescript("""
CREATE TABLE policy_stat (
storage_policy_index INTEGER PRIMARY KEY,
object_count INTEGER DEFAULT 0,
bytes_used INTEGER DEFAULT 0
);
INSERT OR IGNORE INTO policy_stat (
storage_policy_index, object_count, bytes_used
)
SELECT 0, object_count, bytes_used
FROM account_stat
WHERE container_count > 0;
""")
def pre_track_containers_create_container_table(self, conn):
"""
Copied from AccountBroker before the container_count column was
added (using old stat trigger script)
Create container table which is specific to the account DB.
:param conn: DB connection object
"""
# revert to old trigger script to support one of the tests
OLD_POLICY_STAT_TRIGGER_SCRIPT = """
CREATE TRIGGER container_insert_ps AFTER INSERT ON container
BEGIN
INSERT OR IGNORE INTO policy_stat
(storage_policy_index, object_count, bytes_used)
VALUES (new.storage_policy_index, 0, 0);
UPDATE policy_stat
SET object_count = object_count + new.object_count,
bytes_used = bytes_used + new.bytes_used
WHERE storage_policy_index = new.storage_policy_index;
END;
CREATE TRIGGER container_delete_ps AFTER DELETE ON container
BEGIN
UPDATE policy_stat
SET object_count = object_count - old.object_count,
bytes_used = bytes_used - old.bytes_used
WHERE storage_policy_index = old.storage_policy_index;
END;
"""
conn.executescript("""
CREATE TABLE container (
ROWID INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
put_timestamp TEXT,
delete_timestamp TEXT,
object_count INTEGER,
bytes_used INTEGER,
deleted INTEGER DEFAULT 0,
storage_policy_index INTEGER DEFAULT 0
);
CREATE INDEX ix_container_deleted_name ON
container (deleted, name);
CREATE TRIGGER container_insert AFTER INSERT ON container
BEGIN
UPDATE account_stat
SET container_count = container_count + (1 - new.deleted),
object_count = object_count + new.object_count,
bytes_used = bytes_used + new.bytes_used,
hash = chexor(hash, new.name,
new.put_timestamp || '-' ||
new.delete_timestamp || '-' ||
new.object_count || '-' || new.bytes_used);
END;
CREATE TRIGGER container_update BEFORE UPDATE ON container
BEGIN
SELECT RAISE(FAIL, 'UPDATE not allowed; DELETE and INSERT');
END;
CREATE TRIGGER container_delete AFTER DELETE ON container
BEGIN
UPDATE account_stat
SET container_count = container_count - (1 - old.deleted),
object_count = object_count - old.object_count,
bytes_used = bytes_used - old.bytes_used,
hash = chexor(hash, old.name,
old.put_timestamp || '-' ||
old.delete_timestamp || '-' ||
old.object_count || '-' || old.bytes_used);
END;
""" + OLD_POLICY_STAT_TRIGGER_SCRIPT)
class AccountBrokerPreTrackContainerCountSetup(object):
def assertUnmigrated(self, broker):
with broker.get() as conn:
try:
conn.execute('''
SELECT container_count FROM policy_stat
''').fetchone()[0]
except sqlite3.OperationalError as err:
# confirm that the column really isn't there
self.assert_('no such column: container_count' in str(err))
else:
self.fail('broker did not raise sqlite3.OperationalError '
'trying to select container_count from policy_stat!')
def setUp(self):
# use old version of policy_stat
self._imported_create_policy_stat_table = \
AccountBroker.create_policy_stat_table
AccountBroker.create_policy_stat_table = \
pre_track_containers_create_policy_stat
# use old container table so we use old trigger for
# updating policy_stat
self._imported_create_container_table = \
AccountBroker.create_container_table
AccountBroker.create_container_table = \
pre_track_containers_create_container_table
broker = AccountBroker(':memory:', account='a')
broker.initialize(Timestamp('1').internal)
self.assertUnmigrated(broker)
self.tempdir = mkdtemp()
self.ts = (Timestamp(t).internal for t in itertools.count(int(time())))
self.db_path = os.path.join(self.tempdir, 'sda', 'accounts',
'0', '0', '0', 'test.db')
self.broker = AccountBroker(self.db_path, account='a')
self.broker.initialize(next(self.ts))
# Common sanity-check that our starting, pre-migration state correctly
# does not have the container_count column.
self.assertUnmigrated(self.broker)
def tearDown(self):
rmtree(self.tempdir, ignore_errors=True)
self.restore_account_broker()
broker = AccountBroker(':memory:', account='a')
broker.initialize(Timestamp('1').internal)
with broker.get() as conn:
conn.execute('SELECT container_count FROM policy_stat')
def restore_account_broker(self):
AccountBroker.create_policy_stat_table = \
self._imported_create_policy_stat_table
AccountBroker.create_container_table = \
self._imported_create_container_table
@patch_policies([StoragePolicy(0, 'zero', False),
StoragePolicy(1, 'one', True),
StoragePolicy(2, 'two', False),
StoragePolicy(3, 'three', False)])
class TestAccountBrokerBeforePerPolicyContainerTrack(
AccountBrokerPreTrackContainerCountSetup, TestAccountBroker):
"""
Tests for AccountBroker against databases created before
the container_count column was added to the policy_stat table.
"""
def test_policy_table_cont_count_do_migrations(self):
# add a few containers
num_containers = 8
policies = itertools.cycle(POLICIES)
per_policy_container_counts = defaultdict(int)
# add a few container entries
for i in range(num_containers):
name = 'test-container-%02d' % i
policy = next(policies)
self.broker.put_container(name, next(self.ts),
0, 0, 0, int(policy))
per_policy_container_counts[int(policy)] += 1
total_container_count = self.broker.get_info()['container_count']
self.assertEqual(total_container_count, num_containers)
# still un-migrated
self.assertUnmigrated(self.broker)
policy_stats = self.broker.get_policy_stats()
self.assertEqual(len(policy_stats), len(per_policy_container_counts))
for stats in policy_stats.values():
self.assertEqual(stats['object_count'], 0)
self.assertEqual(stats['bytes_used'], 0)
# un-migrated dbs should not return container_count
self.assertFalse('container_count' in stats)
# now force the migration
policy_stats = self.broker.get_policy_stats(do_migrations=True)
self.assertEqual(len(policy_stats), len(per_policy_container_counts))
for policy_index, stats in policy_stats.items():
self.assertEqual(stats['object_count'], 0)
self.assertEqual(stats['bytes_used'], 0)
self.assertEqual(stats['container_count'],
per_policy_container_counts[policy_index])
def test_policy_table_cont_count_update_get_stats(self):
# add a few container entries
for policy in POLICIES:
for i in range(0, policy.idx + 1):
container_name = 'c%s_0' % policy.idx
self.broker.put_container('c%s_%s' % (policy.idx, i),
0, 0, 0, 0, policy.idx)
# _commit_puts_stale_ok() called by get_policy_stats()
# calling get_policy_stats() with do_migrations will alter the table
# and populate it based on what's in the container table now
stats = self.broker.get_policy_stats(do_migrations=True)
# now confirm that the column was created
with self.broker.get() as conn:
conn.execute('SELECT container_count FROM policy_stat')
# confirm stats reporting back correctly
self.assertEqual(len(stats), 4)
for policy in POLICIES:
self.assertEqual(stats[policy.idx]['container_count'],
policy.idx + 1)
# now delete one from each policy and check the stats
with self.broker.get() as conn:
for policy in POLICIES:
container_name = 'c%s_0' % policy.idx
conn.execute('''
DELETE FROM container
WHERE name = ?
''', (container_name,))
conn.commit()
stats = self.broker.get_policy_stats()
self.assertEqual(len(stats), 4)
for policy in POLICIES:
self.assertEqual(stats[policy.idx]['container_count'],
policy.idx)
# now put them back and make sure things are still cool
for policy in POLICIES:
container_name = 'c%s_0' % policy.idx
self.broker.put_container(container_name, 0, 0, 0, 0, policy.idx)
# _commit_puts_stale_ok() called by get_policy_stats()
# confirm stats reporting back correctly
stats = self.broker.get_policy_stats()
self.assertEqual(len(stats), 4)
for policy in POLICIES:
self.assertEqual(stats[policy.idx]['container_count'],
policy.idx + 1)
def test_per_policy_cont_count_migration_with_deleted(self):
num_containers = 15
policies = itertools.cycle(POLICIES)
container_policy_map = {}
# add a few container entries
for i in range(num_containers):
name = 'test-container-%02d' % i
policy = next(policies)
self.broker.put_container(name, next(self.ts),
0, 0, 0, int(policy))
# keep track of stub container policies
container_policy_map[name] = policy
# delete about half of the containers
for i in range(0, num_containers, 2):
name = 'test-container-%02d' % i
policy = container_policy_map[name]
self.broker.put_container(name, 0, next(self.ts),
0, 0, int(policy))
total_container_count = self.broker.get_info()['container_count']
self.assertEqual(total_container_count, num_containers / 2)
# trigger migration
policy_info = self.broker.get_policy_stats(do_migrations=True)
self.assertEqual(len(policy_info), min(num_containers, len(POLICIES)))
policy_container_count = sum(p['container_count'] for p in
policy_info.values())
self.assertEqual(total_container_count, policy_container_count)
def test_per_policy_cont_count_migration_with_single_policy(self):
num_containers = 100
with patch_policies(legacy_only=True):
policy = POLICIES[0]
# add a few container entries
for i in range(num_containers):
name = 'test-container-%02d' % i
self.broker.put_container(name, next(self.ts),
0, 0, 0, int(policy))
# delete about half of the containers
for i in range(0, num_containers, 2):
name = 'test-container-%02d' % i
self.broker.put_container(name, 0, next(self.ts),
0, 0, int(policy))
total_container_count = self.broker.get_info()['container_count']
# trigger migration
policy_info = self.broker.get_policy_stats(do_migrations=True)
self.assertEqual(total_container_count, num_containers / 2)
self.assertEqual(len(policy_info), 1)
policy_container_count = sum(p['container_count'] for p in
policy_info.values())
self.assertEqual(total_container_count, policy_container_count)
def test_per_policy_cont_count_migration_impossible(self):
with patch_policies(legacy_only=True):
# add a container for the legacy policy
policy = POLICIES[0]
self.broker.put_container('test-legacy-container', next(self.ts),
0, 0, 0, int(policy))
# now create an impossible situation by adding a container for a
# policy index that doesn't exist
non_existant_policy_index = int(policy) + 1
self.broker.put_container('test-non-existant-policy',
next(self.ts), 0, 0, 0,
non_existant_policy_index)
total_container_count = self.broker.get_info()['container_count']
# trigger migration
policy_info = self.broker.get_policy_stats(do_migrations=True)
self.assertEqual(total_container_count, 2)
self.assertEqual(len(policy_info), 2)
for policy_stat in policy_info.values():
self.assertEqual(policy_stat['container_count'], 1)

View File

@ -1708,6 +1708,9 @@ class TestAccountController(unittest.TestCase):
self.assertEquals(
resp.headers['X-Account-Storage-Policy-%s-Bytes-Used' %
POLICIES[0].name], '4')
self.assertEquals(
resp.headers['X-Account-Storage-Policy-%s-Container-Count' %
POLICIES[0].name], '1')
def test_policy_stats_non_default(self):
ts = itertools.count()
@ -1743,6 +1746,9 @@ class TestAccountController(unittest.TestCase):
self.assertEquals(
resp.headers['X-Account-Storage-Policy-%s-Bytes-Used' %
policy.name], '4')
self.assertEquals(
resp.headers['X-Account-Storage-Policy-%s-Container-Count' %
policy.name], '1')
def test_empty_policy_stats(self):
ts = itertools.count()

View File

@ -117,9 +117,70 @@ class TestAccountUtils(unittest.TestCase):
})
for policy in POLICIES:
prefix = 'X-Account-Storage-Policy-%s-' % policy.name
expected[prefix + 'Container-Count'] = 1
expected[prefix + 'Object-Count'] = int(policy)
expected[prefix + 'Bytes-Used'] = int(policy) * 10
resp_headers = utils.get_response_headers(broker)
per_policy_container_headers = [
h for h in resp_headers if
h.lower().startswith('x-account-storage-policy-') and
h.lower().endswith('-container-count')]
self.assertTrue(per_policy_container_headers)
for key, value in resp_headers.items():
expected_value = expected.pop(key)
self.assertEqual(expected_value, str(value),
'value for %r was %r not %r' % (
key, value, expected_value))
self.assertFalse(expected)
@patch_policies
def test_get_response_headers_with_legacy_data(self):
broker = backend.AccountBroker(':memory:', account='a')
now = time.time()
with mock.patch('time.time', new=lambda: now):
broker.initialize(Timestamp(now).internal)
# add some container data
ts = (Timestamp(t).internal for t in itertools.count(int(now)))
total_containers = 0
total_objects = 0
total_bytes = 0
for policy in POLICIES:
delete_timestamp = ts.next()
put_timestamp = ts.next()
object_count = int(policy)
bytes_used = int(policy) * 10
broker.put_container('c-%s' % policy.name, put_timestamp,
delete_timestamp, object_count, bytes_used,
int(policy))
total_containers += 1
total_objects += object_count
total_bytes += bytes_used
expected = HeaderKeyDict({
'X-Account-Container-Count': total_containers,
'X-Account-Object-Count': total_objects,
'X-Account-Bytes-Used': total_bytes,
'X-Timestamp': Timestamp(now).normal,
'X-PUT-Timestamp': Timestamp(now).normal,
})
for policy in POLICIES:
prefix = 'X-Account-Storage-Policy-%s-' % policy.name
expected[prefix + 'Object-Count'] = int(policy)
expected[prefix + 'Bytes-Used'] = int(policy) * 10
orig_policy_stats = broker.get_policy_stats
def stub_policy_stats(*args, **kwargs):
policy_stats = orig_policy_stats(*args, **kwargs)
for stats in policy_stats.values():
# legacy db's won't return container_count
del stats['container_count']
return policy_stats
broker.get_policy_stats = stub_policy_stats
resp_headers = utils.get_response_headers(broker)
per_policy_container_headers = [
h for h in resp_headers if
h.lower().startswith('x-account-storage-policy-') and
h.lower().endswith('-container-count')]
self.assertFalse(per_policy_container_headers)
for key, value in resp_headers.items():
expected_value = expected.pop(key)
self.assertEqual(expected_value, str(value),