Add handoffs-only mode to DB replicators.
The object reconstructor has a handoffs-only mode that is very useful when a cluster requires rapid rebalancing, like when disks are nearing fullness. This mode's goal is to remove handoff partitions from disks without spending effort on primary partitions. The object replicator has a similar mode, though it varies in some details. This commit adds a handoffs-only mode to the account and container replicators. Change-Id: I588b151ee65ae49d204bd6bf58555504c15edf9f Closes-Bug: 1668399
This commit is contained in:
parent
2bfd9c6a9b
commit
47fed6f2f9
@ -163,6 +163,25 @@ use = egg:swift#recon
|
|||||||
# Work only with ionice_class.
|
# Work only with ionice_class.
|
||||||
# ionice_class =
|
# ionice_class =
|
||||||
# ionice_priority =
|
# ionice_priority =
|
||||||
|
#
|
||||||
|
# The handoffs_only mode option is for special-case emergency
|
||||||
|
# situations such as full disks in the cluster. This option SHOULD NOT
|
||||||
|
# BE ENABLED except in emergencies. When handoffs_only mode is enabled
|
||||||
|
# the replicator will *only* replicate from handoff nodes to primary
|
||||||
|
# nodes and will not sync primary nodes with other primary nodes.
|
||||||
|
#
|
||||||
|
# This has two main effects: first, the replicator becomes much more
|
||||||
|
# effective at removing misplaced databases, thereby freeing up disk
|
||||||
|
# space at a much faster pace than normal. Second, the replicator does
|
||||||
|
# not sync data between primary nodes, so out-of-sync account and
|
||||||
|
# container listings will not resolve while handoffs_only is enabled.
|
||||||
|
#
|
||||||
|
# This mode is intended to allow operators to temporarily sacrifice
|
||||||
|
# consistency in order to gain faster rebalancing, such as during a
|
||||||
|
# capacity addition with nearly-full disks. It is not intended for
|
||||||
|
# long-term use.
|
||||||
|
#
|
||||||
|
# handoffs_only = no
|
||||||
|
|
||||||
[account-auditor]
|
[account-auditor]
|
||||||
# You can override the default log routing for this app here (don't use set!):
|
# You can override the default log routing for this app here (don't use set!):
|
||||||
|
@ -172,6 +172,25 @@ use = egg:swift#recon
|
|||||||
# Work only with ionice_class.
|
# Work only with ionice_class.
|
||||||
# ionice_class =
|
# ionice_class =
|
||||||
# ionice_priority =
|
# ionice_priority =
|
||||||
|
#
|
||||||
|
# The handoffs_only mode option is for special-case emergency
|
||||||
|
# situations such as full disks in the cluster. This option SHOULD NOT
|
||||||
|
# BE ENABLED except in emergencies. When handoffs_only mode is enabled
|
||||||
|
# the replicator will *only* replicate from handoff nodes to primary
|
||||||
|
# nodes and will not sync primary nodes with other primary nodes.
|
||||||
|
#
|
||||||
|
# This has two main effects: first, the replicator becomes much more
|
||||||
|
# effective at removing misplaced databases, thereby freeing up disk
|
||||||
|
# space at a much faster pace than normal. Second, the replicator does
|
||||||
|
# not sync data between primary nodes, so out-of-sync account and
|
||||||
|
# container listings will not resolve while handoffs_only is enabled.
|
||||||
|
#
|
||||||
|
# This mode is intended to allow operators to temporarily sacrifice
|
||||||
|
# consistency in order to gain faster rebalancing, such as during a
|
||||||
|
# capacity addition with nearly-full disks. It is not intended for
|
||||||
|
# long-term use.
|
||||||
|
#
|
||||||
|
# handoffs_only = no
|
||||||
|
|
||||||
[container-updater]
|
[container-updater]
|
||||||
# You can override the default log routing for this app here (don't use set!):
|
# You can override the default log routing for this app here (don't use set!):
|
||||||
|
@ -87,13 +87,14 @@ def roundrobin_datadirs(datadirs):
|
|||||||
found (in their proper places). The partitions within each data
|
found (in their proper places). The partitions within each data
|
||||||
dir are walked randomly, however.
|
dir are walked randomly, however.
|
||||||
|
|
||||||
:param datadirs: a list of (path, node_id) to walk
|
:param datadirs: a list of (path, node_id, partition_filter) to walk
|
||||||
:returns: A generator of (partition, path_to_db_file, node_id)
|
:returns: A generator of (partition, path_to_db_file, node_id)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def walk_datadir(datadir, node_id):
|
def walk_datadir(datadir, node_id, part_filter):
|
||||||
partitions = [pd for pd in os.listdir(datadir)
|
partitions = [pd for pd in os.listdir(datadir)
|
||||||
if looks_like_partition(pd)]
|
if looks_like_partition(pd)
|
||||||
|
and (part_filter is None or part_filter(pd))]
|
||||||
random.shuffle(partitions)
|
random.shuffle(partitions)
|
||||||
for partition in partitions:
|
for partition in partitions:
|
||||||
part_dir = os.path.join(datadir, partition)
|
part_dir = os.path.join(datadir, partition)
|
||||||
@ -125,7 +126,8 @@ def roundrobin_datadirs(datadirs):
|
|||||||
if e.errno != errno.ENOTEMPTY:
|
if e.errno != errno.ENOTEMPTY:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
its = [walk_datadir(datadir, node_id) for datadir, node_id in datadirs]
|
its = [walk_datadir(datadir, node_id, filt)
|
||||||
|
for datadir, node_id, filt in datadirs]
|
||||||
while its:
|
while its:
|
||||||
for it in its:
|
for it in its:
|
||||||
try:
|
try:
|
||||||
@ -206,6 +208,7 @@ class Replicator(Daemon):
|
|||||||
self.recon_replicator)
|
self.recon_replicator)
|
||||||
self.extract_device_re = re.compile('%s%s([^%s]+)' % (
|
self.extract_device_re = re.compile('%s%s([^%s]+)' % (
|
||||||
self.root, os.path.sep, os.path.sep))
|
self.root, os.path.sep, os.path.sep))
|
||||||
|
self.handoffs_only = config_true_value(conf.get('handoffs_only', 'no'))
|
||||||
|
|
||||||
def _zero_stats(self):
|
def _zero_stats(self):
|
||||||
"""Zero out the stats."""
|
"""Zero out the stats."""
|
||||||
@ -631,6 +634,14 @@ class Replicator(Daemon):
|
|||||||
return match.groups()[0]
|
return match.groups()[0]
|
||||||
return "UNKNOWN"
|
return "UNKNOWN"
|
||||||
|
|
||||||
|
def handoffs_only_filter(self, device_id):
|
||||||
|
def filt(partition_dir):
|
||||||
|
partition = int(partition_dir)
|
||||||
|
primary_node_ids = [
|
||||||
|
d['id'] for d in self.ring.get_part_nodes(partition)]
|
||||||
|
return device_id not in primary_node_ids
|
||||||
|
return filt
|
||||||
|
|
||||||
def report_up_to_date(self, full_info):
|
def report_up_to_date(self, full_info):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
@ -642,6 +653,13 @@ class Replicator(Daemon):
|
|||||||
if not ips:
|
if not ips:
|
||||||
self.logger.error(_('ERROR Failed to get my own IPs?'))
|
self.logger.error(_('ERROR Failed to get my own IPs?'))
|
||||||
return
|
return
|
||||||
|
|
||||||
|
if self.handoffs_only:
|
||||||
|
self.logger.warning(
|
||||||
|
'Starting replication pass with handoffs_only enabled. '
|
||||||
|
'This mode is not intended for normal '
|
||||||
|
'operation; use handoffs_only with care.')
|
||||||
|
|
||||||
self._local_device_ids = set()
|
self._local_device_ids = set()
|
||||||
found_local = False
|
found_local = False
|
||||||
for node in self.ring.devs:
|
for node in self.ring.devs:
|
||||||
@ -664,7 +682,9 @@ class Replicator(Daemon):
|
|||||||
datadir = os.path.join(self.root, node['device'], self.datadir)
|
datadir = os.path.join(self.root, node['device'], self.datadir)
|
||||||
if os.path.isdir(datadir):
|
if os.path.isdir(datadir):
|
||||||
self._local_device_ids.add(node['id'])
|
self._local_device_ids.add(node['id'])
|
||||||
dirs.append((datadir, node['id']))
|
filt = (self.handoffs_only_filter(node['id'])
|
||||||
|
if self.handoffs_only else None)
|
||||||
|
dirs.append((datadir, node['id'], filt))
|
||||||
if not found_local:
|
if not found_local:
|
||||||
self.logger.error("Can't find itself %s with port %s in ring "
|
self.logger.error("Can't find itself %s with port %s in ring "
|
||||||
"file, not replicating",
|
"file, not replicating",
|
||||||
@ -675,6 +695,10 @@ class Replicator(Daemon):
|
|||||||
self._replicate_object, part, object_file, node_id)
|
self._replicate_object, part, object_file, node_id)
|
||||||
self.cpool.waitall()
|
self.cpool.waitall()
|
||||||
self.logger.info(_('Replication run OVER'))
|
self.logger.info(_('Replication run OVER'))
|
||||||
|
if self.handoffs_only:
|
||||||
|
self.logger.warning(
|
||||||
|
'Finished replication pass with handoffs_only enabled. '
|
||||||
|
'If handoffs_only is no longer required, disable it.')
|
||||||
self._report_stats()
|
self._report_stats()
|
||||||
|
|
||||||
def run_forever(self, *args, **kwargs):
|
def run_forever(self, *args, **kwargs):
|
||||||
|
@ -1220,7 +1220,8 @@ class TestDBReplicator(unittest.TestCase):
|
|||||||
self.assertTrue(os.path.isdir(dirpath))
|
self.assertTrue(os.path.isdir(dirpath))
|
||||||
|
|
||||||
node_id = 1
|
node_id = 1
|
||||||
results = list(db_replicator.roundrobin_datadirs([(datadir, node_id)]))
|
results = list(db_replicator.roundrobin_datadirs(
|
||||||
|
[(datadir, node_id, None)]))
|
||||||
expected = [
|
expected = [
|
||||||
('450', os.path.join(datadir, db_path), node_id),
|
('450', os.path.join(datadir, db_path), node_id),
|
||||||
]
|
]
|
||||||
@ -1241,12 +1242,14 @@ class TestDBReplicator(unittest.TestCase):
|
|||||||
self.assertEqual({'18', '1054', '1060', '450'},
|
self.assertEqual({'18', '1054', '1060', '450'},
|
||||||
set(os.listdir(datadir)))
|
set(os.listdir(datadir)))
|
||||||
|
|
||||||
results = list(db_replicator.roundrobin_datadirs([(datadir, node_id)]))
|
results = list(db_replicator.roundrobin_datadirs(
|
||||||
|
[(datadir, node_id, None)]))
|
||||||
self.assertEqual(results, expected)
|
self.assertEqual(results, expected)
|
||||||
self.assertEqual({'1054', '1060', '450'},
|
self.assertEqual({'1054', '1060', '450'},
|
||||||
set(os.listdir(datadir)))
|
set(os.listdir(datadir)))
|
||||||
|
|
||||||
results = list(db_replicator.roundrobin_datadirs([(datadir, node_id)]))
|
results = list(db_replicator.roundrobin_datadirs(
|
||||||
|
[(datadir, node_id, None)]))
|
||||||
self.assertEqual(results, expected)
|
self.assertEqual(results, expected)
|
||||||
# non db file in '1060' dir is not deleted and exception is handled
|
# non db file in '1060' dir is not deleted and exception is handled
|
||||||
self.assertEqual({'1060', '450'},
|
self.assertEqual({'1060', '450'},
|
||||||
@ -1333,8 +1336,8 @@ class TestDBReplicator(unittest.TestCase):
|
|||||||
mock.patch(base + 'random.shuffle', _shuffle), \
|
mock.patch(base + 'random.shuffle', _shuffle), \
|
||||||
mock.patch(base + 'os.rmdir', _rmdir):
|
mock.patch(base + 'os.rmdir', _rmdir):
|
||||||
|
|
||||||
datadirs = [('/srv/node/sda/containers', 1),
|
datadirs = [('/srv/node/sda/containers', 1, None),
|
||||||
('/srv/node/sdb/containers', 2)]
|
('/srv/node/sdb/containers', 2, None)]
|
||||||
results = list(db_replicator.roundrobin_datadirs(datadirs))
|
results = list(db_replicator.roundrobin_datadirs(datadirs))
|
||||||
# The results show that the .db files are returned, the devices
|
# The results show that the .db files are returned, the devices
|
||||||
# interleaved.
|
# interleaved.
|
||||||
@ -1438,6 +1441,150 @@ class TestDBReplicator(unittest.TestCase):
|
|||||||
replicator.logger)])
|
replicator.logger)])
|
||||||
|
|
||||||
|
|
||||||
|
class TestHandoffsOnly(unittest.TestCase):
|
||||||
|
class FakeRing3Nodes(object):
|
||||||
|
_replicas = 3
|
||||||
|
|
||||||
|
# Three nodes, two disks each
|
||||||
|
devs = [
|
||||||
|
dict(id=0, region=1, zone=1,
|
||||||
|
meta='', weight=500.0, ip='10.0.0.1', port=6201,
|
||||||
|
replication_ip='10.0.0.1', replication_port=6201,
|
||||||
|
device='sdp'),
|
||||||
|
dict(id=1, region=1, zone=1,
|
||||||
|
meta='', weight=500.0, ip='10.0.0.1', port=6201,
|
||||||
|
replication_ip='10.0.0.1', replication_port=6201,
|
||||||
|
device='sdq'),
|
||||||
|
|
||||||
|
dict(id=2, region=1, zone=1,
|
||||||
|
meta='', weight=500.0, ip='10.0.0.2', port=6201,
|
||||||
|
replication_ip='10.0.0.2', replication_port=6201,
|
||||||
|
device='sdp'),
|
||||||
|
dict(id=3, region=1, zone=1,
|
||||||
|
meta='', weight=500.0, ip='10.0.0.2', port=6201,
|
||||||
|
replication_ip='10.0.0.2', replication_port=6201,
|
||||||
|
device='sdq'),
|
||||||
|
|
||||||
|
dict(id=4, region=1, zone=1,
|
||||||
|
meta='', weight=500.0, ip='10.0.0.3', port=6201,
|
||||||
|
replication_ip='10.0.0.3', replication_port=6201,
|
||||||
|
device='sdp'),
|
||||||
|
dict(id=5, region=1, zone=1,
|
||||||
|
meta='', weight=500.0, ip='10.0.0.3', port=6201,
|
||||||
|
replication_ip='10.0.0.3', replication_port=6201,
|
||||||
|
device='sdq'),
|
||||||
|
]
|
||||||
|
|
||||||
|
def __init__(self, *a, **kw):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def get_part(self, account, container=None, obj=None):
|
||||||
|
return 0
|
||||||
|
|
||||||
|
def get_part_nodes(self, part):
|
||||||
|
nodes = []
|
||||||
|
for offset in range(self._replicas):
|
||||||
|
i = (part + offset) % len(self.devs)
|
||||||
|
nodes.append(self.devs[i])
|
||||||
|
return nodes
|
||||||
|
|
||||||
|
def get_more_nodes(self, part):
|
||||||
|
for offset in range(self._replicas, len(self.devs)):
|
||||||
|
i = (part + offset) % len(self.devs)
|
||||||
|
yield self.devs[i]
|
||||||
|
|
||||||
|
def _make_fake_db(self, disk, partition, db_hash):
|
||||||
|
directories = [
|
||||||
|
os.path.join(self.root, disk),
|
||||||
|
os.path.join(self.root, disk, 'containers'),
|
||||||
|
os.path.join(self.root, disk, 'containers', str(partition)),
|
||||||
|
os.path.join(self.root, disk, 'containers', str(partition),
|
||||||
|
db_hash[-3:]),
|
||||||
|
os.path.join(self.root, disk, 'containers', str(partition),
|
||||||
|
db_hash[-3:], db_hash)]
|
||||||
|
|
||||||
|
for d in directories:
|
||||||
|
try:
|
||||||
|
os.mkdir(d)
|
||||||
|
except OSError as err:
|
||||||
|
if err.errno != errno.EEXIST:
|
||||||
|
raise
|
||||||
|
file_path = os.path.join(directories[-1], db_hash + ".db")
|
||||||
|
with open(file_path, 'w'):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.root = mkdtemp()
|
||||||
|
|
||||||
|
# object disks; they're just here to make sure they don't trip us up
|
||||||
|
os.mkdir(os.path.join(self.root, 'sdc'))
|
||||||
|
os.mkdir(os.path.join(self.root, 'sdc', 'objects'))
|
||||||
|
os.mkdir(os.path.join(self.root, 'sdd'))
|
||||||
|
os.mkdir(os.path.join(self.root, 'sdd', 'objects'))
|
||||||
|
|
||||||
|
# part 0 belongs on sdp
|
||||||
|
self._make_fake_db('sdp', 0, '010101013cf2b7979af9eaa71cb67220')
|
||||||
|
|
||||||
|
# part 1 does not belong on sdp
|
||||||
|
self._make_fake_db('sdp', 1, 'abababab2b5368158355e799323b498d')
|
||||||
|
|
||||||
|
# part 1 belongs on sdq
|
||||||
|
self._make_fake_db('sdq', 1, '02020202e30f696a3cfa63d434a3c94e')
|
||||||
|
|
||||||
|
# part 2 does not belong on sdq
|
||||||
|
self._make_fake_db('sdq', 2, 'bcbcbcbc15d3835053d568c57e2c83b5')
|
||||||
|
|
||||||
|
def cleanUp(self):
|
||||||
|
rmtree(self.root, ignore_errors=True)
|
||||||
|
|
||||||
|
def test_scary_warnings(self):
|
||||||
|
logger = unit.FakeLogger()
|
||||||
|
replicator = TestReplicator({
|
||||||
|
'handoffs_only': 'yes',
|
||||||
|
'devices': self.root,
|
||||||
|
'bind_port': 6201,
|
||||||
|
'mount_check': 'no',
|
||||||
|
}, logger=logger)
|
||||||
|
|
||||||
|
with patch.object(db_replicator, 'whataremyips',
|
||||||
|
return_value=['10.0.0.1']), \
|
||||||
|
patch.object(replicator, '_replicate_object'), \
|
||||||
|
patch.object(replicator, 'ring', self.FakeRing3Nodes()):
|
||||||
|
replicator.run_once()
|
||||||
|
|
||||||
|
self.assertEqual(
|
||||||
|
logger.get_lines_for_level('warning'),
|
||||||
|
[('Starting replication pass with handoffs_only enabled. This '
|
||||||
|
'mode is not intended for normal operation; use '
|
||||||
|
'handoffs_only with care.'),
|
||||||
|
('Finished replication pass with handoffs_only enabled. '
|
||||||
|
'If handoffs_only is no longer required, disable it.')])
|
||||||
|
|
||||||
|
def test_skips_primary_partitions(self):
|
||||||
|
replicator = TestReplicator({
|
||||||
|
'handoffs_only': 'yes',
|
||||||
|
'devices': self.root,
|
||||||
|
'bind_port': 6201,
|
||||||
|
'mount_check': 'no',
|
||||||
|
})
|
||||||
|
|
||||||
|
with patch.object(db_replicator, 'whataremyips',
|
||||||
|
return_value=['10.0.0.1']), \
|
||||||
|
patch.object(replicator, '_replicate_object') as mock_repl, \
|
||||||
|
patch.object(replicator, 'ring', self.FakeRing3Nodes()):
|
||||||
|
replicator.run_once()
|
||||||
|
|
||||||
|
self.assertEqual(sorted(mock_repl.mock_calls), [
|
||||||
|
mock.call('1', os.path.join(
|
||||||
|
self.root, 'sdp', 'containers', '1', '98d',
|
||||||
|
'abababab2b5368158355e799323b498d',
|
||||||
|
'abababab2b5368158355e799323b498d.db'), 0),
|
||||||
|
mock.call('2', os.path.join(
|
||||||
|
self.root, 'sdq', 'containers', '2', '3b5',
|
||||||
|
'bcbcbcbc15d3835053d568c57e2c83b5',
|
||||||
|
'bcbcbcbc15d3835053d568c57e2c83b5.db'), 1)])
|
||||||
|
|
||||||
|
|
||||||
class TestReplToNode(unittest.TestCase):
|
class TestReplToNode(unittest.TestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
db_replicator.ring = FakeRing()
|
db_replicator.ring = FakeRing()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user