As-unique-as-possible partition replica placement.

This commit introduces a new algorithm for assigning partition
replicas to devices. Basically, the ring builder organizes the devices
into tiers (first zone, then IP/port, then device ID). When placing a
replica, the ring builder looks for the emptiest device (biggest
parts_wanted) in the furthest-away tier.

In the case where zone-count >= replica-count, the new algorithm will
give the same results as the one it replaces. Thus, no migration is
needed.

In the case where zone-count < replica-count, the new algorithm
behaves differently from the old algorithm. The new algorithm will
distribute things evenly at each tier so that the replication is as
high-quality as possible, given the circumstances. The old algorithm
would just crash, so again, no migration is needed.

Handoffs have also been updated to use the new algorithm. When
generating handoff nodes, first the ring looks for nodes in other
zones, then other ips/ports, then any other drive. The first handoff
nodes (the ones in other zones) will be the same as before; this
commit just extends the list of handoff nodes.

The proxy server and replicators have been altered to avoid looking at
the ring's replica count directly. Previously, with a replica count of
C, RingData.get_nodes() and RingData.get_part_nodes() would return
lists of length C, so some other code used the replica count when it
needed the number of nodes. If two of a partition's replicas are on
the same device (e.g. with 3 replicas, 2 devices), then that
assumption is no longer true. Fortunately, all the proxy server and
replicators really needed was the number of nodes returned, which they
already had. (Bonus: now the only code that mentions replica_count
directly is in the ring and the ring builder.)

Change-Id: Iba2929edfc6ece89791890d0635d4763d821a3aa
This commit is contained in:
Samuel Merritt 2012-04-23 10:41:44 -07:00
parent 47f0dbb125
commit bb509dd863
12 changed files with 826 additions and 214 deletions

View File

@ -134,15 +134,20 @@ If there are 1,000 devices of equal weight they will each desire 1,048.576
partitions. The devices are then sorted by the number of partitions they desire
and kept in order throughout the initialization process.
Then, the ring builder assigns each partition's replica to the device that
desires the most partitions at that point, with the restriction that the device
is not in the same zone as any other replica for that partition. Once assigned,
the device's desired partition count is decremented and moved to its new sorted
location in the list of devices and the process continues.
Then, the ring builder assigns each replica of each partition to the device
that desires the most partitions at that point while keeping it as far away as
possible from other replicas. The ring builder prefers to assign a replica to a
device in a zone that has no replicas already; should there be no such zone
available, the ring builder will try to find a device on a different server;
failing that, it will just look for a device that has no replicas; finally, if
all other options are exhausted, the ring builder will assign the replica to
the device that has the fewest replicas already assigned.
When building a new ring based on an old ring, the desired number of partitions
each device wants is recalculated. Next the partitions to be reassigned are
gathered up. Any removed devices have all their assigned partitions unassigned
and added to the gathered list. Any partition replicas that (due to the
addition of new devices) can be spread out for better durability are unassigned
and added to the gathered list. Any devices that have more partitions than they
now desire have random partitions unassigned from them and added to the
gathered list. Lastly, the gathered partitions are then reassigned to devices

View File

