From d79a67ebf6cbfbe13d2321832f783a57516b67de Mon Sep 17 00:00:00 2001 From: gholt Date: Sat, 6 Apr 2013 01:35:58 +0000 Subject: [PATCH] Refactored lists of nodes to contact for requests Extensive refactor here to consolidate what nodes are contacted for any request. This consolidation means reads will contact the same set of nodes that writes would, giving a very good chance that read-your-write behavior will succeed. This also means that writes will not necessarily try all nodes in the cluster as it would previously, which really wasn't desirable anyway. (If you really want that, you can set request_node_count to a really big number, but understand that also means reads will contact every node looking for something that might not exist.) * Added a request_node_count proxy-server conf value that allows control of how many nodes are contacted for a normal request. In proxy.controllers.base.Controller: * Got rid of error_increment since it was only used in one spot by another method and just served to confuse. * Made error_occurred also log the device name. * Made error_limit require an error message and also documented a bit better. * Changed iter_nodes to just take a ring and a partition and yield all the nodes itself so it could control the number of nodes used in a given request. Also happens to consolidate where sort_nodes is called. * Updated account_info and container_info to use all nodes from iter_nodes and to call error_occurred appropriately. * Updated GETorHEAD_base to not track attempts on its own and just stop when iter_nodes tells it to stop. Also, it doesn't take the nodes to contact anymore; instead it takes the ring and gets the nodes from iter_nodes itself. Elsewhere: * Ring now has a get_part method. * Made changes to reflect all of the above. Change-Id: I37f76c99286b6456311abf25167cd0485bfcafac --- doc/source/deployment_guide.rst | 7 ++ etc/proxy-server.conf-sample | 4 + swift/common/ring/ring.py | 20 ++++- swift/proxy/controllers/account.py | 9 +- swift/proxy/controllers/base.py | 119 ++++++++++++++------------- swift/proxy/controllers/container.py | 5 +- swift/proxy/controllers/obj.py | 38 ++++----- swift/proxy/server.py | 10 +++ test/unit/common/ring/test_ring.py | 7 ++ test/unit/proxy/test_server.py | 66 ++++++++++++--- 10 files changed, 180 insertions(+), 105 deletions(-) diff --git a/doc/source/deployment_guide.rst b/doc/source/deployment_guide.rst index 8c6191f4eb..f179625632 100644 --- a/doc/source/deployment_guide.rst +++ b/doc/source/deployment_guide.rst @@ -669,6 +669,13 @@ rate_limit_after_segment 10 Rate limit the download of this segment is downloaded. rate_limit_segments_per_sec 1 Rate limit large object downloads at this rate. +request_node_count 2 * replicas Set to the number of nodes to + contact for a normal request. + You can use '* replicas' at the + end to have it use the number + given times the number of + replicas for the ring being used + for the request. ============================ =============== ============================= [tempauth] diff --git a/etc/proxy-server.conf-sample b/etc/proxy-server.conf-sample index edeaabbdaf..1b731ee735 100644 --- a/etc/proxy-server.conf-sample +++ b/etc/proxy-server.conf-sample @@ -98,6 +98,10 @@ use = egg:swift#proxy # as a regular object on GETs, i.e. will return that object's contents. Should # be set to false if slo is not used in pipeline. # allow_static_large_object = true +# Set to the number of nodes to contact for a normal request. You can use +# '* replicas' at the end to have it use the number given times the number of +# replicas for the ring being used for the request. +# request_node_count = 2 * replicas [filter:tempauth] use = egg:swift#tempauth diff --git a/swift/common/ring/ring.py b/swift/common/ring/ring.py index 61cc106c47..8d395607a0 100644 --- a/swift/common/ring/ring.py +++ b/swift/common/ring/ring.py @@ -204,6 +204,21 @@ class Ring(object): seen_ids.add(dev_id) return part_nodes + def get_part(self, account, container=None, obj=None): + """ + Get the partition for an account/container/object. + + :param account: account name + :param container: container name + :param obj: object name + :returns: the partition number + """ + key = hash_path(account, container, obj, raw_digest=True) + if time() > self._rtime: + self._reload() + part = struct.unpack_from('>I', key)[0] >> self._part_shift + return part + def get_part_nodes(self, part): """ Get the nodes that are responsible for the partition. If one @@ -248,10 +263,7 @@ class Ring(object): hardware description ====== =============================================================== """ - key = hash_path(account, container, obj, raw_digest=True) - if time() > self._rtime: - self._reload() - part = struct.unpack_from('>I', key)[0] >> self._part_shift + part = self.get_part(account, container, obj) return part, self._get_part_nodes(part) def get_more_nodes(self, part): diff --git a/swift/proxy/controllers/account.py b/swift/proxy/controllers/account.py index adb23733ff..ee5995f1eb 100644 --- a/swift/proxy/controllers/account.py +++ b/swift/proxy/controllers/account.py @@ -48,10 +48,9 @@ class AccountController(Controller): def GETorHEAD(self, req): """Handler for HTTP GET/HEAD requests.""" partition, nodes = self.app.account_ring.get_nodes(self.account_name) - nodes = self.app.sort_nodes(nodes) resp = self.GETorHEAD_base( - req, _('Account'), partition, nodes, req.path_info.rstrip('/'), - len(nodes)) + req, _('Account'), self.app.account_ring, partition, + req.path_info.rstrip('/')) if resp.status_int == HTTP_NOT_FOUND and self.app.account_autocreate: if len(self.account_name) > MAX_ACCOUNT_NAME_LENGTH: resp = HTTPBadRequest(request=req) @@ -70,8 +69,8 @@ class AccountController(Controller): self.account_name) return resp resp = self.GETorHEAD_base( - req, _('Account'), partition, nodes, req.path_info.rstrip('/'), - len(nodes)) + req, _('Account'), self.app.account_ring, partition, + req.path_info.rstrip('/')) return resp @public diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py index 4a4d13ffa5..9032ad4ab7 100644 --- a/swift/proxy/controllers/base.py +++ b/swift/proxy/controllers/base.py @@ -242,7 +242,7 @@ def get_account_info(env, app, swift_source=None): cache = cache_from_env(env) if not cache: return None - (version, account, container, _) = \ + (version, account, _junk, _junk) = \ split_path(env['PATH_INFO'], 2, 4, True) cache_key = get_account_memcache_key(account) # Use a unique environment cache key per account. If you copy this env @@ -295,15 +295,6 @@ class Controller(object): if k.lower() in self.pass_through_headers or k.lower().startswith(x_meta)) - def error_increment(self, node): - """ - Handles incrementing error counts when talking to nodes. - - :param node: dictionary of node to increment the error count for - """ - node['errors'] = node.get('errors', 0) + 1 - node['last_error'] = time.time() - def error_occurred(self, node, msg): """ Handle logging, and handling of errors. @@ -311,10 +302,11 @@ class Controller(object): :param node: dictionary of node to handle errors for :param msg: error message """ - self.error_increment(node) - self.app.logger.error(_('%(msg)s %(ip)s:%(port)s'), + node['errors'] = node.get('errors', 0) + 1 + node['last_error'] = time.time() + self.app.logger.error(_('%(msg)s %(ip)s:%(port)s/%(device)s'), {'msg': msg, 'ip': node['ip'], - 'port': node['port']}) + 'port': node['port'], 'device': node['device']}) def exception_occurred(self, node, typ, additional_info): """ @@ -352,14 +344,21 @@ class Controller(object): _('Node error limited %(ip)s:%(port)s (%(device)s)'), node) return limited - def error_limit(self, node): + def error_limit(self, node, msg): """ - Mark a node as error limited. + Mark a node as error limited. This immediately pretends the + node received enough errors to trigger error suppression. Use + this for errors like Insufficient Storage. For other errors + use :func:`error_occurred`. :param node: dictionary of node to error limit + :param msg: error message """ node['errors'] = self.app.error_suppression_limit + 1 node['last_error'] = time.time() + self.app.logger.error(_('%(msg)s %(ip)s:%(port)s/%(device)s'), + {'msg': msg, 'ip': node['ip'], + 'port': node['port'], 'device': node['device']}) def account_info(self, account, autocreate=False): """ @@ -393,16 +392,9 @@ class Controller(object): elif result_code == HTTP_NOT_FOUND and not autocreate: return None, None, None result_code = 0 - attempts_left = len(nodes) path = '/%s' % account headers = {'x-trans-id': self.trans_id, 'Connection': 'close'} - iternodes = self.iter_nodes(partition, nodes, self.app.account_ring) - while attempts_left > 0: - try: - node = iternodes.next() - except StopIteration: - break - attempts_left -= 1 + for node in self.iter_nodes(self.app.account_ring, partition): try: start_node_timing = time.time() with ConnectionTimeout(self.app.conn_timeout): @@ -412,7 +404,7 @@ class Controller(object): self.app.set_node_timing(node, time.time() - start_node_timing) with Timeout(self.app.node_timeout): resp = conn.getresponse() - resp.read() + body = resp.read() if is_success(resp.status): result_code = HTTP_OK account_info.update( @@ -424,10 +416,16 @@ class Controller(object): elif result_code != HTTP_NOT_FOUND: result_code = -1 elif resp.status == HTTP_INSUFFICIENT_STORAGE: - self.error_limit(node) + self.error_limit(node, _('ERROR Insufficient Storage')) continue else: result_code = -1 + if is_server_error(resp.status): + self.error_occurred( + node, + _('ERROR %(status)d %(body)s From Account ' + 'Server') % + {'status': resp.status, 'body': body[:1024]}) except (Exception, Timeout): self.exception_occurred(node, _('Account'), _('Trying to get account info for %s') @@ -497,9 +495,8 @@ class Controller(object): return container_info if not self.account_info(account, autocreate=account_autocreate)[1]: return container_info - attempts_left = len(nodes) headers = {'x-trans-id': self.trans_id, 'Connection': 'close'} - for node in self.iter_nodes(part, nodes, self.app.container_ring): + for node in self.iter_nodes(self.app.container_ring, part): try: start_node_timing = time.time() with ConnectionTimeout(self.app.conn_timeout): @@ -509,7 +506,7 @@ class Controller(object): self.app.set_node_timing(node, time.time() - start_node_timing) with Timeout(self.app.node_timeout): resp = conn.getresponse() - resp.read() + body = resp.read() if is_success(resp.status): container_info.update( headers_to_container_info(resp.getheaders())) @@ -519,14 +516,16 @@ class Controller(object): else: container_info['status'] = -1 if resp.status == HTTP_INSUFFICIENT_STORAGE: - self.error_limit(node) + self.error_limit(node, _('ERROR Insufficient Storage')) + elif is_server_error(resp.status): + self.error_occurred(node, _( + 'ERROR %(status)d %(body)s From Container ' + 'Server') % + {'status': resp.status, 'body': body[:1024]}) except (Exception, Timeout): self.exception_occurred( node, _('Container'), _('Trying to get container info for %s') % path) - attempts_left -= 1 - if attempts_left <= 0: - break if self.app.memcache: if container_info['status'] == HTTP_OK: self.app.memcache.set( @@ -541,18 +540,25 @@ class Controller(object): container_info['nodes'] = nodes return container_info - def iter_nodes(self, partition, nodes, ring): + def iter_nodes(self, ring, partition): """ - Node iterator that will first iterate over the normal nodes for a - partition and then the handoff partitions for the node. + Yields nodes for a ring partition, skipping over error + limited nodes and stopping at the configurable number of + nodes. If a node yielded subsequently gets error limited, an + extra node will be yielded to take its place. - :param partition: partition to iterate nodes for - :param nodes: list of node dicts from the ring - :param ring: ring to get handoff nodes from + :param ring: ring to get yield nodes from + :param partition: ring partition to yield nodes for """ - for node in nodes: + primary_nodes = self.app.sort_nodes(ring.get_part_nodes(partition)) + nodes_left = self.app.request_node_count(ring) + for node in primary_nodes: if not self.error_limited(node): yield node + if not self.error_limited(node): + nodes_left -= 1 + if nodes_left <= 0: + return handoffs = 0 for node in ring.get_more_nodes(partition): if not self.error_limited(node): @@ -561,9 +567,13 @@ class Controller(object): self.app.logger.increment('handoff_count') self.app.logger.warning( 'Handoff requested (%d)' % handoffs) - if handoffs == len(nodes): + if handoffs == len(primary_nodes): self.app.logger.increment('handoff_all_count') yield node + if not self.error_limited(node): + nodes_left -= 1 + if nodes_left <= 0: + return def _make_request(self, nodes, part, method, path, headers, query, logger_thread_locals): @@ -583,7 +593,7 @@ class Controller(object): not is_server_error(resp.status): return resp.status, resp.reason, resp.read() elif resp.status == HTTP_INSUFFICIENT_STORAGE: - self.error_limit(node) + self.error_limit(node, _('ERROR Insufficient Storage')) except (Exception, Timeout): self.exception_occurred(node, self.server_type, _('Trying to %(method)s %(path)s') % @@ -601,7 +611,7 @@ class Controller(object): :returns: a swob.Response object """ start_nodes = ring.get_part_nodes(part) - nodes = self.iter_nodes(part, start_nodes, ring) + nodes = self.iter_nodes(ring, part) pile = GreenPile(len(start_nodes)) for head in headers: pile.spawn(self._make_request, nodes, part, method, path, @@ -755,17 +765,15 @@ class Controller(object): """ return is_success(src.status) or is_redirection(src.status) - def GETorHEAD_base(self, req, server_type, partition, nodes, path, - attempts): + def GETorHEAD_base(self, req, server_type, ring, partition, path): """ Base handler for HTTP GET or HEAD requests. :param req: swob.Request object :param server_type: server type + :param ring: the ring to obtain nodes from :param partition: partition - :param nodes: nodes :param path: path for the request - :param attempts: number of attempts to try :returns: swob.Response object """ statuses = [] @@ -773,14 +781,7 @@ class Controller(object): bodies = [] sources = [] newest = config_true_value(req.headers.get('x-newest', 'f')) - nodes = iter(nodes) - while len(statuses) < attempts: - try: - node = nodes.next() - except StopIteration: - break - if self.error_limited(node): - continue + for node in self.iter_nodes(ring, partition): start_node_timing = time.time() try: with ConnectionTimeout(self.app.conn_timeout): @@ -811,7 +812,7 @@ class Controller(object): statuses.append(possible_source.status) reasons.append(possible_source.reason) bodies.append('') - sources.append(possible_source) + sources.append((possible_source, node)) if not newest: # one good source is enough break else: @@ -819,7 +820,7 @@ class Controller(object): reasons.append(possible_source.reason) bodies.append(possible_source.read()) if possible_source.status == HTTP_INSUFFICIENT_STORAGE: - self.error_limit(node) + self.error_limit(node, _('ERROR Insufficient Storage')) elif is_server_error(possible_source.status): self.error_occurred(node, _('ERROR %(status)d %(body)s ' 'From %(type)s Server') % @@ -827,9 +828,9 @@ class Controller(object): 'body': bodies[-1][:1024], 'type': server_type}) if sources: - sources.sort(key=source_key) - source = sources.pop() - for src in sources: + sources.sort(key=lambda s: source_key(s[0])) + source, node = sources.pop() + for src, _junk in sources: self.close_swift_conn(src) res = Response(request=req, conditional_response=True) if req.method == 'GET' and \ diff --git a/swift/proxy/controllers/container.py b/swift/proxy/controllers/container.py index 61d1aa1198..b0ce7bea97 100644 --- a/swift/proxy/controllers/container.py +++ b/swift/proxy/controllers/container.py @@ -66,11 +66,10 @@ class ContainerController(Controller): """Handler for HTTP GET/HEAD requests.""" if not self.account_info(self.account_name)[1]: return HTTPNotFound(request=req) - part, nodes = self.app.container_ring.get_nodes( + part = self.app.container_ring.get_part( self.account_name, self.container_name) - nodes = self.app.sort_nodes(nodes) resp = self.GETorHEAD_base( - req, _('Container'), part, nodes, req.path_info, len(nodes)) + req, _('Container'), self.app.container_ring, part, req.path_info) if self.app.memcache: # set the memcache container size for ratelimiting cache_key = get_container_memcache_key(self.account_name, diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index 16d864ded7..9fb13b2af6 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -140,7 +140,7 @@ class SegmentedIterable(object): self.segment_dict['name'].lstrip('/').split('/', 1) else: container, obj = self.container, self.segment_dict['name'] - partition, nodes = self.controller.app.object_ring.get_nodes( + partition = self.controller.app.object_ring.get_part( self.controller.account_name, container, obj) path = '/%s/%s/%s' % (self.controller.account_name, container, obj) req = Request.blank(path) @@ -152,12 +152,9 @@ class SegmentedIterable(object): sleep(max(self.next_get_time - time.time(), 0)) self.next_get_time = time.time() + \ 1.0 / self.controller.app.rate_limit_segments_per_sec - nodes = self.controller.app.sort_nodes(nodes) resp = self.controller.GETorHEAD_base( - req, _('Object'), partition, - self.controller.iter_nodes(partition, nodes, - self.controller.app.object_ring), - path, len(nodes)) + req, _('Object'), self.controller.app.object_ring, partition, + path) if self.is_slo and resp.status_int == HTTP_NOT_FOUND: raise SloSegmentError(_( 'Could not load object segment %(path)s:' @@ -309,7 +306,7 @@ class ObjectController(Controller): yield item def _listing_pages_iter(self, lcontainer, lprefix, env): - lpartition, lnodes = self.app.container_ring.get_nodes( + lpartition = self.app.container_ring.get_part( self.account_name, lcontainer) marker = '' while True: @@ -321,10 +318,9 @@ class ObjectController(Controller): lreq.environ['QUERY_STRING'] = \ 'format=json&prefix=%s&marker=%s' % (quote(lprefix), quote(marker)) - lnodes = self.app.sort_nodes(lnodes) lresp = self.GETorHEAD_base( - lreq, _('Container'), lpartition, lnodes, lreq.path_info, - len(lnodes)) + lreq, _('Container'), self.app.container_ring, lpartition, + lreq.path_info) if 'swift.authorize' in env: lreq.acl = lresp.headers.get('x-container-read') aresp = env['swift.authorize'](lreq) @@ -385,13 +381,10 @@ class ObjectController(Controller): if aresp: return aresp - partition, nodes = self.app.object_ring.get_nodes( + partition = self.app.object_ring.get_part( self.account_name, self.container_name, self.object_name) - nodes = self.app.sort_nodes(nodes) resp = self.GETorHEAD_base( - req, _('Object'), partition, - self.iter_nodes(partition, nodes, self.app.object_ring), - req.path_info, len(nodes)) + req, _('Object'), self.app.object_ring, partition, req.path_info) if ';' in resp.headers.get('content-type', ''): # strip off swift_bytes from content-type @@ -424,11 +417,9 @@ class ObjectController(Controller): new_req = req.copy_get() new_req.method = 'GET' new_req.range = None - nodes = self.app.sort_nodes(nodes) new_resp = self.GETorHEAD_base( - new_req, _('Object'), partition, - self.iter_nodes(partition, nodes, self.app.object_ring), - req.path_info, len(nodes)) + new_req, _('Object'), self.app.object_ring, partition, + req.path_info) if new_resp.status_int // 100 == 2: try: listing = json.loads(new_resp.body) @@ -685,7 +676,7 @@ class ObjectController(Controller): conn.node = node return conn elif resp.status == HTTP_INSUFFICIENT_STORAGE: - self.error_limit(node) + self.error_limit(node, _('ERROR Insufficient Storage')) except: self.exception_occurred(node, _('Object'), _('Expect: 100-continue on %s') % path) @@ -744,8 +735,9 @@ class ObjectController(Controller): req.environ.get('swift_versioned_copy')): hreq = Request.blank(req.path_info, headers={'X-Newest': 'True'}, environ={'REQUEST_METHOD': 'HEAD'}) - hresp = self.GETorHEAD_base(hreq, _('Object'), partition, nodes, - hreq.path_info, len(nodes)) + hresp = self.GETorHEAD_base( + hreq, _('Object'), self.app.object_ring, partition, + hreq.path_info) # Used by container sync feature if 'x-timestamp' in req.headers: try: @@ -867,7 +859,7 @@ class ObjectController(Controller): source_resp.headers['X-Static-Large-Object'] req = new_req - node_iter = self.iter_nodes(partition, nodes, self.app.object_ring) + node_iter = self.iter_nodes(self.app.object_ring, partition) pile = GreenPile(len(nodes)) chunked = req.headers.get('transfer-encoding') diff --git a/swift/proxy/server.py b/swift/proxy/server.py index 607236ca47..b8fd8ad09b 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -115,6 +115,16 @@ class Application(object): self.sorting_method = conf.get('sorting_method', 'shuffle').lower() self.allow_static_large_object = config_true_value( conf.get('allow_static_large_object', 'true')) + value = conf.get('request_node_count', '2 * replicas').lower().split() + if len(value) == 1: + value = int(value[0]) + self.request_node_count = lambda r: value + elif len(value) == 3 and value[1] == '*' and value[2] == 'replicas': + value = int(value[0]) + self.request_node_count = lambda r: value * r.replica_count + else: + raise ValueError( + 'Invalid request_node_count value: %r' % ''.join(value)) def get_controller(self, path): """ diff --git a/test/unit/common/ring/test_ring.py b/test/unit/common/ring/test_ring.py index 4ea10c5d60..11ae6f6bd8 100644 --- a/test/unit/common/ring/test_ring.py +++ b/test/unit/common/ring/test_ring.py @@ -201,6 +201,13 @@ class TestRing(unittest.TestCase): self.assertEquals(len(self.ring.devs), 9) self.assertNotEquals(self.ring._mtime, orig_mtime) + def test_get_part(self): + part1 = self.ring.get_part('a') + nodes1 = self.ring.get_part_nodes(part1) + part2, nodes2 = self.ring.get_nodes('a') + self.assertEquals(part1, part2) + self.assertEquals(nodes1, nodes2) + def test_get_part_nodes(self): part, nodes = self.ring.get_nodes('a') self.assertEquals(nodes, self.ring.get_part_nodes(part)) diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index e1ed81a525..0a1d322eab 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -21,7 +21,7 @@ import sys import unittest import urlparse import signal -from contextlib import contextmanager +from contextlib import contextmanager, nested from gzip import GzipFile from shutil import rmtree import time @@ -30,6 +30,7 @@ from hashlib import md5 from tempfile import mkdtemp import random +import mock from eventlet import sleep, spawn, wsgi, listen import simplejson @@ -204,6 +205,13 @@ class FakeRing(object): self.replicas = replicas self.devs = {} + @property + def replica_count(self): + return self.replicas + + def get_part(self, account, container=None, obj=None): + return 1 + def get_nodes(self, account, container=None, obj=None): devs = [] for x in xrange(self.replicas): @@ -1872,12 +1880,13 @@ class TestObjectController(unittest.TestCase): 'container', 'object') collected_nodes = [] - for node in controller.iter_nodes(partition, nodes, - self.app.object_ring): + for node in controller.iter_nodes(self.app.object_ring, + partition): collected_nodes.append(node) self.assertEquals(len(collected_nodes), 5) self.app.object_ring.max_more_nodes = 20 + self.app.request_node_count = lambda r: 20 controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') @@ -1885,8 +1894,8 @@ class TestObjectController(unittest.TestCase): 'container', 'object') collected_nodes = [] - for node in controller.iter_nodes(partition, nodes, - self.app.object_ring): + for node in controller.iter_nodes(self.app.object_ring, + partition): collected_nodes.append(node) self.assertEquals(len(collected_nodes), 9) @@ -1900,8 +1909,8 @@ class TestObjectController(unittest.TestCase): 'container', 'object') collected_nodes = [] - for node in controller.iter_nodes(partition, nodes, - self.app.object_ring): + for node in controller.iter_nodes(self.app.object_ring, + partition): collected_nodes.append(node) self.assertEquals(len(collected_nodes), 5) self.assertEquals( @@ -1919,14 +1928,49 @@ class TestObjectController(unittest.TestCase): 'container', 'object') collected_nodes = [] - for node in controller.iter_nodes(partition, nodes, - self.app.object_ring): + for node in controller.iter_nodes(self.app.object_ring, + partition): collected_nodes.append(node) self.assertEquals(len(collected_nodes), 5) self.assertEquals(self.app.logger.log_dict['warning'], []) finally: self.app.object_ring.max_more_nodes = 0 + def test_iter_nodes_calls_sort_nodes(self): + with mock.patch.object(self.app, 'sort_nodes') as sort_nodes: + controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o') + for node in controller.iter_nodes(self.app.object_ring, 0): + pass + sort_nodes.assert_called_once_with( + self.app.object_ring.get_part_nodes(0)) + + def test_iter_nodes_skips_error_limited(self): + with mock.patch.object(self.app, 'sort_nodes', lambda n: n): + controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o') + first_nodes = list(controller.iter_nodes(self.app.object_ring, 0)) + second_nodes = list(controller.iter_nodes(self.app.object_ring, 0)) + self.assertTrue(first_nodes[0] in second_nodes) + + controller.error_limit(first_nodes[0], 'test') + second_nodes = list(controller.iter_nodes(self.app.object_ring, 0)) + self.assertTrue(first_nodes[0] not in second_nodes) + + def test_iter_nodes_gives_extra_if_error_limited_inline(self): + with nested( + mock.patch.object(self.app, 'sort_nodes', lambda n: n), + mock.patch.object(self.app, 'request_node_count', + lambda r: 6), + mock.patch.object(self.app.object_ring, 'max_more_nodes', 99)): + controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o') + first_nodes = list(controller.iter_nodes(self.app.object_ring, 0)) + second_nodes = [] + for node in controller.iter_nodes(self.app.object_ring, 0): + if not second_nodes: + controller.error_limit(node, 'test') + second_nodes.append(node) + self.assertEquals(len(first_nodes), 6) + self.assertEquals(len(second_nodes), 7) + def test_best_response_sets_etag(self): controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') @@ -5425,8 +5469,8 @@ class FakeObjectController(object): resp = Response(app_iter=iter(body)) return resp - def iter_nodes(self, partition, nodes, ring): - for node in nodes: + def iter_nodes(self, ring, partition): + for node in ring.get_part_nodes(partition): yield node for node in ring.get_more_nodes(partition): yield node