Fix reclaim on deleted containers

The common db replicator's code path for reclaiming deleted db's beyond the
reclaim age was not covered by unittests, and a AttributeError snuck in.  In
writing the test that would cover the common code both for accounts and
containers I discovered another KeyError with the container conditional for
validating the container's fully reported status.

This fixes both those issues and adds additional tests for the cleanup empty
account container partition and suffix directories.

Change-Id: I2a1bfaefebd05b01231bf71dd908fcc49adb4c36
This commit is contained in:
Clay Gerrard 2014-12-03 16:54:43 -08:00
parent d40cebfe55
commit 233e0aebf7
5 changed files with 161 additions and 103 deletions

View File

@ -461,7 +461,7 @@ class Replicator(Daemon):
delete_timestamp > put_timestamp and \
info['count'] in (None, '', 0, '0'):
if self.report_up_to_date(info):
self.delete_db(object_file)
self.delete_db(broker)
self.logger.timing_since('timing', start_time)
return
responses = []

View File

@ -39,9 +39,14 @@ class ContainerReplicator(db_replicator.Replicator):
default_port = 6001
def report_up_to_date(self, full_info):
for key in ('put_timestamp', 'delete_timestamp', 'object_count',
'bytes_used'):
if full_info['reported_' + key] != full_info[key]:
reported_key_map = {
'reported_put_timestamp': 'put_timestamp',
'reported_delete_timestamp': 'delete_timestamp',
'reported_bytes_used': 'bytes_used',
'reported_object_count': 'count',
}
for reported, value_key in reported_key_map.items():
if full_info[reported] != full_info[value_key]:
return False
return True

View File