@ -13,6 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import bisect
import itertools
import math
from array import array
from collections import defaultdict
from random import randint, shuffle
@ -20,6 +24,7 @@ from time import time
from swift.common import exceptions
from swift.common.ring import RingData
from swift.common.ring.utils import tiers_for_dev, build_tier_tree
class RingBuilder(object):
@ -71,7 +76,11 @@ class RingBuilder(object):
self._remove_devs = []
self._ring = None
def weighted_parts(self):
def weight_of_one_part(self):
"""
Returns the weight of each partition as calculated from the
total weight of all the devices.
"""
try:
return self.parts * self.replicas / \
sum(d['weight'] for d in self._iter_devs())
@ -81,6 +90,16 @@ class RingBuilder(object):
'deleted')
def copy_from(self, builder):
"""
Reinitializes this RingBuilder instance from data obtained from the
builder dict given. Code example::
b = RingBuilder(1, 1, 1) # Dummy values
b.copy_from(builder)
This is to restore a RingBuilder that has had its b.to_dict()
previously saved.
"""
if hasattr(builder, 'devs'):
self.part_power = builder.part_power
self.replicas = builder.replicas
@ -110,6 +129,12 @@ class RingBuilder(object):
self._ring = None
def to_dict(self):
"""
Returns a dict that can be used later with copy_from to
restore a RingBuilder. swift-ring-builder uses this to
pickle.dump the dict to a file and later load that dict into
copy_from.
"""
return {'part_power': self.part_power,
'replicas': self.replicas,
'min_part_hours': self.min_part_hours,
@ -147,11 +172,20 @@ class RingBuilder(object):
builder itself keeps additional data such as when partitions were last
moved.
"""
# We cache the self._ring value so multiple requests for it don't build
# it multiple times. Be sure to set self._ring = None whenever the ring
# will need to be rebuilt.
if not self._ring:
# Make devs list (with holes for deleted devices) and not including
# builder-specific extra attributes.
devs = [None] * len(self.devs)
for dev in self._iter_devs():
devs[dev['id']] = dict((k, v) for k, v in dev.items()
if k not in ('parts', 'parts_wanted'))
# Copy over the replica+partition->device assignments, the device
# information, and the part_shift value (the number of bits to
# shift an unsigned int >I right to obtain the partition for the
# int).
if not self._replica2part2dev:
self._ring = RingData([], devs, 32 - self.part_power)
else:
@ -189,6 +223,7 @@ class RingBuilder(object):
if dev['id'] < len(self.devs) and self.devs[dev['id']] is not None:
raise exceptions.DuplicateDeviceError(
'Duplicate device id: %d' % dev['id'])
# Add holes to self.devs to ensure self.devs[dev['id']] will be the dev
while dev['id'] >= len(self.devs):
self.devs.append(None)
dev['weight'] = float(dev['weight'])
@ -248,6 +283,8 @@ class RingBuilder(object):
below 1% or doesn't change by more than 1% (only happens with ring that
can't be balanced no matter what -- like with 3 zones of differing
weights with replicas set to 3).
:returns: (number_of_partitions_altered, resulting_balance)
"""
self._ring = None
if self._last_part_moves_epoch is None:
@ -277,15 +314,19 @@ class RingBuilder(object):
Validate the ring.
This is a safety function to try to catch any bugs in the building
process. It ensures partitions have been assigned to distinct zones,
process. It ensures partitions have been assigned to real devices,
aren't doubly assigned, etc. It can also optionally check the even
distribution of partitions across devices.
:param stats: if True, check distribution of partitions across devices
:returns: if stats is True, a tuple of (device usage, worst stat), else
(None, None)
:returns: if stats is True, a tuple of (device_usage, worst_stat), else
(None, None). device_usage[dev_id] will equal the number of
partitions assigned to that device. worst_stat will equal the
number of partitions the worst device is skewed from the
number it should have.
:raises RingValidationError: problem was found with the ring.
"""
if sum(d['parts'] for d in self._iter_devs()) != \
self.parts * self.replicas:
raise exceptions.RingValidationError(
@ -293,33 +334,37 @@ class RingBuilder(object):
(sum(d['parts'] for d in self._iter_devs()),
self.parts * self.replicas))
if stats:
# dev_usage[dev_id] will equal the number of partitions assigned to
# that device.
dev_usage = array('I', (0 for _junk in xrange(len(self.devs))))
for part2dev in self._replica2part2dev:
for dev_id in part2dev:
dev_usage[dev_id] += 1
for part in xrange(self.parts):
zones = {}
for replica in xrange(self.replicas):
dev_id = self._replica2part2dev[replica][part]
if stats:
dev_usage[dev_id] += 1
zone = self.devs[dev_id]['zone']
if zone in zones:
if dev_id >= len(self.devs) or not self.devs[dev_id]:
raise exceptions.RingValidationError(
'Partition %d not in %d distinct zones. ' \
'Zones were: %s' %
(part, self.replicas,
[self.devs[self._replica2part2dev[r][part]]['zone']
for r in xrange(self.replicas)]))
zones[zone] = True
"Partition %d, replica %d was not allocated "
"to a device." %
(part, replica))
if stats:
weighted_parts = self.weighted_parts()
weight_of_one_part = self.weight_of_one_part()
worst = 0
for dev in self._iter_devs():
if not dev['weight']:
if dev_usage[dev['id']]:
# If a device has no weight, but has partitions, then
# its overage is considered "infinity" and therefore
# always the worst possible. We show 999.99 for
# convenience.
worst = 999.99
break
continue
skew = abs(100.0 * dev_usage[dev['id']] /
(dev['weight'] * weighted_parts) - 100.0)
(dev['weight'] * weight_of_one_part) - 100.0)
if skew > worst:
worst = skew
return dev_usage, worst
@ -337,15 +382,18 @@ class RingBuilder(object):
:returns: balance of the ring
"""
balance = 0
weighted_parts = self.weighted_parts()
weight_of_one_part = self.weight_of_one_part()
for dev in self._iter_devs():
if not dev['weight']:
if dev['parts']:
# If a device has no weight, but has partitions, then its
# overage is considered "infinity" and therefore always the
# worst possible. We show 999.99 for convenience.
balance = 999.99
break
continue
dev_balance = abs(100.0 * dev['parts'] /
(dev['weight'] * weighted_parts) - 100.0)
(dev['weight'] * weight_of_one_part) - 100.0)
if dev_balance > balance:
balance = dev_balance
return balance
@ -369,6 +417,12 @@ class RingBuilder(object):
return [self.devs[r[part]] for r in self._replica2part2dev]
def _iter_devs(self):
"""
Returns an iterator all the non-None devices in the ring. Note that
this means list(b._iter_devs())[some_id] may not equal b.devs[some_id];
you will have to check the 'id' key of each device to obtain its
dev_id.
"""
for dev in self.devs:
if dev is not None:
yield dev
@ -378,16 +432,20 @@ class RingBuilder(object):
Sets the parts_wanted key for each of the devices to the number of
partitions the device wants based on its relative weight. This key is
used to sort the devices according to "most wanted" during rebalancing
to best distribute partitions.
to best distribute partitions. A negative parts_wanted indicates the
device is "overweight" and wishes to give partitions away if possible.
"""
weighted_parts = self.weighted_parts()
weight_of_one_part = self.weight_of_one_part()
for dev in self._iter_devs():
if not dev['weight']:
dev['parts_wanted'] = self.parts * -2
# With no weight, that means we wish to "drain" the device. So
# we set the parts_wanted to a really large negative number to
# indicate its strong desire to give up everything it has.
dev['parts_wanted'] = -self.parts * self.replicas
else:
dev['parts_wanted'] = \
int(weighted_parts * dev['weight']) - dev['parts']
int(weight_of_one_part * dev['weight']) - dev['parts']
def _initial_balance(self):
"""
@ -417,10 +475,14 @@ class RingBuilder(object):
def _gather_reassign_parts(self):
"""
Returns a list of (partition, replicas) pairs to be reassigned
by gathering them from removed devices and overweight devices.
Returns a list of (partition, replicas) pairs to be reassigned by
gathering from removed devices, insufficiently-far-apart replicas, and
overweight drives.
"""
reassign_parts = defaultdict(list)
# First we gather partitions from removed devices. Since removed
# devices usually indicate device failures, we have no choice but to
# reassign these partitions. However, we mark them as moved so later
# choices will skip other replicas of the same partition if possible.
removed_dev_parts = defaultdict(list)
if self._remove_devs:
dev_ids = [d['id'] for d in self._remove_devs if d['parts']]
@ -432,26 +494,76 @@ class RingBuilder(object):
self._last_part_moves[part] = 0
removed_dev_parts[part].append(replica)
# Now we gather partitions that are "at risk" because they aren't
# currently sufficient spread out across the cluster.
spread_out_parts = defaultdict(list)
max_allowed_replicas = self._build_max_replicas_by_tier()
for part in xrange(self.parts):
# Only move one replica at a time if possible.
if part in removed_dev_parts:
continue
# First, add up the count of replicas at each tier for each
# partition.
replicas_at_tier = defaultdict(lambda: 0)
for replica in xrange(self.replicas):
dev = self.devs[self._replica2part2dev[replica][part]]
for tier in tiers_for_dev(dev):
replicas_at_tier[tier] += 1
# Now, look for partitions not yet spread out enough and not
# recently moved.
for replica in xrange(self.replicas):
dev = self.devs[self._replica2part2dev[replica][part]]
removed_replica = False
for tier in tiers_for_dev(dev):
if (replicas_at_tier[tier] > max_allowed_replicas[tier] and
self._last_part_moves[part] >= self.min_part_hours):
self._last_part_moves[part] = 0
spread_out_parts[part].append(replica)
dev['parts_wanted'] += 1
dev['parts'] -= 1
removed_replica = True
break
if removed_replica:
for tier in tiers_for_dev(dev):
replicas_at_tier[tier] -= 1
# Last, we gather partitions from devices that are "overweight" because
# they have more partitions than their parts_wanted.
reassign_parts = defaultdict(list)
# We randomly pick a new starting point in the "circular" ring of
# partitions to try to get a better rebalance when called multiple
# times.
start = self._last_part_gather_start / 4 + randint(0, self.parts / 2)
self._last_part_gather_start = start
for replica in xrange(self.replicas):
part2dev = self._replica2part2dev[replica]
for half in (xrange(start, self.parts), xrange(0, start)):
for part in half:
if self._last_part_moves[part] < self.min_part_hours:
continue
if part in removed_dev_parts:
continue
dev = self.devs[part2dev[part]]
if dev['parts_wanted'] < 0:
self._last_part_moves[part] = 0
dev['parts_wanted'] += 1
dev['parts'] -= 1
reassign_parts[part].append(replica)
for part in itertools.chain(xrange(start, self.parts),
xrange(0, start)):
if self._last_part_moves[part] < self.min_part_hours:
continue
if part in removed_dev_parts or part in spread_out_parts:
continue
dev = self.devs[part2dev[part]]
if dev['parts_wanted'] < 0:
self._last_part_moves[part] = 0
dev['parts_wanted'] += 1
dev['parts'] -= 1
reassign_parts[part].append(replica)
reassign_parts.update(spread_out_parts)
reassign_parts.update(removed_dev_parts)
reassign_parts_list = list(reassign_parts.iteritems())
# We shuffle the partitions to reassign so we get a more even
# distribution later. There has been discussion of trying to distribute
# partitions more "regularly" because that would actually reduce risk
# but 1) it is really difficult to do this with uneven clusters and 2)
# it would concentrate load during failure recovery scenarios
# (increasing risk). The "right" answer has yet to be debated to
# conclusion, but working code wins for now.
shuffle(reassign_parts_list)
return reassign_parts_list
@ -461,7 +573,20 @@ class RingBuilder(object):
the initial assignment. The devices are ordered by how many partitions
they still want and kept in that order throughout the process. The
gathered partitions are iterated through, assigning them to devices
according to the "most wanted" and distinct zone restrictions.
according to the "most wanted" while keeping the replicas as "far
apart" as possible. Two different zones are considered the
farthest-apart things, followed by different ip/port pairs within a
zone; the least-far-apart things are different devices with the same
ip/port pair in the same zone.
If you want more replicas than devices, you won't get all your
replicas.
:param reassign_parts: An iterable of (part, replicas_to_replace)
pairs. replicas_to_replace is an iterable of the
replica (an int) to replace for that partition.
replicas_to_replace may be shared for multiple
partitions, so be sure you do not modify it.
"""
for dev in self._iter_devs():
dev['sort_key'] = self._sort_key_for(dev)
@ -469,42 +594,146 @@ class RingBuilder(object):
sorted((d for d in self._iter_devs() if d['weight']),
key=lambda x: x['sort_key'])
for part, replace_replicas in reassign_parts:
other_zones = array('H')
replace = []
for replica in xrange(self.replicas):
if replica in replace_replicas:
replace.append(replica)
else:
other_zones.append(self.devs[
self._replica2part2dev[replica][part]]['zone'])
tier2children = build_tier_tree(available_devs)
for replica in replace:
index = len(available_devs) - 1
while available_devs[index]['zone'] in other_zones:
index -= 1
dev = available_devs.pop(index)
other_zones.append(dev['zone'])
self._replica2part2dev[replica][part] = dev['id']
tier2devs = defaultdict(list)
tier2sort_key = defaultdict(list)
tiers_by_depth = defaultdict(set)
for dev in available_devs:
for tier in tiers_for_dev(dev):
tier2devs[tier].append(dev) # <-- starts out sorted!
tier2sort_key[tier].append(dev['sort_key'])
tiers_by_depth[len(tier)].add(tier)
for part, replace_replicas in reassign_parts:
# Gather up what other tiers (zones, ip_ports, and devices) the
# replicas not-to-be-moved are in for this part.
other_replicas = defaultdict(lambda: 0)
for replica in xrange(self.replicas):
if replica not in replace_replicas:
dev = self.devs[self._replica2part2dev[replica][part]]
for tier in tiers_for_dev(dev):
other_replicas[tier] += 1
def find_home_for_replica(tier=(), depth=1):
# Order the tiers by how many replicas of this
# partition they already have. Then, of the ones
# with the smallest number of replicas, pick the
# tier with the hungriest drive and then continue
# searching in that subtree.
#
# There are other strategies we could use here,
# such as hungriest-tier (i.e. biggest
# sum-of-parts-wanted) or picking one at random.
# However, hungriest-drive is what was used here
# before, and it worked pretty well in practice.
#
# Note that this allocator will balance things as
# evenly as possible at each level of the device
# layout. If your layout is extremely unbalanced,
# this may produce poor results.
candidate_tiers = tier2children[tier]
min_count = min(other_replicas[t] for t in candidate_tiers)
candidate_tiers = [t for t in candidate_tiers
if other_replicas[t] == min_count]
candidate_tiers.sort(
key=lambda t: tier2devs[t][-1]['parts_wanted'])
if depth == max(tiers_by_depth.keys()):
return tier2devs[candidate_tiers[-1]][-1]
return find_home_for_replica(tier=candidate_tiers[-1],
depth=depth + 1)
for replica in replace_replicas:
dev = find_home_for_replica()
dev['parts_wanted'] -= 1
dev['parts'] += 1
dev['sort_key'] = self._sort_key_for(dev)
self._insert_dev_sorted(available_devs, dev)
old_sort_key = dev['sort_key']
new_sort_key = dev['sort_key'] = self._sort_key_for(dev)
for tier in tiers_for_dev(dev):
other_replicas[tier] += 1
index = bisect.bisect_left(tier2sort_key[tier],
old_sort_key)
tier2devs[tier].pop(index)
tier2sort_key[tier].pop(index)
new_index = bisect.bisect_left(tier2sort_key[tier],
new_sort_key)
tier2devs[tier].insert(new_index, dev)
tier2sort_key[tier].insert(new_index, new_sort_key)
self._replica2part2dev[replica][part] = dev['id']
# Just to save memory and keep from accidental reuse.
for dev in self._iter_devs():
del dev['sort_key']
def _insert_dev_sorted(self, devs, dev):
index = 0
end = len(devs)
while index < end:
mid = (index + end) // 2
if dev['sort_key'] < devs[mid]['sort_key']:
end = mid
else:
index = mid + 1
devs.insert(index, dev)
def _sort_key_for(self, dev):
return '%08x.%04x' % (self.parts + dev['parts_wanted'],
randint(0, 0xffff))
# The maximum value of self.parts is 2^32, which is 9 hex
# digits wide (0x100000000). Using a width of 16 here gives us
# plenty of breathing room; you'd need more than 2^28 replicas
# to overflow it.
# Since the sort key is a string and therefore an ascii sort applies,
# the maximum_parts_wanted + parts_wanted is used so negative
# parts_wanted end up sorted above positive parts_wanted.
return '%016x.%04x.%04x' % (
(self.parts * self.replicas) + dev['parts_wanted'],
randint(0, 0xffff),
dev['id'])
def _build_max_replicas_by_tier(self):
"""
Returns a dict of (tier: replica_count) for all tiers in the ring.
There will always be a () entry as the root of the structure, whose
replica_count will equal the ring's replica_count.
Then there will be (dev_id,) entries for each device, indicating the
maximum number of replicas the device might have for any given
partition. Anything greater than 1 indicates a partition at serious
risk, as the data on that partition will not be stored distinctly at
the ring's replica_count.
Next there will be (dev_id, ip_port) entries for each device,
indicating the maximum number of replicas the device shares with other
devices on the same ip_port for any given partition. Anything greater
than 1 indicates a partition at elevated risk, as if that ip_port were
to fail multiple replicas of that partition would be unreachable.
Last there will be (dev_id, ip_port, zone) entries for each device,
indicating the maximum number of replicas the device shares with other
devices within the same zone for any given partition. Anything greater
than 1 indicates a partition at slightly elevated risk, as if that zone
were to fail multiple replicas of that partition would be unreachable.
Example return dict for the common SAIO setup::
{(): 3,
(1,): 1.0,
(1, '127.0.0.1:6010'): 1.0,
(1, '127.0.0.1:6010', 0): 1.0,
(2,): 1.0,
(2, '127.0.0.1:6020'): 1.0,
(2, '127.0.0.1:6020', 1): 1.0,
(3,): 1.0,
(3, '127.0.0.1:6030'): 1.0,
(3, '127.0.0.1:6030', 2): 1.0,
(4,): 1.0,
(4, '127.0.0.1:6040'): 1.0,
(4, '127.0.0.1:6040', 3): 1.0}
"""
# Used by walk_tree to know what entries to create for each recursive
# call.
tier2children = build_tier_tree(self._iter_devs())
def walk_tree(tier, replica_count):
mr = {tier: replica_count}
if tier in tier2children:
subtiers = tier2children[tier]
for subtier in subtiers:
submax = math.ceil(float(replica_count) / len(subtiers))
mr.update(walk_tree(subtier, submax))
return mr
return walk_tree((), self.replicas)

View File

@ -14,6 +14,7 @@
# limitations under the License.
import cPickle as pickle
from collections import defaultdict
from gzip import GzipFile
from os.path import getmtime
from struct import unpack_from
@ -21,6 +22,7 @@ from time import time
import os
from swift.common.utils import hash_path, validate_configuration
from swift.common.ring.utils import tiers_for_dev
class RingData(object):
@ -65,16 +67,26 @@ class Ring(object):
ring_data['devs'], ring_data['part_shift'])
self._mtime = getmtime(self.pickle_gz_path)
self.devs = ring_data.devs
self.zone2devs = {}
for dev in self.devs:
if not dev:
continue
if dev['zone'] in self.zone2devs:
self.zone2devs[dev['zone']].append(dev)
else:
self.zone2devs[dev['zone']] = [dev]
self._replica2part2dev_id = ring_data._replica2part2dev_id
self._part_shift = ring_data._part_shift
self._rebuild_tier_data()
def _rebuild_tier_data(self):
self.tier2devs = defaultdict(list)
for dev in self.devs:
if not dev:
continue
for tier in tiers_for_dev(dev):
self.tier2devs[tier].append(dev)
tiers_by_length = defaultdict(list)
for tier in self.tier2devs.keys():
tiers_by_length[len(tier)].append(tier)
self.tiers_by_length = sorted(tiers_by_length.values(),
key=lambda x: len(x[0]))
for tiers in self.tiers_by_length:
tiers.sort()
@property
def replica_count(self):
@ -97,20 +109,27 @@ class Ring(object):
def get_part_nodes(self, part):
"""
Get the nodes that are responsible for the partition.
Get the nodes that are responsible for the partition. If one
node is responsible for more than one replica of the same
partition, it will only appear in the output once.
:param part: partition to get nodes for
:returns: list of node dicts
See :func:`get_nodes` for a description of the node dicts.
"""
if time() > self._rtime:
self._reload()
return [self.devs[r[part]] for r in self._replica2part2dev_id]
seen_ids = set()
return [self.devs[r[part]] for r in self._replica2part2dev_id
if not (r[part] in seen_ids or seen_ids.add(r[part]))]
def get_nodes(self, account, container=None, obj=None):
"""
Get the partition and nodes for an account/container/object.
If a node is responsible for more than one replica, it will
only appear in the output once.
:param account: account name
:param container: container name
@ -138,7 +157,9 @@ class Ring(object):
if time() > self._rtime:
self._reload()
part = unpack_from('>I', key)[0] >> self._part_shift
return part, [self.devs[r[part]] for r in self._replica2part2dev_id]
seen_ids = set()
return part, [self.devs[r[part]] for r in self._replica2part2dev_id
if not (r[part] in seen_ids or seen_ids.add(r[part]))]
def get_more_nodes(self, part):
"""
@ -151,17 +172,22 @@ class Ring(object):
"""
if time() > self._rtime:
self._reload()
zones = sorted(self.zone2devs.keys())
used_tiers = set()
for part2dev_id in self._replica2part2dev_id:
zones.remove(self.devs[part2dev_id[part]]['zone'])
while zones:
zone = zones.pop(part % len(zones))
weighted_node = None
for i in xrange(len(self.zone2devs[zone])):
node = self.zone2devs[zone][(part + i) %
len(self.zone2devs[zone])]
if node.get('weight'):
weighted_node = node
for tier in tiers_for_dev(self.devs[part2dev_id[part]]):
used_tiers.add(tier)
for level in self.tiers_by_length:
tiers = list(level)
while tiers:
tier = tiers.pop(part % len(tiers))
if tier in used_tiers:
continue
for i in xrange(len(self.tier2devs[tier])):
dev = self.tier2devs[tier][(part + i) %
len(self.tier2devs[tier])]
if not dev.get('weight'):
continue
yield dev
used_tiers.update(tiers_for_dev(dev))
break
if weighted_node:
yield weighted_node

