Local read affinity for GET/HEAD requests.
Now you can configure the proxy server to read from "local" primary nodes first, where "local" is governed by the newly-introduced "read_affinity" setting in the proxy config. This is desirable when the network links between regions/zones are of varying capacities; in such a case, it's a good idea to prefer fetching data from closer backends. The new setting looks like rN[zM]=P, where N is the region number, M is the optional zone number, and P is the priority. Multiple values can be specified by separating them with commas. The priority for nodes that don't match anything is a very large number, so they'll sort last. This only affects the ordering of the primary nodes; it doesn't affect handoffs at all. Further, while the primary nodes are reordered for all requests, it only matters for GET/HEAD requests since handling the other verbs ends up making concurrent requests to *all* the primary nodes, so ordering is irrelevant. Note that the default proxy config does not have this setting turned on, so the default configuration's behavior is unaffected. blueprint multi-region Change-Id: Iea4cd367ed37fe5ee69b63234541d358d29963a4
This commit is contained in:
parent
4077252f23
commit
f559c50acb
@ -115,12 +115,16 @@ use = egg:swift#proxy
|
||||
# to N per second.
|
||||
# rate_limit_segments_per_sec = 1
|
||||
#
|
||||
# Storage nodes can be chosen at random (shuffle) or by using timing
|
||||
# measurements. Using timing measurements may allow for lower overall latency.
|
||||
# The valid values for sorting_method are "shuffle" and "timing"
|
||||
# Storage nodes can be chosen at random (shuffle), by using timing
|
||||
# measurements (timing), or by using an explicit match (affinity).
|
||||
# Using timing measurements may allow for lower overall latency, while
|
||||
# using affinity allows for finer control. In both the timing and
|
||||
# affinity cases, equally-sorting nodes are still randomly chosen to
|
||||
# spread load.
|
||||
# The valid values for sorting_method are "affinity", "shuffle", and "timing".
|
||||
# sorting_method = shuffle
|
||||
#
|
||||
# If the timing sorting_method is used, the timings will only be valid for
|
||||
# If the "timing" sorting_method is used, the timings will only be valid for
|
||||
# the number of seconds configured by timing_expiry.
|
||||
# timing_expiry = 300
|
||||
#
|
||||
@ -133,6 +137,16 @@ use = egg:swift#proxy
|
||||
# '* replicas' at the end to have it use the number given times the number of
|
||||
# replicas for the ring being used for the request.
|
||||
# request_node_count = 2 * replicas
|
||||
#
|
||||
# Which backend servers to prefer on reads. Format is r<N> for region
|
||||
# N or r<N>z<M> for region N, zone M. The value after the equals is
|
||||
# the priority; lower numbers are higher priority.
|
||||
#
|
||||
# Example: first read from region 1 zone 1, then region 1 zone 2, then
|
||||
# anything in region 2, then everything else:
|
||||
# read_affinity = r1z1=100, r1z2=200, r2=300
|
||||
# Default is empty, meaning no preference.
|
||||
# read_affinity =
|
||||
|
||||
[filter:tempauth]
|
||||
use = egg:swift#tempauth
|
||||
|
@ -17,8 +17,10 @@
|
||||
|
||||
import errno
|
||||
import fcntl
|
||||
import operator
|
||||
import os
|
||||
import pwd
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
import uuid
|
||||
@ -1523,6 +1525,63 @@ def validate_sync_to(value, allowed_sync_hosts):
|
||||
return None
|
||||
|
||||
|
||||
def affinity_key_function(affinity_str):
|
||||
"""Turns an affinity config value into a function suitable for passing to
|
||||
sort(). After doing so, the array will be sorted with respect to the given
|
||||
ordering.
|
||||
|
||||
For example, if affinity_str is "r1=1, r2z7=2, r2z8=2", then the array
|
||||
will be sorted with all nodes from region 1 (r1=1) first, then all the
|
||||
nodes from region 2 zones 7 and 8 (r2z7=2 and r2z8=2), then everything
|
||||
else.
|
||||
|
||||
Note that the order of the pieces of affinity_str is irrelevant; the
|
||||
priority values are what comes after the equals sign.
|
||||
|
||||
If affinity_str is empty or all whitespace, then the resulting function
|
||||
will not alter the ordering of the nodes. However, if affinity_str
|
||||
contains an invalid value, then None is returned.
|
||||
|
||||
:param affinity_str: affinity config value, e.g. "r1z2=3"
|
||||
or "r1=1, r2z1=2, r2z2=2"
|
||||
:returns: single-argument function, or None if argument invalid
|
||||
|
||||
"""
|
||||
affinity_str = affinity_str.strip()
|
||||
|
||||
if not affinity_str:
|
||||
return lambda x: 0
|
||||
|
||||
priority_matchers = []
|
||||
pieces = [s.strip() for s in affinity_str.split(',')]
|
||||
for piece in pieces:
|
||||
# matches r<number>=<number> or r<number>z<number>=<number>
|
||||
match = re.match("r(\d+)(?:z(\d+))?=(\d+)$", piece)
|
||||
if match:
|
||||
region, zone, priority = match.groups()
|
||||
region = int(region)
|
||||
priority = int(priority)
|
||||
zone = int(zone) if zone else None
|
||||
|
||||
matcher = {'region': region, 'priority': priority}
|
||||
if zone is not None:
|
||||
matcher['zone'] = zone
|
||||
priority_matchers.append(matcher)
|
||||
else:
|
||||
raise ValueError("Invalid affinity value: %r" % affinity_str)
|
||||
|
||||
priority_matchers.sort(key=operator.itemgetter('priority'))
|
||||
|
||||
def keyfn(ring_node):
|
||||
for matcher in priority_matchers:
|
||||
if (matcher['region'] == ring_node['region']
|
||||
and ('zone' not in matcher
|
||||
or matcher['zone'] == ring_node['zone'])):
|
||||
return matcher['priority']
|
||||
return 4294967296 # 2^32, i.e. "a big number"
|
||||
return keyfn
|
||||
|
||||
|
||||
def get_remote_client(req):
|
||||
# remote host for zeus
|
||||
client = req.headers.get('x-cluster-client-ip')
|
||||
|
@ -34,7 +34,8 @@ from eventlet import Timeout
|
||||
|
||||
from swift.common.ring import Ring
|
||||
from swift.common.utils import cache_from_env, get_logger, \
|
||||
get_remote_client, split_path, config_true_value, generate_trans_id
|
||||
get_remote_client, split_path, config_true_value, generate_trans_id, \
|
||||
affinity_key_function
|
||||
from swift.common.constraints import check_utf8
|
||||
from swift.proxy.controllers import AccountController, ObjectController, \
|
||||
ContainerController
|
||||
@ -125,6 +126,13 @@ class Application(object):
|
||||
else:
|
||||
raise ValueError(
|
||||
'Invalid request_node_count value: %r' % ''.join(value))
|
||||
try:
|
||||
read_affinity = conf.get('read_affinity', '')
|
||||
self.read_affinity_sort_key = affinity_key_function(read_affinity)
|
||||
except ValueError as err:
|
||||
# make the message a little more useful
|
||||
raise ValueError("Invalid read_affinity value: %r (%s)" %
|
||||
(read_affinity, err.message))
|
||||
|
||||
def get_controller(self, path):
|
||||
"""
|
||||
@ -277,6 +285,8 @@ class Application(object):
|
||||
timing, expires = self.node_timings.get(node['ip'], (-1.0, 0))
|
||||
return timing if expires > now else -1.0
|
||||
nodes.sort(key=key_func)
|
||||
elif self.sorting_method == 'affinity':
|
||||
nodes.sort(key=self.read_affinity_sort_key)
|
||||
return nodes
|
||||
|
||||
def set_node_timing(self, node, timing):
|
||||
|
@ -1494,6 +1494,57 @@ class UnsafeXrange(object):
|
||||
self.concurrent_calls -= 1
|
||||
|
||||
|
||||
class TestAffinityKeyFunction(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.nodes = [dict(id=0, region=1, zone=1),
|
||||
dict(id=1, region=1, zone=2),
|
||||
dict(id=2, region=2, zone=1),
|
||||
dict(id=3, region=2, zone=2),
|
||||
dict(id=4, region=3, zone=1),
|
||||
dict(id=5, region=3, zone=2),
|
||||
dict(id=6, region=4, zone=0),
|
||||
dict(id=7, region=4, zone=1)]
|
||||
|
||||
def test_single_region(self):
|
||||
keyfn = utils.affinity_key_function("r3=1")
|
||||
ids = [n['id'] for n in sorted(self.nodes, key=keyfn)]
|
||||
self.assertEqual([4, 5, 0, 1, 2, 3, 6, 7], ids)
|
||||
|
||||
def test_bogus_value(self):
|
||||
self.assertRaises(ValueError,
|
||||
utils.affinity_key_function, "r3")
|
||||
self.assertRaises(ValueError,
|
||||
utils.affinity_key_function, "r3=elephant")
|
||||
|
||||
def test_empty_value(self):
|
||||
# Empty's okay, it just means no preference
|
||||
keyfn = utils.affinity_key_function("")
|
||||
self.assert_(callable(keyfn))
|
||||
ids = [n['id'] for n in sorted(self.nodes, key=keyfn)]
|
||||
self.assertEqual([0, 1, 2, 3, 4, 5, 6, 7], ids)
|
||||
|
||||
def test_all_whitespace_value(self):
|
||||
# Empty's okay, it just means no preference
|
||||
keyfn = utils.affinity_key_function(" \n")
|
||||
self.assert_(callable(keyfn))
|
||||
ids = [n['id'] for n in sorted(self.nodes, key=keyfn)]
|
||||
self.assertEqual([0, 1, 2, 3, 4, 5, 6, 7], ids)
|
||||
|
||||
def test_with_zone_zero(self):
|
||||
keyfn = utils.affinity_key_function("r4z0=1")
|
||||
ids = [n['id'] for n in sorted(self.nodes, key=keyfn)]
|
||||
self.assertEqual([6, 0, 1, 2, 3, 4, 5, 7], ids)
|
||||
|
||||
def test_multiple(self):
|
||||
keyfn = utils.affinity_key_function("r1=100, r4=200, r3z1=1")
|
||||
ids = [n['id'] for n in sorted(self.nodes, key=keyfn)]
|
||||
self.assertEqual([4, 0, 1, 6, 7, 2, 3, 5], ids)
|
||||
|
||||
def test_more_specific_after_less_specific(self):
|
||||
keyfn = utils.affinity_key_function("r2=100, r2z2=50")
|
||||
ids = [n['id'] for n in sorted(self.nodes, key=keyfn)]
|
||||
self.assertEqual([3, 2, 0, 1, 4, 5, 6, 7], ids)
|
||||
|
||||
class TestGreenthreadSafeIterator(unittest.TestCase):
|
||||
def increment(self, iterable):
|
||||
plus_ones = []
|
||||
|
@ -661,25 +661,34 @@ class TestProxyServer(unittest.TestCase):
|
||||
exp_timings = {}
|
||||
self.assertEquals(baseapp.node_timings, exp_timings)
|
||||
|
||||
proxy_server.time = lambda: times.pop(0)
|
||||
try:
|
||||
times = [time.time()]
|
||||
exp_timings = {'127.0.0.1': (0.1,
|
||||
times[0] + baseapp.timing_expiry)}
|
||||
times = [time.time()]
|
||||
exp_timings = {'127.0.0.1': (0.1, times[0] + baseapp.timing_expiry)}
|
||||
with mock.patch('swift.proxy.server.time', lambda: times.pop(0)):
|
||||
baseapp.set_node_timing({'ip': '127.0.0.1'}, 0.1)
|
||||
self.assertEquals(baseapp.node_timings, exp_timings)
|
||||
finally:
|
||||
proxy_server.time = time.time
|
||||
self.assertEquals(baseapp.node_timings, exp_timings)
|
||||
|
||||
proxy_server.shuffle = lambda l: l
|
||||
try:
|
||||
nodes = [{'ip': '127.0.0.1'}, {'ip': '127.0.0.2'}, {'ip': '127.0.0.3'}]
|
||||
nodes = [{'ip': '127.0.0.1'}, {'ip': '127.0.0.2'}, {'ip': '127.0.0.3'}]
|
||||
with mock.patch('swift.proxy.server.shuffle', lambda l: l):
|
||||
res = baseapp.sort_nodes(nodes)
|
||||
exp_sorting = [{'ip': '127.0.0.2'}, {'ip': '127.0.0.3'},
|
||||
{'ip': '127.0.0.1'}]
|
||||
self.assertEquals(res, exp_sorting)
|
||||
finally:
|
||||
proxy_server.shuffle = random.shuffle
|
||||
exp_sorting = [{'ip': '127.0.0.2'}, {'ip': '127.0.0.3'},
|
||||
{'ip': '127.0.0.1'}]
|
||||
self.assertEquals(res, exp_sorting)
|
||||
|
||||
def test_node_affinity(self):
|
||||
baseapp = proxy_server.Application({'sorting_method': 'affinity',
|
||||
'read_affinity': 'r1=1'},
|
||||
FakeMemcache(),
|
||||
container_ring=FakeRing(),
|
||||
object_ring=FakeRing(),
|
||||
account_ring=FakeRing())
|
||||
|
||||
nodes = [{'region': 2, 'zone': 1, 'ip': '127.0.0.1'},
|
||||
{'region': 1, 'zone': 2, 'ip': '127.0.0.2'}]
|
||||
with mock.patch('swift.proxy.server.shuffle', lambda x:x):
|
||||
app_sorted = baseapp.sort_nodes(nodes)
|
||||
exp_sorted = [{'region': 1, 'zone': 2, 'ip': '127.0.0.2'},
|
||||
{'region': 2, 'zone': 1, 'ip': '127.0.0.1'}]
|
||||
self.assertEquals(exp_sorted, app_sorted)
|
||||
|
||||
|
||||
class TestObjectController(unittest.TestCase):
|
||||
|
Loading…
Reference in New Issue
Block a user