Breakup reclaim into batches
We want to do the table scan without locking and group the locking deletes into small indexed operations to minimize the impact of background processes calling reclaim each cycle. Change-Id: I3ccd145c14a9b68ff8a9da61f79034549c9bc127 Co-Authored-By: Tim Burke <tim.burke@gmail.com> Closes-Bug: #1877651
This commit is contained in:
parent
70dede1ae9
commit
aab45880f8
@ -53,6 +53,9 @@ PICKLE_PROTOCOL = 2
|
||||
# records will be merged.
|
||||
PENDING_CAP = 131072
|
||||
|
||||
SQLITE_ARG_LIMIT = 999
|
||||
RECLAIM_PAGE_SIZE = 10000
|
||||
|
||||
|
||||
def utf8encode(*args):
|
||||
return [(s.encode('utf8') if isinstance(s, six.text_type) else s)
|
||||
@ -981,16 +984,48 @@ class DatabaseBroker(object):
|
||||
with lock_parent_directory(self.pending_file,
|
||||
self.pending_timeout):
|
||||
self._commit_puts()
|
||||
with self.get() as conn:
|
||||
self._reclaim(conn, age_timestamp, sync_timestamp)
|
||||
self._reclaim_metadata(conn, age_timestamp)
|
||||
conn.commit()
|
||||
marker = ''
|
||||
finished = False
|
||||
while not finished:
|
||||
with self.get() as conn:
|
||||
marker = self._reclaim(conn, age_timestamp, marker)
|
||||
if not marker:
|
||||
finished = True
|
||||
self._reclaim_other_stuff(
|
||||
conn, age_timestamp, sync_timestamp)
|
||||
conn.commit()
|
||||
|
||||
def _reclaim(self, conn, age_timestamp, sync_timestamp):
|
||||
conn.execute('''
|
||||
DELETE FROM %s WHERE deleted = 1 AND %s < ?
|
||||
''' % (self.db_contains_type, self.db_reclaim_timestamp),
|
||||
(age_timestamp,))
|
||||
def _reclaim_other_stuff(self, conn, age_timestamp, sync_timestamp):
|
||||
"""
|
||||
This is only called once at the end of reclaim after _reclaim has been
|
||||
called for each page.
|
||||
"""
|
||||
self._reclaim_sync(conn, sync_timestamp)
|
||||
self._reclaim_metadata(conn, age_timestamp)
|
||||
|
||||
def _reclaim(self, conn, age_timestamp, marker):
|
||||
clean_batch_qry = '''
|
||||
DELETE FROM %s WHERE deleted = 1
|
||||
AND name > ? AND %s < ?
|
||||
''' % (self.db_contains_type, self.db_reclaim_timestamp)
|
||||
curs = conn.execute('''
|
||||
SELECT name FROM %s WHERE deleted = 1
|
||||
AND name > ?
|
||||
ORDER BY NAME LIMIT 1 OFFSET ?
|
||||
''' % (self.db_contains_type,), (marker, RECLAIM_PAGE_SIZE))
|
||||
row = curs.fetchone()
|
||||
if row:
|
||||
# do a single book-ended DELETE and bounce out
|
||||
end_marker = row[0]
|
||||
conn.execute(clean_batch_qry + ' AND name <= ?', (
|
||||
marker, age_timestamp, end_marker))
|
||||
else:
|
||||
# delete off the end and reset marker to indicate we're done
|
||||
end_marker = ''
|
||||
conn.execute(clean_batch_qry, (marker, age_timestamp))
|
||||
return end_marker
|
||||
|
||||
def _reclaim_sync(self, conn, sync_timestamp):
|
||||
try:
|
||||
conn.execute('''
|
||||
DELETE FROM outgoing_sync WHERE updated_at < ?
|
||||
|
@ -34,9 +34,7 @@ from swift.common.utils import Timestamp, encode_timestamps, \
|
||||
get_db_files, parse_db_filename, make_db_file_path, split_path, \
|
||||
RESERVED_BYTE
|
||||
from swift.common.db import DatabaseBroker, utf8encode, BROKER_TIMEOUT, \
|
||||
zero_like, DatabaseAlreadyExists
|
||||
|
||||
SQLITE_ARG_LIMIT = 999
|
||||
zero_like, DatabaseAlreadyExists, SQLITE_ARG_LIMIT
|
||||
|
||||
DATADIR = 'containers'
|
||||
|
||||
@ -1581,9 +1579,9 @@ class ContainerBroker(DatabaseBroker):
|
||||
CONTAINER_STAT_VIEW_SCRIPT +
|
||||
'COMMIT;')
|
||||
|
||||
def _reclaim(self, conn, age_timestamp, sync_timestamp):
|
||||
super(ContainerBroker, self)._reclaim(conn, age_timestamp,
|
||||
sync_timestamp)
|
||||
def _reclaim_other_stuff(self, conn, age_timestamp, sync_timestamp):
|
||||
super(ContainerBroker, self)._reclaim_other_stuff(
|
||||
conn, age_timestamp, sync_timestamp)
|
||||
# populate instance cache, but use existing conn to avoid deadlock
|
||||
# when it has a pending update
|
||||
self._populate_instance_cache(conn=conn)
|
||||
|
@ -180,6 +180,72 @@ class TestAccountBroker(unittest.TestCase):
|
||||
broker.delete_db(Timestamp.now().internal)
|
||||
broker.reclaim(Timestamp.now().internal, time())
|
||||
|
||||
def test_batched_reclaim(self):
|
||||
num_of_containers = 60
|
||||
container_specs = []
|
||||
now = time()
|
||||
top_of_the_minute = now - (now % 60)
|
||||
c = itertools.cycle([True, False])
|
||||
for m, is_deleted in six.moves.zip(range(num_of_containers), c):
|
||||
offset = top_of_the_minute - (m * 60)
|
||||
container_specs.append((Timestamp(offset), is_deleted))
|
||||
random.seed(now)
|
||||
random.shuffle(container_specs)
|
||||
policy_indexes = list(p.idx for p in POLICIES)
|
||||
broker = AccountBroker(':memory:', account='test_account')
|
||||
broker.initialize(Timestamp('1').internal)
|
||||
for i, container_spec in enumerate(container_specs):
|
||||
# with container12 before container2 and shuffled ts.internal we
|
||||
# shouldn't be able to accidently rely on any implicit ordering
|
||||
name = 'container%s' % i
|
||||
pidx = random.choice(policy_indexes)
|
||||
ts, is_deleted = container_spec
|
||||
if is_deleted:
|
||||
broker.put_container(name, 0, ts.internal, 0, 0, pidx)
|
||||
else:
|
||||
broker.put_container(name, ts.internal, 0, 0, 0, pidx)
|
||||
|
||||
def count_reclaimable(conn, reclaim_age):
|
||||
return conn.execute(
|
||||
"SELECT count(*) FROM container "
|
||||
"WHERE deleted = 1 AND delete_timestamp < ?", (reclaim_age,)
|
||||
).fetchone()[0]
|
||||
|
||||
# This is intended to divide the set of timestamps exactly in half
|
||||
# regardless of the value of now
|
||||
reclaim_age = top_of_the_minute + 1 - (num_of_containers / 2 * 60)
|
||||
with broker.get() as conn:
|
||||
self.assertEqual(count_reclaimable(conn, reclaim_age),
|
||||
num_of_containers / 4)
|
||||
|
||||
orig__reclaim = broker._reclaim
|
||||
trace = []
|
||||
|
||||
def tracing_reclaim(conn, age_timestamp, marker):
|
||||
trace.append((age_timestamp, marker,
|
||||
count_reclaimable(conn, age_timestamp)))
|
||||
return orig__reclaim(conn, age_timestamp, marker)
|
||||
|
||||
with mock.patch.object(broker, '_reclaim', new=tracing_reclaim), \
|
||||
mock.patch('swift.common.db.RECLAIM_PAGE_SIZE', 10):
|
||||
broker.reclaim(reclaim_age, reclaim_age)
|
||||
with broker.get() as conn:
|
||||
self.assertEqual(count_reclaimable(conn, reclaim_age), 0)
|
||||
self.assertEqual(3, len(trace), trace)
|
||||
self.assertEqual([age for age, marker, reclaimable in trace],
|
||||
[reclaim_age] * 3)
|
||||
# markers are in-order
|
||||
self.assertLess(trace[0][1], trace[1][1])
|
||||
self.assertLess(trace[1][1], trace[2][1])
|
||||
# reclaimable count gradually decreases
|
||||
# generally, count1 > count2 > count3, but because of the randomness
|
||||
# we may occassionally have count1 == count2 or count2 == count3
|
||||
self.assertGreaterEqual(trace[0][2], trace[1][2])
|
||||
self.assertGreaterEqual(trace[1][2], trace[2][2])
|
||||
# technically, this might happen occasionally, but *really* rarely
|
||||
self.assertTrue(trace[0][2] > trace[1][2] or
|
||||
trace[1][2] > trace[2][2])
|
||||
|
||||
def test_delete_db_status(self):
|
||||
start = next(self.ts)
|
||||
broker = AccountBroker(':memory:', account='a')
|
||||
|
@ -1154,7 +1154,7 @@ class TestDatabaseBroker(unittest.TestCase):
|
||||
return broker
|
||||
|
||||
# only testing _reclaim_metadata here
|
||||
@patch.object(DatabaseBroker, '_reclaim')
|
||||
@patch.object(DatabaseBroker, '_reclaim', return_value='')
|
||||
def test_metadata(self, mock_reclaim):
|
||||
# Initializes a good broker for us
|
||||
broker = self.get_replication_info_tester(metadata=True)
|
||||
|
@ -28,6 +28,7 @@ from contextlib import contextmanager
|
||||
import sqlite3
|
||||
import pickle
|
||||
import json
|
||||
import itertools
|
||||
|
||||
import six
|
||||
|
||||
@ -558,6 +559,98 @@ class TestContainerBroker(unittest.TestCase):
|
||||
broker.reclaim(Timestamp.now().internal, time())
|
||||
broker.delete_db(Timestamp.now().internal)
|
||||
|
||||
def test_batch_reclaim(self):
|
||||
num_of_objects = 60
|
||||
obj_specs = []
|
||||
now = time()
|
||||
top_of_the_minute = now - (now % 60)
|
||||
c = itertools.cycle([True, False])
|
||||
for m, is_deleted in six.moves.zip(range(num_of_objects), c):
|
||||
offset = top_of_the_minute - (m * 60)
|
||||
obj_specs.append((Timestamp(offset), is_deleted))
|
||||
random.seed(now)
|
||||
random.shuffle(obj_specs)
|
||||
policy_indexes = list(p.idx for p in POLICIES)
|
||||
broker = ContainerBroker(':memory:', account='test_account',
|
||||
container='test_container')
|
||||
broker.initialize(Timestamp('1').internal, 0)
|
||||
for i, obj_spec in enumerate(obj_specs):
|
||||
# with object12 before object2 and shuffled ts.internal we
|
||||
# shouldn't be able to accidently rely on any implicit ordering
|
||||
obj_name = 'object%s' % i
|
||||
pidx = random.choice(policy_indexes)
|
||||
ts, is_deleted = obj_spec
|
||||
if is_deleted:
|
||||
broker.delete_object(obj_name, ts.internal, pidx)
|
||||
else:
|
||||
broker.put_object(obj_name, ts.internal, 0, 'text/plain',
|
||||
'etag', storage_policy_index=pidx)
|
||||
|
||||
def count_reclaimable(conn, reclaim_age):
|
||||
return conn.execute(
|
||||
"SELECT count(*) FROM object "
|
||||
"WHERE deleted = 1 AND created_at < ?", (reclaim_age,)
|
||||
).fetchone()[0]
|
||||
|
||||
# This is intended to divide the set of timestamps exactly in half
|
||||
# regardless of the value of now
|
||||
reclaim_age = top_of_the_minute + 1 - (num_of_objects / 2 * 60)
|
||||
with broker.get() as conn:
|
||||
self.assertEqual(count_reclaimable(conn, reclaim_age),
|
||||
num_of_objects / 4)
|
||||
|
||||
orig__reclaim = broker._reclaim
|
||||
trace = []
|
||||
|
||||
def tracing_reclaim(conn, age_timestamp, marker):
|
||||
trace.append((age_timestamp, marker,
|
||||
count_reclaimable(conn, age_timestamp)))
|
||||
return orig__reclaim(conn, age_timestamp, marker)
|
||||
|
||||
with mock.patch.object(broker, '_reclaim', new=tracing_reclaim), \
|
||||
mock.patch('swift.common.db.RECLAIM_PAGE_SIZE', 10):
|
||||
broker.reclaim(reclaim_age, reclaim_age)
|
||||
|
||||
with broker.get() as conn:
|
||||
self.assertEqual(count_reclaimable(conn, reclaim_age), 0)
|
||||
self.assertEqual(3, len(trace), trace)
|
||||
self.assertEqual([age for age, marker, reclaimable in trace],
|
||||
[reclaim_age] * 3)
|
||||
# markers are in-order
|
||||
self.assertLess(trace[0][1], trace[1][1])
|
||||
self.assertLess(trace[1][1], trace[2][1])
|
||||
# reclaimable count gradually decreases
|
||||
# generally, count1 > count2 > count3, but because of the randomness
|
||||
# we may occassionally have count1 == count2 or count2 == count3
|
||||
self.assertGreaterEqual(trace[0][2], trace[1][2])
|
||||
self.assertGreaterEqual(trace[1][2], trace[2][2])
|
||||
# technically, this might happen occasionally, but *really* rarely
|
||||
self.assertTrue(trace[0][2] > trace[1][2] or
|
||||
trace[1][2] > trace[2][2])
|
||||
|
||||
def test_reclaim_with_duplicate_names(self):
|
||||
broker = ContainerBroker(':memory:', account='test_account',
|
||||
container='test_container')
|
||||
broker.initialize(Timestamp('1').internal, 0)
|
||||
now = time()
|
||||
ages_ago = Timestamp(now - (3 * 7 * 24 * 60 * 60))
|
||||
for i in range(10):
|
||||
for spidx in range(10):
|
||||
obj_name = 'object%s' % i
|
||||
broker.delete_object(obj_name, ages_ago.internal, spidx)
|
||||
reclaim_age = now - (2 * 7 * 24 * 60 * 60)
|
||||
with broker.get() as conn:
|
||||
self.assertEqual(conn.execute(
|
||||
"SELECT count(*) FROM object "
|
||||
"WHERE created_at < ?", (reclaim_age,)
|
||||
).fetchone()[0], 100)
|
||||
with mock.patch('swift.common.db.RECLAIM_PAGE_SIZE', 10):
|
||||
broker.reclaim(reclaim_age, reclaim_age)
|
||||
with broker.get() as conn:
|
||||
self.assertEqual(conn.execute(
|
||||
"SELECT count(*) FROM object "
|
||||
).fetchone()[0], 0)
|
||||
|
||||
@with_tempdir
|
||||
def test_reclaim_deadlock(self, tempdir):
|
||||
db_path = os.path.join(
|
||||
|
Loading…
Reference in New Issue
Block a user