diff --git a/swift/common/request_helpers.py b/swift/common/request_helpers.py index 0d4d538eb7..7fa97c7317 100644 --- a/swift/common/request_helpers.py +++ b/swift/common/request_helpers.py @@ -40,7 +40,7 @@ from swift.common.utils import split_path, validate_device_partition, \ close_if_possible, maybe_multipart_byteranges_to_document_iters, \ multipart_byteranges_to_document_iters, parse_content_type, \ parse_content_range, csv_append, list_from_csv, Spliterator, quote, \ - RESERVED, config_true_value, md5, CloseableChain + RESERVED, config_true_value, md5, CloseableChain, select_ip_port from swift.common.wsgi import make_subrequest @@ -901,13 +901,39 @@ def update_ignore_range_header(req, name): req.headers[hdr] = csv_append(req.headers.get(hdr), name) +def is_use_replication_network(headers=None): + """ + Determine if replication network should be used. + + :param headers: a dict of headers + :return: the value of the ``x-backend-use-replication-network`` item from + ``headers``. If no ``headers`` are given or the item is not found then + False is returned. + """ + if headers: + for h, v in headers.items(): + if h.lower() == USE_REPLICATION_NETWORK_HEADER: + return config_true_value(v) + return False + + def get_ip_port(node, headers): - use_replication_network = False - for h, v in headers.items(): - if h.lower() == USE_REPLICATION_NETWORK_HEADER: - use_replication_network = config_true_value(v) - break - if use_replication_network: - return node['replication_ip'], node['replication_port'] - else: - return node['ip'], node['port'] + """ + Get the ip address and port that should be used for the given ``node``. + The normal ip address and port are returned unless the ``node`` or + ``headers`` indicate that the replication ip address and port should be + used. + + If the ``headers`` dict has an item with key + ``x-backend-use-replication-network`` and a truthy value then the + replication ip address and port are returned. Otherwise if the ``node`` + dict has an item with key ``use_replication`` and truthy value then the + replication ip address and port are returned. Otherwise the normal ip + address and port are returned. + + :param node: a dict describing a node + :param headers: a dict of headers + :return: a tuple of (ip address, port) + """ + return select_ip_port( + node, use_replication=is_use_replication_network(headers)) diff --git a/swift/common/utils.py b/swift/common/utils.py index d6e9a515ff..f6139b0f4a 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -2813,17 +2813,49 @@ def parse_socket_string(socket_string, default_port): return (host, port) -def node_to_string(node_dict, replication=False): - if replication: - ip = node_dict['replication_ip'] - port = node_dict['replication_port'] +def select_ip_port(node_dict, use_replication=False): + """ + Get the ip address and port that should be used for the given + ``node_dict``. + + If ``use_replication`` is True then the replication ip address and port are + returned. + + If ``use_replication`` is False (the default) and the ``node`` dict has an + item with key ``use_replication`` then that item's value will determine if + the replication ip address and port are returned. + + If neither ``use_replication`` nor ``node_dict['use_replication']`` + indicate otherwise then the normal ip address and port are returned. + + :param node_dict: a dict describing a node + :param use_replication: if True then the replication ip address and port + are returned. + :return: a tuple of (ip address, port) + """ + if use_replication or node_dict.get('use_replication', False): + node_ip = node_dict['replication_ip'] + node_port = node_dict['replication_port'] else: - ip = node_dict['ip'] - port = node_dict['port'] - if ':' in ip: + node_ip = node_dict['ip'] + node_port = node_dict['port'] + return node_ip, node_port + + +def node_to_string(node_dict, replication=False): + """ + Get a string representation of a node's location. + + :param node_dict: a dict describing a node + :param replication: if True then the replication ip address and port are + used, otherwise the normal ip address and port are used. + :return: a string of the form :/ + """ + node_ip, node_port = select_ip_port(node_dict, use_replication=replication) + if ':' in node_ip: # IPv6 - ip = '[%s]' % ip - return '{}:{}/{}'.format(ip, port, node_dict['device']) + node_ip = '[%s]' % node_ip + return '{}:{}/{}'.format(node_ip, node_port, node_dict['device']) def storage_directory(datadir, partition, name_hash): diff --git a/swift/proxy/controllers/account.py b/swift/proxy/controllers/account.py index f6dd3b2cfa..c14669f899 100644 --- a/swift/proxy/controllers/account.py +++ b/swift/proxy/controllers/account.py @@ -64,7 +64,7 @@ class AccountController(Controller): concurrency = self.app.account_ring.replica_count \ if self.app.get_policy_options(None).concurrent_gets else 1 node_iter = self.app.iter_nodes(self.app.account_ring, partition, - self.logger) + self.logger, req) params = req.params params['format'] = 'json' req.params = params diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py index 8c40072e7b..d7d893b726 100644 --- a/swift/proxy/controllers/base.py +++ b/swift/proxy/controllers/base.py @@ -63,10 +63,9 @@ from swift.common.request_helpers import strip_sys_meta_prefix, \ strip_user_meta_prefix, is_user_meta, is_sys_meta, is_sys_or_user_meta, \ http_response_to_document_iters, is_object_transient_sysmeta, \ strip_object_transient_sysmeta_prefix, get_ip_port, get_user_meta_prefix, \ - get_sys_meta_prefix + get_sys_meta_prefix, is_use_replication_network from swift.common.storage_policy import POLICIES - DEFAULT_RECHECK_ACCOUNT_EXISTENCE = 60 # seconds DEFAULT_RECHECK_CONTAINER_EXISTENCE = 60 # seconds DEFAULT_RECHECK_UPDATING_SHARD_RANGES = 3600 # seconds @@ -1635,18 +1634,21 @@ class NodeIter(object): :param ring: ring to get yield nodes from :param partition: ring partition to yield nodes for :param logger: a logger instance + :param request: yielded nodes will be annotated with `use_replication` + based on the `request` headers. :param node_iter: optional iterable of nodes to try. Useful if you want to filter or reorder the nodes. :param policy: an instance of :class:`BaseStoragePolicy`. This should be None for an account or container ring. """ - def __init__(self, app, ring, partition, logger, node_iter=None, + def __init__(self, app, ring, partition, logger, request, node_iter=None, policy=None): self.app = app self.ring = ring self.partition = partition self.logger = logger + self.request = request part_nodes = ring.get_part_nodes(partition) if node_iter is None: @@ -1726,13 +1728,27 @@ class NodeIter(object): if self.nodes_left <= 0: return + def _annotate_node(self, node): + """ + Helper function to set use_replication dict value for a node by looking + up the header value for x-backend-use-replication-network. + + :param node: node dictionary from the ring or node_iter. + :returns: node dictionary with replication network enabled/disabled + """ + # nodes may have come from a ring or a node_iter passed to the + # constructor: be careful not to mutate them! + return dict(node, use_replication=is_use_replication_network( + self.request.headers)) + def next(self): + node = None if self._node_provider: # give node provider the opportunity to inject a node node = self._node_provider() - if node: - return node - return next(self._node_iter) + if not node: + node = next(self._node_iter) + return self._annotate_node(node) def __next__(self): return self.next() @@ -1971,7 +1987,7 @@ class Controller(object): :returns: a swob.Response object """ nodes = GreenthreadSafeIterator( - node_iterator or self.app.iter_nodes(ring, part, self.logger) + node_iterator or self.app.iter_nodes(ring, part, self.logger, req) ) node_number = node_count or len(ring.get_part_nodes(part)) pile = GreenAsyncPile(node_number) diff --git a/swift/proxy/controllers/container.py b/swift/proxy/controllers/container.py index ff40b59d32..3803f40d59 100644 --- a/swift/proxy/controllers/container.py +++ b/swift/proxy/controllers/container.py @@ -103,7 +103,7 @@ class ContainerController(Controller): concurrency = self.app.container_ring.replica_count \ if self.app.get_policy_options(None).concurrent_gets else 1 node_iter = self.app.iter_nodes(self.app.container_ring, part, - self.logger) + self.logger, req) resp = self.GETorHEAD_base( req, 'Container', node_iter, part, req.swift_entity_path, concurrency) diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index c95e51b2b1..974680364a 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -176,7 +176,7 @@ class BaseObjectController(Controller): validate_internal_obj( self.account_name, self.container_name, self.object_name) - def iter_nodes_local_first(self, ring, partition, policy=None, + def iter_nodes_local_first(self, ring, partition, request, policy=None, local_handoffs_first=False): """ Yields nodes for a ring partition. @@ -190,6 +190,8 @@ class BaseObjectController(Controller): :param ring: ring to get nodes from :param partition: ring partition to yield nodes for + :param request: nodes will be annotated with `use_replication` based on + the `request` headers :param policy: optional, an instance of :class:`~swift.common.storage_policy.BaseStoragePolicy` :param local_handoffs_first: optional, if True prefer primaries and @@ -198,7 +200,7 @@ class BaseObjectController(Controller): policy_options = self.app.get_policy_options(policy) is_local = policy_options.write_affinity_is_local_fn if is_local is None: - return self.app.iter_nodes(ring, partition, self.logger, + return self.app.iter_nodes(ring, partition, self.logger, request, policy=policy) primary_nodes = ring.get_part_nodes(partition) @@ -232,7 +234,7 @@ class BaseObjectController(Controller): (node for node in all_nodes if node not in preferred_nodes) ) - return self.app.iter_nodes(ring, partition, self.logger, + return self.app.iter_nodes(ring, partition, self.logger, request, node_iter=node_iter, policy=policy) def GETorHEAD(self, req): @@ -252,7 +254,7 @@ class BaseObjectController(Controller): return aresp partition = obj_ring.get_part( self.account_name, self.container_name, self.object_name) - node_iter = self.app.iter_nodes(obj_ring, partition, self.logger, + node_iter = self.app.iter_nodes(obj_ring, partition, self.logger, req, policy=policy) resp = self._get_or_head_response(req, node_iter, partition, policy) @@ -720,7 +722,8 @@ class BaseObjectController(Controller): """ obj_ring = policy.object_ring node_iter = GreenthreadSafeIterator( - self.iter_nodes_local_first(obj_ring, partition, policy=policy)) + self.iter_nodes_local_first(obj_ring, partition, req, + policy=policy)) pile = GreenPile(len(nodes)) for nheaders in outgoing_headers: @@ -921,8 +924,8 @@ class BaseObjectController(Controller): local_handoffs = len(nodes) - len(local_primaries) node_count += local_handoffs node_iterator = self.iter_nodes_local_first( - obj_ring, partition, policy=policy, local_handoffs_first=True - ) + obj_ring, partition, req, policy=policy, + local_handoffs_first=True) headers = self._backend_requests( req, node_count, container_partition, container_nodes, diff --git a/swift/proxy/server.py b/swift/proxy/server.py index 0f10d87798..a5ba1d858d 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -743,9 +743,10 @@ class Application(object): return ok - def iter_nodes(self, ring, partition, logger, node_iter=None, policy=None): - return NodeIter(self, ring, partition, logger, node_iter=node_iter, - policy=policy) + def iter_nodes(self, ring, partition, logger, request, node_iter=None, + policy=None): + return NodeIter(self, ring, partition, logger, request=request, + node_iter=node_iter, policy=policy, ) def exception_occurred(self, node, typ, additional_info, **kwargs): diff --git a/test/unit/common/test_request_helpers.py b/test/unit/common/test_request_helpers.py index c7f268ea11..f526b02d0f 100644 --- a/test/unit/common/test_request_helpers.py +++ b/test/unit/common/test_request_helpers.py @@ -188,6 +188,21 @@ class TestRequestHelpers(unittest.TestCase): self.assertFalse('c' in to_req.headers) self.assertFalse('C' in to_req.headers) + def test_is_use_replication_network(self): + self.assertFalse(rh.is_use_replication_network()) + self.assertFalse(rh.is_use_replication_network({})) + self.assertFalse(rh.is_use_replication_network( + {'x-backend-use-replication-network': 'false'})) + self.assertFalse(rh.is_use_replication_network( + {'x-backend-use-replication-network': 'no'})) + + self.assertTrue(rh.is_use_replication_network( + {'x-backend-use-replication-network': 'true'})) + self.assertTrue(rh.is_use_replication_network( + {'x-backend-use-replication-network': 'yes'})) + self.assertTrue(rh.is_use_replication_network( + {'X-Backend-Use-Replication-Network': 'True'})) + def test_get_ip_port(self): node = { 'ip': '1.2.3.4', @@ -201,6 +216,17 @@ class TestRequestHelpers(unittest.TestCase): self.assertEqual(('1.2.3.4', 6000), rh.get_ip_port(node, { rh.USE_REPLICATION_NETWORK_HEADER: 'false'})) + # node trumps absent header and False header + node['use_replication'] = True + self.assertEqual(('5.6.7.8', 7000), rh.get_ip_port(node, {})) + self.assertEqual(('5.6.7.8', 7000), rh.get_ip_port(node, { + rh.USE_REPLICATION_NETWORK_HEADER: 'false'})) + + # True header trumps node + node['use_replication'] = False + self.assertEqual(('5.6.7.8', 7000), rh.get_ip_port(node, { + rh.USE_REPLICATION_NETWORK_HEADER: 'true'})) + @patch_policies(with_ec_default=True) def test_get_name_and_placement_object_req(self): path = '/device/part/account/container/object' diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index 3c1ec8228f..018a0804cc 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -2208,6 +2208,25 @@ class TestUtils(unittest.TestCase): self.assertEqual(utils.storage_directory('objects', '1', 'ABCDEF'), 'objects/1/DEF/ABCDEF') + def test_select_node_ip(self): + dev = { + 'ip': '127.0.0.1', + 'port': 6200, + 'replication_ip': '127.0.1.1', + 'replication_port': 6400, + 'device': 'sdb', + } + self.assertEqual(('127.0.0.1', 6200), utils.select_ip_port(dev)) + self.assertEqual(('127.0.1.1', 6400), + utils.select_ip_port(dev, use_replication=True)) + dev['use_replication'] = False + self.assertEqual(('127.0.1.1', 6400), + utils.select_ip_port(dev, use_replication=True)) + dev['use_replication'] = True + self.assertEqual(('127.0.1.1', 6400), utils.select_ip_port(dev)) + self.assertEqual(('127.0.1.1', 6400), + utils.select_ip_port(dev, use_replication=False)) + def test_node_to_string(self): dev = { 'id': 3, @@ -2225,6 +2244,16 @@ class TestUtils(unittest.TestCase): self.assertEqual(utils.node_to_string(dev), '127.0.0.1:6200/sdb') self.assertEqual(utils.node_to_string(dev, replication=True), '127.0.1.1:6400/sdb') + dev['use_replication'] = False + self.assertEqual(utils.node_to_string(dev), '127.0.0.1:6200/sdb') + self.assertEqual(utils.node_to_string(dev, replication=True), + '127.0.1.1:6400/sdb') + dev['use_replication'] = True + self.assertEqual(utils.node_to_string(dev), '127.0.1.1:6400/sdb') + # Node dict takes precedence + self.assertEqual(utils.node_to_string(dev, replication=False), + '127.0.1.1:6400/sdb') + dev = { 'id': 3, 'region': 1, diff --git a/test/unit/proxy/controllers/test_base.py b/test/unit/proxy/controllers/test_base.py index fbc0a8c186..8ed3528422 100644 --- a/test/unit/proxy/controllers/test_base.py +++ b/test/unit/proxy/controllers/test_base.py @@ -1547,7 +1547,8 @@ class TestNodeIter(BaseTest): def test_iter_default_fake_ring(self): for ring in (self.account_ring, self.container_ring): self.assertEqual(ring.replica_count, 3.0) - node_iter = NodeIter(self.app, ring, 0, self.logger) + node_iter = NodeIter(self.app, ring, 0, self.logger, + request=Request.blank('')) self.assertEqual(6, node_iter.nodes_left) self.assertEqual(3, node_iter.primaries_left) count = 0 @@ -1562,7 +1563,7 @@ class TestNodeIter(BaseTest): ring = FakeRing(replicas=3, max_more_nodes=20) # handoffs available policy = StoragePolicy(0, 'zero', object_ring=ring) node_iter = NodeIter(self.app, policy.object_ring, 0, self.logger, - policy=policy) + policy=policy, request=Request.blank('')) self.assertEqual(6, node_iter.nodes_left) self.assertEqual(3, node_iter.primaries_left) primary_indexes = set() @@ -1586,11 +1587,11 @@ class TestNodeIter(BaseTest): # sanity node_iter = NodeIter(self.app, policy.object_ring, 0, self.logger, - policy=policy) + policy=policy, request=Request.blank('')) self.assertEqual(16, len([n for n in node_iter])) node_iter = NodeIter(self.app, policy.object_ring, 0, self.logger, - policy=policy) + policy=policy, request=Request.blank('')) self.assertEqual(16, node_iter.nodes_left) self.assertEqual(8, node_iter.primaries_left) pile = GreenAsyncPile(5) @@ -1615,3 +1616,50 @@ class TestNodeIter(BaseTest): for node in node_iter: nodes.append(node) self.assertEqual(17, len(nodes)) + + def test_annotate_node_with_use_replication(self): + ring = FakeRing(replicas=8, max_more_nodes=20) + policy = StoragePolicy(0, 'ec', object_ring=ring) + + node_iter = NodeIter(self.app, policy.object_ring, 0, self.logger, + policy=policy, request=Request.blank('')) + for node in node_iter: + self.assertIn('use_replication', node) + self.assertFalse(node['use_replication']) + + req = Request.blank('a/c') + node_iter = NodeIter(self.app, policy.object_ring, 0, self.logger, + policy=policy, request=req) + for node in node_iter: + self.assertIn('use_replication', node) + self.assertFalse(node['use_replication']) + + req = Request.blank( + 'a/c', headers={'x-backend-use-replication-network': 'False'}) + node_iter = NodeIter(self.app, policy.object_ring, 0, self.logger, + policy=policy, request=req) + for node in node_iter: + self.assertIn('use_replication', node) + self.assertFalse(node['use_replication']) + + req = Request.blank( + 'a/c', headers={'x-backend-use-replication-network': 'yes'}) + node_iter = NodeIter(self.app, policy.object_ring, 0, self.logger, + policy=policy, request=req) + for node in node_iter: + self.assertIn('use_replication', node) + self.assertTrue(node['use_replication']) + + def test_iter_does_not_mutate_supplied_nodes(self): + ring = FakeRing(replicas=8, max_more_nodes=20) + policy = StoragePolicy(0, 'ec', object_ring=ring) + other_iter = ring.get_part_nodes(0) + node_iter = NodeIter(self.app, policy.object_ring, 0, self.logger, + policy=policy, node_iter=iter(other_iter), + request=Request.blank('')) + nodes = list(node_iter) + self.assertEqual(len(other_iter), len(nodes)) + for node in nodes: + self.assertIn('use_replication', node) + self.assertFalse(node['use_replication']) + self.assertEqual(other_iter, ring.get_part_nodes(0)) diff --git a/test/unit/proxy/controllers/test_obj.py b/test/unit/proxy/controllers/test_obj.py index 365cba36d6..bf32a059a4 100644 --- a/test/unit/proxy/controllers/test_obj.py +++ b/test/unit/proxy/controllers/test_obj.py @@ -47,6 +47,7 @@ from swift.proxy.controllers.base import \ get_container_info as _real_get_container_info from swift.common.storage_policy import POLICIES, ECDriverError, \ StoragePolicy, ECStoragePolicy +from swift.common.swob import Request from test.debug_logger import debug_logger from test.unit import ( FakeRing, fake_http_connect, patch_policies, SlowBody, FakeStatus, @@ -219,8 +220,10 @@ class CommonObjectControllerMixin(BaseObjectControllerMixin): all_nodes = object_ring.get_part_nodes(1) all_nodes.extend(object_ring.get_more_nodes(1)) + for node in all_nodes: + node['use_replication'] = False local_first_nodes = list(controller.iter_nodes_local_first( - object_ring, 1)) + object_ring, 1, Request.blank(''))) self.maxDiff = None @@ -244,6 +247,8 @@ class CommonObjectControllerMixin(BaseObjectControllerMixin): all_nodes = object_ring.get_part_nodes(1) all_nodes.extend(object_ring.get_more_nodes(1)) + for node in all_nodes: + node['use_replication'] = False # limit to the number we're going to look at in this request nodes_requested = self.app.request_node_count(object_ring.replicas) @@ -256,7 +261,7 @@ class CommonObjectControllerMixin(BaseObjectControllerMixin): # finally, create the local_first_nodes iter and flatten it out local_first_nodes = list(controller.iter_nodes_local_first( - object_ring, 1)) + object_ring, 1, Request.blank(''))) # the local nodes move up in the ordering self.assertEqual([1] * (self.replicas() + 1), [ @@ -267,6 +272,21 @@ class CommonObjectControllerMixin(BaseObjectControllerMixin): self.assertEqual(sorted(all_nodes, key=lambda dev: dev['id']), sorted(local_first_nodes, key=lambda dev: dev['id'])) + for node in all_nodes: + node['use_replication'] = True + + req = Request.blank( + '/v1/a/c', headers={'x-backend-use-replication-network': 'yes'}) + local_first_nodes = list(controller.iter_nodes_local_first( + object_ring, 1, request=req)) + self.assertEqual([1] * (self.replicas() + 1), [ + node['region'] for node in local_first_nodes[ + :self.replicas() + 1]]) + # we don't skip any nodes + self.assertEqual(len(all_nodes), len(local_first_nodes)) + self.assertEqual(sorted(all_nodes, key=lambda dev: dev['id']), + sorted(local_first_nodes, key=lambda dev: dev['id'])) + def test_iter_nodes_local_first_best_effort(self): controller = self.controller_cls( self.app, 'a', 'c', 'o') @@ -277,9 +297,11 @@ class CommonObjectControllerMixin(BaseObjectControllerMixin): object_ring = self.policy.object_ring all_nodes = object_ring.get_part_nodes(1) all_nodes.extend(object_ring.get_more_nodes(1)) + for node in all_nodes: + node['use_replication'] = False local_first_nodes = list(controller.iter_nodes_local_first( - object_ring, 1)) + object_ring, 1, request=Request.blank(''))) # we won't have quite enough local nodes... self.assertEqual(len(all_nodes), self.replicas() + @@ -307,9 +329,12 @@ class CommonObjectControllerMixin(BaseObjectControllerMixin): object_ring = policy.object_ring all_nodes = object_ring.get_part_nodes(1) all_nodes.extend(object_ring.get_more_nodes(1)) + for node in all_nodes: + node['use_replication'] = False local_first_nodes = list(controller.iter_nodes_local_first( - object_ring, 1, local_handoffs_first=True)) + object_ring, 1, local_handoffs_first=True, + request=Request.blank(''))) self.maxDiff = None @@ -326,12 +351,15 @@ class CommonObjectControllerMixin(BaseObjectControllerMixin): primary_nodes = object_ring.get_part_nodes(1) handoff_nodes_iter = object_ring.get_more_nodes(1) all_nodes = primary_nodes + list(handoff_nodes_iter) + for node in all_nodes: + node['use_replication'] = False handoff_nodes_iter = object_ring.get_more_nodes(1) local_handoffs = [n for n in handoff_nodes_iter if policy_conf.write_affinity_is_local_fn(n)] prefered_nodes = list(controller.iter_nodes_local_first( - object_ring, 1, local_handoffs_first=True)) + object_ring, 1, local_handoffs_first=True, + request=Request.blank(''))) self.assertEqual(len(all_nodes), self.replicas() + POLICIES.default.object_ring.max_more_nodes) @@ -362,12 +390,17 @@ class CommonObjectControllerMixin(BaseObjectControllerMixin): primary_nodes = object_ring.get_part_nodes(1) handoff_nodes_iter = object_ring.get_more_nodes(1) all_nodes = primary_nodes + list(handoff_nodes_iter) + for node in all_nodes: + node['use_replication'] = False handoff_nodes_iter = object_ring.get_more_nodes(1) local_handoffs = [n for n in handoff_nodes_iter if policy_conf.write_affinity_is_local_fn(n)] + for node in local_handoffs: + node['use_replication'] = False prefered_nodes = list(controller.iter_nodes_local_first( - object_ring, 1, local_handoffs_first=True)) + object_ring, 1, local_handoffs_first=True, + request=Request.blank(''))) self.assertEqual(len(all_nodes), self.replicas() + POLICIES.default.object_ring.max_more_nodes) @@ -991,7 +1024,7 @@ class CommonObjectControllerMixin(BaseObjectControllerMixin): # finally, create the local_first_nodes iter and flatten it out local_first_nodes = list(controller.iter_nodes_local_first( - object_ring, 1, policy)) + object_ring, 1, Request.blank(''), policy)) # check that the required number of local nodes were moved up the order node_regions = [node['region'] for node in local_first_nodes] @@ -2586,7 +2619,8 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): controller = self.controller_cls( self.app, 'a', 'c', 'o') safe_iter = utils.GreenthreadSafeIterator(self.app.iter_nodes( - self.policy.object_ring, 0, self.logger, policy=self.policy)) + self.policy.object_ring, 0, self.logger, policy=self.policy, + request=Request.blank(''))) controller._fragment_GET_request = lambda *a, **k: next(safe_iter) pile = utils.GreenAsyncPile(self.policy.ec_ndata) for i in range(self.policy.ec_ndata): diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index e913258278..5ac90171dc 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -248,7 +248,7 @@ class TestController(unittest.TestCase): self.account_ring = FakeRing() self.container_ring = FakeRing() self.memcache = FakeMemcache() - app = proxy_server.Application(None, + app = proxy_server.Application(None, logger=debug_logger(), account_ring=self.account_ring, container_ring=self.container_ring) self.controller = swift.proxy.controllers.Controller(app) @@ -1166,6 +1166,56 @@ class TestProxyServer(unittest.TestCase): self.assertEqual(controller.__name__, 'InfoController') + def test_exception_occurred_replication_ip_port_logging(self): + logger = debug_logger('test') + app = proxy_server.Application( + {}, + account_ring=FakeRing(separate_replication=True), + container_ring=FakeRing(separate_replication=True), + logger=logger) + app.sort_nodes = lambda nodes, policy: nodes + part = app.container_ring.get_part('a', 'c') + nodes = app.container_ring.get_part_nodes(part) + self.assertNotEqual(nodes[0]['ip'], nodes[0]['replication_ip']) + self.assertEqual(0, sum([node_error_count(app, node) + for node in nodes])) # sanity + + # no use_replication header... + req = Request.blank('/v1/a/c') + with mocked_http_conn(200, 503, 200) as mocked_conn: + req.get_response(app) + + expected = [(n['ip'], n['port']) for n in nodes[:2]] + actual = [(req['ip'], req['port']) for req in mocked_conn.requests[1:]] + self.assertEqual(expected, actual) + line = logger.get_lines_for_level('error')[-1] + self.assertIn('Container Server', line) + self.assertIn('%s:%s/%s' % (nodes[0]['ip'], + nodes[0]['port'], + nodes[0]['device']), line) + self.assertEqual(1, sum([node_error_count(app, node) + for node in nodes])) + annotated_nodes = [dict(node, use_replication=True) for node in nodes] + self.assertEqual(0, sum([node_error_count(app, node) + for node in annotated_nodes])) + + logger.clear() + req = Request.blank( + '/v1/a/c', + headers={'x-backend-use-replication-network': True}) + with mocked_http_conn(200, 503, 200): + req.get_response(app) + line = logger.get_lines_for_level('error')[-1] + self.assertIn('Container Server', line) + self.assertIn('%s:%s/%s' % (nodes[0]['replication_ip'], + nodes[0]['replication_port'], + nodes[0]['device']), line) + self.assertEqual(1, sum([node_error_count(app, node) + for node in nodes])) + annotated_nodes = [dict(node, use_replication=True) for node in nodes] + self.assertEqual(1, sum([node_error_count(app, node) + for node in annotated_nodes])) + def test_exception_occurred(self): def do_test(additional_info): logger = debug_logger('test') @@ -5302,7 +5352,8 @@ class TestReplicatedObjectController( 'object') collected_nodes = [] for node in self.app.iter_nodes(object_ring, partition, - self.logger): + self.logger, + request=Request.blank('')): collected_nodes.append(node) self.assertEqual(len(collected_nodes), 5) @@ -5313,7 +5364,8 @@ class TestReplicatedObjectController( 'object') collected_nodes = [] for node in self.app.iter_nodes(object_ring, partition, - self.logger): + self.logger, + request=Request.blank('')): collected_nodes.append(node) self.assertEqual(len(collected_nodes), 9) @@ -5327,7 +5379,8 @@ class TestReplicatedObjectController( 'object') collected_nodes = [] for node in self.app.iter_nodes(object_ring, partition, - self.logger): + self.logger, + request=Request.blank('')): collected_nodes.append(node) self.assertEqual(len(collected_nodes), 7) self.assertEqual(self.app.logger.log_dict['warning'], []) @@ -5343,7 +5396,8 @@ class TestReplicatedObjectController( collected_nodes = [] for node in self.app.iter_nodes(object_ring, partition, - self.logger): + self.logger, + request=Request.blank('')): collected_nodes.append(node) self.assertEqual(len(collected_nodes), 7) self.assertEqual( @@ -5364,7 +5418,8 @@ class TestReplicatedObjectController( collected_nodes = [] for node in self.app.iter_nodes(object_ring, partition, - self.logger): + self.logger, + request=Request.blank('')): collected_nodes.append(node) self.assertEqual(len(collected_nodes), 7) self.assertEqual( @@ -5389,7 +5444,8 @@ class TestReplicatedObjectController( collected_nodes = [] for node in self.app.iter_nodes(object_ring, partition, - self.logger): + self.logger, + request=Request.blank('')): collected_nodes.append(node) self.assertEqual(len(collected_nodes), 10) self.assertEqual( @@ -5418,7 +5474,8 @@ class TestReplicatedObjectController( with mock.patch.object(self.app, 'sort_nodes', side_effect=fake_sort_nodes): object_ring = self.app.get_object_ring(None) - for node in self.app.iter_nodes(object_ring, 0, self.logger): + for node in self.app.iter_nodes(object_ring, 0, self.logger, + request=Request.blank('')): pass self.assertEqual(called, [ mock.call(object_ring.get_part_nodes(0), policy=None) @@ -5429,9 +5486,9 @@ class TestReplicatedObjectController( lambda n, *args, **kwargs: n): object_ring = self.app.get_object_ring(None) first_nodes = list(self.app.iter_nodes( - object_ring, 0, self.logger)) + object_ring, 0, self.logger, request=Request.blank(''))) second_nodes = list(self.app.iter_nodes( - object_ring, 0, self.logger)) + object_ring, 0, self.logger, request=Request.blank(''))) self.assertIn(first_nodes[0], second_nodes) self.assertEqual( @@ -5451,13 +5508,13 @@ class TestReplicatedObjectController( % (node_to_string(first_nodes[0]), 'test')), line) second_nodes = list(self.app.iter_nodes( - object_ring, 0, self.logger)) + object_ring, 0, self.logger, request=Request.blank(''))) self.assertNotIn(first_nodes[0], second_nodes) self.assertEqual( 1, self.logger.get_increment_counts().get( 'error_limiter.is_limited', 0)) third_nodes = list(self.app.iter_nodes( - object_ring, 0, self.logger)) + object_ring, 0, self.logger, request=Request.blank(''))) self.assertNotIn(first_nodes[0], third_nodes) self.assertEqual( 2, self.logger.get_increment_counts().get( @@ -5471,34 +5528,67 @@ class TestReplicatedObjectController( lambda r: 6), \ mock.patch.object(object_ring, 'max_more_nodes', 99): first_nodes = list(self.app.iter_nodes( - object_ring, 0, self.logger)) + object_ring, 0, self.logger, request=Request.blank(''))) second_nodes = [] - for node in self.app.iter_nodes(object_ring, 0, self.logger): + for node in self.app.iter_nodes(object_ring, 0, self.logger, + request=Request.blank('')): if not second_nodes: self.app.error_limit(node, 'test') second_nodes.append(node) self.assertEqual(len(first_nodes), 6) self.assertEqual(len(second_nodes), 7) - def test_iter_nodes_with_custom_node_iter(self): + def test_iter_nodes_without_replication_network(self): object_ring = self.app.get_object_ring(None) + node_list = [dict(id=n, ip='1.2.3.4', port=n, device='D', + use_replication=False) + for n in range(10)] + expected = [dict(n) for n in node_list] + with mock.patch.object(self.app, 'sort_nodes', + lambda n, *args, **kwargs: n), \ + mock.patch.object(self.app, 'request_node_count', + lambda r: 3): + got_nodes = list(self.app.iter_nodes( + object_ring, 0, self.logger, Request.blank(''), + node_iter=iter(node_list))) + self.assertEqual(expected[:3], got_nodes) + + req = Request.blank('/v1/a/c') node_list = [dict(id=n, ip='1.2.3.4', port=n, device='D') for n in range(10)] with mock.patch.object(self.app, 'sort_nodes', lambda n, *args, **kwargs: n), \ mock.patch.object(self.app, 'request_node_count', - lambda r: 3): - got_nodes = list(self.app.iter_nodes(object_ring, 0, self.logger, - node_iter=iter(node_list))) - self.assertEqual(node_list[:3], got_nodes) + lambda r: 1000000): + got_nodes = list(self.app.iter_nodes( + object_ring, 0, self.logger, req, node_iter=iter(node_list))) + self.assertEqual(expected, got_nodes) + def test_iter_nodes_with_replication_network(self): + object_ring = self.app.get_object_ring(None) + node_list = [dict(id=n, ip='1.2.3.4', port=n, device='D', + use_replication=False) + for n in range(10)] + req = Request.blank( + '/v1/a/c', headers={'x-backend-use-replication-network': 'true'}) with mock.patch.object(self.app, 'sort_nodes', lambda n, *args, **kwargs: n), \ mock.patch.object(self.app, 'request_node_count', - lambda r: 1000000): - got_nodes = list(self.app.iter_nodes(object_ring, 0, self.logger, - node_iter=iter(node_list))) - self.assertEqual(node_list, got_nodes) + lambda r: 3): + got_nodes = list(self.app.iter_nodes( + object_ring, 0, self.logger, req, node_iter=iter(node_list))) + expected = [dict(n, use_replication=True) for n in node_list] + self.assertEqual(expected[:3], got_nodes) + req = Request.blank( + '/v1/a/c', headers={'x-backend-use-replication-network': 'false'}) + expected = [dict(n, use_replication=False) for n in node_list] + with mock.patch.object(self.app, 'sort_nodes', + lambda n, *args, **kwargs: n), \ + mock.patch.object(self.app, 'request_node_count', + lambda r: 13): + got_nodes = list(self.app.iter_nodes( + object_ring, 0, self.logger, req, node_iter=iter(node_list))) + self.assertEqual(expected, got_nodes) def test_best_response_sets_headers(self): controller = ReplicatedObjectController(