Make rings' replica counts adjustable.
Example: $ swift-ring-builder account.builder set_replicas 4 $ swift-ring-builder rebalance This is a prerequisite for supporting globally-distributed clusters, as operators of such clusters will probably want at least as many replicas as they have regions. Therefore, adding a region requires adding a replica. Similarly, removing a region lets an operator remove a replica and save some money on disks. In order to not hose clusters with lots of data, swift-ring-builder now allows for setting of fractional replicas. Thus, one can gradually increase the replica count at a rate that does not adversely affect cluster performance. Example: $ swift-ring-builder object.builder set_replicas 3.01 $ swift-ring-builder object.builder rebalance <distribute rings and wait> $ swift-ring-builder object.builder set_replicas 3.02 $ swift-ring-builder object.builder rebalance <distribute rings and wait>... Obviously, fractional replicas are nonsensical for a single partition. A fractional replica count is for the whole ring, not for any individual partition, and indicates the average number of replicas of each partition. For example, a replica count of 3.2 means that 20% of partitions have 4 replicas and 80% have 3 replicas. Changes do not take effect until after the ring is rebalanced. Thus, if you mean to go from 3 replicas to 3.01 but you accidentally type 2.01, no data is lost. Additionally, 'swift-ring-builder X.builder create' can now take a decimal argument for the number of replicas. DocImpact Change-Id: I12b34dacf60350a297a46be493d5d171580243ff
This commit is contained in:
parent
d14c0c062e
commit
7548cb9c47
@ -80,6 +80,7 @@ def container_dispersion_report(coropool, connpool, account, container_ring,
|
|||||||
retries_done = [0]
|
retries_done = [0]
|
||||||
containers_queried = [0]
|
containers_queried = [0]
|
||||||
container_copies_found = [0] * (container_ring.replica_count + 1)
|
container_copies_found = [0] * (container_ring.replica_count + 1)
|
||||||
|
container_copies_expected = [0] * (container_ring.replica_count + 1)
|
||||||
begun = time()
|
begun = time()
|
||||||
next_report = [time() + 2]
|
next_report = [time() + 2]
|
||||||
|
|
||||||
@ -101,8 +102,8 @@ def container_dispersion_report(coropool, connpool, account, container_ring,
|
|||||||
error_log('Giving up on /%s/%s/%s: %s' % (part, account,
|
error_log('Giving up on /%s/%s/%s: %s' % (part, account,
|
||||||
container, err))
|
container, err))
|
||||||
if output_missing_partitions and \
|
if output_missing_partitions and \
|
||||||
found_count < container_ring.replica_count:
|
found_count < len(nodes):
|
||||||
missing = container_ring.replica_count - found_count
|
missing = len(nodes) - found_count
|
||||||
print '\r\x1B[K',
|
print '\r\x1B[K',
|
||||||
stdout.flush()
|
stdout.flush()
|
||||||
print >>stderr, '# Container partition %s missing %s cop%s' % (
|
print >>stderr, '# Container partition %s missing %s cop%s' % (
|
||||||
@ -121,13 +122,15 @@ def container_dispersion_report(coropool, connpool, account, container_ring,
|
|||||||
container_parts = {}
|
container_parts = {}
|
||||||
for container in containers:
|
for container in containers:
|
||||||
part, nodes = container_ring.get_nodes(account, container)
|
part, nodes = container_ring.get_nodes(account, container)
|
||||||
|
container_copies_expected[len(nodes)] += 1
|
||||||
if part not in container_parts:
|
if part not in container_parts:
|
||||||
container_parts[part] = part
|
container_parts[part] = part
|
||||||
coropool.spawn(direct, container, part, nodes)
|
coropool.spawn(direct, container, part, nodes)
|
||||||
coropool.waitall()
|
coropool.waitall()
|
||||||
distinct_partitions = len(container_parts)
|
distinct_partitions = len(container_parts)
|
||||||
copies_expected = distinct_partitions * container_ring.replica_count
|
|
||||||
copies_found = sum(a * b for a, b in enumerate(container_copies_found))
|
copies_found = sum(a * b for a, b in enumerate(container_copies_found))
|
||||||
|
copies_expected = sum(a * b for a, b
|
||||||
|
in enumerate(container_copies_expected))
|
||||||
value = 100.0 * copies_found / copies_expected
|
value = 100.0 * copies_found / copies_expected
|
||||||
elapsed, elapsed_unit = get_time_units(time() - begun)
|
elapsed, elapsed_unit = get_time_units(time() - begun)
|
||||||
if not json_output:
|
if not json_output:
|
||||||
@ -138,11 +141,12 @@ def container_dispersion_report(coropool, connpool, account, container_ring,
|
|||||||
print 'There were %d overlapping partitions' % (
|
print 'There were %d overlapping partitions' % (
|
||||||
containers_listed - distinct_partitions)
|
containers_listed - distinct_partitions)
|
||||||
for copies in xrange(container_ring.replica_count - 1, -1, -1):
|
for copies in xrange(container_ring.replica_count - 1, -1, -1):
|
||||||
missing_copies = container_ring.replica_count - copies
|
missing_copies = (container_copies_expected[copies] -
|
||||||
|
container_copies_found[copies])
|
||||||
if container_copies_found[copies]:
|
if container_copies_found[copies]:
|
||||||
print missing_string(container_copies_found[copies],
|
print missing_string(container_copies_found[copies],
|
||||||
missing_copies,
|
missing_copies,
|
||||||
container_ring.replica_count)
|
container_copies_expected[copies])
|
||||||
print '%.02f%% of container copies found (%d of %d)' % (
|
print '%.02f%% of container copies found (%d of %d)' % (
|
||||||
value, copies_found, copies_expected)
|
value, copies_found, copies_expected)
|
||||||
print 'Sample represents %.02f%% of the container partition space' % (
|
print 'Sample represents %.02f%% of the container partition space' % (
|
||||||
@ -156,7 +160,8 @@ def container_dispersion_report(coropool, connpool, account, container_ring,
|
|||||||
'copies_found': copies_found,
|
'copies_found': copies_found,
|
||||||
'copies_expected': copies_expected}
|
'copies_expected': copies_expected}
|
||||||
for copies in xrange(container_ring.replica_count):
|
for copies in xrange(container_ring.replica_count):
|
||||||
missing_copies = container_ring.replica_count - copies
|
missing_copies = (container_copies_expected[copies] -
|
||||||
|
container_copies_found[copies])
|
||||||
results['missing_%d' % (missing_copies)] = \
|
results['missing_%d' % (missing_copies)] = \
|
||||||
container_copies_found[copies]
|
container_copies_found[copies]
|
||||||
return results
|
return results
|
||||||
@ -185,6 +190,7 @@ def object_dispersion_report(coropool, connpool, account, object_ring,
|
|||||||
retries_done = [0]
|
retries_done = [0]
|
||||||
objects_queried = [0]
|
objects_queried = [0]
|
||||||
object_copies_found = [0] * (object_ring.replica_count + 1)
|
object_copies_found = [0] * (object_ring.replica_count + 1)
|
||||||
|
object_copies_expected = [0] * (object_ring.replica_count + 1)
|
||||||
begun = time()
|
begun = time()
|
||||||
next_report = [time() + 2]
|
next_report = [time() + 2]
|
||||||
|
|
||||||
@ -226,6 +232,7 @@ def object_dispersion_report(coropool, connpool, account, object_ring,
|
|||||||
object_parts = {}
|
object_parts = {}
|
||||||
for obj in objects:
|
for obj in objects:
|
||||||
part, nodes = object_ring.get_nodes(account, container, obj)
|
part, nodes = object_ring.get_nodes(account, container, obj)
|
||||||
|
object_copies_expected[len(nodes)] += 1
|
||||||
if part not in object_parts:
|
if part not in object_parts:
|
||||||
object_parts[part] = part
|
object_parts[part] = part
|
||||||
coropool.spawn(direct, obj, part, nodes)
|
coropool.spawn(direct, obj, part, nodes)
|
||||||
@ -233,6 +240,8 @@ def object_dispersion_report(coropool, connpool, account, object_ring,
|
|||||||
distinct_partitions = len(object_parts)
|
distinct_partitions = len(object_parts)
|
||||||
copies_expected = distinct_partitions * object_ring.replica_count
|
copies_expected = distinct_partitions * object_ring.replica_count
|
||||||
copies_found = sum(a * b for a, b in enumerate(object_copies_found))
|
copies_found = sum(a * b for a, b in enumerate(object_copies_found))
|
||||||
|
copies_expected = sum(a * b for a, b
|
||||||
|
in enumerate(object_copies_expected))
|
||||||
value = 100.0 * copies_found / copies_expected
|
value = 100.0 * copies_found / copies_expected
|
||||||
elapsed, elapsed_unit = get_time_units(time() - begun)
|
elapsed, elapsed_unit = get_time_units(time() - begun)
|
||||||
if not json_output:
|
if not json_output:
|
||||||
@ -243,7 +252,8 @@ def object_dispersion_report(coropool, connpool, account, object_ring,
|
|||||||
print 'There were %d overlapping partitions' % (
|
print 'There were %d overlapping partitions' % (
|
||||||
objects_listed - distinct_partitions)
|
objects_listed - distinct_partitions)
|
||||||
for copies in xrange(object_ring.replica_count - 1, -1, -1):
|
for copies in xrange(object_ring.replica_count - 1, -1, -1):
|
||||||
missing_copies = object_ring.replica_count - copies
|
missing_copies = (object_copies_expected[copies] -
|
||||||
|
object_copies_found[copies])
|
||||||
if object_copies_found[copies]:
|
if object_copies_found[copies]:
|
||||||
print missing_string(object_copies_found[copies],
|
print missing_string(object_copies_found[copies],
|
||||||
missing_copies, object_ring.replica_count)
|
missing_copies, object_ring.replica_count)
|
||||||
@ -260,7 +270,8 @@ def object_dispersion_report(coropool, connpool, account, object_ring,
|
|||||||
'copies_found': copies_found,
|
'copies_found': copies_found,
|
||||||
'copies_expected': copies_expected}
|
'copies_expected': copies_expected}
|
||||||
for copies in xrange(object_ring.replica_count):
|
for copies in xrange(object_ring.replica_count):
|
||||||
missing_copies = object_ring.replica_count - copies
|
missing_copies = (object_copies_expected[copies] -
|
||||||
|
object_copies_found[copies])
|
||||||
results['missing_%d' % (missing_copies)] = \
|
results['missing_%d' % (missing_copies)] = \
|
||||||
object_copies_found[copies]
|
object_copies_found[copies]
|
||||||
return results
|
return results
|
||||||
|
@ -98,7 +98,7 @@ elif len(args) == 1:
|
|||||||
more_nodes = []
|
more_nodes = []
|
||||||
for more_node in ring.get_more_nodes(part):
|
for more_node in ring.get_more_nodes(part):
|
||||||
more_nodes.append(more_node)
|
more_nodes.append(more_node)
|
||||||
if not options.all and len(more_nodes) >= ring.replica_count:
|
if not options.all and len(more_nodes) >= len(nodes):
|
||||||
break
|
break
|
||||||
|
|
||||||
print '\nAccount \t%s' % account
|
print '\nAccount \t%s' % account
|
||||||
|
@ -62,7 +62,7 @@ swift-ring-builder <builder_file> create <part_power> <replicas>
|
|||||||
if len(argv) < 6:
|
if len(argv) < 6:
|
||||||
print Commands.create.__doc__.strip()
|
print Commands.create.__doc__.strip()
|
||||||
exit(EXIT_ERROR)
|
exit(EXIT_ERROR)
|
||||||
builder = RingBuilder(int(argv[3]), int(argv[4]), int(argv[5]))
|
builder = RingBuilder(int(argv[3]), float(argv[4]), int(argv[5]))
|
||||||
backup_dir = pathjoin(dirname(argv[1]), 'backups')
|
backup_dir = pathjoin(dirname(argv[1]), 'backups')
|
||||||
try:
|
try:
|
||||||
mkdir(backup_dir)
|
mkdir(backup_dir)
|
||||||
@ -85,7 +85,7 @@ swift-ring-builder <builder_file>
|
|||||||
if builder.devs:
|
if builder.devs:
|
||||||
zones = len(set(d['zone'] for d in builder.devs if d is not None))
|
zones = len(set(d['zone'] for d in builder.devs if d is not None))
|
||||||
balance = builder.get_balance()
|
balance = builder.get_balance()
|
||||||
print '%d partitions, %d replicas, %d zones, %d devices, %.02f ' \
|
print '%d partitions, %.6f replicas, %d zones, %d devices, %.02f ' \
|
||||||
'balance' % (builder.parts, builder.replicas, zones,
|
'balance' % (builder.parts, builder.replicas, zones,
|
||||||
len([d for d in builder.devs if d]), balance)
|
len([d for d in builder.devs if d]), balance)
|
||||||
print 'The minimum number of hours before a partition can be ' \
|
print 'The minimum number of hours before a partition can be ' \
|
||||||
@ -586,6 +586,37 @@ swift-ring-builder <builder_file> set_min_part_hours <hours>
|
|||||||
pickle.dump(builder.to_dict(), open(argv[1], 'wb'), protocol=2)
|
pickle.dump(builder.to_dict(), open(argv[1], 'wb'), protocol=2)
|
||||||
exit(EXIT_SUCCESS)
|
exit(EXIT_SUCCESS)
|
||||||
|
|
||||||
|
def set_replicas():
|
||||||
|
"""
|
||||||
|
swift-ring-builder <builder_file> set_replicas <replicas>
|
||||||
|
Changes the replica count to the given <replicas>. <replicas> may
|
||||||
|
be a floating-point value, in which case some partitions will have
|
||||||
|
floor(<replicas>) replicas and some will have ceiling(<replicas>)
|
||||||
|
in the correct proportions.
|
||||||
|
|
||||||
|
A rebalance is needed to make the change take effect.
|
||||||
|
"""
|
||||||
|
if len(argv) < 4:
|
||||||
|
print Commands.set_replicas.__doc__.strip()
|
||||||
|
exit(EXIT_ERROR)
|
||||||
|
|
||||||
|
new_replicas = argv[3]
|
||||||
|
try:
|
||||||
|
new_replicas = float(new_replicas)
|
||||||
|
except ValueError:
|
||||||
|
print Commands.set_replicas.__doc__.strip()
|
||||||
|
print "\"%s\" is not a valid number." % new_replicas
|
||||||
|
exit(EXIT_ERROR)
|
||||||
|
|
||||||
|
if new_replicas < 1:
|
||||||
|
print "Replica count must be at least 1."
|
||||||
|
exit(EXIT_ERROR)
|
||||||
|
|
||||||
|
builder.set_replicas(new_replicas)
|
||||||
|
print 'The replica count is now %.6f.' % builder.replicas
|
||||||
|
print 'The change will take effect after the next rebalance.'
|
||||||
|
pickle.dump(builder.to_dict(), open(argv[1], 'wb'), protocol=2)
|
||||||
|
exit(EXIT_SUCCESS)
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
if len(argv) < 2:
|
if len(argv) < 2:
|
||||||
|
@ -166,6 +166,23 @@ class RingBuilder(object):
|
|||||||
"""
|
"""
|
||||||
self.min_part_hours = min_part_hours
|
self.min_part_hours = min_part_hours
|
||||||
|
|
||||||
|
def set_replicas(self, new_replica_count):
|
||||||
|
"""
|
||||||
|
Changes the number of replicas in this ring.
|
||||||
|
|
||||||
|
If the new replica count is sufficiently different that
|
||||||
|
self._replica2part2dev will change size, sets
|
||||||
|
self.devs_changed. This is so tools like
|
||||||
|
bin/swift-ring-builder can know to write out the new ring
|
||||||
|
rather than bailing out due to lack of balance change.
|
||||||
|
"""
|
||||||
|
old_slots_used = int(self.parts * self.replicas)
|
||||||
|
new_slots_used = int(self.parts * new_replica_count)
|
||||||
|
if old_slots_used != new_slots_used:
|
||||||
|
self.devs_changed = True
|
||||||
|
|
||||||
|
self.replicas = new_replica_count
|
||||||
|
|
||||||
def get_ring(self):
|
def get_ring(self):
|
||||||
"""
|
"""
|
||||||
Get the ring, or more specifically, the swift.common.ring.RingData.
|
Get the ring, or more specifically, the swift.common.ring.RingData.
|
||||||
@ -305,6 +322,10 @@ class RingBuilder(object):
|
|||||||
retval = 0
|
retval = 0
|
||||||
self._update_last_part_moves()
|
self._update_last_part_moves()
|
||||||
last_balance = 0
|
last_balance = 0
|
||||||
|
new_parts, removed_part_count = self._adjust_replica2part2dev_size()
|
||||||
|
retval += removed_part_count
|
||||||
|
self._reassign_parts(new_parts)
|
||||||
|
retval += len(new_parts)
|
||||||
while True:
|
while True:
|
||||||
reassign_parts = self._gather_reassign_parts()
|
reassign_parts = self._gather_reassign_parts()
|
||||||
self._reassign_parts(reassign_parts)
|
self._reassign_parts(reassign_parts)
|
||||||
@ -340,12 +361,13 @@ class RingBuilder(object):
|
|||||||
|
|
||||||
# "len" showed up in profiling, so it's just computed once.
|
# "len" showed up in profiling, so it's just computed once.
|
||||||
dev_len = len(self.devs)
|
dev_len = len(self.devs)
|
||||||
if sum(d['parts'] for d in self._iter_devs()) != \
|
|
||||||
self.parts * self.replicas:
|
parts_on_devs = sum(d['parts'] for d in self._iter_devs())
|
||||||
|
parts_in_map = sum(len(p2d) for p2d in self._replica2part2dev)
|
||||||
|
if parts_on_devs != parts_in_map:
|
||||||
raise exceptions.RingValidationError(
|
raise exceptions.RingValidationError(
|
||||||
'All partitions are not double accounted for: %d != %d' %
|
'All partitions are not double accounted for: %d != %d' %
|
||||||
(sum(d['parts'] for d in self._iter_devs()),
|
(parts_on_devs, parts_in_map))
|
||||||
self.parts * self.replicas))
|
|
||||||
if stats:
|
if stats:
|
||||||
# dev_usage[dev_id] will equal the number of partitions assigned to
|
# dev_usage[dev_id] will equal the number of partitions assigned to
|
||||||
# that device.
|
# that device.
|
||||||
@ -354,14 +376,13 @@ class RingBuilder(object):
|
|||||||
for dev_id in part2dev:
|
for dev_id in part2dev:
|
||||||
dev_usage[dev_id] += 1
|
dev_usage[dev_id] += 1
|
||||||
|
|
||||||
for part in xrange(self.parts):
|
for part, replica in self._each_part_replica():
|
||||||
for replica in xrange(self.replicas):
|
dev_id = self._replica2part2dev[replica][part]
|
||||||
dev_id = self._replica2part2dev[replica][part]
|
if dev_id >= dev_len or not self.devs[dev_id]:
|
||||||
if dev_id >= dev_len or not self.devs[dev_id]:
|
raise exceptions.RingValidationError(
|
||||||
raise exceptions.RingValidationError(
|
"Partition %d, replica %d was not allocated "
|
||||||
"Partition %d, replica %d was not allocated "
|
"to a device." %
|
||||||
"to a device." %
|
(part, replica))
|
||||||
(part, replica))
|
|
||||||
|
|
||||||
for dev in self._iter_devs():
|
for dev in self._iter_devs():
|
||||||
if not isinstance(dev['port'], int):
|
if not isinstance(dev['port'], int):
|
||||||
@ -428,12 +449,17 @@ class RingBuilder(object):
|
|||||||
|
|
||||||
def get_part_devices(self, part):
|
def get_part_devices(self, part):
|
||||||
"""
|
"""
|
||||||
Get the devices that are responsible for the partition.
|
Get the devices that are responsible for the partition,
|
||||||
|
filtering out duplicates.
|
||||||
|
|
||||||
:param part: partition to get devices for
|
:param part: partition to get devices for
|
||||||
:returns: list of device dicts
|
:returns: list of device dicts
|
||||||
"""
|
"""
|
||||||
return [self.devs[r[part]] for r in self._replica2part2dev]
|
devices = []
|
||||||
|
for dev in self._devs_for_part(part):
|
||||||
|
if dev not in devices:
|
||||||
|
devices.append(dev)
|
||||||
|
return devices
|
||||||
|
|
||||||
def _iter_devs(self):
|
def _iter_devs(self):
|
||||||
"""
|
"""
|
||||||
@ -466,19 +492,83 @@ class RingBuilder(object):
|
|||||||
dev['parts_wanted'] = \
|
dev['parts_wanted'] = \
|
||||||
int(weight_of_one_part * dev['weight']) - dev['parts']
|
int(weight_of_one_part * dev['weight']) - dev['parts']
|
||||||
|
|
||||||
|
def _adjust_replica2part2dev_size(self):
|
||||||
|
"""
|
||||||
|
Make sure that the lengths of the arrays in _replica2part2dev
|
||||||
|
are correct for the current value of self.replicas.
|
||||||
|
|
||||||
|
Example:
|
||||||
|
self.part_power = 8
|
||||||
|
self.replicas = 2.25
|
||||||
|
|
||||||
|
self._replica2part2dev will contain 3 arrays: the first 2 of
|
||||||
|
length 256 (2**8), and the last of length 64 (0.25 * 2**8).
|
||||||
|
|
||||||
|
Returns a 2-tuple: the first element is a list of (partition,
|
||||||
|
replicas) tuples indicating which replicas need to be
|
||||||
|
(re)assigned to devices, and the second element is a count of
|
||||||
|
how many replicas were removed.
|
||||||
|
"""
|
||||||
|
removed_replicas = 0
|
||||||
|
|
||||||
|
fractional_replicas, whole_replicas = math.modf(self.replicas)
|
||||||
|
whole_replicas = int(whole_replicas)
|
||||||
|
|
||||||
|
desired_lengths = [self.parts] * whole_replicas
|
||||||
|
if fractional_replicas:
|
||||||
|
desired_lengths.append(int(self.parts * fractional_replicas))
|
||||||
|
|
||||||
|
to_assign = defaultdict(list)
|
||||||
|
|
||||||
|
if self._replica2part2dev is not None:
|
||||||
|
# If we crossed an integer threshold (say, 4.1 --> 4),
|
||||||
|
# we'll have a partial extra replica clinging on here. Clean
|
||||||
|
# up any such extra stuff.
|
||||||
|
for part2dev in self._replica2part2dev[len(desired_lengths):]:
|
||||||
|
for dev_id in part2dev:
|
||||||
|
dev_losing_part = self.devs[dev_id]
|
||||||
|
dev_losing_part['parts'] -= 1
|
||||||
|
removed_replicas += 1
|
||||||
|
self._replica2part2dev = \
|
||||||
|
self._replica2part2dev[:len(desired_lengths)]
|
||||||
|
else:
|
||||||
|
self._replica2part2dev = []
|
||||||
|
|
||||||
|
for replica, desired_length in enumerate(desired_lengths):
|
||||||
|
if replica < len(self._replica2part2dev):
|
||||||
|
part2dev = self._replica2part2dev[replica]
|
||||||
|
if len(part2dev) < desired_length:
|
||||||
|
# Not long enough: needs to be extended and the
|
||||||
|
# newly-added pieces assigned to devices.
|
||||||
|
for part in xrange(len(part2dev), desired_length):
|
||||||
|
to_assign[part].append(replica)
|
||||||
|
part2dev.append(0)
|
||||||
|
elif len(part2dev) > desired_length:
|
||||||
|
# Too long: truncate this mapping.
|
||||||
|
for part in xrange(desired_length, len(part2dev)):
|
||||||
|
dev_losing_part = self.devs[part2dev[part]]
|
||||||
|
dev_losing_part['parts'] -= 1
|
||||||
|
removed_replicas += 1
|
||||||
|
self._replica2part2dev[replica] = part2dev[:desired_length]
|
||||||
|
else:
|
||||||
|
# Mapping not present at all: make one up and assign
|
||||||
|
# all of it.
|
||||||
|
for part in xrange(desired_length):
|
||||||
|
to_assign[part].append(replica)
|
||||||
|
self._replica2part2dev.append(
|
||||||
|
array('H', (0 for _junk in xrange(desired_length))))
|
||||||
|
|
||||||
|
return (list(to_assign.iteritems()), removed_replicas)
|
||||||
|
|
||||||
def _initial_balance(self):
|
def _initial_balance(self):
|
||||||
"""
|
"""
|
||||||
Initial partition assignment is the same as rebalancing an
|
Initial partition assignment is the same as rebalancing an
|
||||||
existing ring, but with some initial setup beforehand.
|
existing ring, but with some initial setup beforehand.
|
||||||
"""
|
"""
|
||||||
self._replica2part2dev = \
|
|
||||||
[array('H', (0 for _junk in xrange(self.parts)))
|
|
||||||
for _junk in xrange(self.replicas)]
|
|
||||||
|
|
||||||
replicas = range(self.replicas)
|
|
||||||
self._last_part_moves = array('B', (0 for _junk in xrange(self.parts)))
|
self._last_part_moves = array('B', (0 for _junk in xrange(self.parts)))
|
||||||
self._last_part_moves_epoch = int(time())
|
self._last_part_moves_epoch = int(time())
|
||||||
self._reassign_parts((p, replicas) for p in xrange(self.parts))
|
|
||||||
|
self._reassign_parts(self._adjust_replica2part2dev_size()[0])
|
||||||
|
|
||||||
def _update_last_part_moves(self):
|
def _update_last_part_moves(self):
|
||||||
"""
|
"""
|
||||||
@ -515,12 +605,11 @@ class RingBuilder(object):
|
|||||||
if self._remove_devs:
|
if self._remove_devs:
|
||||||
dev_ids = [d['id'] for d in self._remove_devs if d['parts']]
|
dev_ids = [d['id'] for d in self._remove_devs if d['parts']]
|
||||||
if dev_ids:
|
if dev_ids:
|
||||||
for replica in xrange(self.replicas):
|
for part, replica in self._each_part_replica():
|
||||||
part2dev = self._replica2part2dev[replica]
|
dev_id = self._replica2part2dev[replica][part]
|
||||||
for part in xrange(self.parts):
|
if dev_id in dev_ids:
|
||||||
if part2dev[part] in dev_ids:
|
self._last_part_moves[part] = 0
|
||||||
self._last_part_moves[part] = 0
|
removed_dev_parts[part].append(replica)
|
||||||
removed_dev_parts[part].append(replica)
|
|
||||||
|
|
||||||
# Now we gather partitions that are "at risk" because they aren't
|
# Now we gather partitions that are "at risk" because they aren't
|
||||||
# currently sufficient spread out across the cluster.
|
# currently sufficient spread out across the cluster.
|
||||||
@ -536,8 +625,7 @@ class RingBuilder(object):
|
|||||||
# replicas_at_tier was a "lambda: 0" defaultdict, but profiling
|
# replicas_at_tier was a "lambda: 0" defaultdict, but profiling
|
||||||
# revealed the lambda invocation as a significant cost.
|
# revealed the lambda invocation as a significant cost.
|
||||||
replicas_at_tier = {}
|
replicas_at_tier = {}
|
||||||
for replica in xrange(self.replicas):
|
for dev in self._devs_for_part(part):
|
||||||
dev = self.devs[self._replica2part2dev[replica][part]]
|
|
||||||
if dev['id'] not in tfd:
|
if dev['id'] not in tfd:
|
||||||
tfd[dev['id']] = tiers_for_dev(dev)
|
tfd[dev['id']] = tiers_for_dev(dev)
|
||||||
for tier in tfd[dev['id']]:
|
for tier in tfd[dev['id']]:
|
||||||
@ -548,7 +636,7 @@ class RingBuilder(object):
|
|||||||
|
|
||||||
# Now, look for partitions not yet spread out enough and not
|
# Now, look for partitions not yet spread out enough and not
|
||||||
# recently moved.
|
# recently moved.
|
||||||
for replica in xrange(self.replicas):
|
for replica in self._replicas_for_part(part):
|
||||||
dev = self.devs[self._replica2part2dev[replica][part]]
|
dev = self.devs[self._replica2part2dev[replica][part]]
|
||||||
removed_replica = False
|
removed_replica = False
|
||||||
if dev['id'] not in tfd:
|
if dev['id'] not in tfd:
|
||||||
@ -584,10 +672,14 @@ class RingBuilder(object):
|
|||||||
start += random.randint(0, self.parts / 2) # GRAH PEP8!!!
|
start += random.randint(0, self.parts / 2) # GRAH PEP8!!!
|
||||||
|
|
||||||
self._last_part_gather_start = start
|
self._last_part_gather_start = start
|
||||||
for replica in xrange(self.replicas):
|
for replica, part2dev in enumerate(self._replica2part2dev):
|
||||||
part2dev = self._replica2part2dev[replica]
|
# If we've got a partial replica, start may be out of
|
||||||
for part in itertools.chain(xrange(start, self.parts),
|
# range. Scale it down so that we get a similar movement
|
||||||
xrange(0, start)):
|
# pattern (but scaled down) on sequential runs.
|
||||||
|
this_start = int(float(start) * len(part2dev) / self.parts)
|
||||||
|
|
||||||
|
for part in itertools.chain(xrange(this_start, len(part2dev)),
|
||||||
|
xrange(0, this_start)):
|
||||||
if self._last_part_moves[part] < self.min_part_hours:
|
if self._last_part_moves[part] < self.min_part_hours:
|
||||||
continue
|
continue
|
||||||
if part in removed_dev_parts or part in spread_out_parts:
|
if part in removed_dev_parts or part in spread_out_parts:
|
||||||
@ -673,7 +765,7 @@ class RingBuilder(object):
|
|||||||
# replicas not-to-be-moved are in for this part.
|
# replicas not-to-be-moved are in for this part.
|
||||||
other_replicas = defaultdict(int)
|
other_replicas = defaultdict(int)
|
||||||
unique_tiers_by_tier_len = defaultdict(set)
|
unique_tiers_by_tier_len = defaultdict(set)
|
||||||
for replica in xrange(self.replicas):
|
for replica in self._replicas_for_part(part):
|
||||||
if replica not in replace_replicas:
|
if replica not in replace_replicas:
|
||||||
dev = self.devs[self._replica2part2dev[replica][part]]
|
dev = self.devs[self._replica2part2dev[replica][part]]
|
||||||
for tier in tiers_for_dev(dev):
|
for tier in tiers_for_dev(dev):
|
||||||
@ -833,6 +925,35 @@ class RingBuilder(object):
|
|||||||
return mr
|
return mr
|
||||||
return walk_tree((), self.replicas)
|
return walk_tree((), self.replicas)
|
||||||
|
|
||||||
|
def _devs_for_part(self, part):
|
||||||
|
"""
|
||||||
|
Returns a list of devices for a specified partition.
|
||||||
|
|
||||||
|
Deliberately includes duplicates.
|
||||||
|
"""
|
||||||
|
return [self.devs[part2dev[part]]
|
||||||
|
for part2dev in self._replica2part2dev
|
||||||
|
if part < len(part2dev)]
|
||||||
|
|
||||||
|
def _replicas_for_part(self, part):
|
||||||
|
"""
|
||||||
|
Returns a list of replicas for a specified partition.
|
||||||
|
|
||||||
|
These can be used as indices into self._replica2part2dev
|
||||||
|
without worrying about IndexErrors.
|
||||||
|
"""
|
||||||
|
return [replica for replica, part2dev
|
||||||
|
in enumerate(self._replica2part2dev)
|
||||||
|
if part < len(part2dev)]
|
||||||
|
|
||||||
|
def _each_part_replica(self):
|
||||||
|
"""
|
||||||
|
Generator yielding every (partition, replica) pair in the ring.
|
||||||
|
"""
|
||||||
|
for replica, part2dev in enumerate(self._replica2part2dev):
|
||||||
|
for part in xrange(len(part2dev)):
|
||||||
|
yield (part, replica)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def load(cls, builder_file, open=open):
|
def load(cls, builder_file, open=open):
|
||||||
"""
|
"""
|
||||||
|
@ -163,7 +163,7 @@ class Ring(object):
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def replica_count(self):
|
def replica_count(self):
|
||||||
"""Number of replicas used in the ring."""
|
"""Number of replicas (full or partial) used in the ring."""
|
||||||
return len(self._replica2part2dev_id)
|
return len(self._replica2part2dev_id)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
@ -189,7 +189,8 @@ class Ring(object):
|
|||||||
|
|
||||||
def _get_part_nodes(self, part):
|
def _get_part_nodes(self, part):
|
||||||
seen_ids = set()
|
seen_ids = set()
|
||||||
return [self._devs[r[part]] for r in self._replica2part2dev_id
|
return [self._devs[r[part]] for r in
|
||||||
|
(rpd for rpd in self._replica2part2dev_id if len(rpd) > part)
|
||||||
if not (r[part] in seen_ids or seen_ids.add(r[part]))]
|
if not (r[part] in seen_ids or seen_ids.add(r[part]))]
|
||||||
|
|
||||||
def get_part_nodes(self, part):
|
def get_part_nodes(self, part):
|
||||||
@ -255,8 +256,9 @@ class Ring(object):
|
|||||||
self._reload()
|
self._reload()
|
||||||
used_tiers = set()
|
used_tiers = set()
|
||||||
for part2dev_id in self._replica2part2dev_id:
|
for part2dev_id in self._replica2part2dev_id:
|
||||||
for tier in tiers_for_dev(self._devs[part2dev_id[part]]):
|
if len(part2dev_id) > part:
|
||||||
used_tiers.add(tier)
|
for tier in tiers_for_dev(self._devs[part2dev_id[part]]):
|
||||||
|
used_tiers.add(tier)
|
||||||
|
|
||||||
for level in self.tiers_by_length:
|
for level in self.tiers_by_length:
|
||||||
tiers = list(level)
|
tiers = list(level)
|
||||||
|
@ -13,6 +13,7 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
import operator
|
||||||
import os
|
import os
|
||||||
import unittest
|
import unittest
|
||||||
import cPickle as pickle
|
import cPickle as pickle
|
||||||
@ -97,6 +98,16 @@ class TestRingBuilder(unittest.TestCase):
|
|||||||
self.assertNotEquals(r0.to_dict(), r1.to_dict())
|
self.assertNotEquals(r0.to_dict(), r1.to_dict())
|
||||||
self.assertEquals(r1.to_dict(), r2.to_dict())
|
self.assertEquals(r1.to_dict(), r2.to_dict())
|
||||||
|
|
||||||
|
def test_set_replicas(self):
|
||||||
|
rb = ring.RingBuilder(8, 3.2, 1)
|
||||||
|
rb.devs_changed = False
|
||||||
|
rb.set_replicas(3.25)
|
||||||
|
self.assertTrue(rb.devs_changed)
|
||||||
|
|
||||||
|
rb.devs_changed = False
|
||||||
|
rb.set_replicas(3.2500001)
|
||||||
|
self.assertFalse(rb.devs_changed)
|
||||||
|
|
||||||
def test_add_dev(self):
|
def test_add_dev(self):
|
||||||
rb = ring.RingBuilder(8, 3, 1)
|
rb = ring.RingBuilder(8, 3, 1)
|
||||||
dev = \
|
dev = \
|
||||||
@ -532,6 +543,65 @@ class TestRingBuilder(unittest.TestCase):
|
|||||||
|
|
||||||
rb.rebalance()
|
rb.rebalance()
|
||||||
|
|
||||||
|
def test_set_replicas_increase(self):
|
||||||
|
rb = ring.RingBuilder(8, 2, 0)
|
||||||
|
rb.add_dev({'id': 0, 'zone': 0, 'weight': 1, 'ip': '127.0.0.1',
|
||||||
|
'port': 10000, 'device': 'sda1'})
|
||||||
|
rb.add_dev({'id': 1, 'zone': 1, 'weight': 1, 'ip': '127.0.0.1',
|
||||||
|
'port': 10001, 'device': 'sda1'})
|
||||||
|
rb.rebalance()
|
||||||
|
rb.validate()
|
||||||
|
|
||||||
|
rb.replicas = 2.1
|
||||||
|
rb.rebalance()
|
||||||
|
rb.validate()
|
||||||
|
|
||||||
|
self.assertEqual([len(p2d) for p2d in rb._replica2part2dev],
|
||||||
|
[256, 256, 25])
|
||||||
|
|
||||||
|
rb.replicas = 2.2
|
||||||
|
rb.rebalance()
|
||||||
|
rb.validate()
|
||||||
|
self.assertEqual([len(p2d) for p2d in rb._replica2part2dev],
|
||||||
|
[256, 256, 51])
|
||||||
|
|
||||||
|
def test_set_replicas_decrease(self):
|
||||||
|
rb = ring.RingBuilder(4, 5, 0)
|
||||||
|
rb.add_dev({'id': 0, 'zone': 0, 'weight': 1, 'ip': '127.0.0.1',
|
||||||
|
'port': 10000, 'device': 'sda1'})
|
||||||
|
rb.add_dev({'id': 1, 'zone': 1, 'weight': 1, 'ip': '127.0.0.1',
|
||||||
|
'port': 10001, 'device': 'sda1'})
|
||||||
|
rb.rebalance()
|
||||||
|
rb.validate()
|
||||||
|
|
||||||
|
rb.replicas = 4.9
|
||||||
|
rb.rebalance()
|
||||||
|
print repr(rb._replica2part2dev)
|
||||||
|
print repr(rb.devs)
|
||||||
|
rb.validate()
|
||||||
|
|
||||||
|
self.assertEqual([len(p2d) for p2d in rb._replica2part2dev],
|
||||||
|
[16, 16, 16, 16, 14])
|
||||||
|
|
||||||
|
# cross a couple of integer thresholds (4 and 3)
|
||||||
|
rb.replicas = 2.5
|
||||||
|
rb.rebalance()
|
||||||
|
rb.validate()
|
||||||
|
|
||||||
|
self.assertEqual([len(p2d) for p2d in rb._replica2part2dev],
|
||||||
|
[16, 16, 8])
|
||||||
|
|
||||||
|
def test_fractional_replicas_rebalance(self):
|
||||||
|
rb = ring.RingBuilder(8, 2.5, 0)
|
||||||
|
rb.add_dev({'id': 0, 'zone': 0, 'weight': 1, 'ip': '127.0.0.1',
|
||||||
|
'port': 10000, 'device': 'sda1'})
|
||||||
|
rb.add_dev({'id': 1, 'zone': 1, 'weight': 1, 'ip': '127.0.0.1',
|
||||||
|
'port': 10001, 'device': 'sda1'})
|
||||||
|
rb.rebalance() # passes by not crashing
|
||||||
|
rb.validate() # also passes by not crashing
|
||||||
|
self.assertEqual([len(p2d) for p2d in rb._replica2part2dev],
|
||||||
|
[256, 256, 128])
|
||||||
|
|
||||||
def test_load(self):
|
def test_load(self):
|
||||||
rb = ring.RingBuilder(8, 3, 1)
|
rb = ring.RingBuilder(8, 3, 1)
|
||||||
devs = [{'id': 0, 'zone': 0, 'weight': 1, 'ip': '127.0.0.0',
|
devs = [{'id': 0, 'zone': 0, 'weight': 1, 'ip': '127.0.0.0',
|
||||||
@ -680,6 +750,31 @@ class TestRingBuilder(unittest.TestCase):
|
|||||||
rb.rebalance()
|
rb.rebalance()
|
||||||
self.assertNotEquals(rb.validate(stats=True)[1], 999.99)
|
self.assertNotEquals(rb.validate(stats=True)[1], 999.99)
|
||||||
|
|
||||||
|
def test_get_part_devices(self):
|
||||||
|
rb = ring.RingBuilder(8, 3, 1)
|
||||||
|
rb.add_dev({'id': 0, 'zone': 0, 'weight': 1, 'ip': '127.0.0.1',
|
||||||
|
'port': 10000, 'device': 'sda1'})
|
||||||
|
rb.add_dev({'id': 1, 'zone': 1, 'weight': 1, 'ip': '127.0.0.1',
|
||||||
|
'port': 10001, 'device': 'sda1'})
|
||||||
|
rb.rebalance()
|
||||||
|
|
||||||
|
part_devs = sorted(rb.get_part_devices(0),
|
||||||
|
key=operator.itemgetter('id'))
|
||||||
|
self.assertEqual(part_devs, [rb.devs[0], rb.devs[1]])
|
||||||
|
|
||||||
|
def test_get_part_devices_partial_replicas(self):
|
||||||
|
rb = ring.RingBuilder(8, 2.5, 1)
|
||||||
|
rb.add_dev({'id': 0, 'zone': 0, 'weight': 1, 'ip': '127.0.0.1',
|
||||||
|
'port': 10000, 'device': 'sda1'})
|
||||||
|
rb.add_dev({'id': 1, 'zone': 1, 'weight': 1, 'ip': '127.0.0.1',
|
||||||
|
'port': 10001, 'device': 'sda1'})
|
||||||
|
rb.rebalance()
|
||||||
|
|
||||||
|
# note: partition 255 will only have 2 replicas
|
||||||
|
part_devs = sorted(rb.get_part_devices(255),
|
||||||
|
key=operator.itemgetter('id'))
|
||||||
|
self.assertEqual(part_devs, [rb.devs[0], rb.devs[1]])
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user