Only move too-close-together replicas when they can spread out.
Imagine a 3-zone ring, and consider a partition in that ring with replicas placed as follows: * replica 0 is on device A (zone 2) * replica 1 is on device B (zone 1) * replica 2 is on device C (zone 2) Further, imagine that there are zero parts_wanted in all of zone 3; that is, zone 3 is completely full. However, zones 1 and 2 each have at least one parts_wanted on at least one device. When the ring builder goes to gather replicas to move, it gathers replica 0 because there are three zones available, but the replicas are only in two of them. Then, it places replica 0 in zone 1 or 2 somewhere because those are the only zones with parts_wanted. Notice that this does *not* do anything to spread the partition out better. Then, on the next rebalance, replica 0 gets picked up and moved (again) but doesn't improve its placement (again). If your builder has min_part_hours > 0 (and it should), then replicas 1 and 2 cannot move at all. A coworker observed the bug because a customer had such a partition, and its replica 2 was on a zero-weight device. He thought it odd that a zero-weight device should still have one partition on it despite the ring having been rebalanced dozens of times. Even if you don't have zero-weight devices, having a bunch of partitions trade places on each rebalance isn't particularly good. Note that this only happens with an unbalanceable ring; if the ring *can* balance, the gathered partitions will swap places, but they will get spread across more zones, so they won't get gathered up again on the next rebalance. Change-Id: I8f44f032caac25c44778a497dedf23f5cb61b6bb Closes-Bug: 1400083
This commit is contained in:
parent
cc2f0f4ed6
commit
1880351f1a
@ -646,23 +646,21 @@ class RingBuilder(object):
|
|||||||
|
|
||||||
def _get_available_parts(self):
|
def _get_available_parts(self):
|
||||||
"""
|
"""
|
||||||
Returns a tuple (wanted_parts_total, dict of (tier: available parts in
|
Returns a dict of (tier: available parts in other tiers) for all tiers
|
||||||
other tiers) for all tiers in the ring.
|
in the ring.
|
||||||
|
|
||||||
Devices that have too much partitions (negative parts_wanted) are
|
Devices that have too much partitions (negative parts_wanted) are
|
||||||
ignored, otherwise the sum of all parts_wanted is 0 +/- rounding
|
ignored, otherwise the sum of all parts_wanted is 0 +/- rounding
|
||||||
errors.
|
errors.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
wanted_parts_total = 0
|
|
||||||
wanted_parts_for_tier = {}
|
wanted_parts_for_tier = {}
|
||||||
for dev in self._iter_devs():
|
for dev in self._iter_devs():
|
||||||
wanted_parts_total += max(0, dev['parts_wanted'])
|
pw = max(0, dev['parts_wanted'])
|
||||||
for tier in tiers_for_dev(dev):
|
for tier in tiers_for_dev(dev):
|
||||||
if tier not in wanted_parts_for_tier:
|
if tier not in wanted_parts_for_tier:
|
||||||
wanted_parts_for_tier[tier] = 0
|
wanted_parts_for_tier[tier] = 0
|
||||||
wanted_parts_for_tier[tier] += max(0, dev['parts_wanted'])
|
wanted_parts_for_tier[tier] += pw
|
||||||
return (wanted_parts_total, wanted_parts_for_tier)
|
return wanted_parts_for_tier
|
||||||
|
|
||||||
def _gather_reassign_parts(self):
|
def _gather_reassign_parts(self):
|
||||||
"""
|
"""
|
||||||
@ -674,6 +672,22 @@ class RingBuilder(object):
|
|||||||
# as a hot-spot).
|
# as a hot-spot).
|
||||||
tfd = {}
|
tfd = {}
|
||||||
|
|
||||||
|
tiers_by_len = defaultdict(set)
|
||||||
|
for dev in self._iter_devs():
|
||||||
|
tiers = tiers_for_dev(dev)
|
||||||
|
tfd[dev['id']] = tiers
|
||||||
|
for tier in tiers:
|
||||||
|
tiers_by_len[len(tier)].add(tier)
|
||||||
|
|
||||||
|
tiers_by_len = dict((length, list(tiers))
|
||||||
|
for length, tiers in tiers_by_len.items())
|
||||||
|
|
||||||
|
sibling_tiers = {}
|
||||||
|
for length, tiers in tiers_by_len.items():
|
||||||
|
for i, tier in enumerate(tiers):
|
||||||
|
sibling_tiers[tier] = [t for t in (tiers[:i] + tiers[(i + 1):])
|
||||||
|
if t[:-1] == tier[:-1]]
|
||||||
|
|
||||||
# First we gather partitions from removed devices. Since removed
|
# First we gather partitions from removed devices. Since removed
|
||||||
# devices usually indicate device failures, we have no choice but to
|
# devices usually indicate device failures, we have no choice but to
|
||||||
# reassign these partitions. However, we mark them as moved so later
|
# reassign these partitions. However, we mark them as moved so later
|
||||||
@ -692,8 +706,7 @@ class RingBuilder(object):
|
|||||||
# currently sufficient spread out across the cluster.
|
# currently sufficient spread out across the cluster.
|
||||||
spread_out_parts = defaultdict(list)
|
spread_out_parts = defaultdict(list)
|
||||||
max_allowed_replicas = self._build_max_replicas_by_tier()
|
max_allowed_replicas = self._build_max_replicas_by_tier()
|
||||||
wanted_parts_total, wanted_parts_for_tier = \
|
wanted_parts_for_tier = self._get_available_parts()
|
||||||
self._get_available_parts()
|
|
||||||
moved_parts = 0
|
moved_parts = 0
|
||||||
for part in xrange(self.parts):
|
for part in xrange(self.parts):
|
||||||
# Only move one replica at a time if possible.
|
# Only move one replica at a time if possible.
|
||||||
@ -706,8 +719,6 @@ class RingBuilder(object):
|
|||||||
# revealed the lambda invocation as a significant cost.
|
# revealed the lambda invocation as a significant cost.
|
||||||
replicas_at_tier = {}
|
replicas_at_tier = {}
|
||||||
for dev in self._devs_for_part(part):
|
for dev in self._devs_for_part(part):
|
||||||
if dev['id'] not in tfd:
|
|
||||||
tfd[dev['id']] = tiers_for_dev(dev)
|
|
||||||
for tier in tfd[dev['id']]:
|
for tier in tfd[dev['id']]:
|
||||||
if tier not in replicas_at_tier:
|
if tier not in replicas_at_tier:
|
||||||
replicas_at_tier[tier] = 1
|
replicas_at_tier[tier] = 1
|
||||||
@ -719,16 +730,42 @@ class RingBuilder(object):
|
|||||||
for replica in self._replicas_for_part(part):
|
for replica in self._replicas_for_part(part):
|
||||||
dev = self.devs[self._replica2part2dev[replica][part]]
|
dev = self.devs[self._replica2part2dev[replica][part]]
|
||||||
removed_replica = False
|
removed_replica = False
|
||||||
if dev['id'] not in tfd:
|
|
||||||
tfd[dev['id']] = tiers_for_dev(dev)
|
|
||||||
for tier in tfd[dev['id']]:
|
for tier in tfd[dev['id']]:
|
||||||
rep_at_tier = 0
|
rep_at_tier = replicas_at_tier.get(tier, 0)
|
||||||
if tier in replicas_at_tier:
|
|
||||||
rep_at_tier = replicas_at_tier[tier]
|
# If this tier's not overcrowded, there's nothing to
|
||||||
# Only allowing parts to be gathered if
|
# gather, so we can avoid some calculation here as an
|
||||||
# there are wanted parts on other tiers
|
# optimization.
|
||||||
available_parts_for_tier = wanted_parts_total - \
|
if rep_at_tier <= max_allowed_replicas[tier]:
|
||||||
wanted_parts_for_tier[tier] - moved_parts
|
continue
|
||||||
|
|
||||||
|
available_parts_for_tier = sum(
|
||||||
|
wanted_parts_for_tier[t]
|
||||||
|
for t in sibling_tiers[tier]
|
||||||
|
# If a sibling tier is "full" with respect to
|
||||||
|
# partition dispersion, but not "full" with respect
|
||||||
|
# to parts_wanted, we don't count it as a possible
|
||||||
|
# destination.
|
||||||
|
#
|
||||||
|
# Otherwise, we gather a partition from tier X
|
||||||
|
# (because its replicas are not spread out), and
|
||||||
|
# then we may place it right back in tier X or in
|
||||||
|
# another tier that already has replicas (because
|
||||||
|
# that tier has parts_wanted). Then, on the next
|
||||||
|
# rebalance, it'll happen again, and then again...
|
||||||
|
#
|
||||||
|
# Worse yet, this "dancing replica" immobilizes
|
||||||
|
# other replicas of the partition that want to move
|
||||||
|
# because they're on devices with negative
|
||||||
|
# parts_wanted. This can lead to a replica that
|
||||||
|
# sticks to a zero-weight device no matter how often
|
||||||
|
# the ring is rebalanced.
|
||||||
|
if (max_allowed_replicas[t] >
|
||||||
|
replicas_at_tier.get(t, 0))
|
||||||
|
) - moved_parts
|
||||||
|
|
||||||
|
# Only allow a part to be gathered if there are wanted
|
||||||
|
# parts on other tiers.
|
||||||
if (rep_at_tier > max_allowed_replicas[tier] and
|
if (rep_at_tier > max_allowed_replicas[tier] and
|
||||||
self._last_part_moves[part] >=
|
self._last_part_moves[part] >=
|
||||||
self.min_part_hours and
|
self.min_part_hours and
|
||||||
@ -741,8 +778,6 @@ class RingBuilder(object):
|
|||||||
moved_parts += 1
|
moved_parts += 1
|
||||||
break
|
break
|
||||||
if removed_replica:
|
if removed_replica:
|
||||||
if dev['id'] not in tfd:
|
|
||||||
tfd[dev['id']] = tiers_for_dev(dev)
|
|
||||||
for tier in tfd[dev['id']]:
|
for tier in tfd[dev['id']]:
|
||||||
replicas_at_tier[tier] -= 1
|
replicas_at_tier[tier] -= 1
|
||||||
|
|
||||||
|
@ -19,6 +19,7 @@ import operator
|
|||||||
import os
|
import os
|
||||||
import unittest
|
import unittest
|
||||||
import cPickle as pickle
|
import cPickle as pickle
|
||||||
|
from array import array
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from math import ceil
|
from math import ceil
|
||||||
from tempfile import mkdtemp
|
from tempfile import mkdtemp
|
||||||
@ -656,6 +657,58 @@ class TestRingBuilder(unittest.TestCase):
|
|||||||
|
|
||||||
rb.rebalance()
|
rb.rebalance()
|
||||||
|
|
||||||
|
def test_remove_last_partition_from_zero_weight(self):
|
||||||
|
rb = ring.RingBuilder(4, 3, 1)
|
||||||
|
rb.add_dev({'id': 0, 'region': 0, 'zone': 1, 'weight': 1.0,
|
||||||
|
'ip': '127.0.0.1', 'port': 10000, 'device': 'sda'})
|
||||||
|
rb.add_dev({'id': 1, 'region': 0, 'zone': 2, 'weight': 2.0,
|
||||||
|
'ip': '127.0.0.2', 'port': 10000, 'device': 'sda'})
|
||||||
|
rb.add_dev({'id': 2, 'region': 0, 'zone': 3, 'weight': 3.0,
|
||||||
|
'ip': '127.0.0.3', 'port': 10000, 'device': 'sda'})
|
||||||
|
|
||||||
|
rb.add_dev({'id': 3, 'region': 0, 'zone': 3, 'weight': 0.5,
|
||||||
|
'ip': '127.0.0.3', 'port': 10001, 'device': 'zero'})
|
||||||
|
|
||||||
|
zero_weight_dev = 3
|
||||||
|
|
||||||
|
rb.rebalance()
|
||||||
|
|
||||||
|
# We want at least one partition with replicas only in zone 2 and 3
|
||||||
|
# due to device weights. It would *like* to spread out into zone 1,
|
||||||
|
# but can't, due to device weight.
|
||||||
|
#
|
||||||
|
# Also, we want such a partition to have a replica on device 3,
|
||||||
|
# which we will then reduce to zero weight. This should cause the
|
||||||
|
# removal of the replica from device 3.
|
||||||
|
#
|
||||||
|
# Getting this to happen by chance is hard, so let's just set up a
|
||||||
|
# builder so that it's in the state we want. This is a synthetic
|
||||||
|
# example; while the bug has happened on a real cluster, that
|
||||||
|
# builder file had a part_power of 16, so its contents are much too
|
||||||
|
# big to include here.
|
||||||
|
rb._replica2part2dev = [
|
||||||
|
# these are the relevant ones
|
||||||
|
# | | | |
|
||||||
|
# v v v v
|
||||||
|
array('H', [2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2]),
|
||||||
|
array('H', [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2]),
|
||||||
|
array('H', [0, 0, 0, 0, 0, 0, 0, 0, 3, 3, 3, 3, 2, 2, 2, 2])]
|
||||||
|
|
||||||
|
rb.set_dev_weight(zero_weight_dev, 0.0)
|
||||||
|
rb.pretend_min_part_hours_passed()
|
||||||
|
rb.rebalance(seed=1)
|
||||||
|
|
||||||
|
node_counts = defaultdict(int)
|
||||||
|
for part2dev_id in rb._replica2part2dev:
|
||||||
|
for dev_id in part2dev_id:
|
||||||
|
node_counts[dev_id] += 1
|
||||||
|
self.assertEqual(node_counts[zero_weight_dev], 0)
|
||||||
|
|
||||||
|
# it's as balanced as it gets, so nothing moves anymore
|
||||||
|
rb.pretend_min_part_hours_passed()
|
||||||
|
parts_moved, _balance = rb.rebalance(seed=1)
|
||||||
|
self.assertEqual(parts_moved, 0)
|
||||||
|
|
||||||
def test_region_fullness_with_balanceable_ring(self):
|
def test_region_fullness_with_balanceable_ring(self):
|
||||||
rb = ring.RingBuilder(8, 3, 1)
|
rb = ring.RingBuilder(8, 3, 1)
|
||||||
rb.add_dev({'id': 0, 'region': 0, 'zone': 0, 'weight': 1,
|
rb.add_dev({'id': 0, 'region': 0, 'zone': 0, 'weight': 1,
|
||||||
|
Loading…
Reference in New Issue
Block a user