View File

@ -0,0 +1,89 @@
from collections import defaultdict
def tiers_for_dev(dev):
"""
Returns a tuple of tiers for a given device in ascending order by
length.
:returns: tuple of tiers
"""
t1 = dev['zone']
t2 = "{ip}:{port}".format(ip=dev.get('ip'), port=dev.get('port'))
t3 = dev['id']
return ((t1,),
(t1, t2),
(t1, t2, t3))
def build_tier_tree(devices):
"""
Construct the tier tree from the zone layout.
The tier tree is a dictionary that maps tiers to their child tiers.
A synthetic root node of () is generated so that there's one tree,
not a forest.
Example:
zone 1 -+---- 192.168.1.1:6000 -+---- device id 0
| |
| +---- device id 1
| |
| +---- device id 2
|
+---- 192.168.1.2:6000 -+---- device id 3
|
+---- device id 4
|
+---- device id 5
zone 2 -+---- 192.168.2.1:6000 -+---- device id 6
| |
| +---- device id 7
| |
| +---- device id 8
|
+---- 192.168.2.2:6000 -+---- device id 9
|
+---- device id 10
|
+---- device id 11
The tier tree would look like:
{
(): [(1,), (2,)],
(1,): [(1, 192.168.1.1:6000),
(1, 192.168.1.2:6000)],
(2,): [(1, 192.168.2.1:6000),
(1, 192.168.2.2:6000)],
(1, 192.168.1.1:6000): [(1, 192.168.1.1:6000, 0),
(1, 192.168.1.1:6000, 1),
(1, 192.168.1.1:6000, 2)],
(1, 192.168.1.2:6000): [(1, 192.168.1.2:6000, 3),
(1, 192.168.1.2:6000, 4),
(1, 192.168.1.2:6000, 5)],
(2, 192.168.2.1:6000): [(1, 192.168.2.1:6000, 6),
(1, 192.168.2.1:6000, 7),
(1, 192.168.2.1:6000, 8)],
(2, 192.168.2.2:6000): [(1, 192.168.2.2:6000, 9),
(1, 192.168.2.2:6000, 10),
(1, 192.168.2.2:6000, 11)],
}
:devices: device dicts from which to generate the tree
:returns: tier tree
"""
tier2children = defaultdict(set)
for dev in devices:
for tier in tiers_for_dev(dev):
if len(tier) > 1:
tier2children[tier[0:-1]].add(tier)
else:
tier2children[()].add(tier)
return tier2children

