Consider tombstone count before shrinking a shard
Previously a shard might be shrunk if its object_count was fell below the shrink_threshold. However, it is possible that a shard with few objects has a large number of tombstones, which would result in a larger than anticipated replication of rows to the acceptor shard. With this patch, a shard's row count (i.e. the sum of tombstones and objects) must be below the shrink_threshold before the shard will be considered for shrinking. A number of changes are made to enable tombstone count to be used in shrinking decisions: - DatabaseBroker reclaim is enhanced to count remaining tombstones after rows have been reclaimed. A new TombstoneReclaimer class is added to encapsulate the reclaim process and tombstone count. - ShardRange has new 'tombstones' and 'row_count' attributes. - A 'tombstones' column is added to the Containerbroker shard_range table. - The sharder performs a reclaim prior to reporting shard container stats to the root container so that the tombstone count can be included. - The sharder uses 'row_count' rather than 'object_count' when evaluating if a shard range is a shrink candidate. Change-Id: I41b86c19c243220b7f1c01c6ecee52835de972b6
This commit is contained in:
parent
7cfdb50f93
commit
bcecddd517
@ -208,9 +208,10 @@ class InvalidSolutionException(ManageShardRangesException):
|
||||
def _print_shard_range(sr, level=0):
|
||||
indent = ' ' * level
|
||||
print(indent + '%r' % sr.name)
|
||||
print(indent + ' objects: %9d lower: %r' % (sr.object_count,
|
||||
sr.lower_str))
|
||||
print(indent + ' state: %9s upper: %r' % (sr.state_text, sr.upper_str))
|
||||
print(indent + ' objects: %9d, tombstones: %9d, lower: %r'
|
||||
% (sr.object_count, sr.tombstones, sr.lower_str))
|
||||
print(indent + ' state: %9s, upper: %r'
|
||||
% (sr.state_text, sr.upper_str))
|
||||
|
||||
|
||||
@contextmanager
|
||||
@ -504,8 +505,8 @@ def compact_shard_ranges(broker, args):
|
||||
for sequence in compactible:
|
||||
acceptor = sequence[-1]
|
||||
donors = sequence[:-1]
|
||||
print('Donor shard range(s) with total of %d objects:'
|
||||
% donors.object_count)
|
||||
print('Donor shard range(s) with total of %d rows:'
|
||||
% donors.row_count)
|
||||
for donor in donors:
|
||||
_print_shard_range(donor, level=1)
|
||||
print('can be compacted into acceptor shard range:')
|
||||
|
@ -227,6 +227,82 @@ def get_db_connection(path, timeout=30, logger=None, okay_to_create=False):
|
||||
return conn
|
||||
|
||||
|
||||
class TombstoneReclaimer(object):
|
||||
"""Encapsulates reclamation of deleted rows in a database."""
|
||||
def __init__(self, broker, age_timestamp):
|
||||
"""
|
||||
Encapsulates reclamation of deleted rows in a database.
|
||||
|
||||
:param broker: an instance of :class:`~swift.common.db.DatabaseBroker`.
|
||||
:param age_timestamp: a float timestamp: tombstones older than this
|
||||
time will be deleted.
|
||||
"""
|
||||
self.broker = broker
|
||||
self.age_timestamp = age_timestamp
|
||||
self.marker = ''
|
||||
self.remaining_tombstones = self.reclaimed = 0
|
||||
self.finished = False
|
||||
# limit 1 offset N gives back the N+1th matching row; that row is used
|
||||
# as an exclusive end_marker for a batch of deletes, so a batch
|
||||
# comprises rows satisfying self.marker <= name < end_marker.
|
||||
self.batch_query = '''
|
||||
SELECT name FROM %s WHERE deleted = 1
|
||||
AND name >= ?
|
||||
ORDER BY NAME LIMIT 1 OFFSET ?
|
||||
''' % self.broker.db_contains_type
|
||||
self.clean_batch_query = '''
|
||||
DELETE FROM %s WHERE deleted = 1
|
||||
AND name >= ? AND %s < %s
|
||||
''' % (self.broker.db_contains_type, self.broker.db_reclaim_timestamp,
|
||||
self.age_timestamp)
|
||||
|
||||
def _reclaim(self, conn):
|
||||
curs = conn.execute(self.batch_query, (self.marker, RECLAIM_PAGE_SIZE))
|
||||
row = curs.fetchone()
|
||||
end_marker = row[0] if row else ''
|
||||
if end_marker:
|
||||
# do a single book-ended DELETE and bounce out
|
||||
curs = conn.execute(self.clean_batch_query + ' AND name < ?',
|
||||
(self.marker, end_marker))
|
||||
self.marker = end_marker
|
||||
self.reclaimed += curs.rowcount
|
||||
self.remaining_tombstones += RECLAIM_PAGE_SIZE - curs.rowcount
|
||||
else:
|
||||
# delete off the end
|
||||
curs = conn.execute(self.clean_batch_query, (self.marker,))
|
||||
self.finished = True
|
||||
self.reclaimed += curs.rowcount
|
||||
|
||||
def reclaim(self):
|
||||
"""
|
||||
Perform reclaim of deleted rows older than ``age_timestamp``.
|
||||
"""
|
||||
while not self.finished:
|
||||
with self.broker.get() as conn:
|
||||
self._reclaim(conn)
|
||||
conn.commit()
|
||||
|
||||
def get_tombstone_count(self):
|
||||
"""
|
||||
Return the number of remaining tombstones newer than ``age_timestamp``.
|
||||
Executes the ``reclaim`` method if it has not already been called on
|
||||
this instance.
|
||||
|
||||
:return: The number of tombstones in the ``broker`` that are newer than
|
||||
``age_timestamp``.
|
||||
"""
|
||||
if not self.finished:
|
||||
self.reclaim()
|
||||
with self.broker.get() as conn:
|
||||
curs = conn.execute('''
|
||||
SELECT COUNT(*) FROM %s WHERE deleted = 1
|
||||
AND name >= ?
|
||||
''' % (self.broker.db_contains_type,), (self.marker,))
|
||||
tombstones = curs.fetchone()[0]
|
||||
self.remaining_tombstones += tombstones
|
||||
return self.remaining_tombstones
|
||||
|
||||
|
||||
class DatabaseBroker(object):
|
||||
"""Encapsulates working with a database."""
|
||||
|
||||
@ -988,47 +1064,22 @@ class DatabaseBroker(object):
|
||||
with lock_parent_directory(self.pending_file,
|
||||
self.pending_timeout):
|
||||
self._commit_puts()
|
||||
marker = ''
|
||||
finished = False
|
||||
while not finished:
|
||||
with self.get() as conn:
|
||||
marker = self._reclaim(conn, age_timestamp, marker)
|
||||
if not marker:
|
||||
finished = True
|
||||
self._reclaim_other_stuff(
|
||||
conn, age_timestamp, sync_timestamp)
|
||||
conn.commit()
|
||||
|
||||
tombstone_reclaimer = TombstoneReclaimer(self, age_timestamp)
|
||||
tombstone_reclaimer.reclaim()
|
||||
with self.get() as conn:
|
||||
self._reclaim_other_stuff(conn, age_timestamp, sync_timestamp)
|
||||
conn.commit()
|
||||
return tombstone_reclaimer
|
||||
|
||||
def _reclaim_other_stuff(self, conn, age_timestamp, sync_timestamp):
|
||||
"""
|
||||
This is only called once at the end of reclaim after _reclaim has been
|
||||
called for each page.
|
||||
This is only called once at the end of reclaim after tombstone reclaim
|
||||
has been completed.
|
||||
"""
|
||||
self._reclaim_sync(conn, sync_timestamp)
|
||||
self._reclaim_metadata(conn, age_timestamp)
|
||||
|
||||
def _reclaim(self, conn, age_timestamp, marker):
|
||||
clean_batch_qry = '''
|
||||
DELETE FROM %s WHERE deleted = 1
|
||||
AND name >= ? AND %s < ?
|
||||
''' % (self.db_contains_type, self.db_reclaim_timestamp)
|
||||
curs = conn.execute('''
|
||||
SELECT name FROM %s WHERE deleted = 1
|
||||
AND name >= ?
|
||||
ORDER BY NAME LIMIT 1 OFFSET ?
|
||||
''' % (self.db_contains_type,), (marker, RECLAIM_PAGE_SIZE))
|
||||
row = curs.fetchone()
|
||||
if row:
|
||||
# do a single book-ended DELETE and bounce out
|
||||
end_marker = row[0]
|
||||
conn.execute(clean_batch_qry + ' AND name < ?', (
|
||||
marker, age_timestamp, end_marker))
|
||||
else:
|
||||
# delete off the end and reset marker to indicate we're done
|
||||
end_marker = ''
|
||||
conn.execute(clean_batch_qry, (marker, age_timestamp))
|
||||
return end_marker
|
||||
|
||||
def _reclaim_sync(self, conn, sync_timestamp):
|
||||
try:
|
||||
conn.execute('''
|
||||
|
@ -566,6 +566,12 @@ class Replicator(Daemon):
|
||||
self.logger.debug('Successfully deleted db %s', broker.db_file)
|
||||
return True
|
||||
|
||||
def _reclaim(self, broker, now=None):
|
||||
if not now:
|
||||
now = time.time()
|
||||
return broker.reclaim(now - self.reclaim_age,
|
||||
now - (self.reclaim_age * 2))
|
||||
|
||||
def _replicate_object(self, partition, object_file, node_id):
|
||||
"""
|
||||
Replicate the db, choosing method based on whether or not it
|
||||
@ -591,8 +597,7 @@ class Replicator(Daemon):
|
||||
try:
|
||||
broker = self.brokerclass(object_file, pending_timeout=30,
|
||||
logger=self.logger)
|
||||
broker.reclaim(now - self.reclaim_age,
|
||||
now - (self.reclaim_age * 2))
|
||||
self._reclaim(broker, now)
|
||||
info = broker.get_replication_info()
|
||||
bpart = self.ring.get_part(
|
||||
info['account'], info.get('container'))
|
||||
|
@ -5042,6 +5042,8 @@ class ShardRange(object):
|
||||
sharding was enabled for a container.
|
||||
:param reported: optional indicator that this shard and its stats have
|
||||
been reported to the root container.
|
||||
:param tombstones: the number of tombstones in the shard range; defaults to
|
||||
-1 to indicate that the value is unknown.
|
||||
"""
|
||||
FOUND = 10
|
||||
CREATED = 20
|
||||
@ -5079,7 +5081,7 @@ class ShardRange(object):
|
||||
def __init__(self, name, timestamp, lower=MIN, upper=MAX,
|
||||
object_count=0, bytes_used=0, meta_timestamp=None,
|
||||
deleted=False, state=None, state_timestamp=None, epoch=None,
|
||||
reported=False):
|
||||
reported=False, tombstones=-1):
|
||||
self.account = self.container = self._timestamp = \
|
||||
self._meta_timestamp = self._state_timestamp = self._epoch = None
|
||||
self._lower = ShardRange.MIN
|
||||
@ -5099,6 +5101,7 @@ class ShardRange(object):
|
||||
self.state_timestamp = state_timestamp
|
||||
self.epoch = epoch
|
||||
self.reported = reported
|
||||
self.tombstones = tombstones
|
||||
|
||||
@classmethod
|
||||
def sort_key(cls, sr):
|
||||
@ -5274,6 +5277,24 @@ class ShardRange(object):
|
||||
raise ValueError('bytes_used cannot be < 0')
|
||||
self._bytes = bytes_used
|
||||
|
||||
@property
|
||||
def tombstones(self):
|
||||
return self._tombstones
|
||||
|
||||
@tombstones.setter
|
||||
def tombstones(self, tombstones):
|
||||
self._tombstones = int(tombstones)
|
||||
|
||||
@property
|
||||
def row_count(self):
|
||||
"""
|
||||
Returns the total number of rows in the shard range i.e. the sum of
|
||||
objects and tombstones.
|
||||
|
||||
:return: the row count
|
||||
"""
|
||||
return self.object_count + max(self.tombstones, 0)
|
||||
|
||||
def update_meta(self, object_count, bytes_used, meta_timestamp=None):
|
||||
"""
|
||||
Set the object stats metadata to the given values and update the
|
||||
@ -5300,6 +5321,27 @@ class ShardRange(object):
|
||||
else:
|
||||
self.meta_timestamp = meta_timestamp
|
||||
|
||||
def update_tombstones(self, tombstones, meta_timestamp=None):
|
||||
"""
|
||||
Set the tombstones metadata to the given values and update the
|
||||
meta_timestamp to the current time.
|
||||
|
||||
:param tombstones: should be an integer
|
||||
:param meta_timestamp: timestamp for metadata; if not given the
|
||||
current time will be set.
|
||||
:raises ValueError: if ``tombstones`` cannot be cast to an int, or
|
||||
if meta_timestamp is neither None nor can be cast to a
|
||||
:class:`~swift.common.utils.Timestamp`.
|
||||
"""
|
||||
tombstones = int(tombstones)
|
||||
if 0 <= tombstones != self.tombstones:
|
||||
self.tombstones = tombstones
|
||||
self.reported = False
|
||||
if meta_timestamp is None:
|
||||
self.meta_timestamp = Timestamp.now()
|
||||
else:
|
||||
self.meta_timestamp = meta_timestamp
|
||||
|
||||
def increment_meta(self, object_count, bytes_used):
|
||||
"""
|
||||
Increment the object stats metadata by the given values and update the
|
||||
@ -5518,6 +5560,7 @@ class ShardRange(object):
|
||||
yield 'state_timestamp', self.state_timestamp.internal
|
||||
yield 'epoch', self.epoch.internal if self.epoch is not None else None
|
||||
yield 'reported', 1 if self.reported else 0
|
||||
yield 'tombstones', self.tombstones
|
||||
|
||||
def copy(self, timestamp=None, **kwargs):
|
||||
"""
|
||||
@ -5550,7 +5593,7 @@ class ShardRange(object):
|
||||
params['upper'], params['object_count'], params['bytes_used'],
|
||||
params['meta_timestamp'], params['deleted'], params['state'],
|
||||
params['state_timestamp'], params['epoch'],
|
||||
params.get('reported', 0))
|
||||
params.get('reported', 0), params.get('tombstones', -1))
|
||||
|
||||
def expand(self, donors):
|
||||
"""
|
||||
@ -5625,6 +5668,15 @@ class ShardRangeList(UserList):
|
||||
"""
|
||||
return sum(sr.object_count for sr in self)
|
||||
|
||||
@property
|
||||
def row_count(self):
|
||||
"""
|
||||
Returns the total number of rows of all items in the list.
|
||||
|
||||
:return: total row count
|
||||
"""
|
||||
return sum(sr.row_count for sr in self)
|
||||
|
||||
@property
|
||||
def bytes_used(self):
|
||||
"""
|
||||
|
@ -66,7 +66,7 @@ SHARD_AUDITING_STATES = [ShardRange.CREATED, ShardRange.CLEAVED,
|
||||
# tuples and vice-versa
|
||||
SHARD_RANGE_KEYS = ('name', 'timestamp', 'lower', 'upper', 'object_count',
|
||||
'bytes_used', 'meta_timestamp', 'deleted', 'state',
|
||||
'state_timestamp', 'epoch', 'reported')
|
||||
'state_timestamp', 'epoch', 'reported', 'tombstones')
|
||||
|
||||
POLICY_STAT_TABLE_CREATE = '''
|
||||
CREATE TABLE policy_stat (
|
||||
@ -287,6 +287,7 @@ def merge_shards(shard_data, existing):
|
||||
if existing['meta_timestamp'] >= shard_data['meta_timestamp']:
|
||||
for k in ('object_count', 'bytes_used', 'meta_timestamp'):
|
||||
shard_data[k] = existing[k]
|
||||
shard_data['tombstones'] = existing.get('tombstones', -1)
|
||||
else:
|
||||
new_content = True
|
||||
|
||||
@ -294,6 +295,7 @@ def merge_shards(shard_data, existing):
|
||||
if existing['reported'] and \
|
||||
existing['object_count'] == shard_data['object_count'] and \
|
||||
existing['bytes_used'] == shard_data['bytes_used'] and \
|
||||
existing.get('tombstones', -1) == shard_data['tombstones'] and \
|
||||
existing['state'] == shard_data['state'] and \
|
||||
existing['epoch'] == shard_data['epoch']:
|
||||
shard_data['reported'] = 1
|
||||
@ -618,7 +620,8 @@ class ContainerBroker(DatabaseBroker):
|
||||
state INTEGER,
|
||||
state_timestamp TEXT,
|
||||
epoch TEXT,
|
||||
reported INTEGER DEFAULT 0
|
||||
reported INTEGER DEFAULT 0,
|
||||
tombstones INTEGER DEFAULT -1
|
||||
);
|
||||
""" % SHARD_RANGE_TABLE)
|
||||
|
||||
@ -1450,22 +1453,34 @@ class ContainerBroker(DatabaseBroker):
|
||||
for item in to_add.values()))
|
||||
conn.commit()
|
||||
|
||||
migrations = {
|
||||
'no such column: reported':
|
||||
self._migrate_add_shard_range_reported,
|
||||
'no such column: tombstones':
|
||||
self._migrate_add_shard_range_tombstones,
|
||||
('no such table: %s' % SHARD_RANGE_TABLE):
|
||||
self.create_shard_range_table,
|
||||
}
|
||||
migrations_done = set()
|
||||
with self.get() as conn:
|
||||
try:
|
||||
return _really_merge_items(conn)
|
||||
except sqlite3.OperationalError as err:
|
||||
# Without the rollback, new enough (>= py37) python/sqlite3
|
||||
# will panic:
|
||||
# sqlite3.OperationalError: cannot start a transaction
|
||||
# within a transaction
|
||||
conn.rollback()
|
||||
if 'no such column: reported' in str(err):
|
||||
self._migrate_add_shard_range_reported(conn)
|
||||
while True:
|
||||
try:
|
||||
return _really_merge_items(conn)
|
||||
if ('no such table: %s' % SHARD_RANGE_TABLE) in str(err):
|
||||
self.create_shard_range_table(conn)
|
||||
return _really_merge_items(conn)
|
||||
raise
|
||||
except sqlite3.OperationalError as err:
|
||||
# Without the rollback, new enough (>= py37) python/sqlite3
|
||||
# will panic:
|
||||
# sqlite3.OperationalError: cannot start a transaction
|
||||
# within a transaction
|
||||
conn.rollback()
|
||||
for err_str, migration in migrations.items():
|
||||
if err_str in migrations_done:
|
||||
continue
|
||||
if err_str in str(err):
|
||||
migration(conn)
|
||||
migrations_done.add(err_str)
|
||||
break
|
||||
else:
|
||||
raise
|
||||
|
||||
def get_reconciler_sync(self):
|
||||
with self.get() as conn:
|
||||
@ -1624,6 +1639,17 @@ class ContainerBroker(DatabaseBroker):
|
||||
COMMIT;
|
||||
''' % SHARD_RANGE_TABLE)
|
||||
|
||||
def _migrate_add_shard_range_tombstones(self, conn):
|
||||
"""
|
||||
Add the tombstones column to the 'shard_range' table.
|
||||
"""
|
||||
conn.executescript('''
|
||||
BEGIN;
|
||||
ALTER TABLE %s
|
||||
ADD COLUMN tombstones INTEGER DEFAULT -1;
|
||||
COMMIT;
|
||||
''' % SHARD_RANGE_TABLE)
|
||||
|
||||
def _reclaim_other_stuff(self, conn, age_timestamp, sync_timestamp):
|
||||
super(ContainerBroker, self)._reclaim_other_stuff(
|
||||
conn, age_timestamp, sync_timestamp)
|
||||
@ -1673,7 +1699,11 @@ class ContainerBroker(DatabaseBroker):
|
||||
elif states is not None:
|
||||
included_states.add(states)
|
||||
|
||||
def do_query(conn, use_reported_column=True):
|
||||
# defaults to be used when legacy db's are missing columns
|
||||
default_values = {'reported': 0,
|
||||
'tombstones': -1}
|
||||
|
||||
def do_query(conn, defaults=None):
|
||||
condition = ''
|
||||
conditions = []
|
||||
params = []
|
||||
@ -1691,10 +1721,13 @@ class ContainerBroker(DatabaseBroker):
|
||||
params.append(self.path)
|
||||
if conditions:
|
||||
condition = ' WHERE ' + ' AND '.join(conditions)
|
||||
if use_reported_column:
|
||||
columns = SHARD_RANGE_KEYS
|
||||
else:
|
||||
columns = SHARD_RANGE_KEYS[:-1] + ('0 as reported', )
|
||||
columns = SHARD_RANGE_KEYS[:-2]
|
||||
for column in SHARD_RANGE_KEYS[-2:]:
|
||||
if column in defaults:
|
||||
columns += (('%s as %s' %
|
||||
(default_values[column], column)),)
|
||||
else:
|
||||
columns += (column,)
|
||||
sql = '''
|
||||
SELECT %s
|
||||
FROM %s%s;
|
||||
@ -1704,14 +1737,26 @@ class ContainerBroker(DatabaseBroker):
|
||||
return [row for row in data]
|
||||
|
||||
with self.maybe_get(connection) as conn:
|
||||
try:
|
||||
return do_query(conn)
|
||||
except sqlite3.OperationalError as err:
|
||||
if ('no such table: %s' % SHARD_RANGE_TABLE) in str(err):
|
||||
return []
|
||||
if 'no such column: reported' in str(err):
|
||||
return do_query(conn, use_reported_column=False)
|
||||
raise
|
||||
defaults = set()
|
||||
attempts = len(default_values) + 1
|
||||
while attempts:
|
||||
attempts -= 1
|
||||
try:
|
||||
return do_query(conn, defaults)
|
||||
except sqlite3.OperationalError as err:
|
||||
if ('no such table: %s' % SHARD_RANGE_TABLE) in str(err):
|
||||
return []
|
||||
if not attempts:
|
||||
raise
|
||||
new_defaults = set()
|
||||
for column in default_values.keys():
|
||||
if 'no such column: %s' % column in str(err):
|
||||
new_defaults.add(column)
|
||||
if not new_defaults:
|
||||
raise
|
||||
if new_defaults.intersection(defaults):
|
||||
raise
|
||||
defaults.update(new_defaults)
|
||||
|
||||
@classmethod
|
||||
def resolve_shard_range_states(cls, states):
|
||||
|
@ -120,6 +120,8 @@ def find_overlapping_ranges(shard_ranges):
|
||||
|
||||
|
||||
def is_sharding_candidate(shard_range, threshold):
|
||||
# note: use *object* count as the condition for sharding: tombstones will
|
||||
# eventually be reclaimed so should not trigger sharding
|
||||
return (shard_range.state == ShardRange.ACTIVE and
|
||||
shard_range.object_count >= threshold)
|
||||
|
||||
@ -127,10 +129,13 @@ def is_sharding_candidate(shard_range, threshold):
|
||||
def is_shrinking_candidate(shard_range, shrink_threshold, merge_size,
|
||||
states=None):
|
||||
# typically shrink_threshold < merge_size but check both just in case
|
||||
# note: use *row* count (objects plus tombstones) as the condition for
|
||||
# shrinking to avoid inadvertently moving large numbers of tombstones into
|
||||
# an acceptor
|
||||
states = states or (ShardRange.ACTIVE,)
|
||||
return (shard_range.state in states and
|
||||
shard_range.object_count < shrink_threshold and
|
||||
shard_range.object_count <= merge_size)
|
||||
shard_range.row_count < shrink_threshold and
|
||||
shard_range.row_count <= merge_size)
|
||||
|
||||
|
||||
def find_sharding_candidates(broker, threshold, shard_ranges=None):
|
||||
@ -186,6 +191,8 @@ def find_compactible_shard_sequences(broker,
|
||||
compacted into each acceptor; -1 implies unlimited.
|
||||
:param max_expanding: the maximum number of acceptors to be found (i.e. the
|
||||
maximum number of sequences to be returned); -1 implies unlimited.
|
||||
:param include_shrinking: if True then existing compactible sequences are
|
||||
included in the results; default is False.
|
||||
:returns: A list of :class:`~swift.common.utils.ShardRangeList` each
|
||||
containing a sequence of neighbouring shard ranges that may be
|
||||
compacted; the final shard range in the list is the acceptor
|
||||
@ -196,10 +203,6 @@ def find_compactible_shard_sequences(broker,
|
||||
# First cut is simple: assume root container shard usage stats are good
|
||||
# enough to make decision; only merge with upper neighbour so that
|
||||
# upper bounds never change (shard names include upper bound).
|
||||
# TODO: object counts may well not be the appropriate metric for
|
||||
# deciding to shrink because a shard with low object_count may have a
|
||||
# large number of deleted object rows that will need to be merged with
|
||||
# a neighbour. We may need to expose row count as well as object count.
|
||||
shard_ranges = broker.get_shard_ranges()
|
||||
own_shard_range = broker.get_own_shard_range()
|
||||
|
||||
@ -216,7 +219,7 @@ def find_compactible_shard_sequences(broker,
|
||||
sequence[-1], shrink_threshold, merge_size,
|
||||
states=(ShardRange.ACTIVE, ShardRange.SHRINKING)) or
|
||||
0 < max_shrinking < len(sequence) or
|
||||
sequence.object_count >= merge_size)):
|
||||
sequence.row_count >= merge_size)):
|
||||
return True
|
||||
return False
|
||||
|
||||
@ -250,7 +253,7 @@ def find_compactible_shard_sequences(broker,
|
||||
if shard_range.state == ShardRange.SHRINKING:
|
||||
# already shrinking: add to sequence unconditionally
|
||||
sequence.append(shard_range)
|
||||
elif (sequence.object_count + shard_range.object_count
|
||||
elif (sequence.row_count + shard_range.row_count
|
||||
<= merge_size):
|
||||
# add to sequence: could be a donor or acceptor
|
||||
sequence.append(shard_range)
|
||||
@ -1825,7 +1828,18 @@ class ContainerSharder(ContainerReplicator):
|
||||
|
||||
def _update_root_container(self, broker):
|
||||
own_shard_range = broker.get_own_shard_range(no_default=True)
|
||||
if not own_shard_range or own_shard_range.reported:
|
||||
if not own_shard_range:
|
||||
return
|
||||
|
||||
# do a reclaim *now* in order to get best estimate of tombstone count
|
||||
# that is consistent with the current object_count
|
||||
reclaimer = self._reclaim(broker)
|
||||
tombstones = reclaimer.get_tombstone_count()
|
||||
self.logger.debug('tombstones in %s = %d',
|
||||
quote(broker.path), tombstones)
|
||||
own_shard_range.update_tombstones(tombstones)
|
||||
|
||||
if own_shard_range.reported:
|
||||
return
|
||||
|
||||
# persist the reported shard metadata
|
||||
|
@ -1682,8 +1682,13 @@ class TestContainerSharding(BaseTestContainerSharding):
|
||||
orig_range_data, range_data,
|
||||
excludes=['meta_timestamp', 'state_timestamp'])
|
||||
|
||||
# ...until the sharders run and update root
|
||||
self.run_sharders(orig_shard_ranges[0])
|
||||
# ...until the sharders run and update root; reclaim tombstones so
|
||||
# that the shard is shrinkable
|
||||
shard_0_part = self.get_part_and_node_numbers(
|
||||
orig_shard_ranges[0])[0]
|
||||
for conf_index in self.configs['container-sharder'].keys():
|
||||
self.run_custom_sharder(conf_index, {'reclaim_age': 0},
|
||||
override_partitions=[shard_0_part])
|
||||
exp_obj_count = len(second_shard_objects) + 1
|
||||
self.assert_container_object_count(exp_obj_count)
|
||||
self.assert_container_listing([alpha] + second_shard_objects)
|
||||
@ -1748,15 +1753,37 @@ class TestContainerSharding(BaseTestContainerSharding):
|
||||
|
||||
self.assert_container_listing([alpha])
|
||||
|
||||
# runs sharders so second range shrinks away, requires up to 3
|
||||
# cycles
|
||||
self.sharders.once() # shard updates root stats
|
||||
# run sharders: second range should not shrink away yet because it
|
||||
# has tombstones
|
||||
self.sharders.once() # second shard updates root stats
|
||||
self.assert_container_listing([alpha])
|
||||
self.sharders.once() # root finds shrinkable shard
|
||||
self.assert_container_listing([alpha])
|
||||
self.sharders.once() # shards shrink themselves
|
||||
self.assert_container_listing([alpha])
|
||||
|
||||
# the acceptor shard is intact...
|
||||
shard_nodes_data = self.direct_get_container_shard_ranges(
|
||||
orig_shard_ranges[1].account, orig_shard_ranges[1].container)
|
||||
obj_count, bytes_used = check_shard_nodes_data(shard_nodes_data)
|
||||
self.assertEqual(1, obj_count)
|
||||
|
||||
# run sharders to reclaim tombstones so that the second shard is
|
||||
# shrinkable
|
||||
shard_1_part = self.get_part_and_node_numbers(
|
||||
orig_shard_ranges[1])[0]
|
||||
for conf_index in self.configs['container-sharder'].keys():
|
||||
self.run_custom_sharder(conf_index, {'reclaim_age': 0},
|
||||
override_partitions=[shard_1_part])
|
||||
self.assert_container_listing([alpha])
|
||||
|
||||
# run sharders so second range shrinks away, requires up to 2
|
||||
# cycles
|
||||
self.sharders.once() # root finds shrinkable shard
|
||||
self.assert_container_listing([alpha])
|
||||
self.sharders.once() # shards shrink themselves
|
||||
self.assert_container_listing([alpha])
|
||||
|
||||
# the second shard range has sharded and is empty
|
||||
shard_nodes_data = self.direct_get_container_shard_ranges(
|
||||
orig_shard_ranges[1].account, orig_shard_ranges[1].container)
|
||||
@ -2215,10 +2242,15 @@ class TestContainerSharding(BaseTestContainerSharding):
|
||||
self.assert_container_listing([])
|
||||
self.assert_container_post_ok('has objects')
|
||||
|
||||
# run sharder on shard containers to update root stats
|
||||
# run sharder on shard containers to update root stats; reclaim
|
||||
# the tombstones so that the shards appear to be shrinkable
|
||||
shard_ranges = self.get_container_shard_ranges()
|
||||
self.assertLengthEqual(shard_ranges, 2)
|
||||
self.run_sharders(shard_ranges)
|
||||
shard_partitions = [self.get_part_and_node_numbers(sr)[0]
|
||||
for sr in shard_ranges]
|
||||
for conf_index in self.configs['container-sharder'].keys():
|
||||
self.run_custom_sharder(conf_index, {'reclaim_age': 0},
|
||||
override_partitions=shard_partitions)
|
||||
self.assert_container_object_count(0)
|
||||
|
||||
# First, test a misplaced object moving from one shard to another.
|
||||
@ -2349,8 +2381,12 @@ class TestContainerSharding(BaseTestContainerSharding):
|
||||
self.assert_container_listing(shard_1_objects)
|
||||
self.assert_container_post_ok('has objects')
|
||||
|
||||
# run sharder on first shard container to update root stats
|
||||
self.run_sharders(shard_ranges[0])
|
||||
# run sharder on first shard container to update root stats; reclaim
|
||||
# the tombstones so that the shard appears to be shrinkable
|
||||
shard_0_part = self.get_part_and_node_numbers(shard_ranges[0])[0]
|
||||
for conf_index in self.configs['container-sharder'].keys():
|
||||
self.run_custom_sharder(conf_index, {'reclaim_age': 0},
|
||||
override_partitions=[shard_0_part])
|
||||
self.assert_container_object_count(len(shard_1_objects))
|
||||
|
||||
# First, test a misplaced object moving from one shard to another.
|
||||
@ -2384,10 +2420,13 @@ class TestContainerSharding(BaseTestContainerSharding):
|
||||
|
||||
# Now we have just one active shard, test a misplaced object moving
|
||||
# from that shard to the root.
|
||||
# delete most objects from second shard range and run sharder on root
|
||||
# to discover second shrink candidate
|
||||
# delete most objects from second shard range, reclaim the tombstones,
|
||||
# and run sharder on root to discover second shrink candidate
|
||||
self.delete_objects(shard_1_objects)
|
||||
self.run_sharders(shard_ranges[1])
|
||||
shard_1_part = self.get_part_and_node_numbers(shard_ranges[1])[0]
|
||||
for conf_index in self.configs['container-sharder'].keys():
|
||||
self.run_custom_sharder(conf_index, {'reclaim_age': 0},
|
||||
override_partitions=[shard_1_part])
|
||||
self.sharders.once(additional_args='--partitions=%s' % self.brain.part)
|
||||
# then run sharder on the shard node to shrink it to root - note this
|
||||
# moves alpha to the root db
|
||||
@ -2457,7 +2496,10 @@ class TestContainerSharding(BaseTestContainerSharding):
|
||||
self.assert_container_post_ok('has objects')
|
||||
|
||||
# run sharder on first shard container to update root stats
|
||||
self.run_sharders(shard_ranges[0])
|
||||
shard_0_part = self.get_part_and_node_numbers(shard_ranges[0])[0]
|
||||
for conf_index in self.configs['container-sharder'].keys():
|
||||
self.run_custom_sharder(conf_index, {'reclaim_age': 0},
|
||||
override_partitions=[shard_0_part])
|
||||
self.assert_container_object_count(len(shard_1_objects))
|
||||
|
||||
# First, test a misplaced object moving from one shard to another.
|
||||
|
@ -36,7 +36,7 @@ import six
|
||||
from swift.account.backend import AccountBroker
|
||||
from swift.common.utils import Timestamp
|
||||
from test.unit import patch_policies, with_tempdir, make_timestamp_iter
|
||||
from swift.common.db import DatabaseConnectionError
|
||||
from swift.common.db import DatabaseConnectionError, TombstoneReclaimer
|
||||
from swift.common.request_helpers import get_reserved_name
|
||||
from swift.common.storage_policy import StoragePolicy, POLICIES
|
||||
from swift.common.utils import md5
|
||||
@ -218,15 +218,17 @@ class TestAccountBroker(unittest.TestCase):
|
||||
self.assertEqual(count_reclaimable(conn, reclaim_age),
|
||||
num_of_containers / 4)
|
||||
|
||||
orig__reclaim = broker._reclaim
|
||||
trace = []
|
||||
|
||||
def tracing_reclaim(conn, age_timestamp, marker):
|
||||
trace.append((age_timestamp, marker,
|
||||
count_reclaimable(conn, age_timestamp)))
|
||||
return orig__reclaim(conn, age_timestamp, marker)
|
||||
class TracingReclaimer(TombstoneReclaimer):
|
||||
def _reclaim(self, conn):
|
||||
trace.append(
|
||||
(self.age_timestamp, self.marker,
|
||||
count_reclaimable(conn, self.age_timestamp)))
|
||||
return super(TracingReclaimer, self)._reclaim(conn)
|
||||
|
||||
with mock.patch.object(broker, '_reclaim', new=tracing_reclaim), \
|
||||
with mock.patch(
|
||||
'swift.common.db.TombstoneReclaimer', TracingReclaimer), \
|
||||
mock.patch('swift.common.db.RECLAIM_PAGE_SIZE', 10):
|
||||
broker.reclaim(reclaim_age, reclaim_age)
|
||||
with broker.get() as conn:
|
||||
|
@ -429,6 +429,7 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
' "state": "sharding",',
|
||||
' "state_timestamp": "%s",' % now.internal,
|
||||
' "timestamp": "%s",' % now.internal,
|
||||
' "tombstones": -1,',
|
||||
' "upper": ""',
|
||||
'}',
|
||||
'db_state = sharding',
|
||||
@ -472,6 +473,7 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
' "state": "sharding",',
|
||||
' "state_timestamp": "%s",' % now.internal,
|
||||
' "timestamp": "%s",' % now.internal,
|
||||
' "tombstones": -1,',
|
||||
' "upper": ""',
|
||||
'}',
|
||||
'db_state = sharded',
|
||||
@ -861,12 +863,37 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
broker = self._make_broker()
|
||||
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')
|
||||
for i, sr in enumerate(shard_ranges):
|
||||
sr.tombstones = 999
|
||||
if i not in small_ranges:
|
||||
sr.object_count = 100001
|
||||
sr.update_state(ShardRange.ACTIVE)
|
||||
broker.merge_shard_ranges(shard_ranges)
|
||||
self._move_broker_to_sharded_state(broker)
|
||||
|
||||
expected_base = [
|
||||
'Donor shard range(s) with total of 2018 rows:',
|
||||
" '.shards_a",
|
||||
" objects: 10, tombstones: 999, lower: 'obj29'",
|
||||
" state: active, upper: 'obj39'",
|
||||
" '.shards_a",
|
||||
" objects: 10, tombstones: 999, lower: 'obj39'",
|
||||
" state: active, upper: 'obj49'",
|
||||
'can be compacted into acceptor shard range:',
|
||||
" '.shards_a",
|
||||
" objects: 100001, tombstones: 999, lower: 'obj49'",
|
||||
" state: active, upper: 'obj59'",
|
||||
'Donor shard range(s) with total of 1009 rows:',
|
||||
" '.shards_a",
|
||||
" objects: 10, tombstones: 999, lower: 'obj69'",
|
||||
" state: active, upper: 'obj79'",
|
||||
'can be compacted into acceptor shard range:',
|
||||
" '.shards_a",
|
||||
" objects: 100001, tombstones: 999, lower: 'obj79'",
|
||||
" state: active, upper: 'obj89'",
|
||||
'Once applied to the broker these changes will result in '
|
||||
'shard range compaction the next time the sharder runs.',
|
||||
]
|
||||
|
||||
def do_compact(user_input, exit_code):
|
||||
out = StringIO()
|
||||
err = StringIO()
|
||||
@ -880,29 +907,7 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
err_lines = err.getvalue().split('\n')
|
||||
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
|
||||
out_lines = out.getvalue().split('\n')
|
||||
expected = [
|
||||
'Donor shard range(s) with total of 20 objects:',
|
||||
" '.shards_a",
|
||||
" objects: 10 lower: 'obj29'",
|
||||
" state: active upper: 'obj39'",
|
||||
" '.shards_a",
|
||||
" objects: 10 lower: 'obj39'",
|
||||
" state: active upper: 'obj49'",
|
||||
'can be compacted into acceptor shard range:',
|
||||
" '.shards_a",
|
||||
" objects: 100001 lower: 'obj49'",
|
||||
" state: active upper: 'obj59'",
|
||||
'Donor shard range(s) with total of 10 objects:',
|
||||
" '.shards_a",
|
||||
" objects: 10 lower: 'obj69'",
|
||||
" state: active upper: 'obj79'",
|
||||
'can be compacted into acceptor shard range:',
|
||||
" '.shards_a",
|
||||
" objects: 100001 lower: 'obj79'",
|
||||
" state: active upper: 'obj89'",
|
||||
'Once applied to the broker these changes will result in '
|
||||
'shard range compaction the next time the sharder runs.',
|
||||
]
|
||||
expected = list(expected_base)
|
||||
if user_input == 'yes':
|
||||
expected.extend([
|
||||
'Updated 2 shard sequences for compaction.',
|
||||
@ -1334,14 +1339,12 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
['No shards identified for compaction.'],
|
||||
out_lines[:1])
|
||||
|
||||
def test_compact_shrink_threshold(self):
|
||||
def _do_test_compact_shrink_threshold(self, broker, shard_ranges):
|
||||
# verify option to set the shrink threshold for compaction;
|
||||
broker = self._make_broker()
|
||||
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')
|
||||
for i, sr in enumerate(shard_ranges):
|
||||
sr.update_state(ShardRange.ACTIVE)
|
||||
# (n-2)th shard range has one extra object
|
||||
shard_ranges[-2].object_count = 11
|
||||
shard_ranges[-2].object_count = shard_ranges[-2].object_count + 1
|
||||
broker.merge_shard_ranges(shard_ranges)
|
||||
self._move_broker_to_sharded_state(broker)
|
||||
# with threshold set to 10 no shard ranges can be shrunk
|
||||
@ -1384,6 +1387,19 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
self.assertEqual([ShardRange.SHRINKING] * 8 + [ShardRange.ACTIVE] * 2,
|
||||
[sr.state for sr in updated_ranges])
|
||||
|
||||
def test_compact_shrink_threshold(self):
|
||||
broker = self._make_broker()
|
||||
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')
|
||||
self._do_test_compact_shrink_threshold(broker, shard_ranges)
|
||||
|
||||
def test_compact_shrink_threshold_with_tombstones(self):
|
||||
broker = self._make_broker()
|
||||
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')
|
||||
for i, sr in enumerate(shard_ranges):
|
||||
sr.object_count = sr.object_count - i
|
||||
sr.tombstones = i
|
||||
self._do_test_compact_shrink_threshold(broker, shard_ranges)
|
||||
|
||||
def test_repair_not_root(self):
|
||||
broker = self._make_broker()
|
||||
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')
|
||||
|
@ -41,7 +41,7 @@ from swift.common.constraints import \
|
||||
MAX_META_VALUE_LENGTH, MAX_META_COUNT, MAX_META_OVERALL_SIZE
|
||||
from swift.common.db import chexor, dict_factory, get_db_connection, \
|
||||
DatabaseBroker, DatabaseConnectionError, DatabaseAlreadyExists, \
|
||||
GreenDBConnection, PICKLE_PROTOCOL, zero_like
|
||||
GreenDBConnection, PICKLE_PROTOCOL, zero_like, TombstoneReclaimer
|
||||
from swift.common.utils import normalize_timestamp, mkdirs, Timestamp
|
||||
from swift.common.exceptions import LockTimeout
|
||||
from swift.common.swob import HTTPException
|
||||
@ -1093,6 +1093,7 @@ class TestDatabaseBroker(unittest.TestCase):
|
||||
broker = DatabaseBroker(':memory:', account='a')
|
||||
broker.db_type = 'test'
|
||||
broker.db_contains_type = 'test'
|
||||
broker.db_reclaim_timestamp = 'created_at'
|
||||
broker_creation = normalize_timestamp(1)
|
||||
broker_uuid = str(uuid4())
|
||||
broker_metadata = metadata and json.dumps(
|
||||
@ -1183,7 +1184,7 @@ class TestDatabaseBroker(unittest.TestCase):
|
||||
return broker
|
||||
|
||||
# only testing _reclaim_metadata here
|
||||
@patch.object(DatabaseBroker, '_reclaim', return_value='')
|
||||
@patch.object(TombstoneReclaimer, 'reclaim')
|
||||
def test_metadata(self, mock_reclaim):
|
||||
# Initializes a good broker for us
|
||||
broker = self.get_replication_info_tester(metadata=True)
|
||||
@ -1569,7 +1570,7 @@ class TestDatabaseBroker(unittest.TestCase):
|
||||
self.assertFalse(pending)
|
||||
|
||||
|
||||
class TestTombstoneReclaim(unittest.TestCase):
|
||||
class TestTombstoneReclaimer(unittest.TestCase):
|
||||
def _make_object(self, broker, obj_name, ts, deleted):
|
||||
if deleted:
|
||||
broker.delete_test(obj_name, ts.internal)
|
||||
@ -1586,29 +1587,32 @@ class TestTombstoneReclaim(unittest.TestCase):
|
||||
with broker.get() as conn:
|
||||
return self._count_reclaimable(conn, reclaim_age)
|
||||
|
||||
def _setup_reclaimable_active(self):
|
||||
def _setup_tombstones(self, reverse_names=True):
|
||||
broker = ExampleBroker(':memory:', account='test_account',
|
||||
container='test_container')
|
||||
broker.initialize(Timestamp('1').internal, 0)
|
||||
now = time.time()
|
||||
top_of_the_minute = now - (now % 60)
|
||||
|
||||
# namespace:
|
||||
# namespace if reverse:
|
||||
# a-* has 70 'active' tombstones followed by 70 reclaimable
|
||||
# b-* has 70 'active' tombstones followed by 70 reclaimable
|
||||
# else:
|
||||
# a-* has 70 reclaimable followed by 70 'active' tombstones
|
||||
# b-* has 70 reclaimable followed by 70 'active' tombstones
|
||||
for i in range(0, 560, 4):
|
||||
self._make_object(broker, 'a_%3d' % (560 - i),
|
||||
Timestamp(top_of_the_minute - (i * 60)),
|
||||
True)
|
||||
self._make_object(broker, 'a_%3d' % (559 - i),
|
||||
Timestamp(top_of_the_minute - ((i + 1) * 60)),
|
||||
False)
|
||||
self._make_object(broker, 'b_%3d' % (560 - i),
|
||||
Timestamp(top_of_the_minute - ((i + 2) * 60)),
|
||||
True)
|
||||
self._make_object(broker, 'b_%3d' % (559 - i),
|
||||
Timestamp(top_of_the_minute - ((i + 3) * 60)),
|
||||
False)
|
||||
self._make_object(
|
||||
broker, 'a_%3d' % (560 - i if reverse_names else i),
|
||||
Timestamp(top_of_the_minute - (i * 60)), True)
|
||||
self._make_object(
|
||||
broker, 'a_%3d' % (559 - i if reverse_names else i + 1),
|
||||
Timestamp(top_of_the_minute - ((i + 1) * 60)), False)
|
||||
self._make_object(
|
||||
broker, 'b_%3d' % (560 - i if reverse_names else i),
|
||||
Timestamp(top_of_the_minute - ((i + 2) * 60)), True)
|
||||
self._make_object(
|
||||
broker, 'b_%3d' % (559 - i if reverse_names else i + 1),
|
||||
Timestamp(top_of_the_minute - ((i + 3) * 60)), False)
|
||||
broker._commit_puts()
|
||||
|
||||
# divide the set of timestamps exactly in half for reclaim
|
||||
@ -1635,11 +1639,12 @@ class TestTombstoneReclaim(unittest.TestCase):
|
||||
yield reclaimable
|
||||
|
||||
def test_batched_reclaim_several_small_batches(self):
|
||||
broker, totm, reclaim_age = self._setup_reclaimable_active()
|
||||
broker, totm, reclaim_age = self._setup_tombstones()
|
||||
|
||||
with self._mock_broker_get(broker, reclaim_age) as reclaimable:
|
||||
with patch('swift.common.db.RECLAIM_PAGE_SIZE', 50):
|
||||
broker.reclaim(reclaim_age, reclaim_age)
|
||||
reclaimer = TombstoneReclaimer(broker, reclaim_age)
|
||||
reclaimer.reclaim()
|
||||
|
||||
expected_reclaimable = [140, # 0 rows fetched
|
||||
90, # 50 rows fetched, 50 reclaimed
|
||||
@ -1652,11 +1657,12 @@ class TestTombstoneReclaim(unittest.TestCase):
|
||||
self.assertEqual(0, self._get_reclaimable(broker, reclaim_age))
|
||||
|
||||
def test_batched_reclaim_exactly_two_batches(self):
|
||||
broker, totm, reclaim_age = self._setup_reclaimable_active()
|
||||
broker, totm, reclaim_age = self._setup_tombstones()
|
||||
|
||||
with self._mock_broker_get(broker, reclaim_age) as reclaimable:
|
||||
with patch('swift.common.db.RECLAIM_PAGE_SIZE', 140):
|
||||
broker.reclaim(reclaim_age, reclaim_age)
|
||||
reclaimer = TombstoneReclaimer(broker, reclaim_age)
|
||||
reclaimer.reclaim()
|
||||
|
||||
expected_reclaimable = [140, # 0 rows fetched
|
||||
70, # 140 rows fetched, 70 reclaimed
|
||||
@ -1665,16 +1671,57 @@ class TestTombstoneReclaim(unittest.TestCase):
|
||||
self.assertEqual(0, self._get_reclaimable(broker, reclaim_age))
|
||||
|
||||
def test_batched_reclaim_one_large_batch(self):
|
||||
broker, totm, reclaim_age = self._setup_reclaimable_active()
|
||||
broker, totm, reclaim_age = self._setup_tombstones()
|
||||
|
||||
with self._mock_broker_get(broker, reclaim_age) as reclaimable:
|
||||
with patch('swift.common.db.RECLAIM_PAGE_SIZE', 1000):
|
||||
broker.reclaim(reclaim_age, reclaim_age)
|
||||
reclaimer = TombstoneReclaimer(broker, reclaim_age)
|
||||
reclaimer.reclaim()
|
||||
|
||||
expected_reclaimable = [140] # 0 rows fetched
|
||||
self.assertEqual(expected_reclaimable, reclaimable)
|
||||
self.assertEqual(0, self._get_reclaimable(broker, reclaim_age))
|
||||
|
||||
def test_reclaim_get_tombstone_count(self):
|
||||
broker, totm, reclaim_age = self._setup_tombstones(reverse_names=False)
|
||||
with patch('swift.common.db.RECLAIM_PAGE_SIZE', 122):
|
||||
reclaimer = TombstoneReclaimer(broker, reclaim_age)
|
||||
reclaimer.reclaim()
|
||||
self.assertEqual(0, self._get_reclaimable(broker, reclaim_age))
|
||||
tombstones = self._get_reclaimable(broker, totm + 1)
|
||||
self.assertEqual(140, tombstones)
|
||||
# in this scenario the reclaim phase finds the remaining tombstone
|
||||
# count (140)
|
||||
self.assertEqual(140, reclaimer.remaining_tombstones)
|
||||
self.assertEqual(140, reclaimer.get_tombstone_count())
|
||||
|
||||
def test_reclaim_get_tombstone_count_with_leftover(self):
|
||||
broker, totm, reclaim_age = self._setup_tombstones()
|
||||
with patch('swift.common.db.RECLAIM_PAGE_SIZE', 122):
|
||||
reclaimer = TombstoneReclaimer(broker, reclaim_age)
|
||||
reclaimer.reclaim()
|
||||
|
||||
self.assertEqual(0, self._get_reclaimable(broker, reclaim_age))
|
||||
tombstones = self._get_reclaimable(broker, totm + 1)
|
||||
self.assertEqual(140, tombstones)
|
||||
# in this scenario the reclaim phase finds a subset (104) of all
|
||||
# tombstones (140)
|
||||
self.assertEqual(104, reclaimer.remaining_tombstones)
|
||||
# get_tombstone_count finds the rest
|
||||
actual = reclaimer.get_tombstone_count()
|
||||
self.assertEqual(140, actual)
|
||||
|
||||
def test_get_tombstone_count_with_leftover(self):
|
||||
# verify that a call to get_tombstone_count() will invoke a reclaim if
|
||||
# reclaim not already invoked
|
||||
broker, totm, reclaim_age = self._setup_tombstones()
|
||||
with patch('swift.common.db.RECLAIM_PAGE_SIZE', 122):
|
||||
reclaimer = TombstoneReclaimer(broker, reclaim_age)
|
||||
actual = reclaimer.get_tombstone_count()
|
||||
|
||||
self.assertEqual(0, self._get_reclaimable(broker, reclaim_age))
|
||||
self.assertEqual(140, actual)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
@ -7846,7 +7846,7 @@ class TestShardRange(unittest.TestCase):
|
||||
meta_timestamp=ts_1.internal, deleted=0,
|
||||
state=utils.ShardRange.FOUND,
|
||||
state_timestamp=ts_1.internal, epoch=None,
|
||||
reported=0)
|
||||
reported=0, tombstones=-1)
|
||||
assert_initialisation_ok(dict(empty_run, name='a/c', timestamp=ts_1),
|
||||
expect)
|
||||
assert_initialisation_ok(dict(name='a/c', timestamp=ts_1), expect)
|
||||
@ -7856,17 +7856,18 @@ class TestShardRange(unittest.TestCase):
|
||||
meta_timestamp=ts_2, deleted=0,
|
||||
state=utils.ShardRange.CREATED,
|
||||
state_timestamp=ts_3.internal, epoch=ts_4,
|
||||
reported=0)
|
||||
reported=0, tombstones=11)
|
||||
expect.update({'lower': 'l', 'upper': 'u', 'object_count': 2,
|
||||
'bytes_used': 10, 'meta_timestamp': ts_2.internal,
|
||||
'state': utils.ShardRange.CREATED,
|
||||
'state_timestamp': ts_3.internal, 'epoch': ts_4,
|
||||
'reported': 0})
|
||||
'reported': 0, 'tombstones': 11})
|
||||
assert_initialisation_ok(good_run.copy(), expect)
|
||||
|
||||
# obj count and bytes used as int strings
|
||||
# obj count, tombstones and bytes used as int strings
|
||||
good_str_run = good_run.copy()
|
||||
good_str_run.update({'object_count': '2', 'bytes_used': '10'})
|
||||
good_str_run.update({'object_count': '2', 'bytes_used': '10',
|
||||
'tombstones': '11'})
|
||||
assert_initialisation_ok(good_str_run, expect)
|
||||
|
||||
good_no_meta = good_run.copy()
|
||||
@ -7922,7 +7923,7 @@ class TestShardRange(unittest.TestCase):
|
||||
'upper': upper, 'object_count': 10, 'bytes_used': 100,
|
||||
'meta_timestamp': ts_2.internal, 'deleted': 0,
|
||||
'state': utils.ShardRange.FOUND, 'state_timestamp': ts_3.internal,
|
||||
'epoch': ts_4, 'reported': 0}
|
||||
'epoch': ts_4, 'reported': 0, 'tombstones': -1}
|
||||
self.assertEqual(expected, sr_dict)
|
||||
self.assertIsInstance(sr_dict['lower'], six.string_types)
|
||||
self.assertIsInstance(sr_dict['upper'], six.string_types)
|
||||
@ -7937,9 +7938,9 @@ class TestShardRange(unittest.TestCase):
|
||||
for key in sr_dict:
|
||||
bad_dict = dict(sr_dict)
|
||||
bad_dict.pop(key)
|
||||
if key == 'reported':
|
||||
# This was added after the fact, and we need to be able to eat
|
||||
# data from old servers
|
||||
if key in ('reported', 'tombstones'):
|
||||
# These were added after the fact, and we need to be able to
|
||||
# eat data from old servers
|
||||
utils.ShardRange.from_dict(bad_dict)
|
||||
utils.ShardRange(**bad_dict)
|
||||
continue
|
||||
@ -8053,6 +8054,62 @@ class TestShardRange(unittest.TestCase):
|
||||
check_bad_args('bad', 10)
|
||||
check_bad_args(10, 'bad')
|
||||
|
||||
def test_update_tombstones(self):
|
||||
ts_1 = next(self.ts_iter)
|
||||
sr = utils.ShardRange('a/test', ts_1, 'l', 'u', 0, 0, None)
|
||||
self.assertEqual(-1, sr.tombstones)
|
||||
self.assertFalse(sr.reported)
|
||||
|
||||
with mock_timestamp_now(next(self.ts_iter)) as now:
|
||||
sr.update_tombstones(1)
|
||||
self.assertEqual(1, sr.tombstones)
|
||||
self.assertEqual(now, sr.meta_timestamp)
|
||||
self.assertFalse(sr.reported)
|
||||
|
||||
sr.reported = True
|
||||
with mock_timestamp_now(next(self.ts_iter)) as now:
|
||||
sr.update_tombstones(3, None)
|
||||
self.assertEqual(3, sr.tombstones)
|
||||
self.assertEqual(now, sr.meta_timestamp)
|
||||
self.assertFalse(sr.reported)
|
||||
|
||||
sr.reported = True
|
||||
ts_2 = next(self.ts_iter)
|
||||
sr.update_tombstones(5, ts_2)
|
||||
self.assertEqual(5, sr.tombstones)
|
||||
self.assertEqual(ts_2, sr.meta_timestamp)
|
||||
self.assertFalse(sr.reported)
|
||||
|
||||
# no change in value -> no change in reported
|
||||
sr.reported = True
|
||||
ts_3 = next(self.ts_iter)
|
||||
sr.update_tombstones(5, ts_3)
|
||||
self.assertEqual(5, sr.tombstones)
|
||||
self.assertEqual(ts_3, sr.meta_timestamp)
|
||||
self.assertTrue(sr.reported)
|
||||
|
||||
sr.update_meta('11', '12')
|
||||
self.assertEqual(11, sr.object_count)
|
||||
self.assertEqual(12, sr.bytes_used)
|
||||
|
||||
def check_bad_args(*args):
|
||||
with self.assertRaises(ValueError):
|
||||
sr.update_tombstones(*args)
|
||||
check_bad_args('bad')
|
||||
check_bad_args(10, 'bad')
|
||||
|
||||
def test_row_count(self):
|
||||
ts_1 = next(self.ts_iter)
|
||||
sr = utils.ShardRange('a/test', ts_1, 'l', 'u', 0, 0, None)
|
||||
self.assertEqual(0, sr.row_count)
|
||||
|
||||
sr.update_meta(11, 123)
|
||||
self.assertEqual(11, sr.row_count)
|
||||
sr.update_tombstones(13)
|
||||
self.assertEqual(24, sr.row_count)
|
||||
sr.update_meta(0, 0)
|
||||
self.assertEqual(13, sr.row_count)
|
||||
|
||||
def test_state_timestamp_setter(self):
|
||||
ts_1 = next(self.ts_iter)
|
||||
sr = utils.ShardRange('a/test', ts_1, 'l', 'u', 0, 0, None)
|
||||
@ -8662,9 +8719,9 @@ class TestShardRangeList(unittest.TestCase):
|
||||
self.ts_iter = make_timestamp_iter()
|
||||
self.shard_ranges = [
|
||||
utils.ShardRange('a/b', self.t1, 'a', 'b',
|
||||
object_count=2, bytes_used=22),
|
||||
object_count=2, bytes_used=22, tombstones=222),
|
||||
utils.ShardRange('b/c', self.t2, 'b', 'c',
|
||||
object_count=4, bytes_used=44),
|
||||
object_count=4, bytes_used=44, tombstones=444),
|
||||
utils.ShardRange('c/y', self.t1, 'c', 'y',
|
||||
object_count=6, bytes_used=66),
|
||||
]
|
||||
@ -8676,6 +8733,7 @@ class TestShardRangeList(unittest.TestCase):
|
||||
self.assertEqual(utils.ShardRange.MIN, srl.upper)
|
||||
self.assertEqual(0, srl.object_count)
|
||||
self.assertEqual(0, srl.bytes_used)
|
||||
self.assertEqual(0, srl.row_count)
|
||||
|
||||
def test_init_with_list(self):
|
||||
srl = ShardRangeList(self.shard_ranges[:2])
|
||||
@ -8684,6 +8742,7 @@ class TestShardRangeList(unittest.TestCase):
|
||||
self.assertEqual('c', srl.upper)
|
||||
self.assertEqual(6, srl.object_count)
|
||||
self.assertEqual(66, srl.bytes_used)
|
||||
self.assertEqual(672, srl.row_count)
|
||||
|
||||
srl.append(self.shard_ranges[2])
|
||||
self.assertEqual(3, len(srl))
|
||||
@ -8691,6 +8750,8 @@ class TestShardRangeList(unittest.TestCase):
|
||||
self.assertEqual('y', srl.upper)
|
||||
self.assertEqual(12, srl.object_count)
|
||||
self.assertEqual(132, srl.bytes_used)
|
||||
self.assertEqual(-1, self.shard_ranges[2].tombstones) # sanity check
|
||||
self.assertEqual(678, srl.row_count) # NB: tombstones=-1 not counted
|
||||
|
||||
def test_pop(self):
|
||||
srl = ShardRangeList(self.shard_ranges[:2])
|
||||
@ -8700,6 +8761,7 @@ class TestShardRangeList(unittest.TestCase):
|
||||
self.assertEqual('b', srl.upper)
|
||||
self.assertEqual(2, srl.object_count)
|
||||
self.assertEqual(22, srl.bytes_used)
|
||||
self.assertEqual(224, srl.row_count)
|
||||
|
||||
def test_slice(self):
|
||||
srl = ShardRangeList(self.shard_ranges)
|
||||
@ -8710,6 +8772,7 @@ class TestShardRangeList(unittest.TestCase):
|
||||
self.assertEqual('b', sublist.upper)
|
||||
self.assertEqual(2, sublist.object_count)
|
||||
self.assertEqual(22, sublist.bytes_used)
|
||||
self.assertEqual(224, sublist.row_count)
|
||||
|
||||
sublist = srl[1:]
|
||||
self.assertIsInstance(sublist, ShardRangeList)
|
||||
@ -8718,6 +8781,7 @@ class TestShardRangeList(unittest.TestCase):
|
||||
self.assertEqual('y', sublist.upper)
|
||||
self.assertEqual(10, sublist.object_count)
|
||||
self.assertEqual(110, sublist.bytes_used)
|
||||
self.assertEqual(454, sublist.row_count)
|
||||
|
||||
def test_includes(self):
|
||||
srl = ShardRangeList(self.shard_ranges)
|
||||
|
@ -36,7 +36,8 @@ from swift.common.exceptions import LockTimeout
|
||||
from swift.container.backend import ContainerBroker, \
|
||||
update_new_item_from_existing, UNSHARDED, SHARDING, SHARDED, \
|
||||
COLLAPSED, SHARD_LISTING_STATES, SHARD_UPDATE_STATES
|
||||
from swift.common.db import DatabaseAlreadyExists, GreenDBConnection
|
||||
from swift.common.db import DatabaseAlreadyExists, GreenDBConnection, \
|
||||
TombstoneReclaimer
|
||||
from swift.common.request_helpers import get_reserved_name
|
||||
from swift.common.utils import Timestamp, encode_timestamps, hash_path, \
|
||||
ShardRange, make_db_file_path, md5, ShardRangeList
|
||||
@ -715,15 +716,17 @@ class TestContainerBroker(unittest.TestCase):
|
||||
self.assertEqual(count_reclaimable(conn, reclaim_age),
|
||||
num_of_objects / 4)
|
||||
|
||||
orig__reclaim = broker._reclaim
|
||||
trace = []
|
||||
|
||||
def tracing_reclaim(conn, age_timestamp, marker):
|
||||
trace.append((age_timestamp, marker,
|
||||
count_reclaimable(conn, age_timestamp)))
|
||||
return orig__reclaim(conn, age_timestamp, marker)
|
||||
class TracingReclaimer(TombstoneReclaimer):
|
||||
def _reclaim(self, conn):
|
||||
trace.append(
|
||||
(self.age_timestamp, self.marker,
|
||||
count_reclaimable(conn, self.age_timestamp)))
|
||||
return super(TracingReclaimer, self)._reclaim(conn)
|
||||
|
||||
with mock.patch.object(broker, '_reclaim', new=tracing_reclaim), \
|
||||
with mock.patch(
|
||||
'swift.common.db.TombstoneReclaimer', TracingReclaimer), \
|
||||
mock.patch('swift.common.db.RECLAIM_PAGE_SIZE', 10):
|
||||
broker.reclaim(reclaim_age, reclaim_age)
|
||||
|
||||
@ -856,7 +859,8 @@ class TestContainerBroker(unittest.TestCase):
|
||||
TestContainerBrokerBeforeXSync,
|
||||
TestContainerBrokerBeforeSPI,
|
||||
TestContainerBrokerBeforeShardRanges,
|
||||
TestContainerBrokerBeforeShardRangeReportedColumn):
|
||||
TestContainerBrokerBeforeShardRangeReportedColumn,
|
||||
TestContainerBrokerBeforeShardRangeTombstonesColumn):
|
||||
self.assertEqual(info['status_changed_at'], '0')
|
||||
else:
|
||||
self.assertEqual(info['status_changed_at'],
|
||||
@ -2210,7 +2214,8 @@ class TestContainerBroker(unittest.TestCase):
|
||||
TestContainerBrokerBeforeXSync,
|
||||
TestContainerBrokerBeforeSPI,
|
||||
TestContainerBrokerBeforeShardRanges,
|
||||
TestContainerBrokerBeforeShardRangeReportedColumn):
|
||||
TestContainerBrokerBeforeShardRangeReportedColumn,
|
||||
TestContainerBrokerBeforeShardRangeTombstonesColumn):
|
||||
self.assertEqual(info['status_changed_at'], '0')
|
||||
else:
|
||||
self.assertEqual(info['status_changed_at'],
|
||||
@ -3509,7 +3514,8 @@ class TestContainerBroker(unittest.TestCase):
|
||||
TestContainerBrokerBeforeXSync,
|
||||
TestContainerBrokerBeforeSPI,
|
||||
TestContainerBrokerBeforeShardRanges,
|
||||
TestContainerBrokerBeforeShardRangeReportedColumn):
|
||||
TestContainerBrokerBeforeShardRangeReportedColumn,
|
||||
TestContainerBrokerBeforeShardRangeTombstonesColumn):
|
||||
self.assertEqual(info['status_changed_at'], '0')
|
||||
else:
|
||||
self.assertEqual(timestamp.internal, info['status_changed_at'])
|
||||
@ -4843,7 +4849,7 @@ class TestContainerBroker(unittest.TestCase):
|
||||
|
||||
@with_tempdir
|
||||
def test_merge_shard_ranges(self, tempdir):
|
||||
ts = [next(self.ts) for _ in range(14)]
|
||||
ts = [next(self.ts) for _ in range(16)]
|
||||
db_path = os.path.join(
|
||||
tempdir, 'containers', 'part', 'suffix', 'hash', 'container.db')
|
||||
broker = ContainerBroker(
|
||||
@ -4947,6 +4953,20 @@ class TestContainerBroker(unittest.TestCase):
|
||||
broker.merge_shard_ranges(ShardRangeList([sr_c_13, sr_b_13]))
|
||||
self._assert_shard_ranges(
|
||||
broker, [sr_b_13, sr_c_13])
|
||||
# merge with tombstones but same meta_timestamp
|
||||
sr_c_13_tombs = ShardRange('a/c_c', ts[13], lower='b', upper='c',
|
||||
object_count=10, meta_timestamp=ts[13],
|
||||
tombstones=999)
|
||||
broker.merge_shard_ranges(sr_c_13_tombs)
|
||||
self._assert_shard_ranges(
|
||||
broker, [sr_b_13, sr_c_13])
|
||||
# merge with tombstones at newer meta_timestamp
|
||||
sr_c_13_tombs = ShardRange('a/c_c', ts[13], lower='b', upper='c',
|
||||
object_count=1, meta_timestamp=ts[14],
|
||||
tombstones=999)
|
||||
broker.merge_shard_ranges(sr_c_13_tombs)
|
||||
self._assert_shard_ranges(
|
||||
broker, [sr_b_13, sr_c_13_tombs])
|
||||
|
||||
@with_tempdir
|
||||
def test_merge_shard_ranges_state(self, tempdir):
|
||||
@ -5670,7 +5690,7 @@ class TestContainerBrokerBeforeShardRangeReportedColumn(
|
||||
ContainerBrokerMigrationMixin, TestContainerBroker):
|
||||
"""
|
||||
Tests for ContainerBroker against databases created
|
||||
before the shard_ranges table was added.
|
||||
before the shard_ranges table reported column was added.
|
||||
"""
|
||||
# *grumble grumble* This should include container_info/policy_stat :-/
|
||||
expected_db_tables = {'outgoing_sync', 'incoming_sync', 'object',
|
||||
@ -5699,6 +5719,234 @@ class TestContainerBrokerBeforeShardRangeReportedColumn(
|
||||
conn.execute('''SELECT reported
|
||||
FROM shard_range''')
|
||||
|
||||
@with_tempdir
|
||||
def test_get_shard_ranges_attempts(self, tempdir):
|
||||
# verify that old broker handles new sql query for shard range rows
|
||||
db_path = os.path.join(tempdir, 'container.db')
|
||||
broker = ContainerBroker(db_path, account='a', container='c')
|
||||
broker.initialize(next(self.ts).internal, 0)
|
||||
|
||||
@contextmanager
|
||||
def patch_execute():
|
||||
with broker.get() as conn:
|
||||
mock_conn = mock.MagicMock()
|
||||
mock_execute = mock.MagicMock()
|
||||
mock_conn.execute = mock_execute
|
||||
|
||||
@contextmanager
|
||||
def mock_get():
|
||||
yield mock_conn
|
||||
|
||||
with mock.patch.object(broker, 'get', mock_get):
|
||||
yield mock_execute, conn
|
||||
|
||||
with patch_execute() as (mock_execute, conn):
|
||||
mock_execute.side_effect = conn.execute
|
||||
broker.get_shard_ranges()
|
||||
|
||||
expected = [
|
||||
mock.call('\n SELECT name, timestamp, lower, upper, '
|
||||
'object_count, bytes_used, meta_timestamp, deleted, '
|
||||
'state, state_timestamp, epoch, reported, '
|
||||
'tombstones\n '
|
||||
'FROM shard_range WHERE deleted=0 AND name != ?;\n'
|
||||
' ', ['a/c']),
|
||||
mock.call('\n SELECT name, timestamp, lower, upper, '
|
||||
'object_count, bytes_used, meta_timestamp, deleted, '
|
||||
'state, state_timestamp, epoch, 0 as reported, '
|
||||
'tombstones\n '
|
||||
'FROM shard_range WHERE deleted=0 AND name != ?;\n'
|
||||
' ', ['a/c']),
|
||||
mock.call('\n SELECT name, timestamp, lower, upper, '
|
||||
'object_count, bytes_used, meta_timestamp, deleted, '
|
||||
'state, state_timestamp, epoch, 0 as reported, '
|
||||
'-1 as tombstones\n '
|
||||
'FROM shard_range WHERE deleted=0 AND name != ?;\n'
|
||||
' ', ['a/c']),
|
||||
]
|
||||
|
||||
self.assertEqual(expected, mock_execute.call_args_list,
|
||||
mock_execute.call_args_list)
|
||||
|
||||
# if unexpectedly the call to execute continues to fail for reported,
|
||||
# verify that the exception is raised after a retry
|
||||
with patch_execute() as (mock_execute, conn):
|
||||
def mock_execute_handler(*args, **kwargs):
|
||||
if len(mock_execute.call_args_list) < 3:
|
||||
return conn.execute(*args, **kwargs)
|
||||
else:
|
||||
raise sqlite3.OperationalError('no such column: reported')
|
||||
mock_execute.side_effect = mock_execute_handler
|
||||
with self.assertRaises(sqlite3.OperationalError):
|
||||
broker.get_shard_ranges()
|
||||
self.assertEqual(expected, mock_execute.call_args_list,
|
||||
mock_execute.call_args_list)
|
||||
|
||||
# if unexpectedly the call to execute continues to fail for tombstones,
|
||||
# verify that the exception is raised after a retry
|
||||
with patch_execute() as (mock_execute, conn):
|
||||
def mock_execute_handler(*args, **kwargs):
|
||||
if len(mock_execute.call_args_list) < 3:
|
||||
return conn.execute(*args, **kwargs)
|
||||
else:
|
||||
raise sqlite3.OperationalError(
|
||||
'no such column: tombstones')
|
||||
mock_execute.side_effect = mock_execute_handler
|
||||
with self.assertRaises(sqlite3.OperationalError):
|
||||
broker.get_shard_ranges()
|
||||
self.assertEqual(expected, mock_execute.call_args_list,
|
||||
mock_execute.call_args_list)
|
||||
|
||||
@with_tempdir
|
||||
def test_merge_shard_ranges_migrates_table(self, tempdir):
|
||||
# verify that old broker migrates shard range table
|
||||
db_path = os.path.join(tempdir, 'container.db')
|
||||
broker = ContainerBroker(db_path, account='a', container='c')
|
||||
broker.initialize(next(self.ts).internal, 0)
|
||||
shard_ranges = [ShardRange('.shards_a/c_0', next(self.ts), 'a', 'b'),
|
||||
ShardRange('.shards_a/c_1', next(self.ts), 'b', 'c')]
|
||||
|
||||
orig_migrate_reported = broker._migrate_add_shard_range_reported
|
||||
orig_migrate_tombstones = broker._migrate_add_shard_range_tombstones
|
||||
|
||||
with mock.patch.object(
|
||||
broker, '_migrate_add_shard_range_reported',
|
||||
side_effect=orig_migrate_reported) as mocked_reported:
|
||||
with mock.patch.object(
|
||||
broker, '_migrate_add_shard_range_tombstones',
|
||||
side_effect=orig_migrate_tombstones) as mocked_tombstones:
|
||||
broker.merge_shard_ranges(shard_ranges[:1])
|
||||
|
||||
mocked_reported.assert_called_once_with(mock.ANY)
|
||||
mocked_tombstones.assert_called_once_with(mock.ANY)
|
||||
self._assert_shard_ranges(broker, shard_ranges[:1])
|
||||
|
||||
with mock.patch.object(
|
||||
broker, '_migrate_add_shard_range_reported',
|
||||
side_effect=orig_migrate_reported) as mocked_reported:
|
||||
with mock.patch.object(
|
||||
broker, '_migrate_add_shard_range_tombstones',
|
||||
side_effect=orig_migrate_tombstones) as mocked_tombstones:
|
||||
broker.merge_shard_ranges(shard_ranges[1:])
|
||||
|
||||
mocked_reported.assert_not_called()
|
||||
mocked_tombstones.assert_not_called()
|
||||
self._assert_shard_ranges(broker, shard_ranges)
|
||||
|
||||
@with_tempdir
|
||||
def test_merge_shard_ranges_fails_to_migrate_table(self, tempdir):
|
||||
# verify that old broker will raise exception if it unexpectedly fails
|
||||
# to migrate shard range table
|
||||
db_path = os.path.join(tempdir, 'container.db')
|
||||
broker = ContainerBroker(db_path, account='a', container='c')
|
||||
broker.initialize(next(self.ts).internal, 0)
|
||||
shard_ranges = [ShardRange('.shards_a/c_0', next(self.ts), 'a', 'b'),
|
||||
ShardRange('.shards_a/c_1', next(self.ts), 'b', 'c')]
|
||||
|
||||
# unexpected error during migration
|
||||
with mock.patch.object(
|
||||
broker, '_migrate_add_shard_range_reported',
|
||||
side_effect=sqlite3.OperationalError('unexpected')) \
|
||||
as mocked_reported:
|
||||
with self.assertRaises(sqlite3.OperationalError):
|
||||
broker.merge_shard_ranges(shard_ranges)
|
||||
|
||||
# one failed attempt was made to add reported column
|
||||
self.assertEqual(1, mocked_reported.call_count)
|
||||
|
||||
# migration silently fails
|
||||
with mock.patch.object(
|
||||
broker, '_migrate_add_shard_range_reported') \
|
||||
as mocked_reported:
|
||||
with self.assertRaises(sqlite3.OperationalError):
|
||||
broker.merge_shard_ranges(shard_ranges)
|
||||
|
||||
# one failed attempt was made to add reported column
|
||||
self.assertEqual(1, mocked_reported.call_count)
|
||||
|
||||
with mock.patch.object(
|
||||
broker, '_migrate_add_shard_range_tombstones') \
|
||||
as mocked_tombstones:
|
||||
with self.assertRaises(sqlite3.OperationalError):
|
||||
broker.merge_shard_ranges(shard_ranges)
|
||||
|
||||
# first migration adds reported column
|
||||
# one failed attempt was made to add tombstones column
|
||||
self.assertEqual(1, mocked_tombstones.call_count)
|
||||
|
||||
|
||||
def pre_tombstones_create_shard_range_table(self, conn):
|
||||
"""
|
||||
Copied from ContainerBroker before the
|
||||
tombstones column was added; used for testing with
|
||||
TestContainerBrokerBeforeShardRangeTombstonesColumn.
|
||||
|
||||
Create a shard_range table with no 'tombstones' column.
|
||||
|
||||
:param conn: DB connection object
|
||||
"""
|
||||
# Use execute (not executescript) so we get the benefits of our
|
||||
# GreenDBConnection. Creating a table requires a whole-DB lock;
|
||||
# *any* in-progress cursor will otherwise trip a "database is locked"
|
||||
# error.
|
||||
conn.execute("""
|
||||
CREATE TABLE shard_range (
|
||||
ROWID INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
name TEXT,
|
||||
timestamp TEXT,
|
||||
lower TEXT,
|
||||
upper TEXT,
|
||||
object_count INTEGER DEFAULT 0,
|
||||
bytes_used INTEGER DEFAULT 0,
|
||||
meta_timestamp TEXT,
|
||||
deleted INTEGER DEFAULT 0,
|
||||
state INTEGER,
|
||||
state_timestamp TEXT,
|
||||
epoch TEXT,
|
||||
reported INTEGER DEFAULT 0
|
||||
);
|
||||
""")
|
||||
|
||||
conn.execute("""
|
||||
CREATE TRIGGER shard_range_update BEFORE UPDATE ON shard_range
|
||||
BEGIN
|
||||
SELECT RAISE(FAIL, 'UPDATE not allowed; DELETE and INSERT');
|
||||
END;
|
||||
""")
|
||||
|
||||
|
||||
class TestContainerBrokerBeforeShardRangeTombstonesColumn(
|
||||
ContainerBrokerMigrationMixin, TestContainerBroker):
|
||||
"""
|
||||
Tests for ContainerBroker against databases created
|
||||
before the shard_ranges table tombstones column was added.
|
||||
"""
|
||||
expected_db_tables = {'outgoing_sync', 'incoming_sync', 'object',
|
||||
'sqlite_sequence', 'container_stat', 'shard_range'}
|
||||
|
||||
def setUp(self):
|
||||
super(TestContainerBrokerBeforeShardRangeTombstonesColumn,
|
||||
self).setUp()
|
||||
ContainerBroker.create_shard_range_table = \
|
||||
pre_tombstones_create_shard_range_table
|
||||
|
||||
broker = ContainerBroker(':memory:', account='a', container='c')
|
||||
broker.initialize(Timestamp('1').internal, 0)
|
||||
with self.assertRaises(sqlite3.DatabaseError) as raised, \
|
||||
broker.get() as conn:
|
||||
conn.execute('''SELECT tombstones
|
||||
FROM shard_range''')
|
||||
self.assertIn('no such column: tombstones', str(raised.exception))
|
||||
|
||||
def tearDown(self):
|
||||
super(TestContainerBrokerBeforeShardRangeTombstonesColumn,
|
||||
self).tearDown()
|
||||
broker = ContainerBroker(':memory:', account='a', container='c')
|
||||
broker.initialize(Timestamp('1').internal, 0)
|
||||
with broker.get() as conn:
|
||||
conn.execute('''SELECT tombstones
|
||||
FROM shard_range''')
|
||||
|
||||
|
||||
class TestUpdateNewItemFromExisting(unittest.TestCase):
|
||||
# TODO: add test scenarios that have swift_bytes in content_type
|
||||
|
@ -112,13 +112,13 @@ class BaseTestSharder(unittest.TestCase):
|
||||
return broker
|
||||
|
||||
def _make_shard_ranges(self, bounds, state=None, object_count=0,
|
||||
timestamp=Timestamp.now()):
|
||||
timestamp=Timestamp.now(), **kwargs):
|
||||
if not isinstance(state, (tuple, list)):
|
||||
state = [state] * len(bounds)
|
||||
state_iter = iter(state)
|
||||
return [ShardRange('.shards_a/c_%s' % upper, timestamp,
|
||||
lower, upper, state=next(state_iter),
|
||||
object_count=object_count)
|
||||
object_count=object_count, **kwargs)
|
||||
for lower, upper in bounds]
|
||||
|
||||
def ts_encoded(self):
|
||||
@ -4628,10 +4628,39 @@ class TestSharder(BaseTestSharder):
|
||||
bytes_used=own_shard_range.bytes_used + 1)]
|
||||
self.check_shard_ranges_sent(broker, expected_sent)
|
||||
|
||||
# initialise tombstones
|
||||
with mock_timestamp_now(next(self.ts_iter)):
|
||||
own_shard_range = broker.get_own_shard_range()
|
||||
own_shard_range.update_tombstones(0)
|
||||
broker.merge_shard_ranges([own_shard_range])
|
||||
|
||||
for state in ShardRange.STATES:
|
||||
with annotate_failure(state):
|
||||
check_only_own_shard_range_sent(state)
|
||||
|
||||
def check_tombstones_sent(state):
|
||||
own_shard_range = broker.get_own_shard_range()
|
||||
self.assertTrue(own_shard_range.update_state(
|
||||
state, state_timestamp=next(self.ts_iter)))
|
||||
broker.merge_shard_ranges([own_shard_range])
|
||||
# delete an object, expect to see it reflected in the own shard
|
||||
# range that is sent
|
||||
broker.delete_object(str(own_shard_range.object_count),
|
||||
next(self.ts_iter).internal)
|
||||
with mock_timestamp_now() as now:
|
||||
# force own shard range meta updates to be at fixed timestamp
|
||||
expected_sent = [
|
||||
dict(own_shard_range,
|
||||
meta_timestamp=now.internal,
|
||||
object_count=own_shard_range.object_count - 1,
|
||||
bytes_used=own_shard_range.bytes_used - 1,
|
||||
tombstones=own_shard_range.tombstones + 1)]
|
||||
self.check_shard_ranges_sent(broker, expected_sent)
|
||||
|
||||
for state in ShardRange.STATES:
|
||||
with annotate_failure(state):
|
||||
check_tombstones_sent(state)
|
||||
|
||||
def test_update_root_container_already_reported(self):
|
||||
broker = self._make_broker()
|
||||
|
||||
@ -4654,6 +4683,12 @@ class TestSharder(BaseTestSharder):
|
||||
sharder._update_root_container(broker)
|
||||
self.assertFalse(mock_conn.requests)
|
||||
|
||||
# initialise tombstones
|
||||
with mock_timestamp_now(next(self.ts_iter)):
|
||||
own_shard_range = broker.get_own_shard_range()
|
||||
own_shard_range.update_tombstones(0)
|
||||
broker.merge_shard_ranges([own_shard_range])
|
||||
|
||||
for state in ShardRange.STATES:
|
||||
with annotate_failure(state):
|
||||
check_already_reported_not_sent(state)
|
||||
@ -4685,7 +4720,8 @@ class TestSharder(BaseTestSharder):
|
||||
own_shard_range.copy(
|
||||
meta_timestamp=now.internal,
|
||||
object_count=own_shard_range.object_count + 1,
|
||||
bytes_used=own_shard_range.bytes_used + 1)] +
|
||||
bytes_used=own_shard_range.bytes_used + 1,
|
||||
tombstones=0)] +
|
||||
shard_ranges,
|
||||
key=lambda sr: (sr.upper, sr.state, sr.lower))
|
||||
self.check_shard_ranges_sent(
|
||||
@ -5360,8 +5396,11 @@ class TestSharder(BaseTestSharder):
|
||||
shard_bounds = (('', 'here'), ('here', 'there'), ('there', ''))
|
||||
size = (DEFAULT_SHARD_SHRINK_POINT *
|
||||
DEFAULT_SHARD_CONTAINER_THRESHOLD / 100)
|
||||
|
||||
# all shard ranges too big to shrink
|
||||
shard_ranges = self._make_shard_ranges(
|
||||
shard_bounds, state=ShardRange.ACTIVE, object_count=size)
|
||||
shard_bounds, state=ShardRange.ACTIVE, object_count=size - 1,
|
||||
tombstones=1)
|
||||
own_sr = broker.get_own_shard_range()
|
||||
own_sr.update_state(ShardRange.SHARDED, Timestamp.now())
|
||||
broker.merge_shard_ranges(shard_ranges + [own_sr])
|
||||
@ -5373,7 +5412,7 @@ class TestSharder(BaseTestSharder):
|
||||
broker.get_shard_ranges())
|
||||
|
||||
# one range just below threshold
|
||||
shard_ranges[0].update_meta(size - 1, 0)
|
||||
shard_ranges[0].update_meta(size - 2, 0)
|
||||
broker.merge_shard_ranges(shard_ranges[0])
|
||||
with self._mock_sharder() as sharder:
|
||||
with mock_timestamp_now() as now:
|
||||
@ -6732,13 +6771,8 @@ class TestSharderFunctions(BaseTestSharder):
|
||||
sequences = find_compactible_shard_sequences(broker, 10, 999, 3, 3)
|
||||
self.assertEqual([shard_ranges[8:]], sequences)
|
||||
|
||||
def test_find_compactible_shrink_threshold(self):
|
||||
def _do_test_find_compactible_shrink_threshold(self, broker, shard_ranges):
|
||||
# verify option to set the shrink threshold for compaction;
|
||||
broker = self._make_broker()
|
||||
shard_ranges = self._make_shard_ranges(
|
||||
(('', 'b'), ('b', 'c'), ('c', 'd'), ('d', 'e'), ('e', 'f'),
|
||||
('f', 'g'), ('g', 'h'), ('h', 'i'), ('i', 'j'), ('j', '')),
|
||||
state=ShardRange.ACTIVE, object_count=10)
|
||||
# (n-2)th shard range has one extra object
|
||||
shard_ranges[-2].object_count = 11
|
||||
broker.merge_shard_ranges(shard_ranges)
|
||||
@ -6750,13 +6784,24 @@ class TestSharderFunctions(BaseTestSharder):
|
||||
sequences = find_compactible_shard_sequences(broker, 11, 999, -1, -1)
|
||||
self.assertEqual([shard_ranges[:9]], sequences)
|
||||
|
||||
def test_find_compactible_expansion_limit(self):
|
||||
# verify option to limit the size of each acceptor after compaction
|
||||
def test_find_compactible_shrink_threshold(self):
|
||||
broker = self._make_broker()
|
||||
shard_ranges = self._make_shard_ranges(
|
||||
(('', 'b'), ('b', 'c'), ('c', 'd'), ('d', 'e'), ('e', 'f'),
|
||||
('f', 'g'), ('g', 'h'), ('h', 'i'), ('i', 'j'), ('j', '')),
|
||||
state=ShardRange.ACTIVE, object_count=6)
|
||||
state=ShardRange.ACTIVE, object_count=10)
|
||||
self._do_test_find_compactible_shrink_threshold(broker, shard_ranges)
|
||||
|
||||
def test_find_compactible_shrink_threshold_with_tombstones(self):
|
||||
broker = self._make_broker()
|
||||
shard_ranges = self._make_shard_ranges(
|
||||
(('', 'b'), ('b', 'c'), ('c', 'd'), ('d', 'e'), ('e', 'f'),
|
||||
('f', 'g'), ('g', 'h'), ('h', 'i'), ('i', 'j'), ('j', '')),
|
||||
state=ShardRange.ACTIVE, object_count=7, tombstones=3)
|
||||
self._do_test_find_compactible_shrink_threshold(broker, shard_ranges)
|
||||
|
||||
def _do_test_find_compactible_expansion_limit(self, broker, shard_ranges):
|
||||
# verify option to limit the size of each acceptor after compaction
|
||||
broker.merge_shard_ranges(shard_ranges)
|
||||
sequences = find_compactible_shard_sequences(broker, 10, 33, -1, -1)
|
||||
self.assertEqual([shard_ranges[:5], shard_ranges[5:]], sequences)
|
||||
@ -6766,11 +6811,30 @@ class TestSharderFunctions(BaseTestSharder):
|
||||
sequences = find_compactible_shard_sequences(broker, 10, 33, -1, -1)
|
||||
self.assertEqual([shard_ranges[:4], shard_ranges[7:]], sequences)
|
||||
|
||||
def test_find_compactible_expansion_limit(self):
|
||||
# verify option to limit the size of each acceptor after compaction
|
||||
broker = self._make_broker()
|
||||
shard_ranges = self._make_shard_ranges(
|
||||
(('', 'b'), ('b', 'c'), ('c', 'd'), ('d', 'e'), ('e', 'f'),
|
||||
('f', 'g'), ('g', 'h'), ('h', 'i'), ('i', 'j'), ('j', '')),
|
||||
state=ShardRange.ACTIVE, object_count=6)
|
||||
self._do_test_find_compactible_expansion_limit(broker, shard_ranges)
|
||||
|
||||
def test_find_compactible_expansion_limit_with_tombstones(self):
|
||||
# verify option to limit the size of each acceptor after compaction
|
||||
broker = self._make_broker()
|
||||
shard_ranges = self._make_shard_ranges(
|
||||
(('', 'b'), ('b', 'c'), ('c', 'd'), ('d', 'e'), ('e', 'f'),
|
||||
('f', 'g'), ('g', 'h'), ('h', 'i'), ('i', 'j'), ('j', '')),
|
||||
state=ShardRange.ACTIVE, object_count=1, tombstones=5)
|
||||
self._do_test_find_compactible_expansion_limit(broker, shard_ranges)
|
||||
|
||||
def test_is_sharding_candidate(self):
|
||||
for state in ShardRange.STATES:
|
||||
for object_count in (9, 10, 11):
|
||||
sr = ShardRange('.shards_a/c', next(self.ts_iter), '', '',
|
||||
state=state, object_count=object_count)
|
||||
state=state, object_count=object_count,
|
||||
tombstones=100) # tombstones not considered
|
||||
with annotate_failure('%s %s' % (state, object_count)):
|
||||
if state == ShardRange.ACTIVE and object_count >= 10:
|
||||
self.assertTrue(is_sharding_candidate(sr, 10))
|
||||
@ -6783,6 +6847,10 @@ class TestSharderFunctions(BaseTestSharder):
|
||||
sr = ShardRange('.shards_a/c', next(self.ts_iter), '', '',
|
||||
state=state, object_count=9)
|
||||
self.assertTrue(is_shrinking_candidate(sr, 10, 9, ok_states))
|
||||
# shard range has 9 rows
|
||||
sr = ShardRange('.shards_a/c', next(self.ts_iter), '', '',
|
||||
state=state, object_count=4, tombstones=5)
|
||||
self.assertTrue(is_shrinking_candidate(sr, 10, 9, ok_states))
|
||||
|
||||
do_check_true(ShardRange.ACTIVE, (ShardRange.ACTIVE,))
|
||||
do_check_true(ShardRange.ACTIVE,
|
||||
@ -6790,11 +6858,12 @@ class TestSharderFunctions(BaseTestSharder):
|
||||
do_check_true(ShardRange.SHRINKING,
|
||||
(ShardRange.ACTIVE, ShardRange.SHRINKING))
|
||||
|
||||
def do_check_false(state, object_count):
|
||||
def do_check_false(state, object_count, tombstones):
|
||||
states = (ShardRange.ACTIVE, ShardRange.SHRINKING)
|
||||
# shard range has 10 objects
|
||||
sr = ShardRange('.shards_a/c', next(self.ts_iter), '', '',
|
||||
state=state, object_count=object_count)
|
||||
state=state, object_count=object_count,
|
||||
tombstones=tombstones)
|
||||
self.assertFalse(is_shrinking_candidate(sr, 10, 20))
|
||||
self.assertFalse(is_shrinking_candidate(sr, 10, 20, states))
|
||||
self.assertFalse(is_shrinking_candidate(sr, 10, 9))
|
||||
@ -6805,7 +6874,13 @@ class TestSharderFunctions(BaseTestSharder):
|
||||
for state in ShardRange.STATES:
|
||||
for object_count in (10, 11):
|
||||
with annotate_failure('%s %s' % (state, object_count)):
|
||||
do_check_false(state, object_count)
|
||||
do_check_false(state, object_count, 0)
|
||||
for tombstones in (10, 11):
|
||||
with annotate_failure('%s %s' % (state, tombstones)):
|
||||
do_check_false(state, 0, tombstones)
|
||||
for tombstones in (5, 6):
|
||||
with annotate_failure('%s %s' % (state, tombstones)):
|
||||
do_check_false(state, 5, tombstones)
|
||||
|
||||
def test_find_and_rank_whole_path_split(self):
|
||||
ts_0 = next(self.ts_iter)
|
||||
|
Loading…
Reference in New Issue
Block a user