Merge "Add ring_ip option to object services"
This commit is contained in:
commit
73b2730f71
@ -22,6 +22,12 @@ bind_port = 6200
|
||||
# feature.
|
||||
# servers_per_port = 0
|
||||
#
|
||||
# If running in a container, servers_per_port may not be able to use the
|
||||
# bind_ip to lookup the ports in the ring. You may instead override the port
|
||||
# lookup in the ring using the ring_ip. Any devices/ports associted with the
|
||||
# ring_ip will be used when listening on the configured bind_ip address.
|
||||
# ring_ip = <bind_ip>
|
||||
#
|
||||
# Maximum concurrent requests per worker
|
||||
# max_clients = 1024
|
||||
#
|
||||
|
@ -37,11 +37,11 @@ DEFAULT_EC_OBJECT_SEGMENT_SIZE = 1048576
|
||||
|
||||
|
||||
class BindPortsCache(object):
|
||||
def __init__(self, swift_dir, bind_ip):
|
||||
def __init__(self, swift_dir, ring_ip):
|
||||
self.swift_dir = swift_dir
|
||||
self.mtimes_by_ring_path = {}
|
||||
self.portsets_by_ring_path = {}
|
||||
self.my_ips = set(whataremyips(bind_ip))
|
||||
self.my_ips = set(whataremyips(ring_ip))
|
||||
|
||||
def all_bind_ports_for_node(self):
|
||||
"""
|
||||
|
@ -2745,25 +2745,25 @@ def expand_ipv6(address):
|
||||
return socket.inet_ntop(socket.AF_INET6, packed_ip)
|
||||
|
||||
|
||||
def whataremyips(bind_ip=None):
|
||||
def whataremyips(ring_ip=None):
|
||||
"""
|
||||
Get "our" IP addresses ("us" being the set of services configured by
|
||||
one `*.conf` file). If our REST listens on a specific address, return it.
|
||||
Otherwise, if listen on '0.0.0.0' or '::' return all addresses, including
|
||||
the loopback.
|
||||
|
||||
:param str bind_ip: Optional bind_ip from a config file; may be IP address
|
||||
or hostname.
|
||||
:param str ring_ip: Optional ring_ip/bind_ip from a config file; may be
|
||||
IP address or hostname.
|
||||
:returns: list of Strings of ip addresses
|
||||
"""
|
||||
if bind_ip:
|
||||
if ring_ip:
|
||||
# See if bind_ip is '0.0.0.0'/'::'
|
||||
try:
|
||||
_, _, _, _, sockaddr = socket.getaddrinfo(
|
||||
bind_ip, None, 0, socket.SOCK_STREAM, 0,
|
||||
ring_ip, None, 0, socket.SOCK_STREAM, 0,
|
||||
socket.AI_NUMERICHOST)[0]
|
||||
if sockaddr[0] not in ('0.0.0.0', '::'):
|
||||
return [bind_ip]
|
||||
return [ring_ip]
|
||||
except socket.gaierror:
|
||||
pass
|
||||
|
||||
|
@ -866,8 +866,11 @@ class ServersPerPortStrategy(StrategyBase):
|
||||
self.swift_dir = conf.get('swift_dir', '/etc/swift')
|
||||
self.ring_check_interval = float(conf.get('ring_check_interval', 15))
|
||||
|
||||
bind_ip = conf.get('bind_ip', '0.0.0.0')
|
||||
self.cache = BindPortsCache(self.swift_dir, bind_ip)
|
||||
# typically ring_ip will be the same as bind_ip, but in a container the
|
||||
# bind_ip might be differnt than the host ip address used to lookup
|
||||
# devices/ports in the ring
|
||||
ring_ip = conf.get('ring_ip', conf.get('bind_ip', '0.0.0.0'))
|
||||
self.cache = BindPortsCache(self.swift_dir, ring_ip)
|
||||
|
||||
def _reload_bind_ports(self):
|
||||
self.bind_ports = self.cache.all_bind_ports_for_node()
|
||||
|
@ -2146,7 +2146,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
||||
self._zero_stats()
|
||||
self._local_device_ids = set()
|
||||
dirs = []
|
||||
self.ips = whataremyips(bind_ip=self.bind_ip)
|
||||
self.ips = whataremyips(self.bind_ip)
|
||||
for node in self.ring.devs:
|
||||
device_path = self._check_node(node)
|
||||
if not device_path:
|
||||
|
@ -169,7 +169,7 @@ class ObjectReconstructor(Daemon):
|
||||
self.devices_dir = conf.get('devices', '/srv/node')
|
||||
self.mount_check = config_true_value(conf.get('mount_check', 'true'))
|
||||
self.swift_dir = conf.get('swift_dir', '/etc/swift')
|
||||
self.bind_ip = conf.get('bind_ip', '0.0.0.0')
|
||||
self.ring_ip = conf.get('ring_ip', conf.get('bind_ip', '0.0.0.0'))
|
||||
self.servers_per_port = int(conf.get('servers_per_port', '0') or 0)
|
||||
self.port = None if self.servers_per_port else \
|
||||
int(conf.get('bind_port', 6200))
|
||||
@ -1275,7 +1275,7 @@ class ObjectReconstructor(Daemon):
|
||||
return jobs
|
||||
|
||||
def get_policy2devices(self):
|
||||
ips = whataremyips(self.bind_ip)
|
||||
ips = whataremyips(self.ring_ip)
|
||||
policy2devices = {}
|
||||
for policy in self.policies:
|
||||
self.load_object_ring(policy)
|
||||
|
@ -134,7 +134,7 @@ class ObjectReplicator(Daemon):
|
||||
self.devices_dir = conf.get('devices', '/srv/node')
|
||||
self.mount_check = config_true_value(conf.get('mount_check', 'true'))
|
||||
self.swift_dir = conf.get('swift_dir', '/etc/swift')
|
||||
self.bind_ip = conf.get('bind_ip', '0.0.0.0')
|
||||
self.ring_ip = conf.get('ring_ip', conf.get('bind_ip', '0.0.0.0'))
|
||||
self.servers_per_port = int(conf.get('servers_per_port', '0') or 0)
|
||||
self.port = None if self.servers_per_port else \
|
||||
int(conf.get('bind_port', 6200))
|
||||
@ -307,7 +307,7 @@ class ObjectReplicator(Daemon):
|
||||
This is the device names, e.g. "sdq" or "d1234" or something, not
|
||||
the full ring entries.
|
||||
"""
|
||||
ips = whataremyips(self.bind_ip)
|
||||
ips = whataremyips(self.ring_ip)
|
||||
local_devices = set()
|
||||
for policy in self.policies:
|
||||
self.load_object_ring(policy)
|
||||
@ -901,7 +901,7 @@ class ObjectReplicator(Daemon):
|
||||
policies will be returned
|
||||
"""
|
||||
jobs = []
|
||||
ips = whataremyips(self.bind_ip)
|
||||
ips = whataremyips(self.ring_ip)
|
||||
for policy in self.policies:
|
||||
# Skip replication if next_part_power is set. In this case
|
||||
# every object is hard-linked twice, but the replicator can't
|
||||
|
@ -1604,6 +1604,35 @@ class TestServersPerPortStrategy(unittest.TestCase, CommonTestMixin):
|
||||
# This is one of the workers for port 6006 that already got reaped.
|
||||
self.assertIsNone(self.strategy.register_worker_exit(89))
|
||||
|
||||
def test_servers_per_port_in_container(self):
|
||||
# normally there's no configured ring_ip
|
||||
conf = {
|
||||
'bind_ip': '1.2.3.4',
|
||||
}
|
||||
self.strategy = wsgi.ServersPerPortStrategy(conf, self.logger, 1)
|
||||
self.assertEqual(self.mock_cache_class.call_args,
|
||||
mock.call('/etc/swift', '1.2.3.4'))
|
||||
self.assertEqual({6006, 6007},
|
||||
self.strategy.cache.all_bind_ports_for_node())
|
||||
ports = {item[1][0] for item in self.strategy.new_worker_socks()}
|
||||
self.assertEqual({6006, 6007}, ports)
|
||||
|
||||
# but in a container we can override it
|
||||
conf = {
|
||||
'bind_ip': '1.2.3.4',
|
||||
'ring_ip': '2.3.4.5'
|
||||
}
|
||||
self.strategy = wsgi.ServersPerPortStrategy(conf, self.logger, 1)
|
||||
# N.B. our fake BindPortsCache always returns {6006, 6007}, but a real
|
||||
# BindPortsCache would only return ports for devices that match the ip
|
||||
# address in the ring
|
||||
self.assertEqual(self.mock_cache_class.call_args,
|
||||
mock.call('/etc/swift', '2.3.4.5'))
|
||||
self.assertEqual({6006, 6007},
|
||||
self.strategy.cache.all_bind_ports_for_node())
|
||||
ports = {item[1][0] for item in self.strategy.new_worker_socks()}
|
||||
self.assertEqual({6006, 6007}, ports)
|
||||
|
||||
def test_shutdown_sockets(self):
|
||||
pid = 88
|
||||
for s, i in self.strategy.new_worker_socks():
|
||||
|
@ -271,7 +271,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
policy=policy, frag_index=scenarios[part_num](obj_set),
|
||||
timestamp=utils.Timestamp(t))
|
||||
|
||||
ips = utils.whataremyips(self.reconstructor.bind_ip)
|
||||
ips = utils.whataremyips(self.reconstructor.ring_ip)
|
||||
for policy in [p for p in POLICIES if p.policy_type == EC_POLICY]:
|
||||
self.ec_policy = policy
|
||||
self.ec_obj_ring = self.reconstructor.load_object_ring(
|
||||
@ -1286,7 +1286,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
# verify reconstructor only deletes reverted nondurable fragments older
|
||||
# commit_window
|
||||
shutil.rmtree(self.ec_obj_path)
|
||||
ips = utils.whataremyips(self.reconstructor.bind_ip)
|
||||
ips = utils.whataremyips(self.reconstructor.ring_ip)
|
||||
local_devs = [dev for dev in self.ec_obj_ring.devs
|
||||
if dev and dev['replication_ip'] in ips and
|
||||
dev['replication_port'] ==
|
||||
@ -1348,7 +1348,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
# visited by the reconstructor, despite having timestamp older than
|
||||
# reclaim_age
|
||||
shutil.rmtree(self.ec_obj_path)
|
||||
ips = utils.whataremyips(self.reconstructor.bind_ip)
|
||||
ips = utils.whataremyips(self.reconstructor.ring_ip)
|
||||
local_devs = [dev for dev in self.ec_obj_ring.devs
|
||||
if dev and dev['replication_ip'] in ips and
|
||||
dev['replication_port'] ==
|
||||
@ -1396,7 +1396,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
# reclaim_age and commit_window is zero; this test illustrates the
|
||||
# potential data loss bug that commit_window addresses
|
||||
shutil.rmtree(self.ec_obj_path)
|
||||
ips = utils.whataremyips(self.reconstructor.bind_ip)
|
||||
ips = utils.whataremyips(self.reconstructor.ring_ip)
|
||||
local_devs = [dev for dev in self.ec_obj_ring.devs
|
||||
if dev and dev['replication_ip'] in ips and
|
||||
dev['replication_port'] ==
|
||||
@ -1446,7 +1446,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
# survive being visited by the reconstructor if its timestamp is older
|
||||
# than reclaim_age
|
||||
shutil.rmtree(self.ec_obj_path)
|
||||
ips = utils.whataremyips(self.reconstructor.bind_ip)
|
||||
ips = utils.whataremyips(self.reconstructor.ring_ip)
|
||||
local_devs = [dev for dev in self.ec_obj_ring.devs
|
||||
if dev and dev['replication_ip'] in ips and
|
||||
dev['replication_port'] ==
|
||||
@ -1494,7 +1494,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
# verify reconstructor only deletes objects that were actually reverted
|
||||
# when ssync is limited by max_objects_per_revert
|
||||
shutil.rmtree(self.ec_obj_path)
|
||||
ips = utils.whataremyips(self.reconstructor.bind_ip)
|
||||
ips = utils.whataremyips(self.reconstructor.ring_ip)
|
||||
local_devs = [dev for dev in self.ec_obj_ring.devs
|
||||
if dev and dev['replication_ip'] in ips and
|
||||
dev['replication_port'] ==
|
||||
@ -3084,6 +3084,36 @@ class BaseTestObjectReconstructor(unittest.TestCase):
|
||||
|
||||
|
||||
class TestObjectReconstructor(BaseTestObjectReconstructor):
|
||||
def test_ring_ip_and_bind_ip(self):
|
||||
# make clean base_conf
|
||||
base_conf = dict(self.conf)
|
||||
for key in ('bind_ip', 'ring_ip'):
|
||||
base_conf.pop(key, None)
|
||||
|
||||
# default ring_ip is always 0.0.0.0
|
||||
self.conf = base_conf
|
||||
self._configure_reconstructor()
|
||||
self.assertEqual('0.0.0.0', self.reconstructor.ring_ip)
|
||||
|
||||
# bind_ip works fine for legacy configs
|
||||
self.conf = dict(base_conf)
|
||||
self.conf['bind_ip'] = '192.168.1.42'
|
||||
self._configure_reconstructor()
|
||||
self.assertEqual('192.168.1.42', self.reconstructor.ring_ip)
|
||||
|
||||
# ring_ip works fine by-itself
|
||||
self.conf = dict(base_conf)
|
||||
self.conf['ring_ip'] = '192.168.1.43'
|
||||
self._configure_reconstructor()
|
||||
self.assertEqual('192.168.1.43', self.reconstructor.ring_ip)
|
||||
|
||||
# if you have both ring_ip wins
|
||||
self.conf = dict(base_conf)
|
||||
self.conf['bind_ip'] = '192.168.1.44'
|
||||
self.conf['ring_ip'] = '192.168.1.45'
|
||||
self._configure_reconstructor()
|
||||
self.assertEqual('192.168.1.45', self.reconstructor.ring_ip)
|
||||
|
||||
def test_handoffs_only_default(self):
|
||||
# sanity neither option added to default conf
|
||||
self.conf.pop('handoffs_first', None)
|
||||
@ -3276,7 +3306,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
|
||||
'replication_ip': '127.0.0.88', # not local via IP
|
||||
'replication_port': self.port,
|
||||
})
|
||||
self.reconstructor.bind_ip = '0.0.0.0' # use whataremyips
|
||||
self.reconstructor.ring_ip = '0.0.0.0' # use whataremyips
|
||||
with mock.patch('swift.obj.reconstructor.whataremyips',
|
||||
return_value=[self.ip]), \
|
||||
mock.patch.object(self.policy.object_ring, '_devs',
|
||||
@ -3331,7 +3361,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
|
||||
'replication_ip': '127.0.0.88', # not local via IP
|
||||
'replication_port': self.port,
|
||||
})
|
||||
self.reconstructor.bind_ip = '0.0.0.0' # use whataremyips
|
||||
self.reconstructor.ring_ip = '0.0.0.0' # use whataremyips
|
||||
with mock.patch('swift.obj.reconstructor.whataremyips',
|
||||
return_value=[self.ip]), \
|
||||
mock.patch.object(self.policy.object_ring, '_devs',
|
||||
@ -3372,7 +3402,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
|
||||
'replication_ip': self.ip,
|
||||
'replication_port': self.port,
|
||||
} for i, dev in enumerate(local_devs)]
|
||||
self.reconstructor.bind_ip = '0.0.0.0' # use whataremyips
|
||||
self.reconstructor.ring_ip = '0.0.0.0' # use whataremyips
|
||||
with mock.patch('swift.obj.reconstructor.whataremyips',
|
||||
return_value=[self.ip]), \
|
||||
mock.patch.object(self.policy.object_ring, '_devs',
|
||||
|
@ -261,6 +261,36 @@ class TestObjectReplicator(unittest.TestCase):
|
||||
rmtree(self.testdir, ignore_errors=1)
|
||||
rmtree(self.recon_cache, ignore_errors=1)
|
||||
|
||||
def test_ring_ip_and_bind_ip(self):
|
||||
# make clean base_conf
|
||||
base_conf = dict(self.conf)
|
||||
for key in ('bind_ip', 'ring_ip'):
|
||||
base_conf.pop(key, None)
|
||||
|
||||
# default ring_ip is always 0.0.0.0
|
||||
self.conf = base_conf
|
||||
self._create_replicator()
|
||||
self.assertEqual('0.0.0.0', self.replicator.ring_ip)
|
||||
|
||||
# bind_ip works fine for legacy configs
|
||||
self.conf = dict(base_conf)
|
||||
self.conf['bind_ip'] = '192.168.1.42'
|
||||
self._create_replicator()
|
||||
self.assertEqual('192.168.1.42', self.replicator.ring_ip)
|
||||
|
||||
# ring_ip works fine by-itself
|
||||
self.conf = dict(base_conf)
|
||||
self.conf['ring_ip'] = '192.168.1.43'
|
||||
self._create_replicator()
|
||||
self.assertEqual('192.168.1.43', self.replicator.ring_ip)
|
||||
|
||||
# if you have both ring_ip wins
|
||||
self.conf = dict(base_conf)
|
||||
self.conf['bind_ip'] = '192.168.1.44'
|
||||
self.conf['ring_ip'] = '192.168.1.45'
|
||||
self._create_replicator()
|
||||
self.assertEqual('192.168.1.45', self.replicator.ring_ip)
|
||||
|
||||
def test_handoff_replication_setting_warnings(self):
|
||||
conf_tests = [
|
||||
# (config, expected_warning)
|
||||
|
Loading…
Reference in New Issue
Block a user