View File

@ -298,7 +298,7 @@ class ContainerSync(Daemon):
# will attempt to sync previously skipped rows in case the
# other nodes didn't succeed.
if unpack_from('>I', key)[0] % \
self.container_ring.replica_count != ordinal:
len(nodes) != ordinal:
if not self.container_sync_row(row, sync_to, sync_key,
broker, info):
return
@ -317,7 +317,7 @@ class ContainerSync(Daemon):
# previously skipped rows in case the other nodes didn't
# succeed.
if unpack_from('>I', key)[0] % \
self.container_ring.replica_count == ordinal:
len(nodes) == ordinal:
if not self.container_sync_row(row, sync_to, sync_key,
broker, info):
return

View File

@ -373,7 +373,7 @@ class ObjectReplicator(Daemon):
headers={'Content-Length': '0'}).getresponse().read()
responses.append(success)
if not suffixes or (len(responses) == \
self.object_ring.replica_count and all(responses)):
len(job['nodes']) and all(responses)):
self.logger.info(_("Removing partition: %s"), job['path'])
tpool.execute(shutil.rmtree, job['path'], ignore_errors=True)
except (Exception, Timeout):
@ -397,7 +397,7 @@ class ObjectReplicator(Daemon):
if isinstance(hashed, BaseException):
raise hashed
self.suffix_hash += hashed
attempts_left = self.object_ring.replica_count - 1
attempts_left = len(job['nodes'])
nodes = itertools.chain(job['nodes'],
self.object_ring.get_more_nodes(int(job['partition'])))
while attempts_left > 0:
@ -533,12 +533,13 @@ class ObjectReplicator(Daemon):
continue
for partition in os.listdir(obj_path):
try:
nodes = [node for node in
part_nodes = \
self.object_ring.get_part_nodes(int(partition))
nodes = [node for node in part_nodes
if node['id'] != local_dev['id']]
jobs.append(dict(path=join(obj_path, partition),
nodes=nodes,
delete=len(nodes) > self.object_ring.replica_count - 1,
delete=len(nodes) > len(part_nodes) - 1,
partition=partition))
except ValueError:
continue

View File

@ -186,7 +186,7 @@ class SegmentedIterable(object):
resp = self.controller.GETorHEAD_base(req, _('Object'), partition,
self.controller.iter_nodes(partition, nodes,
self.controller.app.object_ring), path,
self.controller.app.object_ring.replica_count)
len(nodes))
if not is_success(resp.status_int):
raise Exception(_('Could not load object segment %(path)s:' \
' %(status)s') % {'path': path, 'status': resp.status_int})
@ -414,7 +414,7 @@ class Controller(object):
return None, None, None
result_code = 0
container_count = 0
attempts_left = self.app.account_ring.replica_count
attempts_left = len(nodes)
path = '/%s' % account
headers = {'x-trans-id': self.trans_id, 'Connection': 'close'}
for node in self.iter_nodes(partition, nodes, self.app.account_ring):
@ -507,7 +507,7 @@ class Controller(object):
sync_key = None
container_size = None
versions = None
attempts_left = self.app.container_ring.replica_count
attempts_left = len(nodes)
headers = {'x-trans-id': self.trans_id, 'Connection': 'close'}
for node in self.iter_nodes(partition, nodes, self.app.container_ring):
try:
@ -606,13 +606,14 @@ class Controller(object):
backend request that should be made.
:returns: a webob Response object
"""
nodes = self.iter_nodes(part, ring.get_part_nodes(part), ring)
pile = GreenPile(ring.replica_count)
start_nodes = ring.get_part_nodes(part)
nodes = self.iter_nodes(part, start_nodes, ring)
pile = GreenPile(len(start_nodes))
for head in headers:
pile.spawn(self._make_request, nodes, part, method, path,
head, query_string)
response = [resp for resp in pile if resp]
while len(response) < ring.replica_count:
while len(response) < len(start_nodes):
response.append((HTTP_SERVICE_UNAVAILABLE, '', ''))
statuses, reasons, bodies = zip(*response)
return self.best_response(req, statuses, reasons, bodies,
@ -886,7 +887,7 @@ class ObjectController(Controller):
shuffle(lnodes)
lresp = self.GETorHEAD_base(lreq, _('Container'),
lpartition, lnodes, lreq.path_info,
self.app.container_ring.replica_count)
len(lnodes))
if 'swift.authorize' in env:
lreq.acl = lresp.headers.get('x-container-read')
aresp = env['swift.authorize'](lreq)
@ -918,7 +919,7 @@ class ObjectController(Controller):
shuffle(nodes)
resp = self.GETorHEAD_base(req, _('Object'), partition,
self.iter_nodes(partition, nodes, self.app.object_ring),
req.path_info, self.app.object_ring.replica_count)
req.path_info, len(nodes))
# If we get a 416 Requested Range Not Satisfiable we have to check if
# we were actually requesting a manifest and then redo
@ -928,7 +929,7 @@ class ObjectController(Controller):
req.range = None
resp2 = self.GETorHEAD_base(req, _('Object'), partition,
self.iter_nodes(partition, nodes, self.app.object_ring),
req.path_info, self.app.object_ring.replica_count)
req.path_info, len(nodes))
if 'x-object-manifest' not in resp2.headers:
return resp
resp = resp2
@ -1163,7 +1164,7 @@ class ObjectController(Controller):
hreq = Request.blank(req.path_info, headers={'X-Newest': 'True'},
environ={'REQUEST_METHOD': 'HEAD'})
hresp = self.GETorHEAD_base(hreq, _('Object'), partition, nodes,
hreq.path_info, self.app.object_ring.replica_count)
hreq.path_info, len(nodes))
# Used by container sync feature
if 'x-timestamp' in req.headers:
try:
@ -1544,7 +1545,7 @@ class ContainerController(Controller):
self.account_name, self.container_name)
shuffle(nodes)
resp = self.GETorHEAD_base(req, _('Container'), part, nodes,
req.path_info, self.app.container_ring.replica_count)
req.path_info, len(nodes))
if self.app.memcache:
# set the memcache container size for ratelimiting
@ -1694,7 +1695,7 @@ class AccountController(Controller):
partition, nodes = self.app.account_ring.get_nodes(self.account_name)
shuffle(nodes)
resp = self.GETorHEAD_base(req, _('Account'), partition, nodes,
req.path_info.rstrip('/'), self.app.account_ring.replica_count)
req.path_info.rstrip('/'), len(nodes))
if resp.status_int == HTTP_NOT_FOUND and self.app.account_autocreate:
if len(self.account_name) > MAX_ACCOUNT_NAME_LENGTH:
resp = HTTPBadRequest(request=req)
@ -1712,7 +1713,7 @@ class AccountController(Controller):
raise Exception('Could not autocreate account %r' %
self.account_name)
resp = self.GETorHEAD_base(req, _('Account'), partition, nodes,
req.path_info.rstrip('/'), self.app.account_ring.replica_count)
req.path_info.rstrip('/'), len(nodes))
return resp
@public

View File

@ -15,6 +15,7 @@
import os
import unittest
from collections import defaultdict
from shutil import rmtree
from swift.common import exceptions
@ -191,6 +192,247 @@ class TestRingBuilder(unittest.TestCase):
max_run = run
return max_run > len(parts) / 2
def test_multitier_partial(self):
# Multitier test, zones full, nodes not full
rb = ring.RingBuilder(8, 6, 1)
rb.add_dev({'id': 0, 'zone': 0, 'weight': 1, 'ip': '127.0.0.1',
'port': 10000, 'device': 'sda'})
rb.add_dev({'id': 1, 'zone': 0, 'weight': 1, 'ip': '127.0.0.1',
'port': 10000, 'device': 'sdb'})
rb.add_dev({'id': 2, 'zone': 0, 'weight': 1, 'ip': '127.0.0.1',
'port': 10000, 'device': 'sdc'})
rb.add_dev({'id': 3, 'zone': 1, 'weight': 1, 'ip': '127.0.0.1',
'port': 10001, 'device': 'sdd'})
rb.add_dev({'id': 4, 'zone': 1, 'weight': 1, 'ip': '127.0.0.1',
'port': 10001, 'device': 'sde'})
rb.add_dev({'id': 5, 'zone': 1, 'weight': 1, 'ip': '127.0.0.1',
'port': 10001, 'device': 'sdf'})
rb.add_dev({'id': 6, 'zone': 2, 'weight': 1, 'ip': '127.0.0.1',
'port': 10002, 'device': 'sdg'})
rb.add_dev({'id': 7, 'zone': 2, 'weight': 1, 'ip': '127.0.0.1',
'port': 10002, 'device': 'sdh'})
rb.add_dev({'id': 8, 'zone': 2, 'weight': 1, 'ip': '127.0.0.1',
'port': 10002, 'device': 'sdi'})
rb.rebalance()
rb.validate()
for part in xrange(rb.parts):
counts = defaultdict(lambda: defaultdict(lambda: 0))
for replica in xrange(rb.replicas):
dev = rb.devs[rb._replica2part2dev[replica][part]]
counts['zone'][dev['zone']] += 1
counts['dev_id'][dev['id']] += 1
if counts['zone'] != {0: 2, 1: 2, 2: 2}:
raise AssertionError(
"Partition %d not evenly distributed (got %r)" %
(part, counts['zone']))
for dev_id, replica_count in counts['dev_id'].iteritems():
if replica_count > 1:
raise AssertionError(
"Partition %d is on device %d more than once (%r)" %
(part, dev_id, counts['dev_id']))
def test_multitier_full(self):
# Multitier test, #replicas == #devs
rb = ring.RingBuilder(8, 6, 1)
rb.add_dev({'id': 0, 'zone': 0, 'weight': 1, 'ip': '127.0.0.1',
'port': 10000, 'device': 'sda'})
rb.add_dev({'id': 1, 'zone': 0, 'weight': 1, 'ip': '127.0.0.1',
'port': 10000, 'device': 'sdb'})
rb.add_dev({'id': 2, 'zone': 1, 'weight': 1, 'ip': '127.0.0.1',
'port': 10000, 'device': 'sdc'})
rb.add_dev({'id': 3, 'zone': 1, 'weight': 1, 'ip': '127.0.0.1',
'port': 10001, 'device': 'sdd'})
rb.add_dev({'id': 4, 'zone': 2, 'weight': 1, 'ip': '127.0.0.1',
'port': 10001, 'device': 'sde'})
rb.add_dev({'id': 5, 'zone': 2, 'weight': 1, 'ip': '127.0.0.1',
'port': 10001, 'device': 'sdf'})
rb.rebalance()
rb.validate()
for part in xrange(rb.parts):
counts = defaultdict(lambda: defaultdict(lambda: 0))
for replica in xrange(rb.replicas):
dev = rb.devs[rb._replica2part2dev[replica][part]]
counts['zone'][dev['zone']] += 1
counts['dev_id'][dev['id']] += 1
if counts['zone'] != {0: 2, 1: 2, 2: 2}:
raise AssertionError(
"Partition %d not evenly distributed (got %r)" %
(part, counts['zone']))
for dev_id, replica_count in counts['dev_id'].iteritems():
if replica_count != 1:
raise AssertionError(
"Partition %d is on device %d %d times, not 1 (%r)" %
(part, dev_id, replica_count, counts['dev_id']))
def test_multitier_overfull(self):
# Multitier test, #replicas > #devs + 2 (to prove even distribution)
rb = ring.RingBuilder(8, 8, 1)
rb.add_dev({'id': 0, 'zone': 0, 'weight': 1, 'ip': '127.0.0.1',
'port': 10000, 'device': 'sda'})
rb.add_dev({'id': 1, 'zone': 0, 'weight': 1, 'ip': '127.0.0.1',
'port': 10000, 'device': 'sdb'})
rb.add_dev({'id': 2, 'zone': 1, 'weight': 1, 'ip': '127.0.0.1',
'port': 10000, 'device': 'sdc'})
rb.add_dev({'id': 3, 'zone': 1, 'weight': 1, 'ip': '127.0.0.1',
'port': 10001, 'device': 'sdd'})
rb.add_dev({'id': 4, 'zone': 2, 'weight': 1, 'ip': '127.0.0.1',
'port': 10001, 'device': 'sde'})
rb.add_dev({'id': 5, 'zone': 2, 'weight': 1, 'ip': '127.0.0.1',
'port': 10001, 'device': 'sdf'})
rb.rebalance()
rb.validate()
for part in xrange(rb.parts):
counts = defaultdict(lambda: defaultdict(lambda: 0))
for replica in xrange(rb.replicas):
dev = rb.devs[rb._replica2part2dev[replica][part]]
counts['zone'][dev['zone']] += 1
counts['dev_id'][dev['id']] += 1
self.assertEquals(8, sum(counts['zone'].values()))
for zone, replica_count in counts['zone'].iteritems():
if replica_count not in (2, 3):
raise AssertionError(
"Partition %d not evenly distributed (got %r)" %
(part, counts['zone']))
for dev_id, replica_count in counts['dev_id'].iteritems():
if replica_count not in (1, 2):
raise AssertionError(
"Partition %d is on device %d %d times, "
"not 1 or 2 (%r)" %
(part, dev_id, replica_count, counts['dev_id']))
def test_multitier_expansion_more_devices(self):
rb = ring.RingBuilder(8, 6, 1)
rb.add_dev({'id': 0, 'zone': 0, 'weight': 1, 'ip': '127.0.0.1',
'port': 10000, 'device': 'sda'})
rb.add_dev({'id': 1, 'zone': 1, 'weight': 1, 'ip': '127.0.0.1',
'port': 10000, 'device': 'sdb'})
rb.add_dev({'id': 2, 'zone': 2, 'weight': 1, 'ip': '127.0.0.1',
'port': 10000, 'device': 'sdc'})
rb.rebalance()
rb.validate()
rb.add_dev({'id': 3, 'zone': 0, 'weight': 1, 'ip': '127.0.0.1',
'port': 10000, 'device': 'sdd'})
rb.add_dev({'id': 4, 'zone': 1, 'weight': 1, 'ip': '127.0.0.1',
'port': 10000, 'device': 'sde'})
rb.add_dev({'id': 5, 'zone': 2, 'weight': 1, 'ip': '127.0.0.1',
'port': 10000, 'device': 'sdf'})
for _ in xrange(5):
rb.pretend_min_part_hours_passed()
rb.rebalance()
rb.validate()
for part in xrange(rb.parts):
counts = dict(zone=defaultdict(lambda: 0),
dev_id=defaultdict(lambda: 0))
for replica in xrange(rb.replicas):
dev = rb.devs[rb._replica2part2dev[replica][part]]
counts['zone'][dev['zone']] += 1
counts['dev_id'][dev['id']] += 1
self.assertEquals({0: 2, 1: 2, 2: 2}, dict(counts['zone']))
self.assertEquals({0: 1, 1: 1, 2: 1, 3: 1, 4: 1, 5: 1},
dict(counts['dev_id']))
def test_multitier_part_moves_with_0_min_part_hours(self):
rb = ring.RingBuilder(8, 3, 0)
rb.add_dev({'id': 0, 'zone': 0, 'weight': 1, 'ip': '127.0.0.1',
'port': 10000, 'device': 'sda1'})
rb.rebalance()
rb.validate()
# min_part_hours is 0, so we're clear to move 2 replicas to
# new devs
rb.add_dev({'id': 1, 'zone': 0, 'weight': 1, 'ip': '127.0.0.1',
'port': 10000, 'device': 'sdb1'})
rb.add_dev({'id': 2, 'zone': 0, 'weight': 1, 'ip': '127.0.0.1',
'port': 10000, 'device': 'sdc1'})
rb.rebalance()
rb.validate()
for part in xrange(rb.parts):
devs = set()
for replica in xrange(rb.replicas):
devs.add(rb._replica2part2dev[replica][part])
if len(devs) != 3:
raise AssertionError(
"Partition %d not on 3 devs (got %r)" % (part, devs))
def test_multitier_part_moves_with_positive_min_part_hours(self):
rb = ring.RingBuilder(8, 3, 99)
rb.add_dev({'id': 0, 'zone': 0, 'weight': 1, 'ip': '127.0.0.1',
'port': 10000, 'device': 'sda1'})
rb.rebalance()
rb.validate()
# min_part_hours is >0, so we'll only be able to move 1
# replica to a new home
rb.add_dev({'id': 1, 'zone': 0, 'weight': 1, 'ip': '127.0.0.1',
'port': 10000, 'device': 'sdb1'})
rb.add_dev({'id': 2, 'zone': 0, 'weight': 1, 'ip': '127.0.0.1',
'port': 10000, 'device': 'sdc1'})
rb.pretend_min_part_hours_passed()
rb.rebalance()
rb.validate()
for part in xrange(rb.parts):
devs = set()
for replica in xrange(rb.replicas):
devs.add(rb._replica2part2dev[replica][part])
if len(devs) != 2:
raise AssertionError(
"Partition %d not on 2 devs (got %r)" % (part, devs))
def test_multitier_dont_move_too_many_replicas(self):
rb = ring.RingBuilder(8, 3, 0)
# there'll be at least one replica in z0 and z1
rb.add_dev({'id': 0, 'zone': 0, 'weight': 1, 'ip': '127.0.0.1',
'port': 10000, 'device': 'sda1'})
rb.add_dev({'id': 1, 'zone': 1, 'weight': 1, 'ip': '127.0.0.1',
'port': 10000, 'device': 'sdb1'})
rb.rebalance()
rb.validate()
# only 1 replica should move
rb.add_dev({'id': 2, 'zone': 2, 'weight': 1, 'ip': '127.0.0.1',
'port': 10000, 'device': 'sdd1'})
rb.add_dev({'id': 3, 'zone': 3, 'weight': 1, 'ip': '127.0.0.1',
'port': 10000, 'device': 'sde1'})
rb.add_dev({'id': 4, 'zone': 4, 'weight': 1, 'ip': '127.0.0.1',
'port': 10000, 'device': 'sdf1'})
rb.rebalance()
rb.validate()
for part in xrange(rb.parts):
zones = set()
for replica in xrange(rb.replicas):
zones.add(rb.devs[rb._replica2part2dev[replica][part]]['zone'])
if len(zones) != 3:
raise AssertionError(
"Partition %d not in 3 zones (got %r)" % (part, zones))
if 0 not in zones or 1 not in zones:
raise AssertionError(
"Partition %d not in zones 0 and 1 (got %r)" %
(part, zones))
def test_rerebalance(self):
rb = ring.RingBuilder(8, 3, 1)
rb.add_dev({'id': 0, 'zone': 0, 'weight': 1, 'ip': '127.0.0.1',
@ -286,42 +528,28 @@ class TestRingBuilder(unittest.TestCase):
self.assertRaises(exceptions.RingValidationError, rb.validate)
rb.devs[1]['parts'] += 1
# Test duplicate device for partition
# Test partition on nonexistent device
rb.pretend_min_part_hours_passed()
orig_dev_id = rb._replica2part2dev[0][0]
rb._replica2part2dev[0][0] = rb._replica2part2dev[1][0]
rb._replica2part2dev[0][0] = len(rb.devs)
self.assertRaises(exceptions.RingValidationError, rb.validate)
rb._replica2part2dev[0][0] = orig_dev_id
# Test duplicate zone for partition
rb.add_dev({'id': 5, 'zone': 0, 'weight': 2, 'ip': '127.0.0.1',
'port': 10005, 'device': 'sda1'})
rb.pretend_min_part_hours_passed()
rb.rebalance()
rb.validate()
orig_replica = orig_partition = orig_device = None
for part2dev in rb._replica2part2dev:
for p in xrange(2**8):
if part2dev[p] == 5:
for r in xrange(len(rb._replica2part2dev)):
if rb._replica2part2dev[r][p] != 5:
orig_replica = r
orig_partition = p
orig_device = rb._replica2part2dev[r][p]
rb._replica2part2dev[r][p] = 0
break
if orig_replica is not None:
break
if orig_replica is not None:
break
self.assertRaises(exceptions.RingValidationError, rb.validate)
rb._replica2part2dev[orig_replica][orig_partition] = orig_device
# Tests that validate can handle 'holes' in .devs
rb.remove_dev(2)
rb.pretend_min_part_hours_passed()
rb.rebalance()
rb.validate(stats=True)
# Test partition assigned to a hole
if rb.devs[2]:
rb.remove_dev(2)
rb.pretend_min_part_hours_passed()
orig_dev_id = rb._replica2part2dev[0][0]
rb._replica2part2dev[0][0] = 2
self.assertRaises(exceptions.RingValidationError, rb.validate)
rb._replica2part2dev[0][0] = orig_dev_id
# Validate that zero weight devices with no partitions don't count on
# the 'worst' value.
self.assertNotEquals(rb.validate(stats=True)[1], 999.99)

View File

@ -49,9 +49,18 @@ class TestRing(unittest.TestCase):
rmtree(self.testdir, ignore_errors=1)
os.mkdir(self.testdir)
self.testgz = os.path.join(self.testdir, 'whatever.ring.gz')
self.intended_replica2part2dev_id = [[0, 2, 0, 2], [2, 0, 2, 0]]
self.intended_devs = [{'id': 0, 'zone': 0, 'weight': 1.0}, None,
{'id': 2, 'zone': 2, 'weight': 1.0}]
self.intended_replica2part2dev_id = [[0, 1, 0, 1],
[0, 1, 0, 1],
[3, 4, 3, 4]]
self.intended_devs = [{'id': 0, 'zone': 0, 'weight': 1.0,
'ip': '10.1.1.1', 'port': 6000},
{'id': 1, 'zone': 0, 'weight': 1.0,
'ip': '10.1.1.1', 'port': 6000},
None,
{'id': 3, 'zone': 2, 'weight': 1.0,
'ip': '10.1.2.1', 'port': 6000},
{'id': 4, 'zone': 2, 'weight': 1.0,
'ip': '10.1.2.2', 'port': 6000}]
self.intended_part_shift = 30
self.intended_reload_time = 15
pickle.dump(ring.RingData(self.intended_replica2part2dev_id,
@ -88,28 +97,28 @@ class TestRing(unittest.TestCase):
self.ring = ring.Ring(self.testdir, reload_time=0.001,
ring_name='whatever')
orig_mtime = self.ring._mtime
self.assertEquals(len(self.ring.devs), 3)
self.assertEquals(len(self.ring.devs), 5)
self.intended_devs.append({'id': 3, 'zone': 3, 'weight': 1.0})
pickle.dump(ring.RingData(self.intended_replica2part2dev_id,
self.intended_devs, self.intended_part_shift),
GzipFile(self.testgz, 'wb'))
sleep(0.1)
self.ring.get_nodes('a')
self.assertEquals(len(self.ring.devs), 4)
self.assertEquals(len(self.ring.devs), 6)
self.assertNotEquals(self.ring._mtime, orig_mtime)
os.utime(self.testgz, (time() - 300, time() - 300))
self.ring = ring.Ring(self.testdir, reload_time=0.001,
ring_name='whatever')
orig_mtime = self.ring._mtime
self.assertEquals(len(self.ring.devs), 4)
self.intended_devs.append({'id': 4, 'zone': 4, 'weight': 1.0})
self.assertEquals(len(self.ring.devs), 6)
self.intended_devs.append({'id': 5, 'zone': 4, 'weight': 1.0})
pickle.dump(ring.RingData(self.intended_replica2part2dev_id,
self.intended_devs, self.intended_part_shift),
GzipFile(self.testgz, 'wb'))
sleep(0.1)
self.ring.get_part_nodes(0)
self.assertEquals(len(self.ring.devs), 5)
self.assertEquals(len(self.ring.devs), 7)
self.assertNotEquals(self.ring._mtime, orig_mtime)
os.utime(self.testgz, (time() - 300, time() - 300))
@ -117,14 +126,14 @@ class TestRing(unittest.TestCase):
ring_name='whatever')
orig_mtime = self.ring._mtime
part, nodes = self.ring.get_nodes('a')
self.assertEquals(len(self.ring.devs), 5)
self.intended_devs.append({'id': 5, 'zone': 5, 'weight': 1.0})
self.assertEquals(len(self.ring.devs), 7)
self.intended_devs.append({'id': 6, 'zone': 5, 'weight': 1.0})
pickle.dump(ring.RingData(self.intended_replica2part2dev_id,
self.intended_devs, self.intended_part_shift),
GzipFile(self.testgz, 'wb'))
sleep(0.1)
self.ring.get_more_nodes(part).next()
self.assertEquals(len(self.ring.devs), 6)
self.assertEquals(len(self.ring.devs), 8)
self.assertNotEquals(self.ring._mtime, orig_mtime)
def test_get_part_nodes(self):
@ -137,109 +146,137 @@ class TestRing(unittest.TestCase):
self.assertRaises(TypeError, self.ring.get_nodes)
part, nodes = self.ring.get_nodes('a')
self.assertEquals(part, 0)
self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0},
{'id': 2, 'zone': 2, 'weight': 1.0}])
self.assertEquals(nodes, [self.intended_devs[0],
self.intended_devs[3]])
part, nodes = self.ring.get_nodes('a1')
self.assertEquals(part, 0)
self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0},
{'id': 2, 'zone': 2, 'weight': 1.0}])
self.assertEquals(nodes, [self.intended_devs[0],
self.intended_devs[3]])
part, nodes = self.ring.get_nodes('a4')
self.assertEquals(part, 1)
self.assertEquals(nodes, [{'id': 2, 'zone': 2, 'weight': 1.0},
{'id': 0, 'zone': 0, 'weight': 1.0}])
self.assertEquals(nodes, [self.intended_devs[1],
self.intended_devs[4]])
part, nodes = self.ring.get_nodes('aa')
self.assertEquals(part, 1)
self.assertEquals(nodes, [{'id': 2, 'zone': 2, 'weight': 1.0},
{'id': 0, 'zone': 0, 'weight': 1.0}])
self.assertEquals(nodes, [self.intended_devs[1],
self.intended_devs[4]])
part, nodes = self.ring.get_nodes('a', 'c1')
self.assertEquals(part, 0)
self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0},
{'id': 2, 'zone': 2, 'weight': 1.0}])
self.assertEquals(nodes, [self.intended_devs[0],
self.intended_devs[3]])
part, nodes = self.ring.get_nodes('a', 'c0')
self.assertEquals(part, 3)
self.assertEquals(nodes, [{'id': 2, 'zone': 2, 'weight': 1.0},
{'id': 0, 'zone': 0, 'weight': 1.0}])
self.assertEquals(nodes, [self.intended_devs[1],
self.intended_devs[4]])
part, nodes = self.ring.get_nodes('a', 'c3')
self.assertEquals(part, 2)
self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0},
{'id': 2, 'zone': 2, 'weight': 1.0}])
self.assertEquals(nodes, [self.intended_devs[0],
self.intended_devs[3]])
part, nodes = self.ring.get_nodes('a', 'c2')
self.assertEquals(part, 2)
self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0},
{'id': 2, 'zone': 2, 'weight': 1.0}])
self.assertEquals(nodes, [self.intended_devs[0],
self.intended_devs[3]])
part, nodes = self.ring.get_nodes('a', 'c', 'o1')
self.assertEquals(part, 1)
self.assertEquals(nodes, [{'id': 2, 'zone': 2, 'weight': 1.0},
{'id': 0, 'zone': 0, 'weight': 1.0}])
self.assertEquals(nodes, [self.intended_devs[1],
self.intended_devs[4]])
part, nodes = self.ring.get_nodes('a', 'c', 'o5')
self.assertEquals(part, 0)
self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0},
{'id': 2, 'zone': 2, 'weight': 1.0}])
self.assertEquals(nodes, [self.intended_devs[0],
self.intended_devs[3]])
part, nodes = self.ring.get_nodes('a', 'c', 'o0')
self.assertEquals(part, 0)
self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0},
{'id': 2, 'zone': 2, 'weight': 1.0}])
self.assertEquals(nodes, [self.intended_devs[0],
self.intended_devs[3]])
part, nodes = self.ring.get_nodes('a', 'c', 'o2')
self.assertEquals(part, 2)
self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0},
{'id': 2, 'zone': 2, 'weight': 1.0}])
self.assertEquals(nodes, [self.intended_devs[0],
self.intended_devs[3]])
def add_dev_to_ring(self, new_dev):
self.ring.devs.append(new_dev)
self.ring._rebuild_tier_data()
def test_get_more_nodes(self):
# Yes, these tests are deliberately very fragile. We want to make sure
# that if someone changes the results the ring produces, they know it.
part, nodes = self.ring.get_nodes('a', 'c', 'o2')
self.assertEquals(part, 2)
self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0},
{'id': 2, 'zone': 2, 'weight': 1.0}])
self.assertEquals(nodes, [self.intended_devs[0],
self.intended_devs[3]])
nodes = list(self.ring.get_more_nodes(part))
self.assertEquals(nodes, [])
self.assertEquals(nodes, [self.intended_devs[4],
self.intended_devs[1]])
self.ring.devs.append({'id': 3, 'zone': 0, 'weight': 1.0})
self.ring.zone2devs[0].append(self.ring.devs[3])
new_dev = {'id': 5, 'zone': 0, 'weight': 1.0,
'ip': '10.1.1.1', 'port': 6000}
self.add_dev_to_ring(new_dev)
part, nodes = self.ring.get_nodes('a', 'c', 'o2')
self.assertEquals(part, 2)
self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0},
{'id': 2, 'zone': 2, 'weight': 1.0}])
self.assertEquals(nodes, [self.intended_devs[0],
self.intended_devs[3]])
nodes = list(self.ring.get_more_nodes(part))
self.assertEquals(nodes, [])
self.assertEquals(nodes, [self.intended_devs[4],
new_dev,
self.intended_devs[1]])
self.ring.zone2devs[self.ring.devs[3]['zone']].remove(self.ring.devs[3])
self.ring.devs[3]['zone'] = 3
self.ring.zone2devs[3] = [self.ring.devs[3]]
self.ring.devs[5]['zone'] = 3
self.ring._rebuild_tier_data()
part, nodes = self.ring.get_nodes('a', 'c', 'o2')
self.assertEquals(part, 2)
self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0},
{'id': 2, 'zone': 2, 'weight': 1.0}])
self.assertEquals(nodes, [self.intended_devs[0],
self.intended_devs[3]])
nodes = list(self.ring.get_more_nodes(part))
self.assertEquals(nodes, [{'id': 3, 'zone': 3, 'weight': 1.0}])
self.assertEquals(nodes, [new_dev,
self.intended_devs[4],
self.intended_devs[1]])
self.ring.devs.append(None)
self.ring.devs.append({'id': 5, 'zone': 5, 'weight': 1.0})
self.ring.zone2devs[5] = [self.ring.devs[5]]
new_dev2 = {'id': 6, 'zone': 6, 'weight': 1.0,
'ip': '10.1.6.1', 'port': 6000}
self.add_dev_to_ring(new_dev2)
part, nodes = self.ring.get_nodes('a', 'c', 'o2')
self.assertEquals(part, 2)
self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0},
{'id': 2, 'zone': 2, 'weight': 1.0}])
self.assertEquals(nodes, [self.intended_devs[0],
self.intended_devs[3]])
nodes = list(self.ring.get_more_nodes(part))
self.assertEquals(nodes, [{'id': 3, 'zone': 3, 'weight': 1.0},
{'id': 5, 'zone': 5, 'weight': 1.0}])
self.assertEquals(nodes, [new_dev,
new_dev2,
self.intended_devs[4],
self.intended_devs[1]])
self.ring.devs.append({'id': 6, 'zone': 5, 'weight': 1.0})
self.ring.zone2devs[5].append(self.ring.devs[6])
new_dev3 = {'id': 7, 'zone': 7, 'weight': 1.0,
'ip': '10.1.7.1', 'port': 6000}
self.add_dev_to_ring(new_dev3)
nodes = list(self.ring.get_more_nodes(part))
self.assertEquals(nodes, [{'id': 3, 'zone': 3, 'weight': 1.0},
{'id': 5, 'zone': 5, 'weight': 1.0}])
self.ring.devs[5]['weight'] = 0
self.assertEquals(nodes, [new_dev, new_dev2, new_dev3,
self.intended_devs[4],
self.intended_devs[1]])
new_dev3['weight'] = 0
nodes = list(self.ring.get_more_nodes(part))
self.assertEquals(nodes, [{'id': 3, 'zone': 3, 'weight': 1.0},
{'id': 6, 'zone': 5, 'weight': 1.0}])
self.ring.devs[3]['weight'] = 0
self.ring.devs.append({'id': 7, 'zone': 6, 'weight': 0.0})
self.ring.zone2devs[6] = [self.ring.devs[7]]
self.assertEquals(nodes, [new_dev, new_dev2,
self.intended_devs[4],
self.intended_devs[1]])
self.ring.devs[7]['weight'] = 1.0
new_dev4 = {'id': 8, 'zone': 8, 'weight': 0.0,
'ip': '10.1.8.1', 'port': 6000}
self.add_dev_to_ring(new_dev4)
nodes = list(self.ring.get_more_nodes(part))
self.assertEquals(nodes, [{'id': 6, 'zone': 5, 'weight': 1.0}])
self.assertEquals(nodes, [new_dev, new_dev2,
self.intended_devs[4],
self.intended_devs[1]])
if __name__ == '__main__':

View File

@ -26,7 +26,6 @@ utils.HASH_PATH_SUFFIX = 'endcap'
class FakeRing(object):
def __init__(self):
self.replica_count = 3
self.devs = [{'ip': '10.0.0.%s' % x, 'port': 1000 + x, 'device': 'sda'}
for x in xrange(3)]

View File

@ -433,7 +433,7 @@ class TestObjectReplicator(unittest.TestCase):
replicator.logger.exception = \
lambda *args, **kwargs: fake_exc(self, *args, **kwargs)
# Write some files into '1' and run replicate- they should be moved
# to the other partitoins and then node should get deleted.
# to the other partitions and then node should get deleted.
cur_part = '1'
df = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o',
FakeLogger())

View File

@ -252,7 +252,6 @@ class FakeRing(object):
# this is set higher.
self.max_more_nodes = 0
self.devs = {}
self.replica_count = 3
def get_nodes(self, account, container=None, obj=None):
devs = []
@ -852,7 +851,6 @@ class TestObjectController(unittest.TestCase):
test_status_map((200, 200, 204, 500, 404), 503)
def test_PUT_connect_exceptions(self):
with save_globals():
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
@ -873,7 +871,6 @@ class TestObjectController(unittest.TestCase):
test_status_map((200, 200, 503, 503, -1), 503)
def test_PUT_send_exceptions(self):
with save_globals():
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')