@ -187,17 +187,23 @@ class ExampleBroker(DatabaseBroker):
db_type = 'test'
db_contains_type = 'test'
db_reclaim_timestamp = 'created_at'
def _initialize(self, conn, put_timestamp, **kwargs):
if not self.account:
raise ValueError(
'Attempting to create a new database with no account set')
conn.executescript('''
CREATE TABLE test_stat (
account TEXT,
test_count INTEGER DEFAULT 0,
created_at TEXT,
put_timestamp TEXT DEFAULT '0',
delete_timestamp TEXT DEFAULT '0',
hash TEXT default '00000000000000000000000000000000',
id TEXT,
status_changed_at TEXT DEFAULT '0',
metadata TEXT DEFAULT ''
);
CREATE TABLE test (
ROWID INTEGER PRIMARY KEY AUTOINCREMENT,
@ -218,10 +224,10 @@ class ExampleBroker(DatabaseBroker):
''')
conn.execute("""
INSERT INTO test_stat (
created_at, put_timestamp, status_changed_at)
VALUES (?, ?, ?);
""", (Timestamp(time.time()).internal, put_timestamp,
put_timestamp))
account, created_at, id, put_timestamp, status_changed_at)
VALUES (?, ?, ?, ?, ?);
""", (self.account, Timestamp(time.time()).internal, str(uuid4()),
put_timestamp, put_timestamp))
def merge_items(self, item_list):
with self.get() as conn:
@ -268,6 +274,13 @@ class ExampleBroker(DatabaseBroker):
def delete_test(self, name, timestamp):
self._load_item(name, timestamp, 1)
def _delete_db(self, conn, timestamp):
conn.execute("""
UPDATE test_stat
SET delete_timestamp = ?,
status_changed_at = ?
WHERE delete_timestamp < ? """, (timestamp, timestamp, timestamp))
def _is_deleted(self, conn):
info = conn.execute('SELECT * FROM test_stat').fetchone()
return (info['test_count'] in (None, '', 0, '0')) and \
@ -284,10 +297,18 @@ class TestExampleBroker(unittest.TestCase):
broker_class = ExampleBroker
policy = 0
def setUp(self):
self.ts = (Timestamp(t).internal for t in
itertools.count(int(time.time())))
def test_delete_db(self):
broker = self.broker_class(':memory:', account='a', container='c')
broker.initialize(next(self.ts))
broker.delete_db(next(self.ts))
self.assertTrue(broker.is_deleted())
def test_merge_timestamps_simple_delete(self):
ts = (Timestamp(t).internal for t in
itertools.count(int(time.time())))
put_timestamp = ts.next()
put_timestamp = next(self.ts)
broker = self.broker_class(':memory:', account='a', container='c')
broker.initialize(put_timestamp)
created_at = broker.get_info()['created_at']
@ -298,7 +319,7 @@ class TestExampleBroker(unittest.TestCase):
self.assertEqual(info['delete_timestamp'], '0')
self.assertEqual(info['status_changed_at'], put_timestamp)
# delete
delete_timestamp = ts.next()
delete_timestamp = next(self.ts)
broker.merge_timestamps(created_at, put_timestamp, delete_timestamp)
self.assert_(broker.is_deleted())
info = broker.get_info()
@ -314,9 +335,7 @@ class TestExampleBroker(unittest.TestCase):
broker.delete_test('test', timestamp)
def test_merge_timestamps_delete_with_objects(self):
ts = (Timestamp(t).internal for t in
itertools.count(int(time.time())))
put_timestamp = ts.next()
put_timestamp = next(self.ts)
broker = self.broker_class(':memory:', account='a', container='c')
broker.initialize(put_timestamp, storage_policy_index=int(self.policy))
created_at = broker.get_info()['created_at']
@ -327,11 +346,11 @@ class TestExampleBroker(unittest.TestCase):
self.assertEqual(info['delete_timestamp'], '0')
self.assertEqual(info['status_changed_at'], put_timestamp)
# add object
self.put_item(broker, ts.next())
self.put_item(broker, next(self.ts))
self.assertEqual(broker.get_info()[
'%s_count' % broker.db_contains_type], 1)
# delete
delete_timestamp = ts.next()
delete_timestamp = next(self.ts)
broker.merge_timestamps(created_at, put_timestamp, delete_timestamp)
self.assertFalse(broker.is_deleted())
info = broker.get_info()
@ -341,20 +360,18 @@ class TestExampleBroker(unittest.TestCase):
# status is unchanged
self.assertEqual(info['status_changed_at'], put_timestamp)
# count is causing status to hold on
self.delete_item(broker, ts.next())
self.delete_item(broker, next(self.ts))
self.assertEqual(broker.get_info()[
'%s_count' % broker.db_contains_type], 0)
self.assert_(broker.is_deleted())
def test_merge_timestamps_simple_recreate(self):
ts = (Timestamp(t).internal for t in
itertools.count(int(time.time())))
put_timestamp = ts.next()
put_timestamp = next(self.ts)
broker = self.broker_class(':memory:', account='a', container='c')
broker.initialize(put_timestamp, storage_policy_index=int(self.policy))
virgin_status_changed_at = broker.get_info()['status_changed_at']
created_at = broker.get_info()['created_at']
delete_timestamp = ts.next()
delete_timestamp = next(self.ts)
broker.merge_timestamps(created_at, put_timestamp, delete_timestamp)
self.assert_(broker.is_deleted())
info = broker.get_info()
@ -365,7 +382,7 @@ class TestExampleBroker(unittest.TestCase):
self.assert_(orig_status_changed_at >
Timestamp(virgin_status_changed_at))
# recreate
recreate_timestamp = ts.next()
recreate_timestamp = next(self.ts)
status_changed_at = time.time()
with patch('swift.common.db.time.time', new=lambda: status_changed_at):
broker.merge_timestamps(created_at, recreate_timestamp, '0')
@ -377,14 +394,12 @@ class TestExampleBroker(unittest.TestCase):
self.assert_(info['status_changed_at'], status_changed_at)
def test_merge_timestamps_recreate_with_objects(self):
ts = (Timestamp(t).internal for t in
itertools.count(int(time.time())))
put_timestamp = ts.next()
put_timestamp = next(self.ts)
broker = self.broker_class(':memory:', account='a', container='c')
broker.initialize(put_timestamp, storage_policy_index=int(self.policy))
created_at = broker.get_info()['created_at']
# delete
delete_timestamp = ts.next()
delete_timestamp = next(self.ts)
broker.merge_timestamps(created_at, put_timestamp, delete_timestamp)
self.assert_(broker.is_deleted())
info = broker.get_info()
@ -395,12 +410,12 @@ class TestExampleBroker(unittest.TestCase):
self.assert_(Timestamp(orig_status_changed_at) >=
Timestamp(put_timestamp))
# add object
self.put_item(broker, ts.next())
self.put_item(broker, next(self.ts))
count_key = '%s_count' % broker.db_contains_type
self.assertEqual(broker.get_info()[count_key], 1)
self.assertFalse(broker.is_deleted())
# recreate
recreate_timestamp = ts.next()
recreate_timestamp = next(self.ts)
broker.merge_timestamps(created_at, recreate_timestamp, '0')
self.assertFalse(broker.is_deleted())
info = broker.get_info()
@ -409,34 +424,30 @@ class TestExampleBroker(unittest.TestCase):
self.assertEqual(info['delete_timestamp'], delete_timestamp)
self.assertEqual(info['status_changed_at'], orig_status_changed_at)
# count is not causing status to hold on
self.delete_item(broker, ts.next())
self.delete_item(broker, next(self.ts))
self.assertFalse(broker.is_deleted())
def test_merge_timestamps_update_put_no_status_change(self):
ts = (Timestamp(t).internal for t in
itertools.count(int(time.time())))
put_timestamp = ts.next()
put_timestamp = next(self.ts)
broker = self.broker_class(':memory:', account='a', container='c')
broker.initialize(put_timestamp, storage_policy_index=int(self.policy))
info = broker.get_info()
orig_status_changed_at = info['status_changed_at']
created_at = info['created_at']
new_put_timestamp = ts.next()
new_put_timestamp = next(self.ts)
broker.merge_timestamps(created_at, new_put_timestamp, '0')
info = broker.get_info()
self.assertEqual(new_put_timestamp, info['put_timestamp'])
self.assertEqual(orig_status_changed_at, info['status_changed_at'])
def test_merge_timestamps_update_delete_no_status_change(self):
ts = (Timestamp(t).internal for t in
itertools.count(int(time.time())))
put_timestamp = ts.next()
put_timestamp = next(self.ts)
broker = self.broker_class(':memory:', account='a', container='c')
broker.initialize(put_timestamp, storage_policy_index=int(self.policy))
created_at = broker.get_info()['created_at']
broker.merge_timestamps(created_at, put_timestamp, ts.next())
broker.merge_timestamps(created_at, put_timestamp, next(self.ts))
orig_status_changed_at = broker.get_info()['status_changed_at']
new_delete_timestamp = ts.next()
new_delete_timestamp = next(self.ts)
broker.merge_timestamps(created_at, put_timestamp,
new_delete_timestamp)
info = broker.get_info()
@ -444,16 +455,14 @@ class TestExampleBroker(unittest.TestCase):
self.assertEqual(orig_status_changed_at, info['status_changed_at'])
def test_get_max_row(self):
ts = (normalize_timestamp(t) for t in
itertools.count(int(time.time())))
broker = self.broker_class(':memory:', account='a', container='c')
broker.initialize(ts.next(), storage_policy_index=int(self.policy))
broker.initialize(next(self.ts), storage_policy_index=int(self.policy))
self.assertEquals(-1, broker.get_max_row())
self.put_item(broker, ts.next())
self.put_item(broker, next(self.ts))
self.assertEquals(1, broker.get_max_row())
self.delete_item(broker, ts.next())
self.delete_item(broker, next(self.ts))
self.assertEquals(2, broker.get_max_row())
self.put_item(broker, ts.next())
self.put_item(broker, next(self.ts))
self.assertEquals(3, broker.get_max_row())
def test_get_info(self):
@ -493,10 +502,8 @@ class TestExampleBroker(unittest.TestCase):
json.dumps(metadata))
def test_put_timestamp(self):
ts = (Timestamp(t).internal for t in
itertools.count(int(time.time())))
broker = self.broker_class(':memory:', account='a', container='c')
orig_put_timestamp = ts.next()
orig_put_timestamp = next(self.ts)
broker.initialize(orig_put_timestamp,
storage_policy_index=int(self.policy))
self.assertEqual(broker.get_info()['put_timestamp'],
@ -506,7 +513,7 @@ class TestExampleBroker(unittest.TestCase):
self.assertEqual(broker.get_info()['put_timestamp'],
orig_put_timestamp)
# put_timestamp newer - gets newer
newer_put_timestamp = ts.next()
newer_put_timestamp = next(self.ts)
broker.update_put_timestamp(newer_put_timestamp)
self.assertEqual(broker.get_info()['put_timestamp'],
newer_put_timestamp)
@ -516,10 +523,8 @@ class TestExampleBroker(unittest.TestCase):
newer_put_timestamp)
def test_status_changed_at(self):
ts = (Timestamp(t).internal for t in
itertools.count(int(time.time())))
broker = self.broker_class(':memory:', account='test', container='c')
put_timestamp = ts.next()
put_timestamp = next(self.ts)
created_at = time.time()
with patch('swift.common.db.time.time', new=lambda: created_at):
broker.initialize(put_timestamp,
@ -528,13 +533,13 @@ class TestExampleBroker(unittest.TestCase):
put_timestamp)
self.assertEquals(broker.get_info()['created_at'],
Timestamp(created_at).internal)
status_changed_at = ts.next()
status_changed_at = next(self.ts)
broker.update_status_changed_at(status_changed_at)
self.assertEqual(broker.get_info()['status_changed_at'],
status_changed_at)
# save the old and get a new status_changed_at
old_status_changed_at, status_changed_at = \
status_changed_at, ts.next()
status_changed_at, next(self.ts)
broker.update_status_changed_at(status_changed_at)
self.assertEqual(broker.get_info()['status_changed_at'],
status_changed_at)
@ -559,12 +564,11 @@ class TestExampleBroker(unittest.TestCase):
@with_tempdir
def test_commit_pending(self, tempdir):
ts = (Timestamp(t).internal for t in
itertools.count(int(time.time())))
broker = self.broker_class(os.path.join(tempdir, 'test.db'),
account='a', container='c')
broker.initialize(ts.next(), storage_policy_index=int(self.policy))
self.put_item(broker, ts.next())
broker.initialize(next(self.ts),
storage_policy_index=int(self.policy))
self.put_item(broker, next(self.ts))
qry = 'select * from %s_stat' % broker.db_type
with broker.get() as conn:
rows = [dict(x) for x in conn.execute(qry)]

View File

@ -34,6 +34,7 @@ from swift.common.exceptions import DriveNotMounted
from swift.common.swob import HTTPException
from test import unit
from test.unit.common.test_db import ExampleBroker
TEST_ACCOUNT_NAME = 'a c t'
@ -1297,11 +1298,19 @@ def attach_fake_replication_rpc(rpc, replicate_hook=None):
return FakeReplConnection
class ExampleReplicator(db_replicator.Replicator):
server_type = 'fake'
brokerclass = ExampleBroker
datadir = 'fake'
default_port = 1000
class TestReplicatorSync(unittest.TestCase):
backend = None # override in subclass
datadir = None
replicator_daemon = db_replicator.Replicator
# override in subclass
backend = ExampleReplicator.brokerclass
datadir = ExampleReplicator.datadir
replicator_daemon = ExampleReplicator
replicator_rpc = db_replicator.ReplicatorRpc
def setUp(self):
@ -1365,9 +1374,6 @@ class TestReplicatorSync(unittest.TestCase):
return daemon
def test_local_ids(self):
if self.datadir is None:
# base test case
return
for drive in ('sda', 'sdb', 'sdd'):
os.makedirs(os.path.join(self.root, drive, self.datadir))
for node in self._ring.devs:
@ -1378,5 +1384,46 @@ class TestReplicatorSync(unittest.TestCase):
self.assertEqual(daemon._local_device_ids,
set([node['id']]))
def test_clean_up_after_deleted_brokers(self):
broker = self._get_broker('a', 'c', node_index=0)
part, node = self._get_broker_part_node(broker)
part = str(part)
daemon = self._run_once(node)
# create a super old broker and delete it!
forever_ago = time.time() - daemon.reclaim_age
put_timestamp = normalize_timestamp(forever_ago - 2)
delete_timestamp = normalize_timestamp(forever_ago - 1)
broker.initialize(put_timestamp)
broker.delete_db(delete_timestamp)
# if we have a container broker make sure it's reported
if hasattr(broker, 'reported'):
info = broker.get_info()
broker.reported(info['put_timestamp'],
info['delete_timestamp'],
info['object_count'],
info['bytes_used'])
info = broker.get_replication_info()
self.assertTrue(daemon.report_up_to_date(info))
# we have a part dir
part_root = os.path.join(self.root, node['device'], self.datadir)
parts = os.listdir(part_root)
self.assertEqual([part], parts)
# with a single suffix
suff = os.listdir(os.path.join(part_root, part))
self.assertEqual(1, len(suff))
# running replicator will remove the deleted db
daemon = self._run_once(node, daemon=daemon)
self.assertEqual(1, daemon.stats['remove'])
# we still have a part dir (but it's empty)
suff = os.listdir(os.path.join(part_root, part))
self.assertEqual(0, len(suff))
# run it again and there's nothing to do...
daemon = self._run_once(node, daemon=daemon)
self.assertEqual(0, daemon.stats['attempted'])
# but empty part dir is cleaned up!
parts = os.listdir(part_root)
self.assertEqual(0, len(parts))
if __name__ == '__main__':
unittest.main()

View File

@ -34,44 +34,6 @@ from test.unit import patch_policies
from contextlib import contextmanager
class TestReplicator(unittest.TestCase):
def setUp(self):
self.orig_ring = replicator.db_replicator.ring.Ring
replicator.db_replicator.ring.Ring = lambda *args, **kwargs: None
def tearDown(self):
replicator.db_replicator.ring.Ring = self.orig_ring
def test_report_up_to_date(self):
repl = replicator.ContainerReplicator({})
info = {'put_timestamp': Timestamp(1).internal,
'delete_timestamp': Timestamp(0).internal,
'object_count': 0,
'bytes_used': 0,
'reported_put_timestamp': Timestamp(1).internal,
'reported_delete_timestamp': Timestamp(0).internal,
'reported_object_count': 0,
'reported_bytes_used': 0}
self.assertTrue(repl.report_up_to_date(info))
info['delete_timestamp'] = Timestamp(2).internal
self.assertFalse(repl.report_up_to_date(info))
info['reported_delete_timestamp'] = Timestamp(2).internal
self.assertTrue(repl.report_up_to_date(info))
info['object_count'] = 1
self.assertFalse(repl.report_up_to_date(info))
info['reported_object_count'] = 1
self.assertTrue(repl.report_up_to_date(info))
info['bytes_used'] = 1
self.assertFalse(repl.report_up_to_date(info))
info['reported_bytes_used'] = 1
self.assertTrue(repl.report_up_to_date(info))
info['put_timestamp'] = Timestamp(3).internal
self.assertFalse(repl.report_up_to_date(info))
info['reported_put_timestamp'] = Timestamp(3).internal
self.assertTrue(repl.report_up_to_date(info))
@patch_policies
class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
@ -80,6 +42,46 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
replicator_daemon = replicator.ContainerReplicator
replicator_rpc = replicator.ContainerReplicatorRpc
def test_report_up_to_date(self):
broker = self._get_broker('a', 'c', node_index=0)
broker.initialize(Timestamp(1).internal, int(POLICIES.default))
info = broker.get_info()
broker.reported(info['put_timestamp'],
info['delete_timestamp'],
info['object_count'],
info['bytes_used'])
full_info = broker.get_replication_info()
expected_info = {'put_timestamp': Timestamp(1).internal,
'delete_timestamp': '0',
'count': 0,
'bytes_used': 0,
'reported_put_timestamp': Timestamp(1).internal,
'reported_delete_timestamp': '0',
'reported_object_count': 0,
'reported_bytes_used': 0}
for key, value in expected_info.items():
msg = 'expected value for %r, %r != %r' % (
key, full_info[key], value)
self.assertEqual(full_info[key], value, msg)
repl = replicator.ContainerReplicator({})
self.assertTrue(repl.report_up_to_date(full_info))
full_info['delete_timestamp'] = Timestamp(2).internal
self.assertFalse(repl.report_up_to_date(full_info))
full_info['reported_delete_timestamp'] = Timestamp(2).internal
self.assertTrue(repl.report_up_to_date(full_info))
full_info['count'] = 1
self.assertFalse(repl.report_up_to_date(full_info))
full_info['reported_object_count'] = 1
self.assertTrue(repl.report_up_to_date(full_info))
full_info['bytes_used'] = 1
self.assertFalse(repl.report_up_to_date(full_info))
full_info['reported_bytes_used'] = 1
self.assertTrue(repl.report_up_to_date(full_info))
full_info['put_timestamp'] = Timestamp(3).internal
self.assertFalse(repl.report_up_to_date(full_info))
full_info['reported_put_timestamp'] = Timestamp(3).internal
self.assertTrue(repl.report_up_to_date(full_info))
def test_sync_remote_in_sync(self):
# setup a local container
broker = self._get_broker('a', 'c', node_index=0)