Merge "swift-manage-shard-ranges: add 'merge' subcommand"

This commit is contained in:
Zuul 2022-09-09 05:45:58 +00:00 committed by Gerrit Code Review
commit be88ab7670
4 changed files with 414 additions and 21 deletions

View File

@ -168,7 +168,8 @@ from six.moves import input
from swift.common.utils import Timestamp, get_logger, ShardRange, readconf, \
ShardRangeList, non_negative_int, config_positive_int_value
from swift.container.backend import ContainerBroker, UNSHARDED
from swift.container.backend import ContainerBroker, UNSHARDED, \
sift_shard_ranges
from swift.container.sharder import make_shard_ranges, sharding_enabled, \
CleavingContext, process_compactible_shard_sequences, \
find_compactible_shard_sequences, find_overlapping_ranges, \
@ -427,6 +428,61 @@ def delete_shard_ranges(broker, args):
return EXIT_SUCCESS
def combine_shard_ranges(new_shard_ranges, existing_shard_ranges):
"""
Combines new and existing shard ranges based on most recent state.
:param new_shard_ranges: a list of ShardRange instances.
:param existing_shard_ranges: a list of ShardRange instances.
:return: a list of ShardRange instances.
"""
new_shard_ranges = [dict(sr) for sr in new_shard_ranges]
existing_shard_ranges = [dict(sr) for sr in existing_shard_ranges]
to_add, to_delete = sift_shard_ranges(
new_shard_ranges,
dict((sr['name'], sr) for sr in existing_shard_ranges))
result = [ShardRange.from_dict(existing)
for existing in existing_shard_ranges
if existing['name'] not in to_delete]
result.extend([ShardRange.from_dict(sr) for sr in to_add])
return sorted([sr for sr in result if not sr.deleted],
key=ShardRange.sort_key)
def merge_shard_ranges(broker, args):
_check_own_shard_range(broker, args)
shard_data = _load_and_validate_shard_data(args, require_index=False)
new_shard_ranges = ShardRangeList([ShardRange.from_dict(sr)
for sr in shard_data])
new_shard_ranges.sort(key=ShardRange.sort_key)
# do some checks before merging...
existing_shard_ranges = ShardRangeList(
broker.get_shard_ranges(include_deleted=True))
outcome = combine_shard_ranges(new_shard_ranges, existing_shard_ranges)
if args.verbose:
print('This change will result in the following shard ranges in the '
'affected namespace:')
print(json.dumps([dict(sr) for sr in outcome], indent=2))
overlaps = find_overlapping_ranges(outcome)
if overlaps:
print('WARNING: this change will result in shard ranges overlaps!')
paths_with_gaps = find_paths_with_gaps(outcome)
gaps = [gap for start_path, gap, end_path in paths_with_gaps
if existing_shard_ranges.includes(gap)]
if gaps:
print('WARNING: this change will result in shard ranges gaps!')
if not _proceed(args):
return EXIT_USER_QUIT
with broker.updated_timeout(args.replace_timeout):
broker.merge_shard_ranges(new_shard_ranges)
print('Injected %d shard ranges.' % len(new_shard_ranges))
print('Run container-replicator to replicate them to other nodes.')
return EXIT_SUCCESS
def _replace_shard_ranges(broker, args, shard_data, timeout=0):
own_shard_range = _check_own_shard_range(broker, args)
shard_ranges = make_shard_ranges(
@ -957,6 +1013,22 @@ def _make_parser():
'info', help='Print container db info')
info_parser.set_defaults(func=db_info)
# merge
merge_parser = subparsers.add_parser(
'merge',
help='Merge shard range(s) from file with existing shard ranges. This '
'subcommand should only be used if you are confident that you '
'know what you are doing. Shard ranges should not typically be '
'modified in this way.')
merge_parser.add_argument('input', metavar='input_file',
type=str, help='Name of file')
merge_parser.add_argument(
'--replace-timeout', type=int, default=600,
help='Minimum DB timeout to use when merging shard ranges.')
_add_account_prefix_arg(merge_parser)
_add_prompt_args(merge_parser)
merge_parser.set_defaults(func=merge_shard_ranges)
# replace
replace_parser = subparsers.add_parser(
'replace',

View File

@ -315,6 +315,38 @@ def merge_shards(shard_data, existing):
return new_content
def sift_shard_ranges(new_shard_ranges, existing_shard_ranges):
"""
Compares new and existing shard ranges, updating the new shard ranges with
any more recent state from the existing, and returns shard ranges sorted
into those that need adding because they contain new or updated state and
those that need deleting because their state has been superseded.
:param new_shard_ranges: a list of dicts, each of which represents a shard
range.
:param existing_shard_ranges: a dict mapping shard range names to dicts
representing a shard range.
:return: a tuple (to_add, to_delete); to_add is a list of dicts, each of
which represents a shard range that is to be added to the existing
shard ranges; to_delete is a set of shard range names that are to be
deleted.
"""
to_delete = set()
to_add = {}
for item in new_shard_ranges:
item_ident = item['name']
existing = existing_shard_ranges.get(item_ident)
if merge_shards(item, existing):
# exists with older timestamp
if item_ident in existing_shard_ranges:
to_delete.add(item_ident)
# duplicate entries in item_list
if (item_ident not in to_add or
merge_shards(item, to_add[item_ident])):
to_add[item_ident] = item
return to_add.values(), to_delete
class ContainerBroker(DatabaseBroker):
"""
Encapsulates working with a container database.
@ -1421,28 +1453,14 @@ class ContainerBroker(DatabaseBroker):
chunk = [record['name'] for record
in item_list[offset:offset + SQLITE_ARG_LIMIT]]
records.update(
(rec[0], rec) for rec in curs.execute(
(rec[0], dict(zip(SHARD_RANGE_KEYS, rec)))
for rec in curs.execute(
'SELECT %s FROM %s '
'WHERE deleted IN (0, 1) AND name IN (%s)' %
(', '.join(SHARD_RANGE_KEYS), SHARD_RANGE_TABLE,
','.join('?' * len(chunk))), chunk))
# Sort item_list into things that need adding and deleting
to_delete = set()
to_add = {}
for item in item_list:
item_ident = item['name']
existing = records.get(item_ident)
if existing:
existing = dict(zip(SHARD_RANGE_KEYS, existing))
if merge_shards(item, existing):
# exists with older timestamp
if item_ident in records:
to_delete.add(item_ident)
# duplicate entries in item_list
if (item_ident not in to_add or
merge_shards(item, to_add[item_ident])):
to_add[item_ident] = item
to_add, to_delete = sift_shard_ranges(item_list, records)
if to_delete:
curs.executemany(
@ -1455,7 +1473,7 @@ class ContainerBroker(DatabaseBroker):
'INSERT INTO %s (%s) VALUES (%s)' %
(SHARD_RANGE_TABLE, ','.join(SHARD_RANGE_KEYS), vals),
tuple([item[k] for k in SHARD_RANGE_KEYS]
for item in to_add.values()))
for item in to_add))
conn.commit()
migrations = {

View File

@ -24,7 +24,7 @@ from tempfile import mkdtemp
import six
from six.moves import cStringIO as StringIO
from swift.cli.manage_shard_ranges import main
from swift.cli.manage_shard_ranges import main, combine_shard_ranges
from swift.common import utils
from swift.common.utils import Timestamp, ShardRange
from swift.container.backend import ContainerBroker
@ -730,6 +730,230 @@ class TestManageShardRanges(unittest.TestCase):
self.assertEqual(expected, err.getvalue().splitlines())
self.assertEqual(expected_shard_ranges[:1], json.loads(out.getvalue()))
def test_merge(self):
broker = self._make_broker()
broker.update_metadata({'X-Container-Sysmeta-Sharding':
(True, Timestamp.now().internal)})
good_shard_ranges = []
for shard in self.shard_data[:3]:
good_shard_ranges.append(ShardRange(name='a/c_' + shard['lower'],
timestamp=next(self.ts_iter),
state=ShardRange.ACTIVE,
lower=shard['lower'],
upper=shard['upper']))
# insert an overlap..
bad_shard_range = ShardRange(
name='a/c_bad_' + self.shard_data[1]['lower'],
timestamp=next(self.ts_iter),
state=ShardRange.ACTIVE,
lower=self.shard_data[1]['lower'],
upper=self.shard_data[2]['upper'])
broker.merge_shard_ranges(good_shard_ranges + [bad_shard_range])
self.assertEqual(
[('', 'obj09'),
('obj09', 'obj19'),
('obj09', 'obj29'),
('obj19', 'obj29')],
[(sr.lower_str, sr.upper_str) for sr in broker.get_shard_ranges()])
# use command to merge in a deleted version of the bad shard range
bad_shard_range.update_state(ShardRange.SHRUNK,
state_timestamp=next(self.ts_iter))
bad_shard_range.set_deleted(next(self.ts_iter))
bad_shard_range.update_meta(0, 0, next(self.ts_iter))
input_file = os.path.join(self.testdir, 'shards')
with open(input_file, 'w') as fd:
json.dump([dict(bad_shard_range)], fd)
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
ret = main([broker.db_file, '-v', 'merge', input_file,
'--replace-timeout', '1', '--yes'])
self.assertEqual(0, ret)
affected_shard_ranges = [dict(sr) for sr in good_shard_ranges]
expected_msg = [
'This change will result in the following shard ranges in the '
'affected namespace:']
expected_msg.extend(
json.dumps(affected_shard_ranges, indent=2).splitlines())
expected_msg.extend(
['Injected 1 shard ranges.',
'Run container-replicator to replicate them to other nodes.'])
self.assertEqual(expected_msg, out.getvalue().splitlines())
self.assertEqual(['Loaded db broker for a/c'],
err.getvalue().splitlines())
self.assertEqual(
[dict(sr) for sr in good_shard_ranges],
[dict(sr) for sr in broker.get_shard_ranges()])
self.assertEqual(
dict(bad_shard_range),
dict(broker.get_shard_ranges(include_deleted=True)[3]))
def test_merge_fills_gap(self):
broker = self._make_broker()
broker.update_metadata({'X-Container-Sysmeta-Sharding':
(True, Timestamp.now().internal)})
old_shard_ranges = []
for shard in self.shard_data[:1]:
old_shard_ranges.append(ShardRange(name='a/c_' + shard['lower'],
timestamp=next(self.ts_iter),
state=ShardRange.ACTIVE,
lower=shard['lower'],
upper=shard['upper']))
# use command to merge in a deleted version of the existing and two
# new ranges
new_shard_ranges = [
old_shard_ranges[0].copy(deleted=True,
timestamp=next(self.ts_iter)),
ShardRange(
name='a/c_1_' + self.shard_data[0]['lower'],
timestamp=next(self.ts_iter),
state=ShardRange.ACTIVE,
lower=self.shard_data[0]['lower'],
upper=self.shard_data[0]['upper'] + 'a'),
ShardRange(
name='a/c_1_' + self.shard_data[0]['upper'] + 'a',
timestamp=next(self.ts_iter),
state=ShardRange.ACTIVE,
lower=self.shard_data[0]['upper'] + 'a',
upper=self.shard_data[1]['upper'] + 'a'),
]
input_file = os.path.join(self.testdir, 'shards')
with open(input_file, 'w') as fd:
json.dump([dict(sr) for sr in new_shard_ranges], fd)
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
ret = main([broker.db_file, '-v', 'merge', input_file,
'--replace-timeout', '1', '--yes'])
self.assertEqual(0, ret)
affected_shard_ranges = [dict(sr) for sr in new_shard_ranges[1:]]
expected_msg = [
'This change will result in the following shard ranges in the '
'affected namespace:']
expected_msg.extend(
json.dumps(affected_shard_ranges, indent=2).splitlines())
expected_msg.extend(
['Injected 3 shard ranges.',
'Run container-replicator to replicate them to other nodes.'])
self.assertEqual(expected_msg, out.getvalue().splitlines())
self.assertEqual(['Loaded db broker for a/c'],
err.getvalue().splitlines())
self.assertEqual(
[dict(sr) for sr in new_shard_ranges[1:]],
[dict(sr) for sr in broker.get_shard_ranges()])
self.assertEqual(
[dict(sr) for sr in new_shard_ranges],
[dict(sr) for sr in broker.get_shard_ranges(include_deleted=True)])
def test_merge_warns_of_overlap(self):
broker = self._make_broker()
broker.update_metadata({'X-Container-Sysmeta-Sharding':
(True, Timestamp.now().internal)})
old_shard_ranges = []
for shard in self.shard_data[:3]:
old_shard_ranges.append(ShardRange(name='a/c_' + shard['lower'],
timestamp=next(self.ts_iter),
state=ShardRange.ACTIVE,
lower=shard['lower'],
upper=shard['upper']))
broker.merge_shard_ranges(old_shard_ranges)
# use command to merge in a new range that overlaps...
new_shard_range = ShardRange(
name='a/c_bad_' + self.shard_data[1]['lower'],
timestamp=next(self.ts_iter),
state=ShardRange.ACTIVE,
lower=self.shard_data[1]['lower'] + 'a',
upper=self.shard_data[1]['upper'])
input_file = os.path.join(self.testdir, 'shards')
with open(input_file, 'w') as fd:
json.dump([dict(new_shard_range)], fd)
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
ret = main([broker.db_file, '-v', 'merge', input_file,
'--replace-timeout', '1', '-n'])
self.assertEqual(3, ret)
affected_shard_ranges = [
dict(sr) for sr in [old_shard_ranges[0], old_shard_ranges[1],
new_shard_range, old_shard_ranges[2]]]
expected_msg = [
'This change will result in the following shard ranges in the '
'affected namespace:']
expected_msg.extend(
json.dumps(affected_shard_ranges, indent=2).splitlines())
expected_msg.extend(
['WARNING: this change will result in shard ranges overlaps!',
'No changes applied'])
self.assertEqual(expected_msg, out.getvalue().splitlines())
self.assertEqual(['Loaded db broker for a/c'],
err.getvalue().splitlines())
self.assertEqual(
[dict(sr) for sr in old_shard_ranges],
[dict(sr) for sr in broker.get_shard_ranges()])
# repeat without -v flag
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
ret = main([broker.db_file, 'merge', input_file,
'--replace-timeout', '1', '-n'])
self.assertEqual(3, ret)
expected_msg = [
'WARNING: this change will result in shard ranges overlaps!',
'No changes applied']
self.assertEqual(expected_msg, out.getvalue().splitlines())
self.assertEqual(['Loaded db broker for a/c'],
err.getvalue().splitlines())
self.assertEqual(
[dict(sr) for sr in old_shard_ranges],
[dict(sr) for sr in broker.get_shard_ranges()])
def test_merge_warns_of_gap(self):
broker = self._make_broker()
broker.update_metadata({'X-Container-Sysmeta-Sharding':
(True, Timestamp.now().internal)})
old_shard_ranges = []
for shard in self.shard_data[:3]:
old_shard_ranges.append(ShardRange(name='a/c_' + shard['lower'],
timestamp=next(self.ts_iter),
state=ShardRange.ACTIVE,
lower=shard['lower'],
upper=shard['upper']))
broker.merge_shard_ranges(old_shard_ranges)
# use command to merge in a deleted range that creates a gap...
new_shard_range = old_shard_ranges[1].copy(
timestamp=next(self.ts_iter), deleted=True)
input_file = os.path.join(self.testdir, 'shards')
with open(input_file, 'w') as fd:
json.dump([dict(new_shard_range)], fd)
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
ret = main([broker.db_file, '-v', 'merge', input_file,
'--replace-timeout', '1', '-n'])
self.assertEqual(3, ret)
affected_shard_ranges = [
dict(sr) for sr in [old_shard_ranges[0], old_shard_ranges[2]]]
expected_msg = [
'This change will result in the following shard ranges in the '
'affected namespace:']
expected_msg.extend(
json.dumps(affected_shard_ranges, indent=2).splitlines())
expected_msg.extend(
['WARNING: this change will result in shard ranges gaps!',
'No changes applied'])
self.assertEqual(expected_msg, out.getvalue().splitlines())
self.assertEqual(['Loaded db broker for a/c'],
err.getvalue().splitlines())
self.assertEqual(
[dict(sr) for sr in old_shard_ranges],
[dict(sr) for sr in broker.get_shard_ranges()])
def test_replace(self):
broker = self._make_broker()
broker.update_metadata({'X-Container-Sysmeta-Sharding':
@ -2523,3 +2747,46 @@ class TestManageShardRanges(unittest.TestCase):
self.assertIn(
"argument --yes/-y: not allowed with argument --dry-run/-n",
err_lines[-2], err_lines)
def test_combine_shard_ranges(self):
ts_iter = make_timestamp_iter()
this = ShardRange('a/o', next(ts_iter).internal)
that = ShardRange('a/o', next(ts_iter).internal)
actual = combine_shard_ranges([dict(this)], [dict(that)])
self.assertEqual([dict(that)], [dict(sr) for sr in actual])
actual = combine_shard_ranges([dict(that)], [dict(this)])
self.assertEqual([dict(that)], [dict(sr) for sr in actual])
ts = next(ts_iter).internal
this = ShardRange('a/o', ts, state=ShardRange.ACTIVE,
state_timestamp=next(ts_iter))
that = ShardRange('a/o', ts, state=ShardRange.CREATED,
state_timestamp=next(ts_iter))
actual = combine_shard_ranges([dict(this)], [dict(that)])
self.assertEqual([dict(that)], [dict(sr) for sr in actual])
actual = combine_shard_ranges([dict(that)], [dict(this)])
self.assertEqual([dict(that)], [dict(sr) for sr in actual])
that.update_meta(1, 2, meta_timestamp=next(ts_iter))
this.update_meta(3, 4, meta_timestamp=next(ts_iter))
expected = that.copy(object_count=this.object_count,
bytes_used=this.bytes_used,
meta_timestamp=this.meta_timestamp)
actual = combine_shard_ranges([dict(this)], [dict(that)])
self.assertEqual([dict(expected)], [dict(sr) for sr in actual])
actual = combine_shard_ranges([dict(that)], [dict(this)])
self.assertEqual([dict(expected)], [dict(sr) for sr in actual])
this = ShardRange('a/o', next(ts_iter).internal)
that = ShardRange('a/o', next(ts_iter).internal, deleted=True)
actual = combine_shard_ranges([dict(this)], [dict(that)])
self.assertFalse(actual, [dict(sr) for sr in actual])
actual = combine_shard_ranges([dict(that)], [dict(this)])
self.assertFalse(actual, [dict(sr) for sr in actual])
this = ShardRange('a/o', next(ts_iter).internal, deleted=True)
that = ShardRange('a/o', next(ts_iter).internal)
actual = combine_shard_ranges([dict(this)], [dict(that)])
self.assertEqual([dict(that)], [dict(sr) for sr in actual])
actual = combine_shard_ranges([dict(that)], [dict(this)])
self.assertEqual([dict(that)], [dict(sr) for sr in actual])

View File

@ -36,7 +36,7 @@ import six
from swift.common.exceptions import LockTimeout
from swift.container.backend import ContainerBroker, \
update_new_item_from_existing, UNSHARDED, SHARDING, SHARDED, \
COLLAPSED, SHARD_LISTING_STATES, SHARD_UPDATE_STATES
COLLAPSED, SHARD_LISTING_STATES, SHARD_UPDATE_STATES, sift_shard_ranges
from swift.common.db import DatabaseAlreadyExists, GreenDBConnection, \
TombstoneReclaimer
from swift.common.request_helpers import get_reserved_name
@ -6484,3 +6484,39 @@ class TestUpdateNewItemFromExisting(unittest.TestCase):
for scenario in self.scenarios_when_some_new_item_wins:
self._test_scenario(scenario, True)
class TestModuleFunctions(unittest.TestCase):
def test_sift_shard_ranges(self):
ts_iter = make_timestamp_iter()
existing_shards = {}
sr1 = dict(ShardRange('a/o', next(ts_iter).internal))
sr2 = dict(ShardRange('a/o2', next(ts_iter).internal))
new_shard_ranges = [sr1, sr2]
# first empty existing shards will just add the shards
to_add, to_delete = sift_shard_ranges(new_shard_ranges,
existing_shards)
self.assertEqual(2, len(to_add))
self.assertIn(sr1, to_add)
self.assertIn(sr2, to_add)
self.assertFalse(to_delete)
# if there is a newer version in the existing shards then it won't be
# added to to_add
existing_shards['a/o'] = dict(
ShardRange('a/o', next(ts_iter).internal))
to_add, to_delete = sift_shard_ranges(new_shard_ranges,
existing_shards)
self.assertEqual([sr2], list(to_add))
self.assertFalse(to_delete)
# But if a newer version is in new_shard_ranges then the old will be
# added to to_delete and new is added to to_add.
sr1['timestamp'] = next(ts_iter).internal
to_add, to_delete = sift_shard_ranges(new_shard_ranges,
existing_shards)
self.assertEqual(2, len(to_add))
self.assertIn(sr1, to_add)
self.assertIn(sr2, to_add)
self.assertEqual({'a/o'}, to_delete)