proxy-server exception logging shows replication_ip/port
Adding a "use_replication" field to the node dict, a helper function to set use_replication dict value for a node copy by looking up the header value for x-backend-use-replication-network Change-Id: Ie05af464765dc10cf585be851f462033fc6bdec7
This commit is contained in:
parent
d9bf70ae2b
commit
9ec90d4d56
@ -40,7 +40,7 @@ from swift.common.utils import split_path, validate_device_partition, \
|
||||
close_if_possible, maybe_multipart_byteranges_to_document_iters, \
|
||||
multipart_byteranges_to_document_iters, parse_content_type, \
|
||||
parse_content_range, csv_append, list_from_csv, Spliterator, quote, \
|
||||
RESERVED, config_true_value, md5, CloseableChain
|
||||
RESERVED, config_true_value, md5, CloseableChain, select_ip_port
|
||||
from swift.common.wsgi import make_subrequest
|
||||
|
||||
|
||||
@ -901,13 +901,39 @@ def update_ignore_range_header(req, name):
|
||||
req.headers[hdr] = csv_append(req.headers.get(hdr), name)
|
||||
|
||||
|
||||
def is_use_replication_network(headers=None):
|
||||
"""
|
||||
Determine if replication network should be used.
|
||||
|
||||
:param headers: a dict of headers
|
||||
:return: the value of the ``x-backend-use-replication-network`` item from
|
||||
``headers``. If no ``headers`` are given or the item is not found then
|
||||
False is returned.
|
||||
"""
|
||||
if headers:
|
||||
for h, v in headers.items():
|
||||
if h.lower() == USE_REPLICATION_NETWORK_HEADER:
|
||||
return config_true_value(v)
|
||||
return False
|
||||
|
||||
|
||||
def get_ip_port(node, headers):
|
||||
use_replication_network = False
|
||||
for h, v in headers.items():
|
||||
if h.lower() == USE_REPLICATION_NETWORK_HEADER:
|
||||
use_replication_network = config_true_value(v)
|
||||
break
|
||||
if use_replication_network:
|
||||
return node['replication_ip'], node['replication_port']
|
||||
else:
|
||||
return node['ip'], node['port']
|
||||
"""
|
||||
Get the ip address and port that should be used for the given ``node``.
|
||||
The normal ip address and port are returned unless the ``node`` or
|
||||
``headers`` indicate that the replication ip address and port should be
|
||||
used.
|
||||
|
||||
If the ``headers`` dict has an item with key
|
||||
``x-backend-use-replication-network`` and a truthy value then the
|
||||
replication ip address and port are returned. Otherwise if the ``node``
|
||||
dict has an item with key ``use_replication`` and truthy value then the
|
||||
replication ip address and port are returned. Otherwise the normal ip
|
||||
address and port are returned.
|
||||
|
||||
:param node: a dict describing a node
|
||||
:param headers: a dict of headers
|
||||
:return: a tuple of (ip address, port)
|
||||
"""
|
||||
return select_ip_port(
|
||||
node, use_replication=is_use_replication_network(headers))
|
||||
|
@ -2813,17 +2813,49 @@ def parse_socket_string(socket_string, default_port):
|
||||
return (host, port)
|
||||
|
||||
|
||||
def node_to_string(node_dict, replication=False):
|
||||
if replication:
|
||||
ip = node_dict['replication_ip']
|
||||
port = node_dict['replication_port']
|
||||
def select_ip_port(node_dict, use_replication=False):
|
||||
"""
|
||||
Get the ip address and port that should be used for the given
|
||||
``node_dict``.
|
||||
|
||||
If ``use_replication`` is True then the replication ip address and port are
|
||||
returned.
|
||||
|
||||
If ``use_replication`` is False (the default) and the ``node`` dict has an
|
||||
item with key ``use_replication`` then that item's value will determine if
|
||||
the replication ip address and port are returned.
|
||||
|
||||
If neither ``use_replication`` nor ``node_dict['use_replication']``
|
||||
indicate otherwise then the normal ip address and port are returned.
|
||||
|
||||
:param node_dict: a dict describing a node
|
||||
:param use_replication: if True then the replication ip address and port
|
||||
are returned.
|
||||
:return: a tuple of (ip address, port)
|
||||
"""
|
||||
if use_replication or node_dict.get('use_replication', False):
|
||||
node_ip = node_dict['replication_ip']
|
||||
node_port = node_dict['replication_port']
|
||||
else:
|
||||
ip = node_dict['ip']
|
||||
port = node_dict['port']
|
||||
if ':' in ip:
|
||||
node_ip = node_dict['ip']
|
||||
node_port = node_dict['port']
|
||||
return node_ip, node_port
|
||||
|
||||
|
||||
def node_to_string(node_dict, replication=False):
|
||||
"""
|
||||
Get a string representation of a node's location.
|
||||
|
||||
:param node_dict: a dict describing a node
|
||||
:param replication: if True then the replication ip address and port are
|
||||
used, otherwise the normal ip address and port are used.
|
||||
:return: a string of the form <ip address>:<port>/<device>
|
||||
"""
|
||||
node_ip, node_port = select_ip_port(node_dict, use_replication=replication)
|
||||
if ':' in node_ip:
|
||||
# IPv6
|
||||
ip = '[%s]' % ip
|
||||
return '{}:{}/{}'.format(ip, port, node_dict['device'])
|
||||
node_ip = '[%s]' % node_ip
|
||||
return '{}:{}/{}'.format(node_ip, node_port, node_dict['device'])
|
||||
|
||||
|
||||
def storage_directory(datadir, partition, name_hash):
|
||||
|
@ -64,7 +64,7 @@ class AccountController(Controller):
|
||||
concurrency = self.app.account_ring.replica_count \
|
||||
if self.app.get_policy_options(None).concurrent_gets else 1
|
||||
node_iter = self.app.iter_nodes(self.app.account_ring, partition,
|
||||
self.logger)
|
||||
self.logger, req)
|
||||
params = req.params
|
||||
params['format'] = 'json'
|
||||
req.params = params
|
||||
|
@ -63,10 +63,9 @@ from swift.common.request_helpers import strip_sys_meta_prefix, \
|
||||
strip_user_meta_prefix, is_user_meta, is_sys_meta, is_sys_or_user_meta, \
|
||||
http_response_to_document_iters, is_object_transient_sysmeta, \
|
||||
strip_object_transient_sysmeta_prefix, get_ip_port, get_user_meta_prefix, \
|
||||
get_sys_meta_prefix
|
||||
get_sys_meta_prefix, is_use_replication_network
|
||||
from swift.common.storage_policy import POLICIES
|
||||
|
||||
|
||||
DEFAULT_RECHECK_ACCOUNT_EXISTENCE = 60 # seconds
|
||||
DEFAULT_RECHECK_CONTAINER_EXISTENCE = 60 # seconds
|
||||
DEFAULT_RECHECK_UPDATING_SHARD_RANGES = 3600 # seconds
|
||||
@ -1635,18 +1634,21 @@ class NodeIter(object):
|
||||
:param ring: ring to get yield nodes from
|
||||
:param partition: ring partition to yield nodes for
|
||||
:param logger: a logger instance
|
||||
:param request: yielded nodes will be annotated with `use_replication`
|
||||
based on the `request` headers.
|
||||
:param node_iter: optional iterable of nodes to try. Useful if you
|
||||
want to filter or reorder the nodes.
|
||||
:param policy: an instance of :class:`BaseStoragePolicy`. This should be
|
||||
None for an account or container ring.
|
||||
"""
|
||||
|
||||
def __init__(self, app, ring, partition, logger, node_iter=None,
|
||||
def __init__(self, app, ring, partition, logger, request, node_iter=None,
|
||||
policy=None):
|
||||
self.app = app
|
||||
self.ring = ring
|
||||
self.partition = partition
|
||||
self.logger = logger
|
||||
self.request = request
|
||||
|
||||
part_nodes = ring.get_part_nodes(partition)
|
||||
if node_iter is None:
|
||||
@ -1726,13 +1728,27 @@ class NodeIter(object):
|
||||
if self.nodes_left <= 0:
|
||||
return
|
||||
|
||||
def _annotate_node(self, node):
|
||||
"""
|
||||
Helper function to set use_replication dict value for a node by looking
|
||||
up the header value for x-backend-use-replication-network.
|
||||
|
||||
:param node: node dictionary from the ring or node_iter.
|
||||
:returns: node dictionary with replication network enabled/disabled
|
||||
"""
|
||||
# nodes may have come from a ring or a node_iter passed to the
|
||||
# constructor: be careful not to mutate them!
|
||||
return dict(node, use_replication=is_use_replication_network(
|
||||
self.request.headers))
|
||||
|
||||
def next(self):
|
||||
node = None
|
||||
if self._node_provider:
|
||||
# give node provider the opportunity to inject a node
|
||||
node = self._node_provider()
|
||||
if node:
|
||||
return node
|
||||
return next(self._node_iter)
|
||||
if not node:
|
||||
node = next(self._node_iter)
|
||||
return self._annotate_node(node)
|
||||
|
||||
def __next__(self):
|
||||
return self.next()
|
||||
@ -1971,7 +1987,7 @@ class Controller(object):
|
||||
:returns: a swob.Response object
|
||||
"""
|
||||
nodes = GreenthreadSafeIterator(
|
||||
node_iterator or self.app.iter_nodes(ring, part, self.logger)
|
||||
node_iterator or self.app.iter_nodes(ring, part, self.logger, req)
|
||||
)
|
||||
node_number = node_count or len(ring.get_part_nodes(part))
|
||||
pile = GreenAsyncPile(node_number)
|
||||
|
@ -103,7 +103,7 @@ class ContainerController(Controller):
|
||||
concurrency = self.app.container_ring.replica_count \
|
||||
if self.app.get_policy_options(None).concurrent_gets else 1
|
||||
node_iter = self.app.iter_nodes(self.app.container_ring, part,
|
||||
self.logger)
|
||||
self.logger, req)
|
||||
resp = self.GETorHEAD_base(
|
||||
req, 'Container', node_iter, part,
|
||||
req.swift_entity_path, concurrency)
|
||||
|
@ -176,7 +176,7 @@ class BaseObjectController(Controller):
|
||||
validate_internal_obj(
|
||||
self.account_name, self.container_name, self.object_name)
|
||||
|
||||
def iter_nodes_local_first(self, ring, partition, policy=None,
|
||||
def iter_nodes_local_first(self, ring, partition, request, policy=None,
|
||||
local_handoffs_first=False):
|
||||
"""
|
||||
Yields nodes for a ring partition.
|
||||
@ -190,6 +190,8 @@ class BaseObjectController(Controller):
|
||||
|
||||
:param ring: ring to get nodes from
|
||||
:param partition: ring partition to yield nodes for
|
||||
:param request: nodes will be annotated with `use_replication` based on
|
||||
the `request` headers
|
||||
:param policy: optional, an instance of
|
||||
:class:`~swift.common.storage_policy.BaseStoragePolicy`
|
||||
:param local_handoffs_first: optional, if True prefer primaries and
|
||||
@ -198,7 +200,7 @@ class BaseObjectController(Controller):
|
||||
policy_options = self.app.get_policy_options(policy)
|
||||
is_local = policy_options.write_affinity_is_local_fn
|
||||
if is_local is None:
|
||||
return self.app.iter_nodes(ring, partition, self.logger,
|
||||
return self.app.iter_nodes(ring, partition, self.logger, request,
|
||||
policy=policy)
|
||||
|
||||
primary_nodes = ring.get_part_nodes(partition)
|
||||
@ -232,7 +234,7 @@ class BaseObjectController(Controller):
|
||||
(node for node in all_nodes if node not in preferred_nodes)
|
||||
)
|
||||
|
||||
return self.app.iter_nodes(ring, partition, self.logger,
|
||||
return self.app.iter_nodes(ring, partition, self.logger, request,
|
||||
node_iter=node_iter, policy=policy)
|
||||
|
||||
def GETorHEAD(self, req):
|
||||
@ -252,7 +254,7 @@ class BaseObjectController(Controller):
|
||||
return aresp
|
||||
partition = obj_ring.get_part(
|
||||
self.account_name, self.container_name, self.object_name)
|
||||
node_iter = self.app.iter_nodes(obj_ring, partition, self.logger,
|
||||
node_iter = self.app.iter_nodes(obj_ring, partition, self.logger, req,
|
||||
policy=policy)
|
||||
|
||||
resp = self._get_or_head_response(req, node_iter, partition, policy)
|
||||
@ -720,7 +722,8 @@ class BaseObjectController(Controller):
|
||||
"""
|
||||
obj_ring = policy.object_ring
|
||||
node_iter = GreenthreadSafeIterator(
|
||||
self.iter_nodes_local_first(obj_ring, partition, policy=policy))
|
||||
self.iter_nodes_local_first(obj_ring, partition, req,
|
||||
policy=policy))
|
||||
pile = GreenPile(len(nodes))
|
||||
|
||||
for nheaders in outgoing_headers:
|
||||
@ -921,8 +924,8 @@ class BaseObjectController(Controller):
|
||||
local_handoffs = len(nodes) - len(local_primaries)
|
||||
node_count += local_handoffs
|
||||
node_iterator = self.iter_nodes_local_first(
|
||||
obj_ring, partition, policy=policy, local_handoffs_first=True
|
||||
)
|
||||
obj_ring, partition, req, policy=policy,
|
||||
local_handoffs_first=True)
|
||||
|
||||
headers = self._backend_requests(
|
||||
req, node_count, container_partition, container_nodes,
|
||||
|
@ -743,9 +743,10 @@ class Application(object):
|
||||
|
||||
return ok
|
||||
|
||||
def iter_nodes(self, ring, partition, logger, node_iter=None, policy=None):
|
||||
return NodeIter(self, ring, partition, logger, node_iter=node_iter,
|
||||
policy=policy)
|
||||
def iter_nodes(self, ring, partition, logger, request, node_iter=None,
|
||||
policy=None):
|
||||
return NodeIter(self, ring, partition, logger, request=request,
|
||||
node_iter=node_iter, policy=policy, )
|
||||
|
||||
def exception_occurred(self, node, typ, additional_info,
|
||||
**kwargs):
|
||||
|
@ -188,6 +188,21 @@ class TestRequestHelpers(unittest.TestCase):
|
||||
self.assertFalse('c' in to_req.headers)
|
||||
self.assertFalse('C' in to_req.headers)
|
||||
|
||||
def test_is_use_replication_network(self):
|
||||
self.assertFalse(rh.is_use_replication_network())
|
||||
self.assertFalse(rh.is_use_replication_network({}))
|
||||
self.assertFalse(rh.is_use_replication_network(
|
||||
{'x-backend-use-replication-network': 'false'}))
|
||||
self.assertFalse(rh.is_use_replication_network(
|
||||
{'x-backend-use-replication-network': 'no'}))
|
||||
|
||||
self.assertTrue(rh.is_use_replication_network(
|
||||
{'x-backend-use-replication-network': 'true'}))
|
||||
self.assertTrue(rh.is_use_replication_network(
|
||||
{'x-backend-use-replication-network': 'yes'}))
|
||||
self.assertTrue(rh.is_use_replication_network(
|
||||
{'X-Backend-Use-Replication-Network': 'True'}))
|
||||
|
||||
def test_get_ip_port(self):
|
||||
node = {
|
||||
'ip': '1.2.3.4',
|
||||
@ -201,6 +216,17 @@ class TestRequestHelpers(unittest.TestCase):
|
||||
self.assertEqual(('1.2.3.4', 6000), rh.get_ip_port(node, {
|
||||
rh.USE_REPLICATION_NETWORK_HEADER: 'false'}))
|
||||
|
||||
# node trumps absent header and False header
|
||||
node['use_replication'] = True
|
||||
self.assertEqual(('5.6.7.8', 7000), rh.get_ip_port(node, {}))
|
||||
self.assertEqual(('5.6.7.8', 7000), rh.get_ip_port(node, {
|
||||
rh.USE_REPLICATION_NETWORK_HEADER: 'false'}))
|
||||
|
||||
# True header trumps node
|
||||
node['use_replication'] = False
|
||||
self.assertEqual(('5.6.7.8', 7000), rh.get_ip_port(node, {
|
||||
rh.USE_REPLICATION_NETWORK_HEADER: 'true'}))
|
||||
|
||||
@patch_policies(with_ec_default=True)
|
||||
def test_get_name_and_placement_object_req(self):
|
||||
path = '/device/part/account/container/object'
|
||||
|
@ -2208,6 +2208,25 @@ class TestUtils(unittest.TestCase):
|
||||
self.assertEqual(utils.storage_directory('objects', '1', 'ABCDEF'),
|
||||
'objects/1/DEF/ABCDEF')
|
||||
|
||||
def test_select_node_ip(self):
|
||||
dev = {
|
||||
'ip': '127.0.0.1',
|
||||
'port': 6200,
|
||||
'replication_ip': '127.0.1.1',
|
||||
'replication_port': 6400,
|
||||
'device': 'sdb',
|
||||
}
|
||||
self.assertEqual(('127.0.0.1', 6200), utils.select_ip_port(dev))
|
||||
self.assertEqual(('127.0.1.1', 6400),
|
||||
utils.select_ip_port(dev, use_replication=True))
|
||||
dev['use_replication'] = False
|
||||
self.assertEqual(('127.0.1.1', 6400),
|
||||
utils.select_ip_port(dev, use_replication=True))
|
||||
dev['use_replication'] = True
|
||||
self.assertEqual(('127.0.1.1', 6400), utils.select_ip_port(dev))
|
||||
self.assertEqual(('127.0.1.1', 6400),
|
||||
utils.select_ip_port(dev, use_replication=False))
|
||||
|
||||
def test_node_to_string(self):
|
||||
dev = {
|
||||
'id': 3,
|
||||
@ -2225,6 +2244,16 @@ class TestUtils(unittest.TestCase):
|
||||
self.assertEqual(utils.node_to_string(dev), '127.0.0.1:6200/sdb')
|
||||
self.assertEqual(utils.node_to_string(dev, replication=True),
|
||||
'127.0.1.1:6400/sdb')
|
||||
dev['use_replication'] = False
|
||||
self.assertEqual(utils.node_to_string(dev), '127.0.0.1:6200/sdb')
|
||||
self.assertEqual(utils.node_to_string(dev, replication=True),
|
||||
'127.0.1.1:6400/sdb')
|
||||
dev['use_replication'] = True
|
||||
self.assertEqual(utils.node_to_string(dev), '127.0.1.1:6400/sdb')
|
||||
# Node dict takes precedence
|
||||
self.assertEqual(utils.node_to_string(dev, replication=False),
|
||||
'127.0.1.1:6400/sdb')
|
||||
|
||||
dev = {
|
||||
'id': 3,
|
||||
'region': 1,
|
||||
|
@ -1547,7 +1547,8 @@ class TestNodeIter(BaseTest):
|
||||
def test_iter_default_fake_ring(self):
|
||||
for ring in (self.account_ring, self.container_ring):
|
||||
self.assertEqual(ring.replica_count, 3.0)
|
||||
node_iter = NodeIter(self.app, ring, 0, self.logger)
|
||||
node_iter = NodeIter(self.app, ring, 0, self.logger,
|
||||
request=Request.blank(''))
|
||||
self.assertEqual(6, node_iter.nodes_left)
|
||||
self.assertEqual(3, node_iter.primaries_left)
|
||||
count = 0
|
||||
@ -1562,7 +1563,7 @@ class TestNodeIter(BaseTest):
|
||||
ring = FakeRing(replicas=3, max_more_nodes=20) # handoffs available
|
||||
policy = StoragePolicy(0, 'zero', object_ring=ring)
|
||||
node_iter = NodeIter(self.app, policy.object_ring, 0, self.logger,
|
||||
policy=policy)
|
||||
policy=policy, request=Request.blank(''))
|
||||
self.assertEqual(6, node_iter.nodes_left)
|
||||
self.assertEqual(3, node_iter.primaries_left)
|
||||
primary_indexes = set()
|
||||
@ -1586,11 +1587,11 @@ class TestNodeIter(BaseTest):
|
||||
|
||||
# sanity
|
||||
node_iter = NodeIter(self.app, policy.object_ring, 0, self.logger,
|
||||
policy=policy)
|
||||
policy=policy, request=Request.blank(''))
|
||||
self.assertEqual(16, len([n for n in node_iter]))
|
||||
|
||||
node_iter = NodeIter(self.app, policy.object_ring, 0, self.logger,
|
||||
policy=policy)
|
||||
policy=policy, request=Request.blank(''))
|
||||
self.assertEqual(16, node_iter.nodes_left)
|
||||
self.assertEqual(8, node_iter.primaries_left)
|
||||
pile = GreenAsyncPile(5)
|
||||
@ -1615,3 +1616,50 @@ class TestNodeIter(BaseTest):
|
||||
for node in node_iter:
|
||||
nodes.append(node)
|
||||
self.assertEqual(17, len(nodes))
|
||||
|
||||
def test_annotate_node_with_use_replication(self):
|
||||
ring = FakeRing(replicas=8, max_more_nodes=20)
|
||||
policy = StoragePolicy(0, 'ec', object_ring=ring)
|
||||
|
||||
node_iter = NodeIter(self.app, policy.object_ring, 0, self.logger,
|
||||
policy=policy, request=Request.blank(''))
|
||||
for node in node_iter:
|
||||
self.assertIn('use_replication', node)
|
||||
self.assertFalse(node['use_replication'])
|
||||
|
||||
req = Request.blank('a/c')
|
||||
node_iter = NodeIter(self.app, policy.object_ring, 0, self.logger,
|
||||
policy=policy, request=req)
|
||||
for node in node_iter:
|
||||
self.assertIn('use_replication', node)
|
||||
self.assertFalse(node['use_replication'])
|
||||
|
||||
req = Request.blank(
|
||||
'a/c', headers={'x-backend-use-replication-network': 'False'})
|
||||
node_iter = NodeIter(self.app, policy.object_ring, 0, self.logger,
|
||||
policy=policy, request=req)
|
||||
for node in node_iter:
|
||||
self.assertIn('use_replication', node)
|
||||
self.assertFalse(node['use_replication'])
|
||||
|
||||
req = Request.blank(
|
||||
'a/c', headers={'x-backend-use-replication-network': 'yes'})
|
||||
node_iter = NodeIter(self.app, policy.object_ring, 0, self.logger,
|
||||
policy=policy, request=req)
|
||||
for node in node_iter:
|
||||
self.assertIn('use_replication', node)
|
||||
self.assertTrue(node['use_replication'])
|
||||
|
||||
def test_iter_does_not_mutate_supplied_nodes(self):
|
||||
ring = FakeRing(replicas=8, max_more_nodes=20)
|
||||
policy = StoragePolicy(0, 'ec', object_ring=ring)
|
||||
other_iter = ring.get_part_nodes(0)
|
||||
node_iter = NodeIter(self.app, policy.object_ring, 0, self.logger,
|
||||
policy=policy, node_iter=iter(other_iter),
|
||||
request=Request.blank(''))
|
||||
nodes = list(node_iter)
|
||||
self.assertEqual(len(other_iter), len(nodes))
|
||||
for node in nodes:
|
||||
self.assertIn('use_replication', node)
|
||||
self.assertFalse(node['use_replication'])
|
||||
self.assertEqual(other_iter, ring.get_part_nodes(0))
|
||||
|
@ -47,6 +47,7 @@ from swift.proxy.controllers.base import \
|
||||
get_container_info as _real_get_container_info
|
||||
from swift.common.storage_policy import POLICIES, ECDriverError, \
|
||||
StoragePolicy, ECStoragePolicy
|
||||
from swift.common.swob import Request
|
||||
from test.debug_logger import debug_logger
|
||||
from test.unit import (
|
||||
FakeRing, fake_http_connect, patch_policies, SlowBody, FakeStatus,
|
||||
@ -219,8 +220,10 @@ class CommonObjectControllerMixin(BaseObjectControllerMixin):
|
||||
all_nodes = object_ring.get_part_nodes(1)
|
||||
all_nodes.extend(object_ring.get_more_nodes(1))
|
||||
|
||||
for node in all_nodes:
|
||||
node['use_replication'] = False
|
||||
local_first_nodes = list(controller.iter_nodes_local_first(
|
||||
object_ring, 1))
|
||||
object_ring, 1, Request.blank('')))
|
||||
|
||||
self.maxDiff = None
|
||||
|
||||
@ -244,6 +247,8 @@ class CommonObjectControllerMixin(BaseObjectControllerMixin):
|
||||
|
||||
all_nodes = object_ring.get_part_nodes(1)
|
||||
all_nodes.extend(object_ring.get_more_nodes(1))
|
||||
for node in all_nodes:
|
||||
node['use_replication'] = False
|
||||
|
||||
# limit to the number we're going to look at in this request
|
||||
nodes_requested = self.app.request_node_count(object_ring.replicas)
|
||||
@ -256,7 +261,7 @@ class CommonObjectControllerMixin(BaseObjectControllerMixin):
|
||||
|
||||
# finally, create the local_first_nodes iter and flatten it out
|
||||
local_first_nodes = list(controller.iter_nodes_local_first(
|
||||
object_ring, 1))
|
||||
object_ring, 1, Request.blank('')))
|
||||
|
||||
# the local nodes move up in the ordering
|
||||
self.assertEqual([1] * (self.replicas() + 1), [
|
||||
@ -267,6 +272,21 @@ class CommonObjectControllerMixin(BaseObjectControllerMixin):
|
||||
self.assertEqual(sorted(all_nodes, key=lambda dev: dev['id']),
|
||||
sorted(local_first_nodes, key=lambda dev: dev['id']))
|
||||
|
||||
for node in all_nodes:
|
||||
node['use_replication'] = True
|
||||
|
||||
req = Request.blank(
|
||||
'/v1/a/c', headers={'x-backend-use-replication-network': 'yes'})
|
||||
local_first_nodes = list(controller.iter_nodes_local_first(
|
||||
object_ring, 1, request=req))
|
||||
self.assertEqual([1] * (self.replicas() + 1), [
|
||||
node['region'] for node in local_first_nodes[
|
||||
:self.replicas() + 1]])
|
||||
# we don't skip any nodes
|
||||
self.assertEqual(len(all_nodes), len(local_first_nodes))
|
||||
self.assertEqual(sorted(all_nodes, key=lambda dev: dev['id']),
|
||||
sorted(local_first_nodes, key=lambda dev: dev['id']))
|
||||
|
||||
def test_iter_nodes_local_first_best_effort(self):
|
||||
controller = self.controller_cls(
|
||||
self.app, 'a', 'c', 'o')
|
||||
@ -277,9 +297,11 @@ class CommonObjectControllerMixin(BaseObjectControllerMixin):
|
||||
object_ring = self.policy.object_ring
|
||||
all_nodes = object_ring.get_part_nodes(1)
|
||||
all_nodes.extend(object_ring.get_more_nodes(1))
|
||||
for node in all_nodes:
|
||||
node['use_replication'] = False
|
||||
|
||||
local_first_nodes = list(controller.iter_nodes_local_first(
|
||||
object_ring, 1))
|
||||
object_ring, 1, request=Request.blank('')))
|
||||
|
||||
# we won't have quite enough local nodes...
|
||||
self.assertEqual(len(all_nodes), self.replicas() +
|
||||
@ -307,9 +329,12 @@ class CommonObjectControllerMixin(BaseObjectControllerMixin):
|
||||
object_ring = policy.object_ring
|
||||
all_nodes = object_ring.get_part_nodes(1)
|
||||
all_nodes.extend(object_ring.get_more_nodes(1))
|
||||
for node in all_nodes:
|
||||
node['use_replication'] = False
|
||||
|
||||
local_first_nodes = list(controller.iter_nodes_local_first(
|
||||
object_ring, 1, local_handoffs_first=True))
|
||||
object_ring, 1, local_handoffs_first=True,
|
||||
request=Request.blank('')))
|
||||
|
||||
self.maxDiff = None
|
||||
|
||||
@ -326,12 +351,15 @@ class CommonObjectControllerMixin(BaseObjectControllerMixin):
|
||||
primary_nodes = object_ring.get_part_nodes(1)
|
||||
handoff_nodes_iter = object_ring.get_more_nodes(1)
|
||||
all_nodes = primary_nodes + list(handoff_nodes_iter)
|
||||
for node in all_nodes:
|
||||
node['use_replication'] = False
|
||||
handoff_nodes_iter = object_ring.get_more_nodes(1)
|
||||
local_handoffs = [n for n in handoff_nodes_iter if
|
||||
policy_conf.write_affinity_is_local_fn(n)]
|
||||
|
||||
prefered_nodes = list(controller.iter_nodes_local_first(
|
||||
object_ring, 1, local_handoffs_first=True))
|
||||
object_ring, 1, local_handoffs_first=True,
|
||||
request=Request.blank('')))
|
||||
|
||||
self.assertEqual(len(all_nodes), self.replicas() +
|
||||
POLICIES.default.object_ring.max_more_nodes)
|
||||
@ -362,12 +390,17 @@ class CommonObjectControllerMixin(BaseObjectControllerMixin):
|
||||
primary_nodes = object_ring.get_part_nodes(1)
|
||||
handoff_nodes_iter = object_ring.get_more_nodes(1)
|
||||
all_nodes = primary_nodes + list(handoff_nodes_iter)
|
||||
for node in all_nodes:
|
||||
node['use_replication'] = False
|
||||
handoff_nodes_iter = object_ring.get_more_nodes(1)
|
||||
local_handoffs = [n for n in handoff_nodes_iter if
|
||||
policy_conf.write_affinity_is_local_fn(n)]
|
||||
for node in local_handoffs:
|
||||
node['use_replication'] = False
|
||||
|
||||
prefered_nodes = list(controller.iter_nodes_local_first(
|
||||
object_ring, 1, local_handoffs_first=True))
|
||||
object_ring, 1, local_handoffs_first=True,
|
||||
request=Request.blank('')))
|
||||
|
||||
self.assertEqual(len(all_nodes), self.replicas() +
|
||||
POLICIES.default.object_ring.max_more_nodes)
|
||||
@ -991,7 +1024,7 @@ class CommonObjectControllerMixin(BaseObjectControllerMixin):
|
||||
|
||||
# finally, create the local_first_nodes iter and flatten it out
|
||||
local_first_nodes = list(controller.iter_nodes_local_first(
|
||||
object_ring, 1, policy))
|
||||
object_ring, 1, Request.blank(''), policy))
|
||||
|
||||
# check that the required number of local nodes were moved up the order
|
||||
node_regions = [node['region'] for node in local_first_nodes]
|
||||
@ -2586,7 +2619,8 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
||||
controller = self.controller_cls(
|
||||
self.app, 'a', 'c', 'o')
|
||||
safe_iter = utils.GreenthreadSafeIterator(self.app.iter_nodes(
|
||||
self.policy.object_ring, 0, self.logger, policy=self.policy))
|
||||
self.policy.object_ring, 0, self.logger, policy=self.policy,
|
||||
request=Request.blank('')))
|
||||
controller._fragment_GET_request = lambda *a, **k: next(safe_iter)
|
||||
pile = utils.GreenAsyncPile(self.policy.ec_ndata)
|
||||
for i in range(self.policy.ec_ndata):
|
||||
|
@ -248,7 +248,7 @@ class TestController(unittest.TestCase):
|
||||
self.account_ring = FakeRing()
|
||||
self.container_ring = FakeRing()
|
||||
self.memcache = FakeMemcache()
|
||||
app = proxy_server.Application(None,
|
||||
app = proxy_server.Application(None, logger=debug_logger(),
|
||||
account_ring=self.account_ring,
|
||||
container_ring=self.container_ring)
|
||||
self.controller = swift.proxy.controllers.Controller(app)
|
||||
@ -1166,6 +1166,56 @@ class TestProxyServer(unittest.TestCase):
|
||||
|
||||
self.assertEqual(controller.__name__, 'InfoController')
|
||||
|
||||
def test_exception_occurred_replication_ip_port_logging(self):
|
||||
logger = debug_logger('test')
|
||||
app = proxy_server.Application(
|
||||
{},
|
||||
account_ring=FakeRing(separate_replication=True),
|
||||
container_ring=FakeRing(separate_replication=True),
|
||||
logger=logger)
|
||||
app.sort_nodes = lambda nodes, policy: nodes
|
||||
part = app.container_ring.get_part('a', 'c')
|
||||
nodes = app.container_ring.get_part_nodes(part)
|
||||
self.assertNotEqual(nodes[0]['ip'], nodes[0]['replication_ip'])
|
||||
self.assertEqual(0, sum([node_error_count(app, node)
|
||||
for node in nodes])) # sanity
|
||||
|
||||
# no use_replication header...
|
||||
req = Request.blank('/v1/a/c')
|
||||
with mocked_http_conn(200, 503, 200) as mocked_conn:
|
||||
req.get_response(app)
|
||||
|
||||
expected = [(n['ip'], n['port']) for n in nodes[:2]]
|
||||
actual = [(req['ip'], req['port']) for req in mocked_conn.requests[1:]]
|
||||
self.assertEqual(expected, actual)
|
||||
line = logger.get_lines_for_level('error')[-1]
|
||||
self.assertIn('Container Server', line)
|
||||
self.assertIn('%s:%s/%s' % (nodes[0]['ip'],
|
||||
nodes[0]['port'],
|
||||
nodes[0]['device']), line)
|
||||
self.assertEqual(1, sum([node_error_count(app, node)
|
||||
for node in nodes]))
|
||||
annotated_nodes = [dict(node, use_replication=True) for node in nodes]
|
||||
self.assertEqual(0, sum([node_error_count(app, node)
|
||||
for node in annotated_nodes]))
|
||||
|
||||
logger.clear()
|
||||
req = Request.blank(
|
||||
'/v1/a/c',
|
||||
headers={'x-backend-use-replication-network': True})
|
||||
with mocked_http_conn(200, 503, 200):
|
||||
req.get_response(app)
|
||||
line = logger.get_lines_for_level('error')[-1]
|
||||
self.assertIn('Container Server', line)
|
||||
self.assertIn('%s:%s/%s' % (nodes[0]['replication_ip'],
|
||||
nodes[0]['replication_port'],
|
||||
nodes[0]['device']), line)
|
||||
self.assertEqual(1, sum([node_error_count(app, node)
|
||||
for node in nodes]))
|
||||
annotated_nodes = [dict(node, use_replication=True) for node in nodes]
|
||||
self.assertEqual(1, sum([node_error_count(app, node)
|
||||
for node in annotated_nodes]))
|
||||
|
||||
def test_exception_occurred(self):
|
||||
def do_test(additional_info):
|
||||
logger = debug_logger('test')
|
||||
@ -5302,7 +5352,8 @@ class TestReplicatedObjectController(
|
||||
'object')
|
||||
collected_nodes = []
|
||||
for node in self.app.iter_nodes(object_ring, partition,
|
||||
self.logger):
|
||||
self.logger,
|
||||
request=Request.blank('')):
|
||||
collected_nodes.append(node)
|
||||
self.assertEqual(len(collected_nodes), 5)
|
||||
|
||||
@ -5313,7 +5364,8 @@ class TestReplicatedObjectController(
|
||||
'object')
|
||||
collected_nodes = []
|
||||
for node in self.app.iter_nodes(object_ring, partition,
|
||||
self.logger):
|
||||
self.logger,
|
||||
request=Request.blank('')):
|
||||
collected_nodes.append(node)
|
||||
self.assertEqual(len(collected_nodes), 9)
|
||||
|
||||
@ -5327,7 +5379,8 @@ class TestReplicatedObjectController(
|
||||
'object')
|
||||
collected_nodes = []
|
||||
for node in self.app.iter_nodes(object_ring, partition,
|
||||
self.logger):
|
||||
self.logger,
|
||||
request=Request.blank('')):
|
||||
collected_nodes.append(node)
|
||||
self.assertEqual(len(collected_nodes), 7)
|
||||
self.assertEqual(self.app.logger.log_dict['warning'], [])
|
||||
@ -5343,7 +5396,8 @@ class TestReplicatedObjectController(
|
||||
|
||||
collected_nodes = []
|
||||
for node in self.app.iter_nodes(object_ring, partition,
|
||||
self.logger):
|
||||
self.logger,
|
||||
request=Request.blank('')):
|
||||
collected_nodes.append(node)
|
||||
self.assertEqual(len(collected_nodes), 7)
|
||||
self.assertEqual(
|
||||
@ -5364,7 +5418,8 @@ class TestReplicatedObjectController(
|
||||
|
||||
collected_nodes = []
|
||||
for node in self.app.iter_nodes(object_ring, partition,
|
||||
self.logger):
|
||||
self.logger,
|
||||
request=Request.blank('')):
|
||||
collected_nodes.append(node)
|
||||
self.assertEqual(len(collected_nodes), 7)
|
||||
self.assertEqual(
|
||||
@ -5389,7 +5444,8 @@ class TestReplicatedObjectController(
|
||||
|
||||
collected_nodes = []
|
||||
for node in self.app.iter_nodes(object_ring, partition,
|
||||
self.logger):
|
||||
self.logger,
|
||||
request=Request.blank('')):
|
||||
collected_nodes.append(node)
|
||||
self.assertEqual(len(collected_nodes), 10)
|
||||
self.assertEqual(
|
||||
@ -5418,7 +5474,8 @@ class TestReplicatedObjectController(
|
||||
with mock.patch.object(self.app, 'sort_nodes',
|
||||
side_effect=fake_sort_nodes):
|
||||
object_ring = self.app.get_object_ring(None)
|
||||
for node in self.app.iter_nodes(object_ring, 0, self.logger):
|
||||
for node in self.app.iter_nodes(object_ring, 0, self.logger,
|
||||
request=Request.blank('')):
|
||||
pass
|
||||
self.assertEqual(called, [
|
||||
mock.call(object_ring.get_part_nodes(0), policy=None)
|
||||
@ -5429,9 +5486,9 @@ class TestReplicatedObjectController(
|
||||
lambda n, *args, **kwargs: n):
|
||||
object_ring = self.app.get_object_ring(None)
|
||||
first_nodes = list(self.app.iter_nodes(
|
||||
object_ring, 0, self.logger))
|
||||
object_ring, 0, self.logger, request=Request.blank('')))
|
||||
second_nodes = list(self.app.iter_nodes(
|
||||
object_ring, 0, self.logger))
|
||||
object_ring, 0, self.logger, request=Request.blank('')))
|
||||
self.assertIn(first_nodes[0], second_nodes)
|
||||
|
||||
self.assertEqual(
|
||||
@ -5451,13 +5508,13 @@ class TestReplicatedObjectController(
|
||||
% (node_to_string(first_nodes[0]), 'test')), line)
|
||||
|
||||
second_nodes = list(self.app.iter_nodes(
|
||||
object_ring, 0, self.logger))
|
||||
object_ring, 0, self.logger, request=Request.blank('')))
|
||||
self.assertNotIn(first_nodes[0], second_nodes)
|
||||
self.assertEqual(
|
||||
1, self.logger.get_increment_counts().get(
|
||||
'error_limiter.is_limited', 0))
|
||||
third_nodes = list(self.app.iter_nodes(
|
||||
object_ring, 0, self.logger))
|
||||
object_ring, 0, self.logger, request=Request.blank('')))
|
||||
self.assertNotIn(first_nodes[0], third_nodes)
|
||||
self.assertEqual(
|
||||
2, self.logger.get_increment_counts().get(
|
||||
@ -5471,34 +5528,67 @@ class TestReplicatedObjectController(
|
||||
lambda r: 6), \
|
||||
mock.patch.object(object_ring, 'max_more_nodes', 99):
|
||||
first_nodes = list(self.app.iter_nodes(
|
||||
object_ring, 0, self.logger))
|
||||
object_ring, 0, self.logger, request=Request.blank('')))
|
||||
second_nodes = []
|
||||
for node in self.app.iter_nodes(object_ring, 0, self.logger):
|
||||
for node in self.app.iter_nodes(object_ring, 0, self.logger,
|
||||
request=Request.blank('')):
|
||||
if not second_nodes:
|
||||
self.app.error_limit(node, 'test')
|
||||
second_nodes.append(node)
|
||||
self.assertEqual(len(first_nodes), 6)
|
||||
self.assertEqual(len(second_nodes), 7)
|
||||
|
||||
def test_iter_nodes_with_custom_node_iter(self):
|
||||
def test_iter_nodes_without_replication_network(self):
|
||||
object_ring = self.app.get_object_ring(None)
|
||||
node_list = [dict(id=n, ip='1.2.3.4', port=n, device='D',
|
||||
use_replication=False)
|
||||
for n in range(10)]
|
||||
expected = [dict(n) for n in node_list]
|
||||
with mock.patch.object(self.app, 'sort_nodes',
|
||||
lambda n, *args, **kwargs: n), \
|
||||
mock.patch.object(self.app, 'request_node_count',
|
||||
lambda r: 3):
|
||||
got_nodes = list(self.app.iter_nodes(
|
||||
object_ring, 0, self.logger, Request.blank(''),
|
||||
node_iter=iter(node_list)))
|
||||
self.assertEqual(expected[:3], got_nodes)
|
||||
|
||||
req = Request.blank('/v1/a/c')
|
||||
node_list = [dict(id=n, ip='1.2.3.4', port=n, device='D')
|
||||
for n in range(10)]
|
||||
with mock.patch.object(self.app, 'sort_nodes',
|
||||
lambda n, *args, **kwargs: n), \
|
||||
mock.patch.object(self.app, 'request_node_count',
|
||||
lambda r: 3):
|
||||
got_nodes = list(self.app.iter_nodes(object_ring, 0, self.logger,
|
||||
node_iter=iter(node_list)))
|
||||
self.assertEqual(node_list[:3], got_nodes)
|
||||
lambda r: 1000000):
|
||||
got_nodes = list(self.app.iter_nodes(
|
||||
object_ring, 0, self.logger, req, node_iter=iter(node_list)))
|
||||
self.assertEqual(expected, got_nodes)
|
||||
|
||||
def test_iter_nodes_with_replication_network(self):
|
||||
object_ring = self.app.get_object_ring(None)
|
||||
node_list = [dict(id=n, ip='1.2.3.4', port=n, device='D',
|
||||
use_replication=False)
|
||||
for n in range(10)]
|
||||
req = Request.blank(
|
||||
'/v1/a/c', headers={'x-backend-use-replication-network': 'true'})
|
||||
with mock.patch.object(self.app, 'sort_nodes',
|
||||
lambda n, *args, **kwargs: n), \
|
||||
mock.patch.object(self.app, 'request_node_count',
|
||||
lambda r: 1000000):
|
||||
got_nodes = list(self.app.iter_nodes(object_ring, 0, self.logger,
|
||||
node_iter=iter(node_list)))
|
||||
self.assertEqual(node_list, got_nodes)
|
||||
lambda r: 3):
|
||||
got_nodes = list(self.app.iter_nodes(
|
||||
object_ring, 0, self.logger, req, node_iter=iter(node_list)))
|
||||
expected = [dict(n, use_replication=True) for n in node_list]
|
||||
self.assertEqual(expected[:3], got_nodes)
|
||||
req = Request.blank(
|
||||
'/v1/a/c', headers={'x-backend-use-replication-network': 'false'})
|
||||
expected = [dict(n, use_replication=False) for n in node_list]
|
||||
with mock.patch.object(self.app, 'sort_nodes',
|
||||
lambda n, *args, **kwargs: n), \
|
||||
mock.patch.object(self.app, 'request_node_count',
|
||||
lambda r: 13):
|
||||
got_nodes = list(self.app.iter_nodes(
|
||||
object_ring, 0, self.logger, req, node_iter=iter(node_list)))
|
||||
self.assertEqual(expected, got_nodes)
|
||||
|
||||
def test_best_response_sets_headers(self):
|
||||
controller = ReplicatedObjectController(
|
||||
|
Loading…
x
Reference in New Issue
Block a user