diff --git a/doc/source/deployment_guide.rst b/doc/source/deployment_guide.rst index 8c6191f4eb..f179625632 100644 --- a/doc/source/deployment_guide.rst +++ b/doc/source/deployment_guide.rst @@ -669,6 +669,13 @@ rate_limit_after_segment 10 Rate limit the download of this segment is downloaded. rate_limit_segments_per_sec 1 Rate limit large object downloads at this rate. +request_node_count 2 * replicas Set to the number of nodes to + contact for a normal request. + You can use '* replicas' at the + end to have it use the number + given times the number of + replicas for the ring being used + for the request. ============================ =============== ============================= [tempauth] diff --git a/etc/proxy-server.conf-sample b/etc/proxy-server.conf-sample index edeaabbdaf..1b731ee735 100644 --- a/etc/proxy-server.conf-sample +++ b/etc/proxy-server.conf-sample @@ -98,6 +98,10 @@ use = egg:swift#proxy # as a regular object on GETs, i.e. will return that object's contents. Should # be set to false if slo is not used in pipeline. # allow_static_large_object = true +# Set to the number of nodes to contact for a normal request. You can use +# '* replicas' at the end to have it use the number given times the number of +# replicas for the ring being used for the request. +# request_node_count = 2 * replicas [filter:tempauth] use = egg:swift#tempauth diff --git a/swift/common/ring/ring.py b/swift/common/ring/ring.py index 61cc106c47..8d395607a0 100644 --- a/swift/common/ring/ring.py +++ b/swift/common/ring/ring.py @@ -204,6 +204,21 @@ class Ring(object): seen_ids.add(dev_id) return part_nodes + def get_part(self, account, container=None, obj=None): + """ + Get the partition for an account/container/object. + + :param account: account name + :param container: container name + :param obj: object name + :returns: the partition number + """ + key = hash_path(account, container, obj, raw_digest=True) + if time() > self._rtime: + self._reload() + part = struct.unpack_from('>I', key)[0] >> self._part_shift + return part + def get_part_nodes(self, part): """ Get the nodes that are responsible for the partition. If one @@ -248,10 +263,7 @@ class Ring(object): hardware description ====== =============================================================== """ - key = hash_path(account, container, obj, raw_digest=True) - if time() > self._rtime: - self._reload() - part = struct.unpack_from('>I', key)[0] >> self._part_shift + part = self.get_part(account, container, obj) return part, self._get_part_nodes(part) def get_more_nodes(self, part): diff --git a/swift/proxy/controllers/account.py b/swift/proxy/controllers/account.py index adb23733ff..ee5995f1eb 100644 --- a/swift/proxy/controllers/account.py +++ b/swift/proxy/controllers/account.py @@ -48,10 +48,9 @@ class AccountController(Controller): def GETorHEAD(self, req): """Handler for HTTP GET/HEAD requests.""" partition, nodes = self.app.account_ring.get_nodes(self.account_name) - nodes = self.app.sort_nodes(nodes) resp = self.GETorHEAD_base( - req, _('Account'), partition, nodes, req.path_info.rstrip('/'), - len(nodes)) + req, _('Account'), self.app.account_ring, partition, + req.path_info.rstrip('/')) if resp.status_int == HTTP_NOT_FOUND and self.app.account_autocreate: if len(self.account_name) > MAX_ACCOUNT_NAME_LENGTH: resp = HTTPBadRequest(request=req) @@ -70,8 +69,8 @@ class AccountController(Controller): self.account_name) return resp resp = self.GETorHEAD_base( - req, _('Account'), partition, nodes, req.path_info.rstrip('/'), - len(nodes)) + req, _('Account'), self.app.account_ring, partition, + req.path_info.rstrip('/')) return resp @public diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py index 4a4d13ffa5..9032ad4ab7 100644 --- a/swift/proxy/controllers/base.py +++ b/swift/proxy/controllers/base.py @@ -242,7 +242,7 @@ def get_account_info(env, app, swift_source=None): cache = cache_from_env(env) if not cache: return None - (version, account, container, _) = \ + (version, account, _junk, _junk) = \ split_path(env['PATH_INFO'], 2, 4, True) cache_key = get_account_memcache_key(account) # Use a unique environment cache key per account. If you copy this env @@ -295,15 +295,6 @@ class Controller(object): if k.lower() in self.pass_through_headers or k.lower().startswith(x_meta)) - def error_increment(self, node): - """ - Handles incrementing error counts when talking to nodes. - - :param node: dictionary of node to increment the error count for - """ - node['errors'] = node.get('errors', 0) + 1 - node['last_error'] = time.time() - def error_occurred(self, node, msg): """ Handle logging, and handling of errors. @@ -311,10 +302,11 @@ class Controller(object): :param node: dictionary of node to handle errors for :param msg: error message """ - self.error_increment(node) - self.app.logger.error(_('%(msg)s %(ip)s:%(port)s'), + node['errors'] = node.get('errors', 0) + 1 + node['last_error'] = time.time() + self.app.logger.error(_('%(msg)s %(ip)s:%(port)s/%(device)s'), {'msg': msg, 'ip': node['ip'], - 'port': node['port']}) + 'port': node['port'], 'device': node['device']}) def exception_occurred(self, node, typ, additional_info): """ @@ -352,14 +344,21 @@ class Controller(object): _('Node error limited %(ip)s:%(port)s (%(device)s)'), node) return limited - def error_limit(self, node): + def error_limit(self, node, msg): """ - Mark a node as error limited. + Mark a node as error limited. This immediately pretends the + node received enough errors to trigger error suppression. Use + this for errors like Insufficient Storage. For other errors + use :func:`error_occurred`. :param node: dictionary of node to error limit + :param msg: error message """ node['errors'] = self.app.error_suppression_limit + 1 node['last_error'] = time.time() + self.app.logger.error(_('%(msg)s %(ip)s:%(port)s/%(device)s'), + {'msg': msg, 'ip': node['ip'], + 'port': node['port'], 'device': node['device']}) def account_info(self, account, autocreate=False): """ @@ -393,16 +392,9 @@ class Controller(object): elif result_code == HTTP_NOT_FOUND and not autocreate: return None, None, None result_code = 0 - attempts_left = len(nodes) path = '/%s' % account headers = {'x-trans-id': self.trans_id, 'Connection': 'close'} - iternodes = self.iter_nodes(partition, nodes, self.app.account_ring) - while attempts_left > 0: - try: - node = iternodes.next() - except StopIteration: - break - attempts_left -= 1 + for node in self.iter_nodes(self.app.account_ring, partition): try: start_node_timing = time.time() with ConnectionTimeout(self.app.conn_timeout): @@ -412,7 +404,7 @@ class Controller(object): self.app.set_node_timing(node, time.time() - start_node_timing) with Timeout(self.app.node_timeout): resp = conn.getresponse() - resp.read() + body = resp.read() if is_success(resp.status): result_code = HTTP_OK account_info.update( @@ -424,10 +416,16 @@ class Controller(object): elif result_code != HTTP_NOT_FOUND: result_code = -1 elif resp.status == HTTP_INSUFFICIENT_STORAGE: - self.error_limit(node) + self.error_limit(node, _('ERROR Insufficient Storage')) continue else: result_code = -1 + if is_server_error(resp.status): + self.error_occurred( + node, + _('ERROR %(status)d %(body)s From Account ' + 'Server') % + {'status': resp.status, 'body': body[:1024]}) except (Exception, Timeout): self.exception_occurred(node, _('Account'), _('Trying to get account info for %s') @@ -497,9 +495,8 @@ class Controller(object): return container_info if not self.account_info(account, autocreate=account_autocreate)[1]: return container_info - attempts_left = len(nodes) headers = {'x-trans-id': self.trans_id, 'Connection': 'close'} - for node in self.iter_nodes(part, nodes, self.app.container_ring): + for node in self.iter_nodes(self.app.container_ring, part): try: start_node_timing = time.time() with ConnectionTimeout(self.app.conn_timeout): @@ -509,7 +506,7 @@ class Controller(object): self.app.set_node_timing(node, time.time() - start_node_timing) with Timeout(self.app.node_timeout): resp = conn.getresponse() - resp.read() + body = resp.read() if is_success(resp.status): container_info.update( headers_to_container_info(resp.getheaders())) @@ -519,14 +516,16 @@ class Controller(object): else: container_info['status'] = -1 if resp.status == HTTP_INSUFFICIENT_STORAGE: - self.error_limit(node) + self.error_limit(node, _('ERROR Insufficient Storage')) + elif is_server_error(resp.status): + self.error_occurred(node, _( + 'ERROR %(status)d %(body)s From Container ' + 'Server') % + {'status': resp.status, 'body': body[:1024]}) except (Exception, Timeout): self.exception_occurred( node, _('Container'), _('Trying to get container info for %s') % path) - attempts_left -= 1 - if attempts_left <= 0: - break if self.app.memcache: if container_info['status'] == HTTP_OK: self.app.memcache.set( @@ -541,18 +540,25 @@ class Controller(object): container_info['nodes'] = nodes return container_info - def iter_nodes(self, partition, nodes, ring): + def iter_nodes(self, ring, partition): """ - Node iterator that will first iterate over the normal nodes for a - partition and then the handoff partitions for the node. + Yields nodes for a ring partition, skipping over error + limited nodes and stopping at the configurable number of + nodes. If a node yielded subsequently gets error limited, an + extra node will be yielded to take its place. - :param partition: partition to iterate nodes for - :param nodes: list of node dicts from the ring - :param ring: ring to get handoff nodes from + :param ring: ring to get yield nodes from + :param partition: ring partition to yield nodes for """ - for node in nodes: + primary_nodes = self.app.sort_nodes(ring.get_part_nodes(partition)) + nodes_left = self.app.request_node_count(ring) + for node in primary_nodes: if not self.error_limited(node): yield node + if not self.error_limited(node): + nodes_left -= 1 + if nodes_left <= 0: + return handoffs = 0 for node in ring.get_more_nodes(partition): if not self.error_limited(node): @@ -561,9 +567,13 @@ class Controller(object): self.app.logger.increment('handoff_count') self.app.logger.warning( 'Handoff requested (%d)' % handoffs) - if handoffs == len(nodes): + if handoffs == len(primary_nodes): self.app.logger.increment('handoff_all_count') yield node + if not self.error_limited(node): + nodes_left -= 1 + if nodes_left <= 0: + return def _make_request(self, nodes, part, method, path, headers, query, logger_thread_locals): @@ -583,7 +593,7 @@ class Controller(object): not is_server_error(resp.status): return resp.status, resp.reason, resp.read() elif resp.status == HTTP_INSUFFICIENT_STORAGE: - self.error_limit(node) + self.error_limit(node, _('ERROR Insufficient Storage')) except (Exception, Timeout): self.exception_occurred(node, self.server_type, _('Trying to %(method)s %(path)s') % @@ -601,7 +611,7 @@ class Controller(object): :returns: a swob.Response object """ start_nodes = ring.get_part_nodes(part) - nodes = self.iter_nodes(part, start_nodes, ring) + nodes = self.iter_nodes(ring, part) pile = GreenPile(len(start_nodes)) for head in headers: pile.spawn(self._make_request, nodes, part, method, path, @@ -755,17 +765,15 @@ class Controller(object): """ return is_success(src.status) or is_redirection(src.status) - def GETorHEAD_base(self, req, server_type, partition, nodes, path, - attempts): + def GETorHEAD_base(self, req, server_type, ring, partition, path): """ Base handler for HTTP GET or HEAD requests. :param req: swob.Request object :param server_type: server type + :param ring: the ring to obtain nodes from :param partition: partition - :param nodes: nodes :param path: path for the request - :param attempts: number of attempts to try :returns: swob.Response object """ statuses = [] @@ -773,14 +781,7 @@ class Controller(object): bodies = [] sources = [] newest = config_true_value(req.headers.get('x-newest', 'f')) - nodes = iter(nodes) - while len(statuses) < attempts: - try: - node = nodes.next() - except StopIteration: - break - if self.error_limited(node): - continue + for node in self.iter_nodes(ring, partition): start_node_timing = time.time() try: with ConnectionTimeout(self.app.conn_timeout): @@ -811,7 +812,7 @@ class Controller(object): statuses.append(possible_source.status) reasons.append(possible_source.reason) bodies.append('') - sources.append(possible_source) + sources.append((possible_source, node)) if not newest: # one good source is enough break else: @@ -819,7 +820,7 @@ class Controller(object): reasons.append(possible_source.reason) bodies.append(possible_source.read()) if possible_source.status == HTTP_INSUFFICIENT_STORAGE: - self.error_limit(node) + self.error_limit(node, _('ERROR Insufficient Storage')) elif is_server_error(possible_source.status): self.error_occurred(node, _('ERROR %(status)d %(body)s ' 'From %(type)s Server') % @@ -827,9 +828,9 @@ class Controller(object): 'body': bodies[-1][:1024], 'type': server_type}) if sources: - sources.sort(key=source_key) - source = sources.pop() - for src in sources: + sources.sort(key=lambda s: source_key(s[0])) + source, node = sources.pop() + for src, _junk in sources: self.close_swift_conn(src) res = Response(request=req, conditional_response=True) if req.method == 'GET' and \ diff --git a/swift/proxy/controllers/container.py b/swift/proxy/controllers/container.py index 61d1aa1198..b0ce7bea97 100644 --- a/swift/proxy/controllers/container.py +++ b/swift/proxy/controllers/container.py @@ -66,11 +66,10 @@ class ContainerController(Controller): """Handler for HTTP GET/HEAD requests.""" if not self.account_info(self.account_name)[1]: return HTTPNotFound(request=req) - part, nodes = self.app.container_ring.get_nodes( + part = self.app.container_ring.get_part( self.account_name, self.container_name) - nodes = self.app.sort_nodes(nodes) resp = self.GETorHEAD_base( - req, _('Container'), part, nodes, req.path_info, len(nodes)) + req, _('Container'), self.app.container_ring, part, req.path_info) if self.app.memcache: # set the memcache container size for ratelimiting cache_key = get_container_memcache_key(self.account_name, diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index 16d864ded7..9fb13b2af6 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -140,7 +140,7 @@ class SegmentedIterable(object): self.segment_dict['name'].lstrip('/').split('/', 1) else: container, obj = self.container, self.segment_dict['name'] - partition, nodes = self.controller.app.object_ring.get_nodes( + partition = self.controller.app.object_ring.get_part( self.controller.account_name, container, obj) path = '/%s/%s/%s' % (self.controller.account_name, container, obj) req = Request.blank(path) @@ -152,12 +152,9 @@ class SegmentedIterable(object): sleep(max(self.next_get_time - time.time(), 0)) self.next_get_time = time.time() + \ 1.0 / self.controller.app.rate_limit_segments_per_sec - nodes = self.controller.app.sort_nodes(nodes) resp = self.controller.GETorHEAD_base( - req, _('Object'), partition, - self.controller.iter_nodes(partition, nodes, - self.controller.app.object_ring), - path, len(nodes)) + req, _('Object'), self.controller.app.object_ring, partition, + path) if self.is_slo and resp.status_int == HTTP_NOT_FOUND: raise SloSegmentError(_( 'Could not load object segment %(path)s:' @@ -309,7 +306,7 @@ class ObjectController(Controller): yield item def _listing_pages_iter(self, lcontainer, lprefix, env): - lpartition, lnodes = self.app.container_ring.get_nodes( + lpartition = self.app.container_ring.get_part( self.account_name, lcontainer) marker = '' while True: @@ -321,10 +318,9 @@ class ObjectController(Controller): lreq.environ['QUERY_STRING'] = \ 'format=json&prefix=%s&marker=%s' % (quote(lprefix), quote(marker)) - lnodes = self.app.sort_nodes(lnodes) lresp = self.GETorHEAD_base( - lreq, _('Container'), lpartition, lnodes, lreq.path_info, - len(lnodes)) + lreq, _('Container'), self.app.container_ring, lpartition, + lreq.path_info) if 'swift.authorize' in env: lreq.acl = lresp.headers.get('x-container-read') aresp = env['swift.authorize'](lreq) @@ -385,13 +381,10 @@ class ObjectController(Controller): if aresp: return aresp - partition, nodes = self.app.object_ring.get_nodes( + partition = self.app.object_ring.get_part( self.account_name, self.container_name, self.object_name) - nodes = self.app.sort_nodes(nodes) resp = self.GETorHEAD_base( - req, _('Object'), partition, - self.iter_nodes(partition, nodes, self.app.object_ring), - req.path_info, len(nodes)) + req, _('Object'), self.app.object_ring, partition, req.path_info) if ';' in resp.headers.get('content-type', ''): # strip off swift_bytes from content-type @@ -424,11 +417,9 @@ class ObjectController(Controller): new_req = req.copy_get() new_req.method = 'GET' new_req.range = None - nodes = self.app.sort_nodes(nodes) new_resp = self.GETorHEAD_base( - new_req, _('Object'), partition, - self.iter_nodes(partition, nodes, self.app.object_ring), - req.path_info, len(nodes)) + new_req, _('Object'), self.app.object_ring, partition, + req.path_info) if new_resp.status_int // 100 == 2: try: listing = json.loads(new_resp.body) @@ -685,7 +676,7 @@ class ObjectController(Controller): conn.node = node return conn elif resp.status == HTTP_INSUFFICIENT_STORAGE: - self.error_limit(node) + self.error_limit(node, _('ERROR Insufficient Storage')) except: self.exception_occurred(node, _('Object'), _('Expect: 100-continue on %s') % path) @@ -744,8 +735,9 @@ class ObjectController(Controller): req.environ.get('swift_versioned_copy')): hreq = Request.blank(req.path_info, headers={'X-Newest': 'True'}, environ={'REQUEST_METHOD': 'HEAD'}) - hresp = self.GETorHEAD_base(hreq, _('Object'), partition, nodes, - hreq.path_info, len(nodes)) + hresp = self.GETorHEAD_base( + hreq, _('Object'), self.app.object_ring, partition, + hreq.path_info) # Used by container sync feature if 'x-timestamp' in req.headers: try: @@ -867,7 +859,7 @@ class ObjectController(Controller): source_resp.headers['X-Static-Large-Object'] req = new_req - node_iter = self.iter_nodes(partition, nodes, self.app.object_ring) + node_iter = self.iter_nodes(self.app.object_ring, partition) pile = GreenPile(len(nodes)) chunked = req.headers.get('transfer-encoding') diff --git a/swift/proxy/server.py b/swift/proxy/server.py index 607236ca47..b8fd8ad09b 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -115,6 +115,16 @@ class Application(object): self.sorting_method = conf.get('sorting_method', 'shuffle').lower() self.allow_static_large_object = config_true_value( conf.get('allow_static_large_object', 'true')) + value = conf.get('request_node_count', '2 * replicas').lower().split() + if len(value) == 1: + value = int(value[0]) + self.request_node_count = lambda r: value + elif len(value) == 3 and value[1] == '*' and value[2] == 'replicas': + value = int(value[0]) + self.request_node_count = lambda r: value * r.replica_count + else: + raise ValueError( + 'Invalid request_node_count value: %r' % ''.join(value)) def get_controller(self, path): """ diff --git a/test/unit/common/ring/test_ring.py b/test/unit/common/ring/test_ring.py index 4ea10c5d60..11ae6f6bd8 100644 --- a/test/unit/common/ring/test_ring.py +++ b/test/unit/common/ring/test_ring.py @@ -201,6 +201,13 @@ class TestRing(unittest.TestCase): self.assertEquals(len(self.ring.devs), 9) self.assertNotEquals(self.ring._mtime, orig_mtime) + def test_get_part(self): + part1 = self.ring.get_part('a') + nodes1 = self.ring.get_part_nodes(part1) + part2, nodes2 = self.ring.get_nodes('a') + self.assertEquals(part1, part2) + self.assertEquals(nodes1, nodes2) + def test_get_part_nodes(self): part, nodes = self.ring.get_nodes('a') self.assertEquals(nodes, self.ring.get_part_nodes(part)) diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index e1ed81a525..0a1d322eab 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -21,7 +21,7 @@ import sys import unittest import urlparse import signal -from contextlib import contextmanager +from contextlib import contextmanager, nested from gzip import GzipFile from shutil import rmtree import time @@ -30,6 +30,7 @@ from hashlib import md5 from tempfile import mkdtemp import random +import mock from eventlet import sleep, spawn, wsgi, listen import simplejson @@ -204,6 +205,13 @@ class FakeRing(object): self.replicas = replicas self.devs = {} + @property + def replica_count(self): + return self.replicas + + def get_part(self, account, container=None, obj=None): + return 1 + def get_nodes(self, account, container=None, obj=None): devs = [] for x in xrange(self.replicas): @@ -1872,12 +1880,13 @@ class TestObjectController(unittest.TestCase): 'container', 'object') collected_nodes = [] - for node in controller.iter_nodes(partition, nodes, - self.app.object_ring): + for node in controller.iter_nodes(self.app.object_ring, + partition): collected_nodes.append(node) self.assertEquals(len(collected_nodes), 5) self.app.object_ring.max_more_nodes = 20 + self.app.request_node_count = lambda r: 20 controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') @@ -1885,8 +1894,8 @@ class TestObjectController(unittest.TestCase): 'container', 'object') collected_nodes = [] - for node in controller.iter_nodes(partition, nodes, - self.app.object_ring): + for node in controller.iter_nodes(self.app.object_ring, + partition): collected_nodes.append(node) self.assertEquals(len(collected_nodes), 9) @@ -1900,8 +1909,8 @@ class TestObjectController(unittest.TestCase): 'container', 'object') collected_nodes = [] - for node in controller.iter_nodes(partition, nodes, - self.app.object_ring): + for node in controller.iter_nodes(self.app.object_ring, + partition): collected_nodes.append(node) self.assertEquals(len(collected_nodes), 5) self.assertEquals( @@ -1919,14 +1928,49 @@ class TestObjectController(unittest.TestCase): 'container', 'object') collected_nodes = [] - for node in controller.iter_nodes(partition, nodes, - self.app.object_ring): + for node in controller.iter_nodes(self.app.object_ring, + partition): collected_nodes.append(node) self.assertEquals(len(collected_nodes), 5) self.assertEquals(self.app.logger.log_dict['warning'], []) finally: self.app.object_ring.max_more_nodes = 0 + def test_iter_nodes_calls_sort_nodes(self): + with mock.patch.object(self.app, 'sort_nodes') as sort_nodes: + controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o') + for node in controller.iter_nodes(self.app.object_ring, 0): + pass + sort_nodes.assert_called_once_with( + self.app.object_ring.get_part_nodes(0)) + + def test_iter_nodes_skips_error_limited(self): + with mock.patch.object(self.app, 'sort_nodes', lambda n: n): + controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o') + first_nodes = list(controller.iter_nodes(self.app.object_ring, 0)) + second_nodes = list(controller.iter_nodes(self.app.object_ring, 0)) + self.assertTrue(first_nodes[0] in second_nodes) + + controller.error_limit(first_nodes[0], 'test') + second_nodes = list(controller.iter_nodes(self.app.object_ring, 0)) + self.assertTrue(first_nodes[0] not in second_nodes) + + def test_iter_nodes_gives_extra_if_error_limited_inline(self): + with nested( + mock.patch.object(self.app, 'sort_nodes', lambda n: n), + mock.patch.object(self.app, 'request_node_count', + lambda r: 6), + mock.patch.object(self.app.object_ring, 'max_more_nodes', 99)): + controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o') + first_nodes = list(controller.iter_nodes(self.app.object_ring, 0)) + second_nodes = [] + for node in controller.iter_nodes(self.app.object_ring, 0): + if not second_nodes: + controller.error_limit(node, 'test') + second_nodes.append(node) + self.assertEquals(len(first_nodes), 6) + self.assertEquals(len(second_nodes), 7) + def test_best_response_sets_etag(self): controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') @@ -5425,8 +5469,8 @@ class FakeObjectController(object): resp = Response(app_iter=iter(body)) return resp - def iter_nodes(self, partition, nodes, ring): - for node in nodes: + def iter_nodes(self, ring, partition): + for node in ring.get_part_nodes(partition): yield node for node in ring.get_more_nodes(partition): yield node