diff --git a/bin/swift-dispersion-report b/bin/swift-dispersion-report index 08af4a7b24..21e3f0669c 100755 --- a/bin/swift-dispersion-report +++ b/bin/swift-dispersion-report @@ -80,6 +80,7 @@ def container_dispersion_report(coropool, connpool, account, container_ring, retries_done = [0] containers_queried = [0] container_copies_found = [0] * (container_ring.replica_count + 1) + container_copies_expected = [0] * (container_ring.replica_count + 1) begun = time() next_report = [time() + 2] @@ -101,8 +102,8 @@ def container_dispersion_report(coropool, connpool, account, container_ring, error_log('Giving up on /%s/%s/%s: %s' % (part, account, container, err)) if output_missing_partitions and \ - found_count < container_ring.replica_count: - missing = container_ring.replica_count - found_count + found_count < len(nodes): + missing = len(nodes) - found_count print '\r\x1B[K', stdout.flush() print >>stderr, '# Container partition %s missing %s cop%s' % ( @@ -121,13 +122,15 @@ def container_dispersion_report(coropool, connpool, account, container_ring, container_parts = {} for container in containers: part, nodes = container_ring.get_nodes(account, container) + container_copies_expected[len(nodes)] += 1 if part not in container_parts: container_parts[part] = part coropool.spawn(direct, container, part, nodes) coropool.waitall() distinct_partitions = len(container_parts) - copies_expected = distinct_partitions * container_ring.replica_count copies_found = sum(a * b for a, b in enumerate(container_copies_found)) + copies_expected = sum(a * b for a, b + in enumerate(container_copies_expected)) value = 100.0 * copies_found / copies_expected elapsed, elapsed_unit = get_time_units(time() - begun) if not json_output: @@ -138,11 +141,12 @@ def container_dispersion_report(coropool, connpool, account, container_ring, print 'There were %d overlapping partitions' % ( containers_listed - distinct_partitions) for copies in xrange(container_ring.replica_count - 1, -1, -1): - missing_copies = container_ring.replica_count - copies + missing_copies = (container_copies_expected[copies] - + container_copies_found[copies]) if container_copies_found[copies]: print missing_string(container_copies_found[copies], missing_copies, - container_ring.replica_count) + container_copies_expected[copies]) print '%.02f%% of container copies found (%d of %d)' % ( value, copies_found, copies_expected) print 'Sample represents %.02f%% of the container partition space' % ( @@ -156,7 +160,8 @@ def container_dispersion_report(coropool, connpool, account, container_ring, 'copies_found': copies_found, 'copies_expected': copies_expected} for copies in xrange(container_ring.replica_count): - missing_copies = container_ring.replica_count - copies + missing_copies = (container_copies_expected[copies] - + container_copies_found[copies]) results['missing_%d' % (missing_copies)] = \ container_copies_found[copies] return results @@ -185,6 +190,7 @@ def object_dispersion_report(coropool, connpool, account, object_ring, retries_done = [0] objects_queried = [0] object_copies_found = [0] * (object_ring.replica_count + 1) + object_copies_expected = [0] * (object_ring.replica_count + 1) begun = time() next_report = [time() + 2] @@ -226,6 +232,7 @@ def object_dispersion_report(coropool, connpool, account, object_ring, object_parts = {} for obj in objects: part, nodes = object_ring.get_nodes(account, container, obj) + object_copies_expected[len(nodes)] += 1 if part not in object_parts: object_parts[part] = part coropool.spawn(direct, obj, part, nodes) @@ -233,6 +240,8 @@ def object_dispersion_report(coropool, connpool, account, object_ring, distinct_partitions = len(object_parts) copies_expected = distinct_partitions * object_ring.replica_count copies_found = sum(a * b for a, b in enumerate(object_copies_found)) + copies_expected = sum(a * b for a, b + in enumerate(object_copies_expected)) value = 100.0 * copies_found / copies_expected elapsed, elapsed_unit = get_time_units(time() - begun) if not json_output: @@ -243,7 +252,8 @@ def object_dispersion_report(coropool, connpool, account, object_ring, print 'There were %d overlapping partitions' % ( objects_listed - distinct_partitions) for copies in xrange(object_ring.replica_count - 1, -1, -1): - missing_copies = object_ring.replica_count - copies + missing_copies = (object_copies_expected[copies] - + object_copies_found[copies]) if object_copies_found[copies]: print missing_string(object_copies_found[copies], missing_copies, object_ring.replica_count) @@ -260,7 +270,8 @@ def object_dispersion_report(coropool, connpool, account, object_ring, 'copies_found': copies_found, 'copies_expected': copies_expected} for copies in xrange(object_ring.replica_count): - missing_copies = object_ring.replica_count - copies + missing_copies = (object_copies_expected[copies] - + object_copies_found[copies]) results['missing_%d' % (missing_copies)] = \ object_copies_found[copies] return results diff --git a/bin/swift-get-nodes b/bin/swift-get-nodes index 8c0b8ac582..cb46820c58 100755 --- a/bin/swift-get-nodes +++ b/bin/swift-get-nodes @@ -98,7 +98,7 @@ elif len(args) == 1: more_nodes = [] for more_node in ring.get_more_nodes(part): more_nodes.append(more_node) - if not options.all and len(more_nodes) >= ring.replica_count: + if not options.all and len(more_nodes) >= len(nodes): break print '\nAccount \t%s' % account diff --git a/bin/swift-ring-builder b/bin/swift-ring-builder index d2ec782fbe..16859924f5 100755 --- a/bin/swift-ring-builder +++ b/bin/swift-ring-builder @@ -62,7 +62,7 @@ swift-ring-builder create if len(argv) < 6: print Commands.create.__doc__.strip() exit(EXIT_ERROR) - builder = RingBuilder(int(argv[3]), int(argv[4]), int(argv[5])) + builder = RingBuilder(int(argv[3]), float(argv[4]), int(argv[5])) backup_dir = pathjoin(dirname(argv[1]), 'backups') try: mkdir(backup_dir) @@ -85,7 +85,7 @@ swift-ring-builder if builder.devs: zones = len(set(d['zone'] for d in builder.devs if d is not None)) balance = builder.get_balance() - print '%d partitions, %d replicas, %d zones, %d devices, %.02f ' \ + print '%d partitions, %.6f replicas, %d zones, %d devices, %.02f ' \ 'balance' % (builder.parts, builder.replicas, zones, len([d for d in builder.devs if d]), balance) print 'The minimum number of hours before a partition can be ' \ @@ -586,6 +586,37 @@ swift-ring-builder set_min_part_hours pickle.dump(builder.to_dict(), open(argv[1], 'wb'), protocol=2) exit(EXIT_SUCCESS) + def set_replicas(): + """ +swift-ring-builder set_replicas + Changes the replica count to the given . may + be a floating-point value, in which case some partitions will have + floor() replicas and some will have ceiling() + in the correct proportions. + + A rebalance is needed to make the change take effect. + """ + if len(argv) < 4: + print Commands.set_replicas.__doc__.strip() + exit(EXIT_ERROR) + + new_replicas = argv[3] + try: + new_replicas = float(new_replicas) + except ValueError: + print Commands.set_replicas.__doc__.strip() + print "\"%s\" is not a valid number." % new_replicas + exit(EXIT_ERROR) + + if new_replicas < 1: + print "Replica count must be at least 1." + exit(EXIT_ERROR) + + builder.set_replicas(new_replicas) + print 'The replica count is now %.6f.' % builder.replicas + print 'The change will take effect after the next rebalance.' + pickle.dump(builder.to_dict(), open(argv[1], 'wb'), protocol=2) + exit(EXIT_SUCCESS) if __name__ == '__main__': if len(argv) < 2: diff --git a/swift/common/ring/builder.py b/swift/common/ring/builder.py index e83c653ea0..0e0ac51592 100644 --- a/swift/common/ring/builder.py +++ b/swift/common/ring/builder.py @@ -166,6 +166,23 @@ class RingBuilder(object): """ self.min_part_hours = min_part_hours + def set_replicas(self, new_replica_count): + """ + Changes the number of replicas in this ring. + + If the new replica count is sufficiently different that + self._replica2part2dev will change size, sets + self.devs_changed. This is so tools like + bin/swift-ring-builder can know to write out the new ring + rather than bailing out due to lack of balance change. + """ + old_slots_used = int(self.parts * self.replicas) + new_slots_used = int(self.parts * new_replica_count) + if old_slots_used != new_slots_used: + self.devs_changed = True + + self.replicas = new_replica_count + def get_ring(self): """ Get the ring, or more specifically, the swift.common.ring.RingData. @@ -305,6 +322,10 @@ class RingBuilder(object): retval = 0 self._update_last_part_moves() last_balance = 0 + new_parts, removed_part_count = self._adjust_replica2part2dev_size() + retval += removed_part_count + self._reassign_parts(new_parts) + retval += len(new_parts) while True: reassign_parts = self._gather_reassign_parts() self._reassign_parts(reassign_parts) @@ -340,12 +361,13 @@ class RingBuilder(object): # "len" showed up in profiling, so it's just computed once. dev_len = len(self.devs) - if sum(d['parts'] for d in self._iter_devs()) != \ - self.parts * self.replicas: + + parts_on_devs = sum(d['parts'] for d in self._iter_devs()) + parts_in_map = sum(len(p2d) for p2d in self._replica2part2dev) + if parts_on_devs != parts_in_map: raise exceptions.RingValidationError( 'All partitions are not double accounted for: %d != %d' % - (sum(d['parts'] for d in self._iter_devs()), - self.parts * self.replicas)) + (parts_on_devs, parts_in_map)) if stats: # dev_usage[dev_id] will equal the number of partitions assigned to # that device. @@ -354,14 +376,13 @@ class RingBuilder(object): for dev_id in part2dev: dev_usage[dev_id] += 1 - for part in xrange(self.parts): - for replica in xrange(self.replicas): - dev_id = self._replica2part2dev[replica][part] - if dev_id >= dev_len or not self.devs[dev_id]: - raise exceptions.RingValidationError( - "Partition %d, replica %d was not allocated " - "to a device." % - (part, replica)) + for part, replica in self._each_part_replica(): + dev_id = self._replica2part2dev[replica][part] + if dev_id >= dev_len or not self.devs[dev_id]: + raise exceptions.RingValidationError( + "Partition %d, replica %d was not allocated " + "to a device." % + (part, replica)) for dev in self._iter_devs(): if not isinstance(dev['port'], int): @@ -428,12 +449,17 @@ class RingBuilder(object): def get_part_devices(self, part): """ - Get the devices that are responsible for the partition. + Get the devices that are responsible for the partition, + filtering out duplicates. :param part: partition to get devices for :returns: list of device dicts """ - return [self.devs[r[part]] for r in self._replica2part2dev] + devices = [] + for dev in self._devs_for_part(part): + if dev not in devices: + devices.append(dev) + return devices def _iter_devs(self): """ @@ -466,19 +492,83 @@ class RingBuilder(object): dev['parts_wanted'] = \ int(weight_of_one_part * dev['weight']) - dev['parts'] + def _adjust_replica2part2dev_size(self): + """ + Make sure that the lengths of the arrays in _replica2part2dev + are correct for the current value of self.replicas. + + Example: + self.part_power = 8 + self.replicas = 2.25 + + self._replica2part2dev will contain 3 arrays: the first 2 of + length 256 (2**8), and the last of length 64 (0.25 * 2**8). + + Returns a 2-tuple: the first element is a list of (partition, + replicas) tuples indicating which replicas need to be + (re)assigned to devices, and the second element is a count of + how many replicas were removed. + """ + removed_replicas = 0 + + fractional_replicas, whole_replicas = math.modf(self.replicas) + whole_replicas = int(whole_replicas) + + desired_lengths = [self.parts] * whole_replicas + if fractional_replicas: + desired_lengths.append(int(self.parts * fractional_replicas)) + + to_assign = defaultdict(list) + + if self._replica2part2dev is not None: + # If we crossed an integer threshold (say, 4.1 --> 4), + # we'll have a partial extra replica clinging on here. Clean + # up any such extra stuff. + for part2dev in self._replica2part2dev[len(desired_lengths):]: + for dev_id in part2dev: + dev_losing_part = self.devs[dev_id] + dev_losing_part['parts'] -= 1 + removed_replicas += 1 + self._replica2part2dev = \ + self._replica2part2dev[:len(desired_lengths)] + else: + self._replica2part2dev = [] + + for replica, desired_length in enumerate(desired_lengths): + if replica < len(self._replica2part2dev): + part2dev = self._replica2part2dev[replica] + if len(part2dev) < desired_length: + # Not long enough: needs to be extended and the + # newly-added pieces assigned to devices. + for part in xrange(len(part2dev), desired_length): + to_assign[part].append(replica) + part2dev.append(0) + elif len(part2dev) > desired_length: + # Too long: truncate this mapping. + for part in xrange(desired_length, len(part2dev)): + dev_losing_part = self.devs[part2dev[part]] + dev_losing_part['parts'] -= 1 + removed_replicas += 1 + self._replica2part2dev[replica] = part2dev[:desired_length] + else: + # Mapping not present at all: make one up and assign + # all of it. + for part in xrange(desired_length): + to_assign[part].append(replica) + self._replica2part2dev.append( + array('H', (0 for _junk in xrange(desired_length)))) + + return (list(to_assign.iteritems()), removed_replicas) + def _initial_balance(self): """ Initial partition assignment is the same as rebalancing an existing ring, but with some initial setup beforehand. """ - self._replica2part2dev = \ - [array('H', (0 for _junk in xrange(self.parts))) - for _junk in xrange(self.replicas)] - - replicas = range(self.replicas) self._last_part_moves = array('B', (0 for _junk in xrange(self.parts))) self._last_part_moves_epoch = int(time()) - self._reassign_parts((p, replicas) for p in xrange(self.parts)) + + self._reassign_parts(self._adjust_replica2part2dev_size()[0]) def _update_last_part_moves(self): """ @@ -515,12 +605,11 @@ class RingBuilder(object): if self._remove_devs: dev_ids = [d['id'] for d in self._remove_devs if d['parts']] if dev_ids: - for replica in xrange(self.replicas): - part2dev = self._replica2part2dev[replica] - for part in xrange(self.parts): - if part2dev[part] in dev_ids: - self._last_part_moves[part] = 0 - removed_dev_parts[part].append(replica) + for part, replica in self._each_part_replica(): + dev_id = self._replica2part2dev[replica][part] + if dev_id in dev_ids: + self._last_part_moves[part] = 0 + removed_dev_parts[part].append(replica) # Now we gather partitions that are "at risk" because they aren't # currently sufficient spread out across the cluster. @@ -536,8 +625,7 @@ class RingBuilder(object): # replicas_at_tier was a "lambda: 0" defaultdict, but profiling # revealed the lambda invocation as a significant cost. replicas_at_tier = {} - for replica in xrange(self.replicas): - dev = self.devs[self._replica2part2dev[replica][part]] + for dev in self._devs_for_part(part): if dev['id'] not in tfd: tfd[dev['id']] = tiers_for_dev(dev) for tier in tfd[dev['id']]: @@ -548,7 +636,7 @@ class RingBuilder(object): # Now, look for partitions not yet spread out enough and not # recently moved. - for replica in xrange(self.replicas): + for replica in self._replicas_for_part(part): dev = self.devs[self._replica2part2dev[replica][part]] removed_replica = False if dev['id'] not in tfd: @@ -584,10 +672,14 @@ class RingBuilder(object): start += random.randint(0, self.parts / 2) # GRAH PEP8!!! self._last_part_gather_start = start - for replica in xrange(self.replicas): - part2dev = self._replica2part2dev[replica] - for part in itertools.chain(xrange(start, self.parts), - xrange(0, start)): + for replica, part2dev in enumerate(self._replica2part2dev): + # If we've got a partial replica, start may be out of + # range. Scale it down so that we get a similar movement + # pattern (but scaled down) on sequential runs. + this_start = int(float(start) * len(part2dev) / self.parts) + + for part in itertools.chain(xrange(this_start, len(part2dev)), + xrange(0, this_start)): if self._last_part_moves[part] < self.min_part_hours: continue if part in removed_dev_parts or part in spread_out_parts: @@ -673,7 +765,7 @@ class RingBuilder(object): # replicas not-to-be-moved are in for this part. other_replicas = defaultdict(int) unique_tiers_by_tier_len = defaultdict(set) - for replica in xrange(self.replicas): + for replica in self._replicas_for_part(part): if replica not in replace_replicas: dev = self.devs[self._replica2part2dev[replica][part]] for tier in tiers_for_dev(dev): @@ -833,6 +925,35 @@ class RingBuilder(object): return mr return walk_tree((), self.replicas) + def _devs_for_part(self, part): + """ + Returns a list of devices for a specified partition. + + Deliberately includes duplicates. + """ + return [self.devs[part2dev[part]] + for part2dev in self._replica2part2dev + if part < len(part2dev)] + + def _replicas_for_part(self, part): + """ + Returns a list of replicas for a specified partition. + + These can be used as indices into self._replica2part2dev + without worrying about IndexErrors. + """ + return [replica for replica, part2dev + in enumerate(self._replica2part2dev) + if part < len(part2dev)] + + def _each_part_replica(self): + """ + Generator yielding every (partition, replica) pair in the ring. + """ + for replica, part2dev in enumerate(self._replica2part2dev): + for part in xrange(len(part2dev)): + yield (part, replica) + @classmethod def load(cls, builder_file, open=open): """ diff --git a/swift/common/ring/ring.py b/swift/common/ring/ring.py index 1f72b1c3b8..c2f8c657c6 100644 --- a/swift/common/ring/ring.py +++ b/swift/common/ring/ring.py @@ -163,7 +163,7 @@ class Ring(object): @property def replica_count(self): - """Number of replicas used in the ring.""" + """Number of replicas (full or partial) used in the ring.""" return len(self._replica2part2dev_id) @property @@ -189,7 +189,8 @@ class Ring(object): def _get_part_nodes(self, part): seen_ids = set() - return [self._devs[r[part]] for r in self._replica2part2dev_id + return [self._devs[r[part]] for r in + (rpd for rpd in self._replica2part2dev_id if len(rpd) > part) if not (r[part] in seen_ids or seen_ids.add(r[part]))] def get_part_nodes(self, part): @@ -255,8 +256,9 @@ class Ring(object): self._reload() used_tiers = set() for part2dev_id in self._replica2part2dev_id: - for tier in tiers_for_dev(self._devs[part2dev_id[part]]): - used_tiers.add(tier) + if len(part2dev_id) > part: + for tier in tiers_for_dev(self._devs[part2dev_id[part]]): + used_tiers.add(tier) for level in self.tiers_by_length: tiers = list(level) diff --git a/test/unit/common/ring/test_builder.py b/test/unit/common/ring/test_builder.py index f580082376..2e6ab58397 100644 --- a/test/unit/common/ring/test_builder.py +++ b/test/unit/common/ring/test_builder.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import operator import os import unittest import cPickle as pickle @@ -97,6 +98,16 @@ class TestRingBuilder(unittest.TestCase): self.assertNotEquals(r0.to_dict(), r1.to_dict()) self.assertEquals(r1.to_dict(), r2.to_dict()) + def test_set_replicas(self): + rb = ring.RingBuilder(8, 3.2, 1) + rb.devs_changed = False + rb.set_replicas(3.25) + self.assertTrue(rb.devs_changed) + + rb.devs_changed = False + rb.set_replicas(3.2500001) + self.assertFalse(rb.devs_changed) + def test_add_dev(self): rb = ring.RingBuilder(8, 3, 1) dev = \ @@ -532,6 +543,65 @@ class TestRingBuilder(unittest.TestCase): rb.rebalance() + def test_set_replicas_increase(self): + rb = ring.RingBuilder(8, 2, 0) + rb.add_dev({'id': 0, 'zone': 0, 'weight': 1, 'ip': '127.0.0.1', + 'port': 10000, 'device': 'sda1'}) + rb.add_dev({'id': 1, 'zone': 1, 'weight': 1, 'ip': '127.0.0.1', + 'port': 10001, 'device': 'sda1'}) + rb.rebalance() + rb.validate() + + rb.replicas = 2.1 + rb.rebalance() + rb.validate() + + self.assertEqual([len(p2d) for p2d in rb._replica2part2dev], + [256, 256, 25]) + + rb.replicas = 2.2 + rb.rebalance() + rb.validate() + self.assertEqual([len(p2d) for p2d in rb._replica2part2dev], + [256, 256, 51]) + + def test_set_replicas_decrease(self): + rb = ring.RingBuilder(4, 5, 0) + rb.add_dev({'id': 0, 'zone': 0, 'weight': 1, 'ip': '127.0.0.1', + 'port': 10000, 'device': 'sda1'}) + rb.add_dev({'id': 1, 'zone': 1, 'weight': 1, 'ip': '127.0.0.1', + 'port': 10001, 'device': 'sda1'}) + rb.rebalance() + rb.validate() + + rb.replicas = 4.9 + rb.rebalance() + print repr(rb._replica2part2dev) + print repr(rb.devs) + rb.validate() + + self.assertEqual([len(p2d) for p2d in rb._replica2part2dev], + [16, 16, 16, 16, 14]) + + # cross a couple of integer thresholds (4 and 3) + rb.replicas = 2.5 + rb.rebalance() + rb.validate() + + self.assertEqual([len(p2d) for p2d in rb._replica2part2dev], + [16, 16, 8]) + + def test_fractional_replicas_rebalance(self): + rb = ring.RingBuilder(8, 2.5, 0) + rb.add_dev({'id': 0, 'zone': 0, 'weight': 1, 'ip': '127.0.0.1', + 'port': 10000, 'device': 'sda1'}) + rb.add_dev({'id': 1, 'zone': 1, 'weight': 1, 'ip': '127.0.0.1', + 'port': 10001, 'device': 'sda1'}) + rb.rebalance() # passes by not crashing + rb.validate() # also passes by not crashing + self.assertEqual([len(p2d) for p2d in rb._replica2part2dev], + [256, 256, 128]) + def test_load(self): rb = ring.RingBuilder(8, 3, 1) devs = [{'id': 0, 'zone': 0, 'weight': 1, 'ip': '127.0.0.0', @@ -680,6 +750,31 @@ class TestRingBuilder(unittest.TestCase): rb.rebalance() self.assertNotEquals(rb.validate(stats=True)[1], 999.99) + def test_get_part_devices(self): + rb = ring.RingBuilder(8, 3, 1) + rb.add_dev({'id': 0, 'zone': 0, 'weight': 1, 'ip': '127.0.0.1', + 'port': 10000, 'device': 'sda1'}) + rb.add_dev({'id': 1, 'zone': 1, 'weight': 1, 'ip': '127.0.0.1', + 'port': 10001, 'device': 'sda1'}) + rb.rebalance() + + part_devs = sorted(rb.get_part_devices(0), + key=operator.itemgetter('id')) + self.assertEqual(part_devs, [rb.devs[0], rb.devs[1]]) + + def test_get_part_devices_partial_replicas(self): + rb = ring.RingBuilder(8, 2.5, 1) + rb.add_dev({'id': 0, 'zone': 0, 'weight': 1, 'ip': '127.0.0.1', + 'port': 10000, 'device': 'sda1'}) + rb.add_dev({'id': 1, 'zone': 1, 'weight': 1, 'ip': '127.0.0.1', + 'port': 10001, 'device': 'sda1'}) + rb.rebalance() + + # note: partition 255 will only have 2 replicas + part_devs = sorted(rb.get_part_devices(255), + key=operator.itemgetter('id')) + self.assertEqual(part_devs, [rb.devs[0], rb.devs[1]]) + if __name__ == '__main__': unittest.main()