Merge "Allow 1+ object-servers-per-disk deployment"
This commit is contained in:
commit
0009a43eb4
@ -139,6 +139,72 @@ swift-ring-builder with no options will display help text with available
|
||||
commands and options. More information on how the ring works internally
|
||||
can be found in the :doc:`Ring Overview <overview_ring>`.
|
||||
|
||||
.. _server-per-port-configuration:
|
||||
|
||||
-------------------------------
|
||||
Running object-servers Per Disk
|
||||
-------------------------------
|
||||
|
||||
The lack of true asynchronous file I/O on Linux leaves the object-server
|
||||
workers vulnerable to misbehaving disks. Because any object-server worker can
|
||||
service a request for any disk, and a slow I/O request blocks the eventlet hub,
|
||||
a single slow disk can impair an entire storage node. This also prevents
|
||||
object servers from fully utilizing all their disks during heavy load.
|
||||
|
||||
The :ref:`threads_per_disk <object-server-options>` option was one way to
|
||||
address this, but came with severe performance overhead which was worse
|
||||
than the benefit of I/O isolation. Any clusters using threads_per_disk should
|
||||
switch to using `servers_per_port`.
|
||||
|
||||
Another way to get full I/O isolation is to give each disk on a storage node a
|
||||
different port in the storage policy rings. Then set the
|
||||
:ref:`servers_per_port <object-server-default-options>`
|
||||
option in the object-server config. NOTE: while the purpose of this config
|
||||
setting is to run one or more object-server worker processes per *disk*, the
|
||||
implementation just runs object-servers per unique port of local devices in the
|
||||
rings. The deployer must combine this option with appropriately-configured
|
||||
rings to benefit from this feature.
|
||||
|
||||
Here's an example (abbreviated) old-style ring (2 node cluster with 2 disks
|
||||
each)::
|
||||
|
||||
Devices: id region zone ip address port replication ip replication port name
|
||||
0 1 1 1.1.0.1 6000 1.1.0.1 6000 d1
|
||||
1 1 1 1.1.0.1 6000 1.1.0.1 6000 d2
|
||||
2 1 2 1.1.0.2 6000 1.1.0.2 6000 d3
|
||||
3 1 2 1.1.0.2 6000 1.1.0.2 6000 d4
|
||||
|
||||
And here's the same ring set up for `servers_per_port`::
|
||||
|
||||
Devices: id region zone ip address port replication ip replication port name
|
||||
0 1 1 1.1.0.1 6000 1.1.0.1 6000 d1
|
||||
1 1 1 1.1.0.1 6001 1.1.0.1 6001 d2
|
||||
2 1 2 1.1.0.2 6000 1.1.0.2 6000 d3
|
||||
3 1 2 1.1.0.2 6001 1.1.0.2 6001 d4
|
||||
|
||||
When migrating from normal to `servers_per_port`, perform these steps in order:
|
||||
|
||||
#. Upgrade Swift code to a version capable of doing `servers_per_port`.
|
||||
|
||||
#. Enable `servers_per_port` with a > 0 value
|
||||
|
||||
#. Restart `swift-object-server` processes with a SIGHUP. At this point, you
|
||||
will have the `servers_per_port` number of `swift-object-server` processes
|
||||
serving all requests for all disks on each node. This preserves
|
||||
availability, but you should perform the next step as quickly as possible.
|
||||
|
||||
#. Push out new rings that actually have different ports per disk on each
|
||||
server. One of the ports in the new ring should be the same as the port
|
||||
used in the old ring ("6000" in the example above). This will cover
|
||||
existing proxy-server processes who haven't loaded the new ring yet. They
|
||||
can still talk to any storage node regardless of whether or not that
|
||||
storage node has loaded the ring and started object-server processes on the
|
||||
new ports.
|
||||
|
||||
If you do not run a separate object-server for replication, then this setting
|
||||
must be available to the object-replicator and object-reconstructor (i.e.
|
||||
appear in the [DEFAULT] config section).
|
||||
|
||||
.. _general-service-configuration:
|
||||
|
||||
-----------------------------
|
||||
@ -149,14 +215,14 @@ Most Swift services fall into two categories. Swift's wsgi servers and
|
||||
background daemons.
|
||||
|
||||
For more information specific to the configuration of Swift's wsgi servers
|
||||
with paste deploy see :ref:`general-server-configuration`
|
||||
with paste deploy see :ref:`general-server-configuration`.
|
||||
|
||||
Configuration for servers and daemons can be expressed together in the same
|
||||
file for each type of server, or separately. If a required section for the
|
||||
service trying to start is missing there will be an error. The sections not
|
||||
used by the service are ignored.
|
||||
|
||||
Consider the example of an object storage node. By convention configuration
|
||||
Consider the example of an object storage node. By convention, configuration
|
||||
for the object-server, object-updater, object-replicator, and object-auditor
|
||||
exist in a single file ``/etc/swift/object-server.conf``::
|
||||
|
||||
@ -323,7 +389,7 @@ max_header_size 8192 max_header_size is the max number of bytes in
|
||||
tokens including more than 7 catalog entries.
|
||||
See also include_service_catalog in
|
||||
proxy-server.conf-sample (documented in
|
||||
overview_auth.rst)
|
||||
overview_auth.rst).
|
||||
=================== ========== =============================================
|
||||
|
||||
---------------------------
|
||||
@ -335,6 +401,8 @@ etc/object-server.conf-sample in the source code repository.
|
||||
|
||||
The following configuration options are available:
|
||||
|
||||
.. _object-server-default-options:
|
||||
|
||||
[DEFAULT]
|
||||
|
||||
=================== ========== =============================================
|
||||
@ -353,12 +421,30 @@ workers auto Override the number of pre-forked workers
|
||||
should be an integer, zero means no fork. If
|
||||
unset, it will try to default to the number
|
||||
of effective cpu cores and fallback to one.
|
||||
Increasing the number of workers may reduce
|
||||
the possibility of slow file system
|
||||
operations in one request from negatively
|
||||
impacting other requests, but may not be as
|
||||
efficient as tuning :ref:`threads_per_disk
|
||||
<object-server-options>`
|
||||
Increasing the number of workers helps slow
|
||||
filesystem operations in one request from
|
||||
negatively impacting other requests, but only
|
||||
the :ref:`servers_per_port
|
||||
<server-per-port-configuration>`
|
||||
option provides complete I/O isolation with
|
||||
no measurable overhead.
|
||||
servers_per_port 0 If each disk in each storage policy ring has
|
||||
unique port numbers for its "ip" value, you
|
||||
can use this setting to have each
|
||||
object-server worker only service requests
|
||||
for the single disk matching the port in the
|
||||
ring. The value of this setting determines
|
||||
how many worker processes run for each port
|
||||
(disk) in the ring. If you have 24 disks
|
||||
per server, and this setting is 4, then
|
||||
each storage node will have 1 + (24 * 4) =
|
||||
97 total object-server processes running.
|
||||
This gives complete I/O isolation, drastically
|
||||
reducing the impact of slow disks on storage
|
||||
node performance. The object-replicator and
|
||||
object-reconstructor need to see this setting
|
||||
too, so it must be in the [DEFAULT] section.
|
||||
See :ref:`server-per-port-configuration`.
|
||||
max_clients 1024 Maximum number of clients one worker can
|
||||
process simultaneously (it will actually
|
||||
accept(2) N + 1). Setting this to one (1)
|
||||
@ -421,13 +507,12 @@ keep_cache_private false Allow non-public objects to stay
|
||||
threads_per_disk 0 Size of the per-disk thread pool
|
||||
used for performing disk I/O. The
|
||||
default of 0 means to not use a
|
||||
per-disk thread pool. It is
|
||||
recommended to keep this value
|
||||
small, as large values can result
|
||||
in high read latencies due to
|
||||
large queue depths. A good
|
||||
starting point is 4 threads per
|
||||
disk.
|
||||
per-disk thread pool.
|
||||
This option is no longer
|
||||
recommended and the
|
||||
:ref:`servers_per_port
|
||||
<server-per-port-configuration>`
|
||||
should be used instead.
|
||||
replication_concurrency 4 Set to restrict the number of
|
||||
concurrent incoming REPLICATION
|
||||
requests; set to 0 for unlimited
|
||||
@ -562,7 +647,7 @@ workers auto Override the number of pre-forked workers
|
||||
the possibility of slow file system
|
||||
operations in one request from negatively
|
||||
impacting other requests. See
|
||||
:ref:`general-service-tuning`
|
||||
:ref:`general-service-tuning`.
|
||||
max_clients 1024 Maximum number of clients one worker can
|
||||
process simultaneously (it will actually
|
||||
accept(2) N + 1). Setting this to one (1)
|
||||
@ -690,7 +775,7 @@ workers auto Override the number of pre-forked workers
|
||||
the possibility of slow file system
|
||||
operations in one request from negatively
|
||||
impacting other requests. See
|
||||
:ref:`general-service-tuning`
|
||||
:ref:`general-service-tuning`.
|
||||
max_clients 1024 Maximum number of clients one worker can
|
||||
process simultaneously (it will actually
|
||||
accept(2) N + 1). Setting this to one (1)
|
||||
@ -813,7 +898,7 @@ workers auto Override the number of
|
||||
will try to default to the
|
||||
number of effective cpu cores
|
||||
and fallback to one. See
|
||||
:ref:`general-service-tuning`
|
||||
:ref:`general-service-tuning`.
|
||||
max_clients 1024 Maximum number of clients one
|
||||
worker can process
|
||||
simultaneously (it will
|
||||
|
@ -12,9 +12,16 @@ bind_port = 6000
|
||||
# expiring_objects_account_name = expiring_objects
|
||||
#
|
||||
# Use an integer to override the number of pre-forked processes that will
|
||||
# accept connections.
|
||||
# accept connections. NOTE: if servers_per_port is set, this setting is
|
||||
# ignored.
|
||||
# workers = auto
|
||||
#
|
||||
# Make object-server run this many worker processes per unique port of
|
||||
# "local" ring devices across all storage policies. This can help provide
|
||||
# the isolation of threads_per_disk without the severe overhead. The default
|
||||
# value of 0 disables this feature.
|
||||
# servers_per_port = 0
|
||||
#
|
||||
# Maximum concurrent requests per worker
|
||||
# max_clients = 1024
|
||||
#
|
||||
|
@ -69,7 +69,7 @@ class AccountReaper(Daemon):
|
||||
self.object_ring = None
|
||||
self.node_timeout = int(conf.get('node_timeout', 10))
|
||||
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
|
||||
self.myips = whataremyips()
|
||||
self.myips = whataremyips(conf.get('bind_ip', '0.0.0.0'))
|
||||
self.concurrency = int(conf.get('concurrency', 25))
|
||||
self.container_concurrency = self.object_concurrency = \
|
||||
sqrt(self.concurrency)
|
||||
|
@ -154,6 +154,7 @@ class Replicator(Daemon):
|
||||
self.logger = logger or get_logger(conf, log_route='replicator')
|
||||
self.root = conf.get('devices', '/srv/node')
|
||||
self.mount_check = config_true_value(conf.get('mount_check', 'true'))
|
||||
self.bind_ip = conf.get('bind_ip', '0.0.0.0')
|
||||
self.port = int(conf.get('bind_port', self.default_port))
|
||||
concurrency = int(conf.get('concurrency', 8))
|
||||
self.cpool = GreenPool(size=concurrency)
|
||||
@ -580,7 +581,7 @@ class Replicator(Daemon):
|
||||
"""Run a replication pass once."""
|
||||
self._zero_stats()
|
||||
dirs = []
|
||||
ips = whataremyips()
|
||||
ips = whataremyips(self.bind_ip)
|
||||
if not ips:
|
||||
self.logger.error(_('ERROR Failed to get my own IPs?'))
|
||||
return
|
||||
|
@ -44,10 +44,29 @@ class RingData(object):
|
||||
dev.setdefault("region", 1)
|
||||
|
||||
@classmethod
|
||||
def deserialize_v1(cls, gz_file):
|
||||
def deserialize_v1(cls, gz_file, metadata_only=False):
|
||||
"""
|
||||
Deserialize a v1 ring file into a dictionary with `devs`, `part_shift`,
|
||||
and `replica2part2dev_id` keys.
|
||||
|
||||
If the optional kwarg `metadata_only` is True, then the
|
||||
`replica2part2dev_id` is not loaded and that key in the returned
|
||||
dictionary just has the value `[]`.
|
||||
|
||||
:param file gz_file: An opened file-like object which has already
|
||||
consumed the 6 bytes of magic and version.
|
||||
:param bool metadata_only: If True, only load `devs` and `part_shift`
|
||||
:returns: A dict containing `devs`, `part_shift`, and
|
||||
`replica2part2dev_id`
|
||||
"""
|
||||
|
||||
json_len, = struct.unpack('!I', gz_file.read(4))
|
||||
ring_dict = json.loads(gz_file.read(json_len))
|
||||
ring_dict['replica2part2dev_id'] = []
|
||||
|
||||
if metadata_only:
|
||||
return ring_dict
|
||||
|
||||
partition_count = 1 << (32 - ring_dict['part_shift'])
|
||||
for x in xrange(ring_dict['replica_count']):
|
||||
ring_dict['replica2part2dev_id'].append(
|
||||
@ -55,11 +74,12 @@ class RingData(object):
|
||||
return ring_dict
|
||||
|
||||
@classmethod
|
||||
def load(cls, filename):
|
||||
def load(cls, filename, metadata_only=False):
|
||||
"""
|
||||
Load ring data from a file.
|
||||
|
||||
:param filename: Path to a file serialized by the save() method.
|
||||
:param bool metadata_only: If True, only load `devs` and `part_shift`.
|
||||
:returns: A RingData instance containing the loaded data.
|
||||
"""
|
||||
gz_file = GzipFile(filename, 'rb')
|
||||
@ -70,15 +90,18 @@ class RingData(object):
|
||||
# See if the file is in the new format
|
||||
magic = gz_file.read(4)
|
||||
if magic == 'R1NG':
|
||||
version, = struct.unpack('!H', gz_file.read(2))
|
||||
if version == 1:
|
||||
ring_data = cls.deserialize_v1(gz_file)
|
||||
format_version, = struct.unpack('!H', gz_file.read(2))
|
||||
if format_version == 1:
|
||||
ring_data = cls.deserialize_v1(
|
||||
gz_file, metadata_only=metadata_only)
|
||||
else:
|
||||
raise Exception('Unknown ring format version %d' % version)
|
||||
raise Exception('Unknown ring format version %d' %
|
||||
format_version)
|
||||
else:
|
||||
# Assume old-style pickled ring
|
||||
gz_file.seek(0)
|
||||
ring_data = pickle.load(gz_file)
|
||||
|
||||
if not hasattr(ring_data, 'devs'):
|
||||
ring_data = RingData(ring_data['replica2part2dev_id'],
|
||||
ring_data['devs'], ring_data['part_shift'])
|
||||
|
@ -235,9 +235,14 @@ def is_local_device(my_ips, my_port, dev_ip, dev_port):
|
||||
Return True if the provided dev_ip and dev_port are among the IP
|
||||
addresses specified in my_ips and my_port respectively.
|
||||
|
||||
To support accurate locality determination in the server-per-port
|
||||
deployment, when my_port is None, only IP addresses are used for
|
||||
determining locality (dev_port is ignored).
|
||||
|
||||
If dev_ip is a hostname then it is first translated to an IP
|
||||
address before checking it against my_ips.
|
||||
"""
|
||||
candidate_ips = []
|
||||
if not is_valid_ip(dev_ip) and is_valid_hostname(dev_ip):
|
||||
try:
|
||||
# get the ip for this host; use getaddrinfo so that
|
||||
@ -248,12 +253,19 @@ def is_local_device(my_ips, my_port, dev_ip, dev_port):
|
||||
dev_ip = addr[4][0] # get the ip-address
|
||||
if family == socket.AF_INET6:
|
||||
dev_ip = expand_ipv6(dev_ip)
|
||||
if dev_ip in my_ips and dev_port == my_port:
|
||||
return True
|
||||
return False
|
||||
candidate_ips.append(dev_ip)
|
||||
except socket.gaierror:
|
||||
return False
|
||||
return dev_ip in my_ips and dev_port == my_port
|
||||
else:
|
||||
if is_valid_ipv6(dev_ip):
|
||||
dev_ip = expand_ipv6(dev_ip)
|
||||
candidate_ips = [dev_ip]
|
||||
|
||||
for dev_ip in candidate_ips:
|
||||
if dev_ip in my_ips and (my_port is None or dev_port == my_port):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def parse_search_value(search_value):
|
||||
|
@ -12,11 +12,13 @@
|
||||
# limitations under the License.
|
||||
|
||||
from ConfigParser import ConfigParser
|
||||
import textwrap
|
||||
import os
|
||||
import string
|
||||
import textwrap
|
||||
|
||||
from swift.common.utils import config_true_value, SWIFT_CONF_FILE
|
||||
from swift.common.ring import Ring
|
||||
from swift.common.utils import (
|
||||
config_true_value, SWIFT_CONF_FILE, whataremyips)
|
||||
from swift.common.ring import Ring, RingData
|
||||
from swift.common.utils import quorum_size
|
||||
from swift.common.exceptions import RingValidationError
|
||||
from pyeclib.ec_iface import ECDriver, ECDriverError, VALID_EC_TYPES
|
||||
@ -30,6 +32,53 @@ EC_POLICY = 'erasure_coding'
|
||||
DEFAULT_EC_OBJECT_SEGMENT_SIZE = 1048576
|
||||
|
||||
|
||||
class BindPortsCache(object):
|
||||
def __init__(self, swift_dir, bind_ip):
|
||||
self.swift_dir = swift_dir
|
||||
self.mtimes_by_ring_path = {}
|
||||
self.portsets_by_ring_path = {}
|
||||
self.my_ips = set(whataremyips(bind_ip))
|
||||
|
||||
def all_bind_ports_for_node(self):
|
||||
"""
|
||||
Given an iterable of IP addresses identifying a storage backend server,
|
||||
return a set of all bind ports defined in all rings for this storage
|
||||
backend server.
|
||||
|
||||
The caller is responsible for not calling this method (which performs
|
||||
at least a stat on all ring files) too frequently.
|
||||
"""
|
||||
# NOTE: we don't worry about disappearing rings here because you can't
|
||||
# ever delete a storage policy.
|
||||
|
||||
for policy in POLICIES:
|
||||
# NOTE: we must NOT use policy.load_ring to load the ring. Users
|
||||
# of this utility function will not need the actual ring data, just
|
||||
# the bind ports.
|
||||
#
|
||||
# This is duplicated with Ring.__init__ just a bit...
|
||||
serialized_path = os.path.join(self.swift_dir,
|
||||
policy.ring_name + '.ring.gz')
|
||||
try:
|
||||
new_mtime = os.path.getmtime(serialized_path)
|
||||
except OSError:
|
||||
continue
|
||||
old_mtime = self.mtimes_by_ring_path.get(serialized_path)
|
||||
if not old_mtime or old_mtime != new_mtime:
|
||||
self.portsets_by_ring_path[serialized_path] = set(
|
||||
dev['port']
|
||||
for dev in RingData.load(serialized_path,
|
||||
metadata_only=True).devs
|
||||
if dev and dev['ip'] in self.my_ips)
|
||||
self.mtimes_by_ring_path[serialized_path] = new_mtime
|
||||
# No "break" here so that the above line will update the
|
||||
# mtimes_by_ring_path entry for any ring that changes, not just
|
||||
# the first one we notice.
|
||||
|
||||
# Return the requested set of ports from our (now-freshened) cache
|
||||
return reduce(set.union, self.portsets_by_ring_path.values(), set())
|
||||
|
||||
|
||||
class PolicyError(ValueError):
|
||||
|
||||
def __init__(self, msg, index=None):
|
||||
@ -291,7 +340,7 @@ class ECStoragePolicy(BaseStoragePolicy):
|
||||
if ec_type not in VALID_EC_TYPES:
|
||||
raise PolicyError('Wrong ec_type %s for policy %s, should be one'
|
||||
' of "%s"' % (ec_type, self.name,
|
||||
', '.join(VALID_EC_TYPES)))
|
||||
', '.join(VALID_EC_TYPES)))
|
||||
self._ec_type = ec_type
|
||||
|
||||
# Define _ec_ndata as the number of EC data fragments
|
||||
@ -427,8 +476,9 @@ class ECStoragePolicy(BaseStoragePolicy):
|
||||
if nodes_configured != (self.ec_ndata + self.ec_nparity):
|
||||
raise RingValidationError(
|
||||
'EC ring for policy %s needs to be configured with '
|
||||
'exactly %d nodes. Got %d.' % (self.name,
|
||||
self.ec_ndata + self.ec_nparity, nodes_configured))
|
||||
'exactly %d nodes. Got %d.' % (
|
||||
self.name, self.ec_ndata + self.ec_nparity,
|
||||
nodes_configured))
|
||||
|
||||
@property
|
||||
def quorum(self):
|
||||
|
@ -1589,7 +1589,7 @@ def get_hub():
|
||||
return None
|
||||
|
||||
|
||||
def drop_privileges(user):
|
||||
def drop_privileges(user, call_setsid=True):
|
||||
"""
|
||||
Sets the userid/groupid of the current process, get session leader, etc.
|
||||
|
||||
@ -1602,10 +1602,11 @@ def drop_privileges(user):
|
||||
os.setgid(user[3])
|
||||
os.setuid(user[2])
|
||||
os.environ['HOME'] = user[5]
|
||||
try:
|
||||
os.setsid()
|
||||
except OSError:
|
||||
pass
|
||||
if call_setsid:
|
||||
try:
|
||||
os.setsid()
|
||||
except OSError:
|
||||
pass
|
||||
os.chdir('/') # in case you need to rmdir on where you started the daemon
|
||||
os.umask(0o22) # ensure files are created with the correct privileges
|
||||
|
||||
@ -1706,12 +1707,28 @@ def expand_ipv6(address):
|
||||
return socket.inet_ntop(socket.AF_INET6, packed_ip)
|
||||
|
||||
|
||||
def whataremyips():
|
||||
def whataremyips(bind_ip=None):
|
||||
"""
|
||||
Get the machine's ip addresses
|
||||
Get "our" IP addresses ("us" being the set of services configured by
|
||||
one *.conf file). If our REST listens on a specific address, return it.
|
||||
Otherwise, if listen on '0.0.0.0' or '::' return all addresses, including
|
||||
the loopback.
|
||||
|
||||
:param str bind_ip: Optional bind_ip from a config file; may be IP address
|
||||
or hostname.
|
||||
:returns: list of Strings of ip addresses
|
||||
"""
|
||||
if bind_ip:
|
||||
# See if bind_ip is '0.0.0.0'/'::'
|
||||
try:
|
||||
_, _, _, _, sockaddr = socket.getaddrinfo(
|
||||
bind_ip, None, 0, socket.SOCK_STREAM, 0,
|
||||
socket.AI_NUMERICHOST)[0]
|
||||
if sockaddr[0] not in ('0.0.0.0', '::'):
|
||||
return [bind_ip]
|
||||
except socket.gaierror:
|
||||
pass
|
||||
|
||||
addresses = []
|
||||
for interface in netifaces.interfaces():
|
||||
try:
|
||||
|
@ -29,12 +29,13 @@ from textwrap import dedent
|
||||
|
||||
import eventlet
|
||||
import eventlet.debug
|
||||
from eventlet import greenio, GreenPool, sleep, wsgi, listen
|
||||
from eventlet import greenio, GreenPool, sleep, wsgi, listen, Timeout
|
||||
from paste.deploy import loadwsgi
|
||||
from eventlet.green import socket, ssl
|
||||
from eventlet.green import socket, ssl, os as green_os
|
||||
from urllib import unquote
|
||||
|
||||
from swift.common import utils, constraints
|
||||
from swift.common.storage_policy import BindPortsCache
|
||||
from swift.common.swob import Request
|
||||
from swift.common.utils import capture_stdio, disable_fallocate, \
|
||||
drop_privileges, get_logger, NullLogger, config_true_value, \
|
||||
@ -437,10 +438,414 @@ def run_server(conf, logger, sock, global_conf=None):
|
||||
pool.waitall()
|
||||
|
||||
|
||||
#TODO(clayg): pull more pieces of this to test more
|
||||
class WorkersStrategy(object):
|
||||
"""
|
||||
WSGI server management strategy object for a single bind port and listen
|
||||
socket shared by a configured number of forked-off workers.
|
||||
|
||||
Used in :py:func:`run_wsgi`.
|
||||
|
||||
:param dict conf: Server configuration dictionary.
|
||||
:param logger: The server's :py:class:`~swift.common.utils.LogAdaptor`
|
||||
object.
|
||||
"""
|
||||
|
||||
def __init__(self, conf, logger):
|
||||
self.conf = conf
|
||||
self.logger = logger
|
||||
self.sock = None
|
||||
self.children = []
|
||||
self.worker_count = config_auto_int_value(conf.get('workers'),
|
||||
CPU_COUNT)
|
||||
|
||||
def loop_timeout(self):
|
||||
"""
|
||||
:returns: None; to block in :py:func:`green.os.wait`
|
||||
"""
|
||||
|
||||
return None
|
||||
|
||||
def bind_ports(self):
|
||||
"""
|
||||
Bind the one listen socket for this strategy and drop privileges
|
||||
(since the parent process will never need to bind again).
|
||||
"""
|
||||
|
||||
try:
|
||||
self.sock = get_socket(self.conf)
|
||||
except ConfigFilePortError:
|
||||
msg = 'bind_port wasn\'t properly set in the config file. ' \
|
||||
'It must be explicitly set to a valid port number.'
|
||||
return msg
|
||||
drop_privileges(self.conf.get('user', 'swift'))
|
||||
|
||||
def no_fork_sock(self):
|
||||
"""
|
||||
Return a server listen socket if the server should run in the
|
||||
foreground (no fork).
|
||||
"""
|
||||
|
||||
# Useful for profiling [no forks].
|
||||
if self.worker_count == 0:
|
||||
return self.sock
|
||||
|
||||
def new_worker_socks(self):
|
||||
"""
|
||||
Yield a sequence of (socket, opqaue_data) tuples for each server which
|
||||
should be forked-off and started.
|
||||
|
||||
The opaque_data item for each socket will passed into the
|
||||
:py:meth:`log_sock_exit` and :py:meth:`register_worker_start` methods
|
||||
where it will be ignored.
|
||||
"""
|
||||
|
||||
while len(self.children) < self.worker_count:
|
||||
yield self.sock, None
|
||||
|
||||
def post_fork_hook(self):
|
||||
"""
|
||||
Perform any initialization in a forked-off child process prior to
|
||||
starting the wsgi server.
|
||||
"""
|
||||
|
||||
pass
|
||||
|
||||
def log_sock_exit(self, sock, _unused):
|
||||
"""
|
||||
Log a server's exit.
|
||||
|
||||
:param socket sock: The listen socket for the worker just started.
|
||||
:param _unused: The socket's opaque_data yielded by
|
||||
:py:meth:`new_worker_socks`.
|
||||
"""
|
||||
|
||||
self.logger.notice('Child %d exiting normally' % os.getpid())
|
||||
|
||||
def register_worker_start(self, sock, _unused, pid):
|
||||
"""
|
||||
Called when a new worker is started.
|
||||
|
||||
:param socket sock: The listen socket for the worker just started.
|
||||
:param _unused: The socket's opaque_data yielded by new_worker_socks().
|
||||
:param int pid: The new worker process' PID
|
||||
"""
|
||||
|
||||
self.logger.notice('Started child %s' % pid)
|
||||
self.children.append(pid)
|
||||
|
||||
def register_worker_exit(self, pid):
|
||||
"""
|
||||
Called when a worker has exited.
|
||||
|
||||
:param int pid: The PID of the worker that exited.
|
||||
"""
|
||||
|
||||
self.logger.error('Removing dead child %s' % pid)
|
||||
self.children.remove(pid)
|
||||
|
||||
def shutdown_sockets(self):
|
||||
"""
|
||||
Shutdown any listen sockets.
|
||||
"""
|
||||
|
||||
greenio.shutdown_safe(self.sock)
|
||||
self.sock.close()
|
||||
|
||||
|
||||
class PortPidState(object):
|
||||
"""
|
||||
A helper class for :py:class:`ServersPerPortStrategy` to track listen
|
||||
sockets and PIDs for each port.
|
||||
|
||||
:param int servers_per_port: The configured number of servers per port.
|
||||
:param logger: The server's :py:class:`~swift.common.utils.LogAdaptor`
|
||||
"""
|
||||
|
||||
def __init__(self, servers_per_port, logger):
|
||||
self.servers_per_port = servers_per_port
|
||||
self.logger = logger
|
||||
self.sock_data_by_port = {}
|
||||
|
||||
def sock_for_port(self, port):
|
||||
"""
|
||||
:param int port: The port whose socket is desired.
|
||||
:returns: The bound listen socket for the given port.
|
||||
"""
|
||||
|
||||
return self.sock_data_by_port[port]['sock']
|
||||
|
||||
def port_for_sock(self, sock):
|
||||
"""
|
||||
:param socket sock: A tracked bound listen socket
|
||||
:returns: The port the socket is bound to.
|
||||
"""
|
||||
|
||||
for port, sock_data in self.sock_data_by_port.iteritems():
|
||||
if sock_data['sock'] == sock:
|
||||
return port
|
||||
|
||||
def _pid_to_port_and_index(self, pid):
|
||||
for port, sock_data in self.sock_data_by_port.iteritems():
|
||||
for server_idx, a_pid in enumerate(sock_data['pids']):
|
||||
if pid == a_pid:
|
||||
return port, server_idx
|
||||
|
||||
def port_index_pairs(self):
|
||||
"""
|
||||
:returns: A set of (port, server_idx) tuples for currently-tracked
|
||||
ports, sockets, and PIDs.
|
||||
"""
|
||||
|
||||
current_port_index_pairs = set()
|
||||
for port, pid_state in self.sock_data_by_port.iteritems():
|
||||
current_port_index_pairs |= set(
|
||||
(port, i)
|
||||
for i, pid in enumerate(pid_state['pids'])
|
||||
if pid is not None)
|
||||
return current_port_index_pairs
|
||||
|
||||
def track_port(self, port, sock):
|
||||
"""
|
||||
Start tracking servers for the given port and listen socket.
|
||||
|
||||
:param int port: The port to start tracking
|
||||
:param socket sock: The bound listen socket for the port.
|
||||
"""
|
||||
|
||||
self.sock_data_by_port[port] = {
|
||||
'sock': sock,
|
||||
'pids': [None] * self.servers_per_port,
|
||||
}
|
||||
|
||||
def not_tracking(self, port):
|
||||
"""
|
||||
Return True if the specified port is not being tracked.
|
||||
|
||||
:param int port: A port to check.
|
||||
"""
|
||||
|
||||
return port not in self.sock_data_by_port
|
||||
|
||||
def all_socks(self):
|
||||
"""
|
||||
Yield all current listen sockets.
|
||||
"""
|
||||
|
||||
for orphan_data in self.sock_data_by_port.itervalues():
|
||||
yield orphan_data['sock']
|
||||
|
||||
def forget_port(self, port):
|
||||
"""
|
||||
Idempotently forget a port, closing the listen socket at most once.
|
||||
"""
|
||||
|
||||
orphan_data = self.sock_data_by_port.pop(port, None)
|
||||
if orphan_data:
|
||||
greenio.shutdown_safe(orphan_data['sock'])
|
||||
orphan_data['sock'].close()
|
||||
self.logger.notice('Closing unnecessary sock for port %d', port)
|
||||
|
||||
def add_pid(self, port, index, pid):
|
||||
self.sock_data_by_port[port]['pids'][index] = pid
|
||||
|
||||
def forget_pid(self, pid):
|
||||
"""
|
||||
Idempotently forget a PID. It's okay if the PID is no longer in our
|
||||
data structure (it could have been removed by the "orphan port" removal
|
||||
in :py:meth:`new_worker_socks`).
|
||||
|
||||
:param int pid: The PID which exited.
|
||||
"""
|
||||
|
||||
port_server_idx = self._pid_to_port_and_index(pid)
|
||||
if port_server_idx is None:
|
||||
# This method can lose a race with the "orphan port" removal, when
|
||||
# a ring reload no longer contains a port. So it's okay if we were
|
||||
# unable to find a (port, server_idx) pair.
|
||||
return
|
||||
dead_port, server_idx = port_server_idx
|
||||
self.logger.error('Removing dead child %d (PID: %s) for port %s',
|
||||
server_idx, pid, dead_port)
|
||||
self.sock_data_by_port[dead_port]['pids'][server_idx] = None
|
||||
|
||||
|
||||
class ServersPerPortStrategy(object):
|
||||
"""
|
||||
WSGI server management strategy object for an object-server with one listen
|
||||
port per unique local port in the storage policy rings. The
|
||||
`servers_per_port` integer config setting determines how many workers are
|
||||
run per port.
|
||||
|
||||
Used in :py:func:`run_wsgi`.
|
||||
|
||||
:param dict conf: Server configuration dictionary.
|
||||
:param logger: The server's :py:class:`~swift.common.utils.LogAdaptor`
|
||||
object.
|
||||
:param int servers_per_port: The number of workers to run per port.
|
||||
"""
|
||||
|
||||
def __init__(self, conf, logger, servers_per_port):
|
||||
self.conf = conf
|
||||
self.logger = logger
|
||||
self.servers_per_port = servers_per_port
|
||||
self.swift_dir = conf.get('swift_dir', '/etc/swift')
|
||||
self.ring_check_interval = int(conf.get('ring_check_interval', 15))
|
||||
self.port_pid_state = PortPidState(servers_per_port, logger)
|
||||
|
||||
bind_ip = conf.get('bind_ip', '0.0.0.0')
|
||||
self.cache = BindPortsCache(self.swift_dir, bind_ip)
|
||||
|
||||
def _reload_bind_ports(self):
|
||||
self.bind_ports = self.cache.all_bind_ports_for_node()
|
||||
|
||||
def _bind_port(self, port):
|
||||
new_conf = self.conf.copy()
|
||||
new_conf['bind_port'] = port
|
||||
sock = get_socket(new_conf)
|
||||
self.port_pid_state.track_port(port, sock)
|
||||
|
||||
def loop_timeout(self):
|
||||
"""
|
||||
:returns: The time to wait for a child to exit before checking for
|
||||
reloaded rings (new ports).
|
||||
"""
|
||||
|
||||
return self.ring_check_interval
|
||||
|
||||
def bind_ports(self):
|
||||
"""
|
||||
Bind one listen socket per unique local storage policy ring port. Then
|
||||
do all the work of drop_privileges except the actual dropping of
|
||||
privileges (each forked-off worker will do that post-fork in
|
||||
:py:meth:`post_fork_hook`).
|
||||
"""
|
||||
|
||||
self._reload_bind_ports()
|
||||
for port in self.bind_ports:
|
||||
self._bind_port(port)
|
||||
|
||||
# The workers strategy drops privileges here, which we obviously cannot
|
||||
# do if we want to support binding to low ports. But we do want some
|
||||
# of the actions that drop_privileges did.
|
||||
try:
|
||||
os.setsid()
|
||||
except OSError:
|
||||
pass
|
||||
# In case you need to rmdir where you started the daemon:
|
||||
os.chdir('/')
|
||||
# Ensure files are created with the correct privileges:
|
||||
os.umask(0o22)
|
||||
|
||||
def no_fork_sock(self):
|
||||
"""
|
||||
This strategy does not support running in the foreground.
|
||||
"""
|
||||
|
||||
pass
|
||||
|
||||
def new_worker_socks(self):
|
||||
"""
|
||||
Yield a sequence of (socket, server_idx) tuples for each server which
|
||||
should be forked-off and started.
|
||||
|
||||
Any sockets for "orphaned" ports no longer in any ring will be closed
|
||||
(causing their associated workers to gracefully exit) after all new
|
||||
sockets have been yielded.
|
||||
|
||||
The server_idx item for each socket will passed into the
|
||||
:py:meth:`log_sock_exit` and :py:meth:`register_worker_start` methods.
|
||||
"""
|
||||
|
||||
self._reload_bind_ports()
|
||||
desired_port_index_pairs = set(
|
||||
(p, i) for p in self.bind_ports
|
||||
for i in range(self.servers_per_port))
|
||||
|
||||
current_port_index_pairs = self.port_pid_state.port_index_pairs()
|
||||
|
||||
if desired_port_index_pairs != current_port_index_pairs:
|
||||
# Orphan ports are ports which had object-server processes running,
|
||||
# but which no longer appear in the ring. We'll kill them after we
|
||||
# start missing workers.
|
||||
orphan_port_index_pairs = current_port_index_pairs - \
|
||||
desired_port_index_pairs
|
||||
|
||||
# Fork off worker(s) for every port who's supposed to have
|
||||
# worker(s) but doesn't
|
||||
missing_port_index_pairs = desired_port_index_pairs - \
|
||||
current_port_index_pairs
|
||||
for port, server_idx in sorted(missing_port_index_pairs):
|
||||
if self.port_pid_state.not_tracking(port):
|
||||
try:
|
||||
self._bind_port(port)
|
||||
except Exception as e:
|
||||
self.logger.critical('Unable to bind to port %d: %s',
|
||||
port, e)
|
||||
continue
|
||||
yield self.port_pid_state.sock_for_port(port), server_idx
|
||||
|
||||
for orphan_pair in orphan_port_index_pairs:
|
||||
# For any port in orphan_port_index_pairs, it is guaranteed
|
||||
# that there should be no listen socket for that port, so we
|
||||
# can close and forget them.
|
||||
self.port_pid_state.forget_port(orphan_pair[0])
|
||||
|
||||
def post_fork_hook(self):
|
||||
"""
|
||||
Called in each child process, prior to starting the actual wsgi server,
|
||||
to drop privileges.
|
||||
"""
|
||||
|
||||
drop_privileges(self.conf.get('user', 'swift'), call_setsid=False)
|
||||
|
||||
def log_sock_exit(self, sock, server_idx):
|
||||
"""
|
||||
Log a server's exit.
|
||||
"""
|
||||
|
||||
port = self.port_pid_state.port_for_sock(sock)
|
||||
self.logger.notice('Child %d (PID %d, port %d) exiting normally',
|
||||
server_idx, os.getpid(), port)
|
||||
|
||||
def register_worker_start(self, sock, server_idx, pid):
|
||||
"""
|
||||
Called when a new worker is started.
|
||||
|
||||
:param socket sock: The listen socket for the worker just started.
|
||||
:param server_idx: The socket's server_idx as yielded by
|
||||
:py:meth:`new_worker_socks`.
|
||||
:param int pid: The new worker process' PID
|
||||
"""
|
||||
port = self.port_pid_state.port_for_sock(sock)
|
||||
self.logger.notice('Started child %d (PID %d) for port %d',
|
||||
server_idx, pid, port)
|
||||
self.port_pid_state.add_pid(port, server_idx, pid)
|
||||
|
||||
def register_worker_exit(self, pid):
|
||||
"""
|
||||
Called when a worker has exited.
|
||||
|
||||
:param int pid: The PID of the worker that exited.
|
||||
"""
|
||||
|
||||
self.port_pid_state.forget_pid(pid)
|
||||
|
||||
def shutdown_sockets(self):
|
||||
"""
|
||||
Shutdown any listen sockets.
|
||||
"""
|
||||
|
||||
for sock in self.port_pid_state.all_socks():
|
||||
greenio.shutdown_safe(sock)
|
||||
sock.close()
|
||||
|
||||
|
||||
def run_wsgi(conf_path, app_section, *args, **kwargs):
|
||||
"""
|
||||
Runs the server using the specified number of workers.
|
||||
Runs the server according to some strategy. The default strategy runs a
|
||||
specified number of workers in pre-fork model. The object-server (only)
|
||||
may use a servers-per-port strategy if its config has a servers_per_port
|
||||
setting with a value greater than zero.
|
||||
|
||||
:param conf_path: Path to paste.deploy style configuration file/directory
|
||||
:param app_section: App name from conf file to load config from
|
||||
@ -454,17 +859,22 @@ def run_wsgi(conf_path, app_section, *args, **kwargs):
|
||||
print(e)
|
||||
return 1
|
||||
|
||||
# bind to address and port
|
||||
try:
|
||||
sock = get_socket(conf)
|
||||
except ConfigFilePortError:
|
||||
msg = 'bind_port wasn\'t properly set in the config file. ' \
|
||||
'It must be explicitly set to a valid port number.'
|
||||
logger.error(msg)
|
||||
print(msg)
|
||||
servers_per_port = int(conf.get('servers_per_port', '0') or 0)
|
||||
|
||||
# NOTE: for now servers_per_port is object-server-only; future work could
|
||||
# be done to test and allow it to be used for account and container
|
||||
# servers, but that has not been done yet.
|
||||
if servers_per_port and app_section == 'object-server':
|
||||
strategy = ServersPerPortStrategy(
|
||||
conf, logger, servers_per_port=servers_per_port)
|
||||
else:
|
||||
strategy = WorkersStrategy(conf, logger)
|
||||
|
||||
error_msg = strategy.bind_ports()
|
||||
if error_msg:
|
||||
logger.error(error_msg)
|
||||
print(error_msg)
|
||||
return 1
|
||||
# remaining tasks should not require elevated privileges
|
||||
drop_privileges(conf.get('user', 'swift'))
|
||||
|
||||
# Ensure the configuration and application can be loaded before proceeding.
|
||||
global_conf = {'log_name': log_name}
|
||||
@ -479,11 +889,9 @@ def run_wsgi(conf_path, app_section, *args, **kwargs):
|
||||
# redirect errors to logger and close stdio
|
||||
capture_stdio(logger)
|
||||
|
||||
worker_count = config_auto_int_value(conf.get('workers'), CPU_COUNT)
|
||||
|
||||
# Useful for profiling [no forks].
|
||||
if worker_count == 0:
|
||||
run_server(conf, logger, sock, global_conf=global_conf)
|
||||
no_fork_sock = strategy.no_fork_sock()
|
||||
if no_fork_sock:
|
||||
run_server(conf, logger, no_fork_sock, global_conf=global_conf)
|
||||
return 0
|
||||
|
||||
def kill_children(*args):
|
||||
@ -502,32 +910,42 @@ def run_wsgi(conf_path, app_section, *args, **kwargs):
|
||||
running = [True]
|
||||
signal.signal(signal.SIGTERM, kill_children)
|
||||
signal.signal(signal.SIGHUP, hup)
|
||||
children = []
|
||||
|
||||
while running[0]:
|
||||
while len(children) < worker_count:
|
||||
for sock, sock_info in strategy.new_worker_socks():
|
||||
pid = os.fork()
|
||||
if pid == 0:
|
||||
signal.signal(signal.SIGHUP, signal.SIG_DFL)
|
||||
signal.signal(signal.SIGTERM, signal.SIG_DFL)
|
||||
strategy.post_fork_hook()
|
||||
run_server(conf, logger, sock)
|
||||
logger.notice('Child %d exiting normally' % os.getpid())
|
||||
strategy.log_sock_exit(sock, sock_info)
|
||||
return 0
|
||||
else:
|
||||
logger.notice('Started child %s' % pid)
|
||||
children.append(pid)
|
||||
try:
|
||||
pid, status = os.wait()
|
||||
if os.WIFEXITED(status) or os.WIFSIGNALED(status):
|
||||
logger.error('Removing dead child %s' % pid)
|
||||
children.remove(pid)
|
||||
except OSError as err:
|
||||
if err.errno not in (errno.EINTR, errno.ECHILD):
|
||||
raise
|
||||
except KeyboardInterrupt:
|
||||
logger.notice('User quit')
|
||||
break
|
||||
greenio.shutdown_safe(sock)
|
||||
sock.close()
|
||||
strategy.register_worker_start(sock, sock_info, pid)
|
||||
|
||||
# The strategy may need to pay attention to something in addition to
|
||||
# child process exits (like new ports showing up in a ring).
|
||||
#
|
||||
# NOTE: a timeout value of None will just instantiate the Timeout
|
||||
# object and not actually schedule it, which is equivalent to no
|
||||
# timeout for the green_os.wait().
|
||||
loop_timeout = strategy.loop_timeout()
|
||||
|
||||
with Timeout(loop_timeout, exception=False):
|
||||
try:
|
||||
pid, status = green_os.wait()
|
||||
if os.WIFEXITED(status) or os.WIFSIGNALED(status):
|
||||
strategy.register_worker_exit(pid)
|
||||
except OSError as err:
|
||||
if err.errno not in (errno.EINTR, errno.ECHILD):
|
||||
raise
|
||||
except KeyboardInterrupt:
|
||||
logger.notice('User quit')
|
||||
running[0] = False
|
||||
break
|
||||
|
||||
strategy.shutdown_sockets()
|
||||
logger.notice('Exited')
|
||||
return 0
|
||||
|
||||
|
@ -204,7 +204,8 @@ class ContainerSync(Daemon):
|
||||
#: swift.common.ring.Ring for locating containers.
|
||||
self.container_ring = container_ring or Ring(self.swift_dir,
|
||||
ring_name='container')
|
||||
self._myips = whataremyips()
|
||||
bind_ip = conf.get('bind_ip', '0.0.0.0')
|
||||
self._myips = whataremyips(bind_ip)
|
||||
self._myport = int(conf.get('bind_port', 6001))
|
||||
swift.common.db.DB_PREALLOCATION = \
|
||||
config_true_value(conf.get('db_preallocation', 'f'))
|
||||
|
@ -119,7 +119,10 @@ class ObjectReconstructor(Daemon):
|
||||
self.devices_dir = conf.get('devices', '/srv/node')
|
||||
self.mount_check = config_true_value(conf.get('mount_check', 'true'))
|
||||
self.swift_dir = conf.get('swift_dir', '/etc/swift')
|
||||
self.port = int(conf.get('bind_port', 6000))
|
||||
self.bind_ip = conf.get('bind_ip', '0.0.0.0')
|
||||
self.servers_per_port = int(conf.get('servers_per_port', '0') or 0)
|
||||
self.port = None if self.servers_per_port else \
|
||||
int(conf.get('bind_port', 6000))
|
||||
self.concurrency = int(conf.get('concurrency', 1))
|
||||
self.stats_interval = int(conf.get('stats_interval', '300'))
|
||||
self.ring_check_interval = int(conf.get('ring_check_interval', 15))
|
||||
@ -764,7 +767,7 @@ class ObjectReconstructor(Daemon):
|
||||
"""
|
||||
override_devices = override_devices or []
|
||||
override_partitions = override_partitions or []
|
||||
ips = whataremyips()
|
||||
ips = whataremyips(self.bind_ip)
|
||||
for policy in POLICIES:
|
||||
if policy.policy_type != EC_POLICY:
|
||||
continue
|
||||
@ -776,6 +779,7 @@ class ObjectReconstructor(Daemon):
|
||||
ips, self.port,
|
||||
dev['replication_ip'], dev['replication_port']),
|
||||
policy.object_ring.devs)
|
||||
|
||||
for local_dev in local_devices:
|
||||
if override_devices and (local_dev['device'] not in
|
||||
override_devices):
|
||||
|
@ -65,7 +65,10 @@ class ObjectReplicator(Daemon):
|
||||
self.mount_check = config_true_value(conf.get('mount_check', 'true'))
|
||||
self.vm_test_mode = config_true_value(conf.get('vm_test_mode', 'no'))
|
||||
self.swift_dir = conf.get('swift_dir', '/etc/swift')
|
||||
self.port = int(conf.get('bind_port', 6000))
|
||||
self.bind_ip = conf.get('bind_ip', '0.0.0.0')
|
||||
self.servers_per_port = int(conf.get('servers_per_port', '0') or 0)
|
||||
self.port = None if self.servers_per_port else \
|
||||
int(conf.get('bind_port', 6000))
|
||||
self.concurrency = int(conf.get('concurrency', 1))
|
||||
self.stats_interval = int(conf.get('stats_interval', '300'))
|
||||
self.ring_check_interval = int(conf.get('ring_check_interval', 15))
|
||||
@ -539,7 +542,7 @@ class ObjectReplicator(Daemon):
|
||||
policies will be returned
|
||||
"""
|
||||
jobs = []
|
||||
ips = whataremyips()
|
||||
ips = whataremyips(self.bind_ip)
|
||||
for policy in POLICIES:
|
||||
if policy.policy_type == REPL_POLICY:
|
||||
if (override_policies is not None and
|
||||
|
@ -39,8 +39,8 @@ for p in POLICIES:
|
||||
POLICIES_BY_TYPE[p.policy_type].append(p)
|
||||
|
||||
|
||||
def get_server_number(port, port2server):
|
||||
server_number = port2server[port]
|
||||
def get_server_number(ipport, ipport2server):
|
||||
server_number = ipport2server[ipport]
|
||||
server, number = server_number[:-1], server_number[-1:]
|
||||
try:
|
||||
number = int(number)
|
||||
@ -50,19 +50,19 @@ def get_server_number(port, port2server):
|
||||
return server, number
|
||||
|
||||
|
||||
def start_server(port, port2server, pids, check=True):
|
||||
server, number = get_server_number(port, port2server)
|
||||
def start_server(ipport, ipport2server, pids, check=True):
|
||||
server, number = get_server_number(ipport, ipport2server)
|
||||
err = Manager([server]).start(number=number, wait=False)
|
||||
if err:
|
||||
raise Exception('unable to start %s' % (
|
||||
server if not number else '%s%s' % (server, number)))
|
||||
if check:
|
||||
return check_server(port, port2server, pids)
|
||||
return check_server(ipport, ipport2server, pids)
|
||||
return None
|
||||
|
||||
|
||||
def check_server(port, port2server, pids, timeout=CHECK_SERVER_TIMEOUT):
|
||||
server = port2server[port]
|
||||
def check_server(ipport, ipport2server, pids, timeout=CHECK_SERVER_TIMEOUT):
|
||||
server = ipport2server[ipport]
|
||||
if server[:-1] in ('account', 'container', 'object'):
|
||||
if int(server[-1]) > 4:
|
||||
return None
|
||||
@ -74,7 +74,7 @@ def check_server(port, port2server, pids, timeout=CHECK_SERVER_TIMEOUT):
|
||||
try_until = time() + timeout
|
||||
while True:
|
||||
try:
|
||||
conn = HTTPConnection('127.0.0.1', port)
|
||||
conn = HTTPConnection(*ipport)
|
||||
conn.request('GET', path)
|
||||
resp = conn.getresponse()
|
||||
# 404 because it's a nonsense path (and mount_check is false)
|
||||
@ -87,14 +87,14 @@ def check_server(port, port2server, pids, timeout=CHECK_SERVER_TIMEOUT):
|
||||
if time() > try_until:
|
||||
print err
|
||||
print 'Giving up on %s:%s after %s seconds.' % (
|
||||
server, port, timeout)
|
||||
server, ipport, timeout)
|
||||
raise err
|
||||
sleep(0.1)
|
||||
else:
|
||||
try_until = time() + timeout
|
||||
while True:
|
||||
try:
|
||||
url, token = get_auth('http://127.0.0.1:8080/auth/v1.0',
|
||||
url, token = get_auth('http://%s:%d/auth/v1.0' % ipport,
|
||||
'test:tester', 'testing')
|
||||
account = url.split('/')[-1]
|
||||
head_account(url, token)
|
||||
@ -108,8 +108,8 @@ def check_server(port, port2server, pids, timeout=CHECK_SERVER_TIMEOUT):
|
||||
return None
|
||||
|
||||
|
||||
def kill_server(port, port2server, pids):
|
||||
server, number = get_server_number(port, port2server)
|
||||
def kill_server(ipport, ipport2server, pids):
|
||||
server, number = get_server_number(ipport, ipport2server)
|
||||
err = Manager([server]).kill(number=number)
|
||||
if err:
|
||||
raise Exception('unable to kill %s' % (server if not number else
|
||||
@ -117,47 +117,77 @@ def kill_server(port, port2server, pids):
|
||||
try_until = time() + 30
|
||||
while True:
|
||||
try:
|
||||
conn = HTTPConnection('127.0.0.1', port)
|
||||
conn = HTTPConnection(*ipport)
|
||||
conn.request('GET', '/')
|
||||
conn.getresponse()
|
||||
except Exception as err:
|
||||
break
|
||||
if time() > try_until:
|
||||
raise Exception(
|
||||
'Still answering on port %s after 30 seconds' % port)
|
||||
'Still answering on %s:%s after 30 seconds' % ipport)
|
||||
sleep(0.1)
|
||||
|
||||
|
||||
def kill_nonprimary_server(primary_nodes, port2server, pids):
|
||||
primary_ports = [n['port'] for n in primary_nodes]
|
||||
for port, server in port2server.iteritems():
|
||||
if port in primary_ports:
|
||||
def kill_nonprimary_server(primary_nodes, ipport2server, pids):
|
||||
primary_ipports = [(n['ip'], n['port']) for n in primary_nodes]
|
||||
for ipport, server in ipport2server.iteritems():
|
||||
if ipport in primary_ipports:
|
||||
server_type = server[:-1]
|
||||
break
|
||||
else:
|
||||
raise Exception('Cannot figure out server type for %r' % primary_nodes)
|
||||
for port, server in list(port2server.iteritems()):
|
||||
if server[:-1] == server_type and port not in primary_ports:
|
||||
kill_server(port, port2server, pids)
|
||||
return port
|
||||
for ipport, server in list(ipport2server.iteritems()):
|
||||
if server[:-1] == server_type and ipport not in primary_ipports:
|
||||
kill_server(ipport, ipport2server, pids)
|
||||
return ipport
|
||||
|
||||
|
||||
def build_port_to_conf(server):
|
||||
# map server to config by port
|
||||
port_to_config = {}
|
||||
for server_ in Manager([server]):
|
||||
for config_path in server_.conf_files():
|
||||
conf = readconf(config_path,
|
||||
section_name='%s-replicator' % server_.type)
|
||||
port_to_config[int(conf['bind_port'])] = conf
|
||||
return port_to_config
|
||||
def add_ring_devs_to_ipport2server(ring, server_type, ipport2server,
|
||||
servers_per_port=0):
|
||||
# We'll number the servers by order of unique occurrence of:
|
||||
# IP, if servers_per_port > 0 OR there > 1 IP in ring
|
||||
# ipport, otherwise
|
||||
unique_ip_count = len(set(dev['ip'] for dev in ring.devs if dev))
|
||||
things_to_number = {}
|
||||
number = 0
|
||||
for dev in filter(None, ring.devs):
|
||||
ip = dev['ip']
|
||||
ipport = (ip, dev['port'])
|
||||
unique_by = ip if servers_per_port or unique_ip_count > 1 else ipport
|
||||
if unique_by not in things_to_number:
|
||||
number += 1
|
||||
things_to_number[unique_by] = number
|
||||
ipport2server[ipport] = '%s%d' % (server_type,
|
||||
things_to_number[unique_by])
|
||||
|
||||
|
||||
def store_config_paths(name, configs):
|
||||
for server_name in (name, '%s-replicator' % name):
|
||||
for server in Manager([server_name]):
|
||||
for i, conf in enumerate(server.conf_files(), 1):
|
||||
configs[server.server][i] = conf
|
||||
|
||||
|
||||
def get_ring(ring_name, required_replicas, required_devices,
|
||||
server=None, force_validate=None):
|
||||
server=None, force_validate=None, ipport2server=None,
|
||||
config_paths=None):
|
||||
if not server:
|
||||
server = ring_name
|
||||
ring = Ring('/etc/swift', ring_name=ring_name)
|
||||
if ipport2server is None:
|
||||
ipport2server = {} # used internally, even if not passed in
|
||||
if config_paths is None:
|
||||
config_paths = defaultdict(dict)
|
||||
store_config_paths(server, config_paths)
|
||||
|
||||
repl_name = '%s-replicator' % server
|
||||
repl_configs = {i: readconf(c, section_name=repl_name)
|
||||
for i, c in config_paths[repl_name].iteritems()}
|
||||
servers_per_port = any(int(c.get('servers_per_port', '0'))
|
||||
for c in repl_configs.values())
|
||||
|
||||
add_ring_devs_to_ipport2server(ring, server, ipport2server,
|
||||
servers_per_port=servers_per_port)
|
||||
if not VALIDATE_RSYNC and not force_validate:
|
||||
return ring
|
||||
# easy sanity checks
|
||||
@ -167,10 +197,11 @@ def get_ring(ring_name, required_replicas, required_devices,
|
||||
if len(ring.devs) != required_devices:
|
||||
raise SkipTest('%s has %s devices instead of %s' % (
|
||||
ring.serialized_path, len(ring.devs), required_devices))
|
||||
port_to_config = build_port_to_conf(server)
|
||||
for dev in ring.devs:
|
||||
# verify server is exposing mounted device
|
||||
conf = port_to_config[dev['port']]
|
||||
ipport = (dev['ip'], dev['port'])
|
||||
_, server_number = get_server_number(ipport, ipport2server)
|
||||
conf = repl_configs[server_number]
|
||||
for device in os.listdir(conf['devices']):
|
||||
if device == dev['device']:
|
||||
dev_path = os.path.join(conf['devices'], device)
|
||||
@ -185,7 +216,7 @@ def get_ring(ring_name, required_replicas, required_devices,
|
||||
"unable to find ring device %s under %s's devices (%s)" % (
|
||||
dev['device'], server, conf['devices']))
|
||||
# verify server is exposing rsync device
|
||||
if port_to_config[dev['port']].get('vm_test_mode', False):
|
||||
if conf.get('vm_test_mode', False):
|
||||
rsync_export = '%s%s' % (server, dev['replication_port'])
|
||||
else:
|
||||
rsync_export = server
|
||||
@ -235,46 +266,45 @@ class ProbeTest(unittest.TestCase):
|
||||
Manager(['all']).stop()
|
||||
self.pids = {}
|
||||
try:
|
||||
self.ipport2server = {}
|
||||
self.configs = defaultdict(dict)
|
||||
self.account_ring = get_ring(
|
||||
'account',
|
||||
self.acct_cont_required_replicas,
|
||||
self.acct_cont_required_devices)
|
||||
self.acct_cont_required_devices,
|
||||
ipport2server=self.ipport2server,
|
||||
config_paths=self.configs)
|
||||
self.container_ring = get_ring(
|
||||
'container',
|
||||
self.acct_cont_required_replicas,
|
||||
self.acct_cont_required_devices)
|
||||
self.acct_cont_required_devices,
|
||||
ipport2server=self.ipport2server,
|
||||
config_paths=self.configs)
|
||||
self.policy = get_policy(**self.policy_requirements)
|
||||
self.object_ring = get_ring(
|
||||
self.policy.ring_name,
|
||||
self.obj_required_replicas,
|
||||
self.obj_required_devices,
|
||||
server='object')
|
||||
server='object',
|
||||
ipport2server=self.ipport2server,
|
||||
config_paths=self.configs)
|
||||
|
||||
self.servers_per_port = any(
|
||||
int(readconf(c, section_name='object-replicator').get(
|
||||
'servers_per_port', '0'))
|
||||
for c in self.configs['object-replicator'].values())
|
||||
|
||||
Manager(['main']).start(wait=False)
|
||||
self.port2server = {}
|
||||
for server, port in [('account', 6002), ('container', 6001),
|
||||
('object', 6000)]:
|
||||
for number in xrange(1, 9):
|
||||
self.port2server[port + (number * 10)] = \
|
||||
'%s%d' % (server, number)
|
||||
for port in self.port2server:
|
||||
check_server(port, self.port2server, self.pids)
|
||||
self.port2server[8080] = 'proxy'
|
||||
self.url, self.token, self.account = \
|
||||
check_server(8080, self.port2server, self.pids)
|
||||
self.configs = defaultdict(dict)
|
||||
for name in ('account', 'container', 'object'):
|
||||
for server_name in (name, '%s-replicator' % name):
|
||||
for server in Manager([server_name]):
|
||||
for i, conf in enumerate(server.conf_files(), 1):
|
||||
self.configs[server.server][i] = conf
|
||||
for ipport in self.ipport2server:
|
||||
check_server(ipport, self.ipport2server, self.pids)
|
||||
proxy_ipport = ('127.0.0.1', 8080)
|
||||
self.ipport2server[proxy_ipport] = 'proxy'
|
||||
self.url, self.token, self.account = check_server(
|
||||
proxy_ipport, self.ipport2server, self.pids)
|
||||
self.replicators = Manager(
|
||||
['account-replicator', 'container-replicator',
|
||||
'object-replicator'])
|
||||
self.updaters = Manager(['container-updater', 'object-updater'])
|
||||
self.server_port_to_conf = {}
|
||||
# get some configs backend daemon configs loaded up
|
||||
for server in ('account', 'container', 'object'):
|
||||
self.server_port_to_conf[server] = build_port_to_conf(server)
|
||||
except BaseException:
|
||||
try:
|
||||
raise
|
||||
@ -288,7 +318,11 @@ class ProbeTest(unittest.TestCase):
|
||||
Manager(['all']).kill()
|
||||
|
||||
def device_dir(self, server, node):
|
||||
conf = self.server_port_to_conf[server][node['port']]
|
||||
server_type, config_number = get_server_number(
|
||||
(node['ip'], node['port']), self.ipport2server)
|
||||
repl_server = '%s-replicator' % server_type
|
||||
conf = readconf(self.configs[repl_server][config_number],
|
||||
section_name=repl_server)
|
||||
return os.path.join(conf['devices'], node['device'])
|
||||
|
||||
def storage_dir(self, server, node, part=None, policy=None):
|
||||
@ -301,9 +335,24 @@ class ProbeTest(unittest.TestCase):
|
||||
|
||||
def config_number(self, node):
|
||||
_server_type, config_number = get_server_number(
|
||||
node['port'], self.port2server)
|
||||
(node['ip'], node['port']), self.ipport2server)
|
||||
return config_number
|
||||
|
||||
def is_local_to(self, node1, node2):
|
||||
"""
|
||||
Return True if both ring devices are "local" to each other (on the same
|
||||
"server".
|
||||
"""
|
||||
if self.servers_per_port:
|
||||
return node1['ip'] == node2['ip']
|
||||
|
||||
# Without a disambiguating IP, for SAIOs, we have to assume ports
|
||||
# uniquely identify "servers". SAIOs should be configured to *either*
|
||||
# have unique IPs per node (e.g. 127.0.0.1, 127.0.0.2, etc.) OR unique
|
||||
# ports per server (i.e. sdb1 & sdb5 would have same port numbers in
|
||||
# the 8-disk EC ring).
|
||||
return node1['port'] == node2['port']
|
||||
|
||||
def get_to_final_state(self):
|
||||
# these .stop()s are probably not strictly necessary,
|
||||
# but may prevent race conditions
|
||||
|
@ -97,8 +97,9 @@ class TestAccountFailures(ReplProbeTest):
|
||||
self.assert_(found2)
|
||||
|
||||
apart, anodes = self.account_ring.get_nodes(self.account)
|
||||
kill_nonprimary_server(anodes, self.port2server, self.pids)
|
||||
kill_server(anodes[0]['port'], self.port2server, self.pids)
|
||||
kill_nonprimary_server(anodes, self.ipport2server, self.pids)
|
||||
kill_server((anodes[0]['ip'], anodes[0]['port']),
|
||||
self.ipport2server, self.pids)
|
||||
# Kill account servers excepting two of the primaries
|
||||
|
||||
# Delete container1
|
||||
@ -146,7 +147,8 @@ class TestAccountFailures(ReplProbeTest):
|
||||
self.assert_(found2)
|
||||
|
||||
# Restart other primary account server
|
||||
start_server(anodes[0]['port'], self.port2server, self.pids)
|
||||
start_server((anodes[0]['ip'], anodes[0]['port']),
|
||||
self.ipport2server, self.pids)
|
||||
|
||||
# Assert that server doesn't know about container1's deletion or the
|
||||
# new container2/object2 yet
|
||||
|
@ -49,14 +49,16 @@ class TestContainerFailures(ReplProbeTest):
|
||||
client.put_container(self.url, self.token, container1)
|
||||
|
||||
# Kill container1 servers excepting two of the primaries
|
||||
kill_nonprimary_server(cnodes, self.port2server, self.pids)
|
||||
kill_server(cnodes[0]['port'], self.port2server, self.pids)
|
||||
kill_nonprimary_server(cnodes, self.ipport2server, self.pids)
|
||||
kill_server((cnodes[0]['ip'], cnodes[0]['port']),
|
||||
self.ipport2server, self.pids)
|
||||
|
||||
# Delete container1
|
||||
client.delete_container(self.url, self.token, container1)
|
||||
|
||||
# Restart other container1 primary server
|
||||
start_server(cnodes[0]['port'], self.port2server, self.pids)
|
||||
start_server((cnodes[0]['ip'], cnodes[0]['port']),
|
||||
self.ipport2server, self.pids)
|
||||
|
||||
# Create container1/object1 (allowed because at least server thinks the
|
||||
# container exists)
|
||||
@ -87,18 +89,23 @@ class TestContainerFailures(ReplProbeTest):
|
||||
client.put_container(self.url, self.token, container1)
|
||||
|
||||
# Kill container1 servers excepting one of the primaries
|
||||
cnp_port = kill_nonprimary_server(cnodes, self.port2server, self.pids)
|
||||
kill_server(cnodes[0]['port'], self.port2server, self.pids)
|
||||
kill_server(cnodes[1]['port'], self.port2server, self.pids)
|
||||
cnp_ipport = kill_nonprimary_server(cnodes, self.ipport2server,
|
||||
self.pids)
|
||||
kill_server((cnodes[0]['ip'], cnodes[0]['port']),
|
||||
self.ipport2server, self.pids)
|
||||
kill_server((cnodes[1]['ip'], cnodes[1]['port']),
|
||||
self.ipport2server, self.pids)
|
||||
|
||||
# Delete container1 directly to the one primary still up
|
||||
direct_client.direct_delete_container(cnodes[2], cpart, self.account,
|
||||
container1)
|
||||
|
||||
# Restart other container1 servers
|
||||
start_server(cnodes[0]['port'], self.port2server, self.pids)
|
||||
start_server(cnodes[1]['port'], self.port2server, self.pids)
|
||||
start_server(cnp_port, self.port2server, self.pids)
|
||||
start_server((cnodes[0]['ip'], cnodes[0]['port']),
|
||||
self.ipport2server, self.pids)
|
||||
start_server((cnodes[1]['ip'], cnodes[1]['port']),
|
||||
self.ipport2server, self.pids)
|
||||
start_server(cnp_ipport, self.ipport2server, self.pids)
|
||||
|
||||
# Get to a final state
|
||||
self.get_to_final_state()
|
||||
|
@ -26,7 +26,8 @@ from swiftclient import client
|
||||
from swift.common import direct_client
|
||||
from swift.obj.diskfile import get_data_dir
|
||||
from swift.common.exceptions import ClientException
|
||||
from test.probe.common import kill_server, ReplProbeTest, start_server
|
||||
from test.probe.common import (
|
||||
kill_server, ReplProbeTest, start_server, get_server_number)
|
||||
from swift.common.utils import readconf
|
||||
from swift.common.manager import Manager
|
||||
|
||||
@ -35,7 +36,8 @@ class TestEmptyDevice(ReplProbeTest):
|
||||
|
||||
def _get_objects_dir(self, onode):
|
||||
device = onode['device']
|
||||
node_id = (onode['port'] - 6000) / 10
|
||||
_, node_id = get_server_number((onode['ip'], onode['port']),
|
||||
self.ipport2server)
|
||||
obj_server_conf = readconf(self.configs['object-server'][node_id])
|
||||
devices = obj_server_conf['app:object-server']['devices']
|
||||
obj_dir = '%s/%s' % (devices, device)
|
||||
@ -56,7 +58,8 @@ class TestEmptyDevice(ReplProbeTest):
|
||||
onode = onodes[0]
|
||||
|
||||
# Kill one container/obj primary server
|
||||
kill_server(onode['port'], self.port2server, self.pids)
|
||||
kill_server((onode['ip'], onode['port']),
|
||||
self.ipport2server, self.pids)
|
||||
|
||||
# Delete the default data directory for objects on the primary server
|
||||
obj_dir = '%s/%s' % (self._get_objects_dir(onode),
|
||||
@ -74,7 +77,8 @@ class TestEmptyDevice(ReplProbeTest):
|
||||
# Kill other two container/obj primary servers
|
||||
# to ensure GET handoff works
|
||||
for node in onodes[1:]:
|
||||
kill_server(node['port'], self.port2server, self.pids)
|
||||
kill_server((node['ip'], node['port']),
|
||||
self.ipport2server, self.pids)
|
||||
|
||||
# Indirectly through proxy assert we can get container/obj
|
||||
odata = client.get_object(self.url, self.token, container, obj)[-1]
|
||||
@ -83,7 +87,8 @@ class TestEmptyDevice(ReplProbeTest):
|
||||
'returned: %s' % repr(odata))
|
||||
# Restart those other two container/obj primary servers
|
||||
for node in onodes[1:]:
|
||||
start_server(node['port'], self.port2server, self.pids)
|
||||
start_server((node['ip'], node['port']),
|
||||
self.ipport2server, self.pids)
|
||||
self.assertFalse(os.path.exists(obj_dir))
|
||||
# We've indirectly verified the handoff node has the object, but
|
||||
# let's directly verify it.
|
||||
@ -122,7 +127,8 @@ class TestEmptyDevice(ReplProbeTest):
|
||||
missing)
|
||||
|
||||
# Bring the first container/obj primary server back up
|
||||
start_server(onode['port'], self.port2server, self.pids)
|
||||
start_server((onode['ip'], onode['port']),
|
||||
self.ipport2server, self.pids)
|
||||
|
||||
# Assert that it doesn't have container/obj yet
|
||||
self.assertFalse(os.path.exists(obj_dir))
|
||||
@ -136,21 +142,17 @@ class TestEmptyDevice(ReplProbeTest):
|
||||
else:
|
||||
self.fail("Expected ClientException but didn't get it")
|
||||
|
||||
try:
|
||||
port_num = onode['replication_port']
|
||||
except KeyError:
|
||||
port_num = onode['port']
|
||||
try:
|
||||
another_port_num = another_onode['replication_port']
|
||||
except KeyError:
|
||||
another_port_num = another_onode['port']
|
||||
|
||||
# Run object replication for first container/obj primary server
|
||||
num = (port_num - 6000) / 10
|
||||
_, num = get_server_number(
|
||||
(onode['ip'], onode.get('replication_port', onode['port'])),
|
||||
self.ipport2server)
|
||||
Manager(['object-replicator']).once(number=num)
|
||||
|
||||
# Run object replication for handoff node
|
||||
another_num = (another_port_num - 6000) / 10
|
||||
_, another_num = get_server_number(
|
||||
(another_onode['ip'],
|
||||
another_onode.get('replication_port', another_onode['port'])),
|
||||
self.ipport2server)
|
||||
Manager(['object-replicator']).once(number=another_num)
|
||||
|
||||
# Assert the first container/obj primary server now has container/obj
|
||||
|
@ -41,15 +41,17 @@ class TestObjectAsyncUpdate(ReplProbeTest):
|
||||
# Kill container servers excepting two of the primaries
|
||||
cpart, cnodes = self.container_ring.get_nodes(self.account, container)
|
||||
cnode = cnodes[0]
|
||||
kill_nonprimary_server(cnodes, self.port2server, self.pids)
|
||||
kill_server(cnode['port'], self.port2server, self.pids)
|
||||
kill_nonprimary_server(cnodes, self.ipport2server, self.pids)
|
||||
kill_server((cnode['ip'], cnode['port']),
|
||||
self.ipport2server, self.pids)
|
||||
|
||||
# Create container/obj
|
||||
obj = 'object-%s' % uuid4()
|
||||
client.put_object(self.url, self.token, container, obj, '')
|
||||
|
||||
# Restart other primary server
|
||||
start_server(cnode['port'], self.port2server, self.pids)
|
||||
start_server((cnode['ip'], cnode['port']),
|
||||
self.ipport2server, self.pids)
|
||||
|
||||
# Assert it does not know about container/obj
|
||||
self.assert_(not direct_client.direct_get_container(
|
||||
|
@ -41,7 +41,8 @@ class TestObjectHandoff(ReplProbeTest):
|
||||
opart, onodes = self.object_ring.get_nodes(
|
||||
self.account, container, obj)
|
||||
onode = onodes[0]
|
||||
kill_server(onode['port'], self.port2server, self.pids)
|
||||
kill_server((onode['ip'], onode['port']),
|
||||
self.ipport2server, self.pids)
|
||||
|
||||
# Create container/obj (goes to two primary servers and one handoff)
|
||||
client.put_object(self.url, self.token, container, obj, 'VERIFY')
|
||||
@ -53,7 +54,8 @@ class TestObjectHandoff(ReplProbeTest):
|
||||
# Kill other two container/obj primary servers
|
||||
# to ensure GET handoff works
|
||||
for node in onodes[1:]:
|
||||
kill_server(node['port'], self.port2server, self.pids)
|
||||
kill_server((node['ip'], node['port']),
|
||||
self.ipport2server, self.pids)
|
||||
|
||||
# Indirectly through proxy assert we can get container/obj
|
||||
odata = client.get_object(self.url, self.token, container, obj)[-1]
|
||||
@ -63,7 +65,8 @@ class TestObjectHandoff(ReplProbeTest):
|
||||
|
||||
# Restart those other two container/obj primary servers
|
||||
for node in onodes[1:]:
|
||||
start_server(node['port'], self.port2server, self.pids)
|
||||
start_server((node['ip'], node['port']),
|
||||
self.ipport2server, self.pids)
|
||||
|
||||
# We've indirectly verified the handoff node has the container/object,
|
||||
# but let's directly verify it.
|
||||
@ -90,7 +93,8 @@ class TestObjectHandoff(ReplProbeTest):
|
||||
(cnode['ip'], cnode['port']))
|
||||
|
||||
# Bring the first container/obj primary server back up
|
||||
start_server(onode['port'], self.port2server, self.pids)
|
||||
start_server((onode['ip'], onode['port']),
|
||||
self.ipport2server, self.pids)
|
||||
|
||||
# Assert that it doesn't have container/obj yet
|
||||
try:
|
||||
@ -138,7 +142,8 @@ class TestObjectHandoff(ReplProbeTest):
|
||||
|
||||
# Kill the first container/obj primary server again (we have two
|
||||
# primaries and the handoff up now)
|
||||
kill_server(onode['port'], self.port2server, self.pids)
|
||||
kill_server((onode['ip'], onode['port']),
|
||||
self.ipport2server, self.pids)
|
||||
|
||||
# Delete container/obj
|
||||
try:
|
||||
@ -175,7 +180,8 @@ class TestObjectHandoff(ReplProbeTest):
|
||||
(cnode['ip'], cnode['port']))
|
||||
|
||||
# Restart the first container/obj primary server again
|
||||
start_server(onode['port'], self.port2server, self.pids)
|
||||
start_server((onode['ip'], onode['port']),
|
||||
self.ipport2server, self.pids)
|
||||
|
||||
# Assert it still has container/obj
|
||||
direct_client.direct_get_object(
|
||||
|
@ -294,7 +294,7 @@ class TestReconstructorRevert(ECProbeTest):
|
||||
# the same server
|
||||
handoff_fragment_etag = None
|
||||
for node in onodes:
|
||||
if node['port'] == hnode['port']:
|
||||
if self.is_local_to(node, hnode):
|
||||
# we'll keep track of the etag of this fragment we're removing
|
||||
# in case we need it later (queue forshadowing music)...
|
||||
try:
|
||||
@ -327,7 +327,7 @@ class TestReconstructorRevert(ECProbeTest):
|
||||
raise
|
||||
# partner already had it's fragment removed
|
||||
if (handoff_fragment_etag is not None and
|
||||
hnode['port'] == partner['port']):
|
||||
self.is_local_to(hnode, partner)):
|
||||
# oh, well that makes sense then...
|
||||
rebuilt_fragment_etag = handoff_fragment_etag
|
||||
else:
|
||||
|
@ -30,7 +30,7 @@ import eventlet
|
||||
from eventlet.green import socket
|
||||
from tempfile import mkdtemp
|
||||
from shutil import rmtree
|
||||
from swift.common.utils import Timestamp
|
||||
from swift.common.utils import Timestamp, NOTICE
|
||||
from test import get_config
|
||||
from swift.common import swob, utils
|
||||
from swift.common.ring import Ring, RingData
|
||||
@ -478,8 +478,18 @@ class FakeLogger(logging.Logger, object):
|
||||
logging.INFO: 'info',
|
||||
logging.DEBUG: 'debug',
|
||||
logging.CRITICAL: 'critical',
|
||||
NOTICE: 'notice',
|
||||
}
|
||||
|
||||
def notice(self, msg, *args, **kwargs):
|
||||
"""
|
||||
Convenience function for syslog priority LOG_NOTICE. The python
|
||||
logging lvl is set to 25, just above info. SysLogHandler is
|
||||
monkey patched to map this log lvl to the LOG_NOTICE syslog
|
||||
priority.
|
||||
"""
|
||||
self.log(NOTICE, msg, *args, **kwargs)
|
||||
|
||||
def _log(self, level, msg, *args, **kwargs):
|
||||
store_name = self.store_in[level]
|
||||
cargs = [msg]
|
||||
@ -495,7 +505,7 @@ class FakeLogger(logging.Logger, object):
|
||||
def _clear(self):
|
||||
self.log_dict = defaultdict(list)
|
||||
self.lines_dict = {'critical': [], 'error': [], 'info': [],
|
||||
'warning': [], 'debug': []}
|
||||
'warning': [], 'debug': [], 'notice': []}
|
||||
|
||||
def get_lines_for_level(self, level):
|
||||
if level not in self.lines_dict:
|
||||
|
@ -77,6 +77,15 @@ class TestRingData(unittest.TestCase):
|
||||
for p in xrange(pickle.HIGHEST_PROTOCOL):
|
||||
with closing(GzipFile(ring_fname, 'wb')) as f:
|
||||
pickle.dump(rd, f, protocol=p)
|
||||
meta_only = ring.RingData.load(ring_fname, metadata_only=True)
|
||||
self.assertEqual([
|
||||
{'id': 0, 'zone': 0, 'region': 1, 'ip': '10.1.1.0',
|
||||
'port': 7000},
|
||||
{'id': 1, 'zone': 1, 'region': 1, 'ip': '10.1.1.1',
|
||||
'port': 7000},
|
||||
], meta_only.devs)
|
||||
# Pickled rings can't load only metadata, so you get it all
|
||||
self.assert_ring_data_equal(rd, meta_only)
|
||||
ring_data = ring.RingData.load(ring_fname)
|
||||
self.assert_ring_data_equal(rd, ring_data)
|
||||
|
||||
@ -86,6 +95,12 @@ class TestRingData(unittest.TestCase):
|
||||
[array.array('H', [0, 1, 0, 1]), array.array('H', [0, 1, 0, 1])],
|
||||
[{'id': 0, 'zone': 0}, {'id': 1, 'zone': 1}], 30)
|
||||
rd.save(ring_fname)
|
||||
meta_only = ring.RingData.load(ring_fname, metadata_only=True)
|
||||
self.assertEqual([
|
||||
{'id': 0, 'zone': 0, 'region': 1},
|
||||
{'id': 1, 'zone': 1, 'region': 1},
|
||||
], meta_only.devs)
|
||||
self.assertEqual([], meta_only._replica2part2dev_id)
|
||||
rd2 = ring.RingData.load(ring_fname)
|
||||
self.assert_ring_data_equal(rd, rd2)
|
||||
|
||||
|
@ -185,22 +185,41 @@ class TestUtils(unittest.TestCase):
|
||||
self.assertFalse(is_valid_hostname("$blah#"))
|
||||
|
||||
def test_is_local_device(self):
|
||||
my_ips = ["127.0.0.1",
|
||||
"0000:0000:0000:0000:0000:0000:0000:0001"]
|
||||
# localhost shows up in whataremyips() output as "::1" for IPv6
|
||||
my_ips = ["127.0.0.1", "::1"]
|
||||
my_port = 6000
|
||||
self.assertTrue(is_local_device(my_ips, my_port,
|
||||
"localhost",
|
||||
my_port))
|
||||
"127.0.0.1", my_port))
|
||||
self.assertTrue(is_local_device(my_ips, my_port,
|
||||
"::1", my_port))
|
||||
self.assertTrue(is_local_device(
|
||||
my_ips, my_port,
|
||||
"0000:0000:0000:0000:0000:0000:0000:0001", my_port))
|
||||
self.assertTrue(is_local_device(my_ips, my_port,
|
||||
"localhost", my_port))
|
||||
self.assertFalse(is_local_device(my_ips, my_port,
|
||||
"localhost",
|
||||
my_port + 1))
|
||||
"localhost", my_port + 1))
|
||||
self.assertFalse(is_local_device(my_ips, my_port,
|
||||
"127.0.0.2",
|
||||
my_port))
|
||||
"127.0.0.2", my_port))
|
||||
# for those that don't have a local port
|
||||
self.assertTrue(is_local_device(my_ips, None,
|
||||
my_ips[0], None))
|
||||
|
||||
# When servers_per_port is active, the "my_port" passed in is None
|
||||
# which means "don't include port in the determination of locality
|
||||
# because it's not reliable in this deployment scenario"
|
||||
self.assertTrue(is_local_device(my_ips, None,
|
||||
"127.0.0.1", 6666))
|
||||
self.assertTrue(is_local_device(my_ips, None,
|
||||
"::1", 6666))
|
||||
self.assertTrue(is_local_device(
|
||||
my_ips, None,
|
||||
"0000:0000:0000:0000:0000:0000:0000:0001", 6666))
|
||||
self.assertTrue(is_local_device(my_ips, None,
|
||||
"localhost", 6666))
|
||||
self.assertFalse(is_local_device(my_ips, None,
|
||||
"127.0.0.2", my_port))
|
||||
|
||||
def test_validate_and_normalize_ip(self):
|
||||
ipv4 = "10.0.0.1"
|
||||
self.assertEqual(ipv4, validate_and_normalize_ip(ipv4))
|
||||
|
@ -477,7 +477,7 @@ class TestDBReplicator(unittest.TestCase):
|
||||
def test_run_once_no_ips(self):
|
||||
replicator = TestReplicator({}, logger=unit.FakeLogger())
|
||||
self._patch(patch.object, db_replicator, 'whataremyips',
|
||||
lambda *args: [])
|
||||
lambda *a, **kw: [])
|
||||
|
||||
replicator.run_once()
|
||||
|
||||
@ -487,7 +487,9 @@ class TestDBReplicator(unittest.TestCase):
|
||||
|
||||
def test_run_once_node_is_not_mounted(self):
|
||||
db_replicator.ring = FakeRingWithSingleNode()
|
||||
conf = {'mount_check': 'true', 'bind_port': 6000}
|
||||
# If a bind_ip is specified, it's plumbed into whataremyips() and
|
||||
# returned by itself.
|
||||
conf = {'mount_check': 'true', 'bind_ip': '1.1.1.1', 'bind_port': 6000}
|
||||
replicator = TestReplicator(conf, logger=unit.FakeLogger())
|
||||
self.assertEqual(replicator.mount_check, True)
|
||||
self.assertEqual(replicator.port, 6000)
|
||||
@ -498,8 +500,6 @@ class TestDBReplicator(unittest.TestCase):
|
||||
replicator.ring.devs[0]['device']))
|
||||
return False
|
||||
|
||||
self._patch(patch.object, db_replicator, 'whataremyips',
|
||||
lambda *args: ['1.1.1.1'])
|
||||
self._patch(patch.object, db_replicator, 'ismount', mock_ismount)
|
||||
replicator.run_once()
|
||||
|
||||
@ -528,7 +528,7 @@ class TestDBReplicator(unittest.TestCase):
|
||||
self.assertEquals(1, node_id)
|
||||
|
||||
self._patch(patch.object, db_replicator, 'whataremyips',
|
||||
lambda *args: ['1.1.1.1'])
|
||||
lambda *a, **kw: ['1.1.1.1'])
|
||||
self._patch(patch.object, db_replicator, 'ismount', lambda *args: True)
|
||||
self._patch(patch.object, db_replicator, 'unlink_older_than',
|
||||
mock_unlink_older_than)
|
||||
@ -1390,7 +1390,7 @@ class TestReplicatorSync(unittest.TestCase):
|
||||
return True
|
||||
daemon._rsync_file = _rsync_file
|
||||
with mock.patch('swift.common.db_replicator.whataremyips',
|
||||
new=lambda: [node['replication_ip']]):
|
||||
new=lambda *a, **kw: [node['replication_ip']]):
|
||||
daemon.run_once()
|
||||
return daemon
|
||||
|
||||
|
@ -15,14 +15,17 @@
|
||||
import unittest
|
||||
import StringIO
|
||||
from ConfigParser import ConfigParser
|
||||
import os
|
||||
import mock
|
||||
from functools import partial
|
||||
from tempfile import NamedTemporaryFile
|
||||
from test.unit import patch_policies, FakeRing
|
||||
from test.unit import patch_policies, FakeRing, temptree
|
||||
from swift.common.storage_policy import (
|
||||
StoragePolicyCollection, POLICIES, PolicyError, parse_storage_policies,
|
||||
reload_storage_policies, get_policy_string, split_policy_string,
|
||||
BaseStoragePolicy, StoragePolicy, ECStoragePolicy, REPL_POLICY, EC_POLICY,
|
||||
VALID_EC_TYPES, DEFAULT_EC_OBJECT_SEGMENT_SIZE)
|
||||
VALID_EC_TYPES, DEFAULT_EC_OBJECT_SEGMENT_SIZE, BindPortsCache)
|
||||
from swift.common.ring import RingData
|
||||
from swift.common.exceptions import RingValidationError
|
||||
|
||||
|
||||
@ -740,6 +743,139 @@ class TestStoragePolicies(unittest.TestCase):
|
||||
self.assertRaises(PolicyError, policies.get_object_ring, 99,
|
||||
'/path/not/used')
|
||||
|
||||
def test_bind_ports_cache(self):
|
||||
test_policies = [StoragePolicy(0, 'aay', True),
|
||||
StoragePolicy(1, 'bee', False),
|
||||
StoragePolicy(2, 'cee', False)]
|
||||
|
||||
my_ips = ['1.2.3.4', '2.3.4.5']
|
||||
other_ips = ['3.4.5.6', '4.5.6.7']
|
||||
bind_ip = my_ips[1]
|
||||
devs_by_ring_name1 = {
|
||||
'object': [ # 'aay'
|
||||
{'id': 0, 'zone': 0, 'region': 1, 'ip': my_ips[0],
|
||||
'port': 6006},
|
||||
{'id': 0, 'zone': 0, 'region': 1, 'ip': other_ips[0],
|
||||
'port': 6007},
|
||||
{'id': 0, 'zone': 0, 'region': 1, 'ip': my_ips[1],
|
||||
'port': 6008},
|
||||
None,
|
||||
{'id': 0, 'zone': 0, 'region': 1, 'ip': other_ips[1],
|
||||
'port': 6009}],
|
||||
'object-1': [ # 'bee'
|
||||
{'id': 0, 'zone': 0, 'region': 1, 'ip': my_ips[1],
|
||||
'port': 6006}, # dupe
|
||||
{'id': 0, 'zone': 0, 'region': 1, 'ip': other_ips[0],
|
||||
'port': 6010},
|
||||
{'id': 0, 'zone': 0, 'region': 1, 'ip': my_ips[1],
|
||||
'port': 6011},
|
||||
{'id': 0, 'zone': 0, 'region': 1, 'ip': other_ips[1],
|
||||
'port': 6012}],
|
||||
'object-2': [ # 'cee'
|
||||
{'id': 0, 'zone': 0, 'region': 1, 'ip': my_ips[0],
|
||||
'port': 6010}, # on our IP and a not-us IP
|
||||
{'id': 0, 'zone': 0, 'region': 1, 'ip': other_ips[0],
|
||||
'port': 6013},
|
||||
None,
|
||||
{'id': 0, 'zone': 0, 'region': 1, 'ip': my_ips[1],
|
||||
'port': 6014},
|
||||
{'id': 0, 'zone': 0, 'region': 1, 'ip': other_ips[1],
|
||||
'port': 6015}],
|
||||
}
|
||||
devs_by_ring_name2 = {
|
||||
'object': [ # 'aay'
|
||||
{'id': 0, 'zone': 0, 'region': 1, 'ip': my_ips[0],
|
||||
'port': 6016},
|
||||
{'id': 0, 'zone': 0, 'region': 1, 'ip': other_ips[1],
|
||||
'port': 6019}],
|
||||
'object-1': [ # 'bee'
|
||||
{'id': 0, 'zone': 0, 'region': 1, 'ip': my_ips[1],
|
||||
'port': 6016}, # dupe
|
||||
{'id': 0, 'zone': 0, 'region': 1, 'ip': other_ips[1],
|
||||
'port': 6022}],
|
||||
'object-2': [ # 'cee'
|
||||
{'id': 0, 'zone': 0, 'region': 1, 'ip': my_ips[0],
|
||||
'port': 6020},
|
||||
{'id': 0, 'zone': 0, 'region': 1, 'ip': other_ips[1],
|
||||
'port': 6025}],
|
||||
}
|
||||
ring_files = [ring_name + '.ring.gz'
|
||||
for ring_name in sorted(devs_by_ring_name1)]
|
||||
|
||||
def _fake_load(gz_path, stub_objs, metadata_only=False):
|
||||
return RingData(
|
||||
devs=stub_objs[os.path.basename(gz_path)[:-8]],
|
||||
replica2part2dev_id=[],
|
||||
part_shift=24)
|
||||
|
||||
with mock.patch(
|
||||
'swift.common.storage_policy.RingData.load'
|
||||
) as mock_ld, \
|
||||
patch_policies(test_policies), \
|
||||
mock.patch('swift.common.storage_policy.whataremyips') \
|
||||
as mock_whataremyips, \
|
||||
temptree(ring_files) as tempdir:
|
||||
mock_whataremyips.return_value = my_ips
|
||||
|
||||
cache = BindPortsCache(tempdir, bind_ip)
|
||||
|
||||
self.assertEqual([
|
||||
mock.call(bind_ip),
|
||||
], mock_whataremyips.mock_calls)
|
||||
mock_whataremyips.reset_mock()
|
||||
|
||||
mock_ld.side_effect = partial(_fake_load,
|
||||
stub_objs=devs_by_ring_name1)
|
||||
self.assertEqual(set([
|
||||
6006, 6008, 6011, 6010, 6014,
|
||||
]), cache.all_bind_ports_for_node())
|
||||
self.assertEqual([
|
||||
mock.call(os.path.join(tempdir, ring_files[0]),
|
||||
metadata_only=True),
|
||||
mock.call(os.path.join(tempdir, ring_files[1]),
|
||||
metadata_only=True),
|
||||
mock.call(os.path.join(tempdir, ring_files[2]),
|
||||
metadata_only=True),
|
||||
], mock_ld.mock_calls)
|
||||
mock_ld.reset_mock()
|
||||
|
||||
mock_ld.side_effect = partial(_fake_load,
|
||||
stub_objs=devs_by_ring_name2)
|
||||
self.assertEqual(set([
|
||||
6006, 6008, 6011, 6010, 6014,
|
||||
]), cache.all_bind_ports_for_node())
|
||||
self.assertEqual([], mock_ld.mock_calls)
|
||||
|
||||
# but when all the file mtimes are made different, it'll
|
||||
# reload
|
||||
for gz_file in [os.path.join(tempdir, n)
|
||||
for n in ring_files]:
|
||||
os.utime(gz_file, (88, 88))
|
||||
|
||||
self.assertEqual(set([
|
||||
6016, 6020,
|
||||
]), cache.all_bind_ports_for_node())
|
||||
self.assertEqual([
|
||||
mock.call(os.path.join(tempdir, ring_files[0]),
|
||||
metadata_only=True),
|
||||
mock.call(os.path.join(tempdir, ring_files[1]),
|
||||
metadata_only=True),
|
||||
mock.call(os.path.join(tempdir, ring_files[2]),
|
||||
metadata_only=True),
|
||||
], mock_ld.mock_calls)
|
||||
mock_ld.reset_mock()
|
||||
|
||||
# Don't do something stupid like crash if a ring file is missing.
|
||||
os.unlink(os.path.join(tempdir, 'object-2.ring.gz'))
|
||||
|
||||
self.assertEqual(set([
|
||||
6016, 6020,
|
||||
]), cache.all_bind_ports_for_node())
|
||||
self.assertEqual([], mock_ld.mock_calls)
|
||||
|
||||
# whataremyips() is only called in the constructor
|
||||
self.assertEqual([], mock_whataremyips.mock_calls)
|
||||
|
||||
def test_singleton_passthrough(self):
|
||||
test_policies = [StoragePolicy(0, 'aay', True),
|
||||
StoragePolicy(1, 'bee', False),
|
||||
|
@ -1488,6 +1488,18 @@ class TestUtils(unittest.TestCase):
|
||||
self.assert_(len(myips) > 1)
|
||||
self.assert_('127.0.0.1' in myips)
|
||||
|
||||
def test_whataremyips_bind_to_all(self):
|
||||
for any_addr in ('0.0.0.0', '0000:0000:0000:0000:0000:0000:0000:0000',
|
||||
'::0', '::0000', '::',
|
||||
# Wacky parse-error input produces all IPs
|
||||
'I am a bear'):
|
||||
myips = utils.whataremyips(any_addr)
|
||||
self.assert_(len(myips) > 1)
|
||||
self.assert_('127.0.0.1' in myips)
|
||||
|
||||
def test_whataremyips_bind_ip_specific(self):
|
||||
self.assertEqual(['1.2.3.4'], utils.whataremyips('1.2.3.4'))
|
||||
|
||||
def test_whataremyips_error(self):
|
||||
def my_interfaces():
|
||||
return ['eth0']
|
||||
@ -1725,6 +1737,21 @@ log_name = %(yarr)s'''
|
||||
for func in required_func_calls:
|
||||
self.assert_(utils.os.called_funcs[func])
|
||||
|
||||
def test_drop_privileges_no_call_setsid(self):
|
||||
user = getuser()
|
||||
# over-ride os with mock
|
||||
required_func_calls = ('setgroups', 'setgid', 'setuid', 'chdir',
|
||||
'umask')
|
||||
bad_func_calls = ('setsid',)
|
||||
utils.os = MockOs(called_funcs=required_func_calls,
|
||||
raise_funcs=bad_func_calls)
|
||||
# exercise the code
|
||||
utils.drop_privileges(user, call_setsid=False)
|
||||
for func in required_func_calls:
|
||||
self.assert_(utils.os.called_funcs[func])
|
||||
for func in bad_func_calls:
|
||||
self.assert_(func not in utils.os.called_funcs)
|
||||
|
||||
@reset_logger_state
|
||||
def test_capture_stdio(self):
|
||||
# stubs
|
||||
|
@ -42,7 +42,8 @@ from swift.common.swob import Request
|
||||
from swift.common import wsgi, utils
|
||||
from swift.common.storage_policy import POLICIES
|
||||
|
||||
from test.unit import temptree, with_tempdir, write_fake_ring, patch_policies
|
||||
from test.unit import (
|
||||
temptree, with_tempdir, write_fake_ring, patch_policies, FakeLogger)
|
||||
|
||||
from paste.deploy import loadwsgi
|
||||
|
||||
@ -688,6 +689,65 @@ class TestWSGI(unittest.TestCase):
|
||||
self.assertEqual(calls['_loadapp'], 1)
|
||||
self.assertEqual(rc, 0)
|
||||
|
||||
@mock.patch('swift.common.wsgi.run_server')
|
||||
@mock.patch('swift.common.wsgi.WorkersStrategy')
|
||||
@mock.patch('swift.common.wsgi.ServersPerPortStrategy')
|
||||
def test_run_server_strategy_plumbing(self, mock_per_port, mock_workers,
|
||||
mock_run_server):
|
||||
# Make sure the right strategy gets used in a number of different
|
||||
# config cases.
|
||||
mock_per_port().bind_ports.return_value = 'stop early'
|
||||
mock_workers().bind_ports.return_value = 'stop early'
|
||||
logger = FakeLogger()
|
||||
stub__initrp = [
|
||||
{'__file__': 'test', 'workers': 2}, # conf
|
||||
logger,
|
||||
'log_name',
|
||||
]
|
||||
with mock.patch.object(wsgi, '_initrp', return_value=stub__initrp):
|
||||
for server_type in ('account-server', 'container-server',
|
||||
'object-server'):
|
||||
mock_per_port.reset_mock()
|
||||
mock_workers.reset_mock()
|
||||
logger._clear()
|
||||
self.assertEqual(1, wsgi.run_wsgi('conf_file', server_type))
|
||||
self.assertEqual([
|
||||
'stop early',
|
||||
], logger.get_lines_for_level('error'))
|
||||
self.assertEqual([], mock_per_port.mock_calls)
|
||||
self.assertEqual([
|
||||
mock.call(stub__initrp[0], logger),
|
||||
mock.call().bind_ports(),
|
||||
], mock_workers.mock_calls)
|
||||
|
||||
stub__initrp[0]['servers_per_port'] = 3
|
||||
for server_type in ('account-server', 'container-server'):
|
||||
mock_per_port.reset_mock()
|
||||
mock_workers.reset_mock()
|
||||
logger._clear()
|
||||
self.assertEqual(1, wsgi.run_wsgi('conf_file', server_type))
|
||||
self.assertEqual([
|
||||
'stop early',
|
||||
], logger.get_lines_for_level('error'))
|
||||
self.assertEqual([], mock_per_port.mock_calls)
|
||||
self.assertEqual([
|
||||
mock.call(stub__initrp[0], logger),
|
||||
mock.call().bind_ports(),
|
||||
], mock_workers.mock_calls)
|
||||
|
||||
mock_per_port.reset_mock()
|
||||
mock_workers.reset_mock()
|
||||
logger._clear()
|
||||
self.assertEqual(1, wsgi.run_wsgi('conf_file', 'object-server'))
|
||||
self.assertEqual([
|
||||
'stop early',
|
||||
], logger.get_lines_for_level('error'))
|
||||
self.assertEqual([
|
||||
mock.call(stub__initrp[0], logger, servers_per_port=3),
|
||||
mock.call().bind_ports(),
|
||||
], mock_per_port.mock_calls)
|
||||
self.assertEqual([], mock_workers.mock_calls)
|
||||
|
||||
def test_run_server_failure1(self):
|
||||
calls = defaultdict(lambda: 0)
|
||||
|
||||
@ -751,6 +811,380 @@ class TestWSGI(unittest.TestCase):
|
||||
self.assertEquals(r.environ['PATH_INFO'], '/override')
|
||||
|
||||
|
||||
class TestServersPerPortStrategy(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.logger = FakeLogger()
|
||||
self.conf = {
|
||||
'workers': 100, # ignored
|
||||
'user': 'bob',
|
||||
'swift_dir': '/jim/cricket',
|
||||
'ring_check_interval': '76',
|
||||
'bind_ip': '2.3.4.5',
|
||||
}
|
||||
self.servers_per_port = 3
|
||||
self.s1, self.s2 = mock.MagicMock(), mock.MagicMock()
|
||||
patcher = mock.patch('swift.common.wsgi.get_socket',
|
||||
side_effect=[self.s1, self.s2])
|
||||
self.mock_get_socket = patcher.start()
|
||||
self.addCleanup(patcher.stop)
|
||||
patcher = mock.patch('swift.common.wsgi.drop_privileges')
|
||||
self.mock_drop_privileges = patcher.start()
|
||||
self.addCleanup(patcher.stop)
|
||||
patcher = mock.patch('swift.common.wsgi.BindPortsCache')
|
||||
self.mock_cache_class = patcher.start()
|
||||
self.addCleanup(patcher.stop)
|
||||
patcher = mock.patch('swift.common.wsgi.os.setsid')
|
||||
self.mock_setsid = patcher.start()
|
||||
self.addCleanup(patcher.stop)
|
||||
patcher = mock.patch('swift.common.wsgi.os.chdir')
|
||||
self.mock_chdir = patcher.start()
|
||||
self.addCleanup(patcher.stop)
|
||||
patcher = mock.patch('swift.common.wsgi.os.umask')
|
||||
self.mock_umask = patcher.start()
|
||||
self.addCleanup(patcher.stop)
|
||||
|
||||
self.all_bind_ports_for_node = \
|
||||
self.mock_cache_class().all_bind_ports_for_node
|
||||
self.ports = (6006, 6007)
|
||||
self.all_bind_ports_for_node.return_value = set(self.ports)
|
||||
|
||||
self.strategy = wsgi.ServersPerPortStrategy(self.conf, self.logger,
|
||||
self.servers_per_port)
|
||||
|
||||
def test_loop_timeout(self):
|
||||
# This strategy should loop every ring_check_interval seconds, even if
|
||||
# no workers exit.
|
||||
self.assertEqual(76, self.strategy.loop_timeout())
|
||||
|
||||
# Check the default
|
||||
del self.conf['ring_check_interval']
|
||||
self.strategy = wsgi.ServersPerPortStrategy(self.conf, self.logger,
|
||||
self.servers_per_port)
|
||||
|
||||
self.assertEqual(15, self.strategy.loop_timeout())
|
||||
|
||||
def test_bind_ports(self):
|
||||
self.strategy.bind_ports()
|
||||
|
||||
self.assertEqual(set((6006, 6007)), self.strategy.bind_ports)
|
||||
self.assertEqual([
|
||||
mock.call({'workers': 100, # ignored
|
||||
'user': 'bob',
|
||||
'swift_dir': '/jim/cricket',
|
||||
'ring_check_interval': '76',
|
||||
'bind_ip': '2.3.4.5',
|
||||
'bind_port': 6006}),
|
||||
mock.call({'workers': 100, # ignored
|
||||
'user': 'bob',
|
||||
'swift_dir': '/jim/cricket',
|
||||
'ring_check_interval': '76',
|
||||
'bind_ip': '2.3.4.5',
|
||||
'bind_port': 6007}),
|
||||
], self.mock_get_socket.mock_calls)
|
||||
self.assertEqual(
|
||||
6006, self.strategy.port_pid_state.port_for_sock(self.s1))
|
||||
self.assertEqual(
|
||||
6007, self.strategy.port_pid_state.port_for_sock(self.s2))
|
||||
self.assertEqual([mock.call()], self.mock_setsid.mock_calls)
|
||||
self.assertEqual([mock.call('/')], self.mock_chdir.mock_calls)
|
||||
self.assertEqual([mock.call(0o22)], self.mock_umask.mock_calls)
|
||||
|
||||
def test_bind_ports_ignores_setsid_errors(self):
|
||||
self.mock_setsid.side_effect = OSError()
|
||||
self.strategy.bind_ports()
|
||||
|
||||
self.assertEqual(set((6006, 6007)), self.strategy.bind_ports)
|
||||
self.assertEqual([
|
||||
mock.call({'workers': 100, # ignored
|
||||
'user': 'bob',
|
||||
'swift_dir': '/jim/cricket',
|
||||
'ring_check_interval': '76',
|
||||
'bind_ip': '2.3.4.5',
|
||||
'bind_port': 6006}),
|
||||
mock.call({'workers': 100, # ignored
|
||||
'user': 'bob',
|
||||
'swift_dir': '/jim/cricket',
|
||||
'ring_check_interval': '76',
|
||||
'bind_ip': '2.3.4.5',
|
||||
'bind_port': 6007}),
|
||||
], self.mock_get_socket.mock_calls)
|
||||
self.assertEqual(
|
||||
6006, self.strategy.port_pid_state.port_for_sock(self.s1))
|
||||
self.assertEqual(
|
||||
6007, self.strategy.port_pid_state.port_for_sock(self.s2))
|
||||
self.assertEqual([mock.call()], self.mock_setsid.mock_calls)
|
||||
self.assertEqual([mock.call('/')], self.mock_chdir.mock_calls)
|
||||
self.assertEqual([mock.call(0o22)], self.mock_umask.mock_calls)
|
||||
|
||||
def test_no_fork_sock(self):
|
||||
self.assertEqual(None, self.strategy.no_fork_sock())
|
||||
|
||||
def test_new_worker_socks(self):
|
||||
self.strategy.bind_ports()
|
||||
self.all_bind_ports_for_node.reset_mock()
|
||||
|
||||
pid = 88
|
||||
got_si = []
|
||||
for s, i in self.strategy.new_worker_socks():
|
||||
got_si.append((s, i))
|
||||
self.strategy.register_worker_start(s, i, pid)
|
||||
pid += 1
|
||||
|
||||
self.assertEqual([
|
||||
(self.s1, 0), (self.s1, 1), (self.s1, 2),
|
||||
(self.s2, 0), (self.s2, 1), (self.s2, 2),
|
||||
], got_si)
|
||||
self.assertEqual([
|
||||
'Started child %d (PID %d) for port %d' % (0, 88, 6006),
|
||||
'Started child %d (PID %d) for port %d' % (1, 89, 6006),
|
||||
'Started child %d (PID %d) for port %d' % (2, 90, 6006),
|
||||
'Started child %d (PID %d) for port %d' % (0, 91, 6007),
|
||||
'Started child %d (PID %d) for port %d' % (1, 92, 6007),
|
||||
'Started child %d (PID %d) for port %d' % (2, 93, 6007),
|
||||
], self.logger.get_lines_for_level('notice'))
|
||||
self.logger._clear()
|
||||
|
||||
# Steady-state...
|
||||
self.assertEqual([], list(self.strategy.new_worker_socks()))
|
||||
self.all_bind_ports_for_node.reset_mock()
|
||||
|
||||
# Get rid of servers for ports which disappear from the ring
|
||||
self.ports = (6007,)
|
||||
self.all_bind_ports_for_node.return_value = set(self.ports)
|
||||
self.s1.reset_mock()
|
||||
self.s2.reset_mock()
|
||||
|
||||
with mock.patch('swift.common.wsgi.greenio') as mock_greenio:
|
||||
self.assertEqual([], list(self.strategy.new_worker_socks()))
|
||||
|
||||
self.assertEqual([
|
||||
mock.call(), # ring_check_interval has passed...
|
||||
], self.all_bind_ports_for_node.mock_calls)
|
||||
self.assertEqual([
|
||||
mock.call.shutdown_safe(self.s1),
|
||||
], mock_greenio.mock_calls)
|
||||
self.assertEqual([
|
||||
mock.call.close(),
|
||||
], self.s1.mock_calls)
|
||||
self.assertEqual([], self.s2.mock_calls) # not closed
|
||||
self.assertEqual([
|
||||
'Closing unnecessary sock for port %d' % 6006,
|
||||
], self.logger.get_lines_for_level('notice'))
|
||||
self.logger._clear()
|
||||
|
||||
# Create new socket & workers for new ports that appear in ring
|
||||
self.ports = (6007, 6009)
|
||||
self.all_bind_ports_for_node.return_value = set(self.ports)
|
||||
self.s1.reset_mock()
|
||||
self.s2.reset_mock()
|
||||
s3 = mock.MagicMock()
|
||||
self.mock_get_socket.side_effect = Exception('ack')
|
||||
|
||||
# But first make sure we handle failure to bind to the requested port!
|
||||
got_si = []
|
||||
for s, i in self.strategy.new_worker_socks():
|
||||
got_si.append((s, i))
|
||||
self.strategy.register_worker_start(s, i, pid)
|
||||
pid += 1
|
||||
|
||||
self.assertEqual([], got_si)
|
||||
self.assertEqual([
|
||||
'Unable to bind to port %d: %s' % (6009, Exception('ack')),
|
||||
'Unable to bind to port %d: %s' % (6009, Exception('ack')),
|
||||
'Unable to bind to port %d: %s' % (6009, Exception('ack')),
|
||||
], self.logger.get_lines_for_level('critical'))
|
||||
self.logger._clear()
|
||||
|
||||
# Will keep trying, so let it succeed again
|
||||
self.mock_get_socket.side_effect = [s3]
|
||||
|
||||
got_si = []
|
||||
for s, i in self.strategy.new_worker_socks():
|
||||
got_si.append((s, i))
|
||||
self.strategy.register_worker_start(s, i, pid)
|
||||
pid += 1
|
||||
|
||||
self.assertEqual([
|
||||
(s3, 0), (s3, 1), (s3, 2),
|
||||
], got_si)
|
||||
self.assertEqual([
|
||||
'Started child %d (PID %d) for port %d' % (0, 94, 6009),
|
||||
'Started child %d (PID %d) for port %d' % (1, 95, 6009),
|
||||
'Started child %d (PID %d) for port %d' % (2, 96, 6009),
|
||||
], self.logger.get_lines_for_level('notice'))
|
||||
self.logger._clear()
|
||||
|
||||
# Steady-state...
|
||||
self.assertEqual([], list(self.strategy.new_worker_socks()))
|
||||
self.all_bind_ports_for_node.reset_mock()
|
||||
|
||||
# Restart a guy who died on us
|
||||
self.strategy.register_worker_exit(95) # server_idx == 1
|
||||
|
||||
got_si = []
|
||||
for s, i in self.strategy.new_worker_socks():
|
||||
got_si.append((s, i))
|
||||
self.strategy.register_worker_start(s, i, pid)
|
||||
pid += 1
|
||||
|
||||
self.assertEqual([
|
||||
(s3, 1),
|
||||
], got_si)
|
||||
self.assertEqual([
|
||||
'Started child %d (PID %d) for port %d' % (1, 97, 6009),
|
||||
], self.logger.get_lines_for_level('notice'))
|
||||
self.logger._clear()
|
||||
|
||||
# Check log_sock_exit
|
||||
self.strategy.log_sock_exit(self.s2, 2)
|
||||
self.assertEqual([
|
||||
'Child %d (PID %d, port %d) exiting normally' % (
|
||||
2, os.getpid(), 6007),
|
||||
], self.logger.get_lines_for_level('notice'))
|
||||
|
||||
# It's ok to register_worker_exit for a PID that's already had its
|
||||
# socket closed due to orphaning.
|
||||
# This is one of the workers for port 6006 that already got reaped.
|
||||
self.assertEqual(None, self.strategy.register_worker_exit(89))
|
||||
|
||||
def test_post_fork_hook(self):
|
||||
self.strategy.post_fork_hook()
|
||||
|
||||
self.assertEqual([
|
||||
mock.call('bob', call_setsid=False),
|
||||
], self.mock_drop_privileges.mock_calls)
|
||||
|
||||
def test_shutdown_sockets(self):
|
||||
self.strategy.bind_ports()
|
||||
|
||||
with mock.patch('swift.common.wsgi.greenio') as mock_greenio:
|
||||
self.strategy.shutdown_sockets()
|
||||
|
||||
self.assertEqual([
|
||||
mock.call.shutdown_safe(self.s1),
|
||||
mock.call.shutdown_safe(self.s2),
|
||||
], mock_greenio.mock_calls)
|
||||
self.assertEqual([
|
||||
mock.call.close(),
|
||||
], self.s1.mock_calls)
|
||||
self.assertEqual([
|
||||
mock.call.close(),
|
||||
], self.s2.mock_calls)
|
||||
|
||||
|
||||
class TestWorkersStrategy(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.logger = FakeLogger()
|
||||
self.conf = {
|
||||
'workers': 2,
|
||||
'user': 'bob',
|
||||
}
|
||||
self.strategy = wsgi.WorkersStrategy(self.conf, self.logger)
|
||||
patcher = mock.patch('swift.common.wsgi.get_socket',
|
||||
return_value='abc')
|
||||
self.mock_get_socket = patcher.start()
|
||||
self.addCleanup(patcher.stop)
|
||||
patcher = mock.patch('swift.common.wsgi.drop_privileges')
|
||||
self.mock_drop_privileges = patcher.start()
|
||||
self.addCleanup(patcher.stop)
|
||||
|
||||
def test_loop_timeout(self):
|
||||
# This strategy should block in the green.os.wait() until a worker
|
||||
# process exits.
|
||||
self.assertEqual(None, self.strategy.loop_timeout())
|
||||
|
||||
def test_binding(self):
|
||||
self.assertEqual(None, self.strategy.bind_ports())
|
||||
|
||||
self.assertEqual('abc', self.strategy.sock)
|
||||
self.assertEqual([
|
||||
mock.call(self.conf),
|
||||
], self.mock_get_socket.mock_calls)
|
||||
self.assertEqual([
|
||||
mock.call('bob'),
|
||||
], self.mock_drop_privileges.mock_calls)
|
||||
|
||||
self.mock_get_socket.side_effect = wsgi.ConfigFilePortError()
|
||||
|
||||
self.assertEqual(
|
||||
'bind_port wasn\'t properly set in the config file. '
|
||||
'It must be explicitly set to a valid port number.',
|
||||
self.strategy.bind_ports())
|
||||
|
||||
def test_no_fork_sock(self):
|
||||
self.strategy.bind_ports()
|
||||
self.assertEqual(None, self.strategy.no_fork_sock())
|
||||
|
||||
self.conf['workers'] = 0
|
||||
self.strategy = wsgi.WorkersStrategy(self.conf, self.logger)
|
||||
self.strategy.bind_ports()
|
||||
|
||||
self.assertEqual('abc', self.strategy.no_fork_sock())
|
||||
|
||||
def test_new_worker_socks(self):
|
||||
self.strategy.bind_ports()
|
||||
pid = 88
|
||||
sock_count = 0
|
||||
for s, i in self.strategy.new_worker_socks():
|
||||
self.assertEqual('abc', s)
|
||||
self.assertEqual(None, i) # unused for this strategy
|
||||
self.strategy.register_worker_start(s, 'unused', pid)
|
||||
pid += 1
|
||||
sock_count += 1
|
||||
|
||||
self.assertEqual([
|
||||
'Started child %s' % 88,
|
||||
'Started child %s' % 89,
|
||||
], self.logger.get_lines_for_level('notice'))
|
||||
|
||||
self.assertEqual(2, sock_count)
|
||||
self.assertEqual([], list(self.strategy.new_worker_socks()))
|
||||
|
||||
sock_count = 0
|
||||
self.strategy.register_worker_exit(88)
|
||||
|
||||
self.assertEqual([
|
||||
'Removing dead child %s' % 88,
|
||||
], self.logger.get_lines_for_level('error'))
|
||||
|
||||
for s, i in self.strategy.new_worker_socks():
|
||||
self.assertEqual('abc', s)
|
||||
self.assertEqual(None, i) # unused for this strategy
|
||||
self.strategy.register_worker_start(s, 'unused', pid)
|
||||
pid += 1
|
||||
sock_count += 1
|
||||
|
||||
self.assertEqual(1, sock_count)
|
||||
self.assertEqual([
|
||||
'Started child %s' % 88,
|
||||
'Started child %s' % 89,
|
||||
'Started child %s' % 90,
|
||||
], self.logger.get_lines_for_level('notice'))
|
||||
|
||||
def test_post_fork_hook(self):
|
||||
# Just don't crash or do something stupid
|
||||
self.assertEqual(None, self.strategy.post_fork_hook())
|
||||
|
||||
def test_shutdown_sockets(self):
|
||||
self.mock_get_socket.return_value = mock.MagicMock()
|
||||
self.strategy.bind_ports()
|
||||
with mock.patch('swift.common.wsgi.greenio') as mock_greenio:
|
||||
self.strategy.shutdown_sockets()
|
||||
self.assertEqual([
|
||||
mock.call.shutdown_safe(self.mock_get_socket.return_value),
|
||||
], mock_greenio.mock_calls)
|
||||
self.assertEqual([
|
||||
mock.call.close(),
|
||||
], self.mock_get_socket.return_value.mock_calls)
|
||||
|
||||
def test_log_sock_exit(self):
|
||||
self.strategy.log_sock_exit('blahblah', 'blahblah')
|
||||
my_pid = os.getpid()
|
||||
self.assertEqual([
|
||||
'Child %d exiting normally' % my_pid,
|
||||
], self.logger.get_lines_for_level('notice'))
|
||||
|
||||
|
||||
class TestWSGIContext(unittest.TestCase):
|
||||
|
||||
def test_app_call(self):
|
||||
|
@ -289,7 +289,11 @@ class TestContainerSync(unittest.TestCase):
|
||||
# those.
|
||||
cring = FakeRing()
|
||||
with mock.patch('swift.container.sync.InternalClient'):
|
||||
cs = sync.ContainerSync({}, container_ring=cring)
|
||||
cs = sync.ContainerSync({
|
||||
'bind_ip': '10.0.0.0',
|
||||
}, container_ring=cring)
|
||||
# Plumbing test for bind_ip and whataremyips()
|
||||
self.assertEqual(['10.0.0.0'], cs._myips)
|
||||
orig_ContainerBroker = sync.ContainerBroker
|
||||
try:
|
||||
sync.ContainerBroker = lambda p: FakeContainerBroker(
|
||||
|
@ -73,16 +73,10 @@ def make_ec_archive_bodies(policy, test_body):
|
||||
fragment_payloads.append(fragments)
|
||||
|
||||
# join up the fragment payloads per node
|
||||
ec_archive_bodies = [''.join(fragments)
|
||||
for fragments in zip(*fragment_payloads)]
|
||||
ec_archive_bodies = [''.join(frags) for frags in zip(*fragment_payloads)]
|
||||
return ec_archive_bodies
|
||||
|
||||
|
||||
def _ips():
|
||||
return ['127.0.0.1']
|
||||
object_reconstructor.whataremyips = _ips
|
||||
|
||||
|
||||
def _create_test_rings(path):
|
||||
testgz = os.path.join(path, 'object.ring.gz')
|
||||
intended_replica2part2dev_id = [
|
||||
@ -582,7 +576,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
except AssertionError as e:
|
||||
extra_info = \
|
||||
'\n\n... for %r in part num %s job %r' % (
|
||||
k, part_num, job_key)
|
||||
k, part_num, job_key)
|
||||
raise AssertionError(str(e) + extra_info)
|
||||
else:
|
||||
self.fail(
|
||||
@ -1001,6 +995,7 @@ class TestObjectReconstructor(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.policy = POLICIES.default
|
||||
self.policy.object_ring._rtime = time.time() + 3600
|
||||
self.testdir = tempfile.mkdtemp()
|
||||
self.devices = os.path.join(self.testdir, 'devices')
|
||||
self.local_dev = self.policy.object_ring.devs[0]
|
||||
@ -1009,6 +1004,7 @@ class TestObjectReconstructor(unittest.TestCase):
|
||||
self.conf = {
|
||||
'devices': self.devices,
|
||||
'mount_check': False,
|
||||
'bind_ip': self.ip,
|
||||
'bind_port': self.port,
|
||||
}
|
||||
self.logger = debug_logger('object-reconstructor')
|
||||
@ -1042,9 +1038,7 @@ class TestObjectReconstructor(unittest.TestCase):
|
||||
utils.mkdirs(os.path.join(
|
||||
self.devices, self.local_dev['device'],
|
||||
datadir, str(part)))
|
||||
with mock.patch('swift.obj.reconstructor.whataremyips',
|
||||
return_value=[self.ip]):
|
||||
part_infos = list(self.reconstructor.collect_parts())
|
||||
part_infos = list(self.reconstructor.collect_parts())
|
||||
found_parts = sorted(int(p['partition']) for p in part_infos)
|
||||
self.assertEqual(found_parts, sorted(stub_parts))
|
||||
for part_info in part_infos:
|
||||
@ -1056,10 +1050,112 @@ class TestObjectReconstructor(unittest.TestCase):
|
||||
diskfile.get_data_dir(self.policy),
|
||||
str(part_info['partition'])))
|
||||
|
||||
def test_collect_parts_skips_non_local_devs_servers_per_port(self):
|
||||
self._configure_reconstructor(devices=self.devices, mount_check=False,
|
||||
bind_ip=self.ip, bind_port=self.port,
|
||||
servers_per_port=2)
|
||||
|
||||
device_parts = {
|
||||
'sda': (374,),
|
||||
'sdb': (179, 807), # w/one-serv-per-port, same IP alone is local
|
||||
'sdc': (363, 468, 843),
|
||||
'sdd': (912,), # "not local" via different IP
|
||||
}
|
||||
for policy in POLICIES:
|
||||
datadir = diskfile.get_data_dir(policy)
|
||||
for dev, parts in device_parts.items():
|
||||
for part in parts:
|
||||
utils.mkdirs(os.path.join(
|
||||
self.devices, dev,
|
||||
datadir, str(part)))
|
||||
|
||||
# we're only going to add sda and sdc into the ring
|
||||
local_devs = ('sda', 'sdb', 'sdc')
|
||||
stub_ring_devs = [{
|
||||
'device': dev,
|
||||
'replication_ip': self.ip,
|
||||
'replication_port': self.port + 1 if dev == 'sdb' else self.port,
|
||||
} for dev in local_devs]
|
||||
stub_ring_devs.append({
|
||||
'device': 'sdd',
|
||||
'replication_ip': '127.0.0.88', # not local via IP
|
||||
'replication_port': self.port,
|
||||
})
|
||||
self.reconstructor.bind_ip = '0.0.0.0' # use whataremyips
|
||||
with nested(mock.patch('swift.obj.reconstructor.whataremyips',
|
||||
return_value=[self.ip]),
|
||||
mock.patch.object(self.policy.object_ring, '_devs',
|
||||
new=stub_ring_devs)):
|
||||
part_infos = list(self.reconstructor.collect_parts())
|
||||
found_parts = sorted(int(p['partition']) for p in part_infos)
|
||||
expected_parts = sorted(itertools.chain(
|
||||
*(device_parts[d] for d in local_devs)))
|
||||
self.assertEqual(found_parts, expected_parts)
|
||||
for part_info in part_infos:
|
||||
self.assertEqual(part_info['policy'], self.policy)
|
||||
self.assertTrue(part_info['local_dev'] in stub_ring_devs)
|
||||
dev = part_info['local_dev']
|
||||
self.assertEqual(part_info['part_path'],
|
||||
os.path.join(self.devices,
|
||||
dev['device'],
|
||||
diskfile.get_data_dir(self.policy),
|
||||
str(part_info['partition'])))
|
||||
|
||||
def test_collect_parts_multi_device_skips_non_non_local_devs(self):
|
||||
device_parts = {
|
||||
'sda': (374,),
|
||||
'sdb': (179, 807), # "not local" via different port
|
||||
'sdc': (363, 468, 843),
|
||||
'sdd': (912,), # "not local" via different IP
|
||||
}
|
||||
for policy in POLICIES:
|
||||
datadir = diskfile.get_data_dir(policy)
|
||||
for dev, parts in device_parts.items():
|
||||
for part in parts:
|
||||
utils.mkdirs(os.path.join(
|
||||
self.devices, dev,
|
||||
datadir, str(part)))
|
||||
|
||||
# we're only going to add sda and sdc into the ring
|
||||
local_devs = ('sda', 'sdc')
|
||||
stub_ring_devs = [{
|
||||
'device': dev,
|
||||
'replication_ip': self.ip,
|
||||
'replication_port': self.port,
|
||||
} for dev in local_devs]
|
||||
stub_ring_devs.append({
|
||||
'device': 'sdb',
|
||||
'replication_ip': self.ip,
|
||||
'replication_port': self.port + 1, # not local via port
|
||||
})
|
||||
stub_ring_devs.append({
|
||||
'device': 'sdd',
|
||||
'replication_ip': '127.0.0.88', # not local via IP
|
||||
'replication_port': self.port,
|
||||
})
|
||||
self.reconstructor.bind_ip = '0.0.0.0' # use whataremyips
|
||||
with nested(mock.patch('swift.obj.reconstructor.whataremyips',
|
||||
return_value=[self.ip]),
|
||||
mock.patch.object(self.policy.object_ring, '_devs',
|
||||
new=stub_ring_devs)):
|
||||
part_infos = list(self.reconstructor.collect_parts())
|
||||
found_parts = sorted(int(p['partition']) for p in part_infos)
|
||||
expected_parts = sorted(itertools.chain(
|
||||
*(device_parts[d] for d in local_devs)))
|
||||
self.assertEqual(found_parts, expected_parts)
|
||||
for part_info in part_infos:
|
||||
self.assertEqual(part_info['policy'], self.policy)
|
||||
self.assertTrue(part_info['local_dev'] in stub_ring_devs)
|
||||
dev = part_info['local_dev']
|
||||
self.assertEqual(part_info['part_path'],
|
||||
os.path.join(self.devices,
|
||||
dev['device'],
|
||||
diskfile.get_data_dir(self.policy),
|
||||
str(part_info['partition'])))
|
||||
|
||||
def test_collect_parts_multi_device_skips_non_ring_devices(self):
|
||||
device_parts = {
|
||||
'sda': (374,),
|
||||
'sdb': (179, 807),
|
||||
'sdc': (363, 468, 843),
|
||||
}
|
||||
for policy in POLICIES:
|
||||
@ -1075,8 +1171,9 @@ class TestObjectReconstructor(unittest.TestCase):
|
||||
stub_ring_devs = [{
|
||||
'device': dev,
|
||||
'replication_ip': self.ip,
|
||||
'replication_port': self.port
|
||||
'replication_port': self.port,
|
||||
} for dev in local_devs]
|
||||
self.reconstructor.bind_ip = '0.0.0.0' # use whataremyips
|
||||
with nested(mock.patch('swift.obj.reconstructor.whataremyips',
|
||||
return_value=[self.ip]),
|
||||
mock.patch.object(self.policy.object_ring, '_devs',
|
||||
|
@ -36,9 +36,8 @@ from swift.obj import diskfile, replicator as object_replicator
|
||||
from swift.common.storage_policy import StoragePolicy, POLICIES
|
||||
|
||||
|
||||
def _ips():
|
||||
def _ips(*args, **kwargs):
|
||||
return ['127.0.0.0']
|
||||
object_replicator.whataremyips = _ips
|
||||
|
||||
|
||||
def mock_http_connect(status):
|
||||
@ -171,34 +170,46 @@ class TestObjectReplicator(unittest.TestCase):
|
||||
rmtree(self.testdir, ignore_errors=1)
|
||||
os.mkdir(self.testdir)
|
||||
os.mkdir(self.devices)
|
||||
os.mkdir(os.path.join(self.devices, 'sda'))
|
||||
self.objects = os.path.join(self.devices, 'sda',
|
||||
diskfile.get_data_dir(POLICIES[0]))
|
||||
self.objects_1 = os.path.join(self.devices, 'sda',
|
||||
diskfile.get_data_dir(POLICIES[1]))
|
||||
os.mkdir(self.objects)
|
||||
os.mkdir(self.objects_1)
|
||||
self.parts = {}
|
||||
self.parts_1 = {}
|
||||
for part in ['0', '1', '2', '3']:
|
||||
self.parts[part] = os.path.join(self.objects, part)
|
||||
os.mkdir(self.parts[part])
|
||||
self.parts_1[part] = os.path.join(self.objects_1, part)
|
||||
os.mkdir(self.parts_1[part])
|
||||
|
||||
self.objects, self.objects_1, self.parts, self.parts_1 = \
|
||||
self._write_disk_data('sda')
|
||||
_create_test_rings(self.testdir)
|
||||
self.logger = debug_logger('test-replicator')
|
||||
self.conf = dict(
|
||||
bind_ip=_ips()[0], bind_port=6000,
|
||||
swift_dir=self.testdir, devices=self.devices, mount_check='false',
|
||||
timeout='300', stats_interval='1', sync_method='rsync')
|
||||
self.replicator = object_replicator.ObjectReplicator(self.conf)
|
||||
self.logger = self.replicator.logger = debug_logger('test-replicator')
|
||||
self.df_mgr = diskfile.DiskFileManager(self.conf,
|
||||
self.replicator.logger)
|
||||
self._create_replicator()
|
||||
|
||||
def tearDown(self):
|
||||
rmtree(self.testdir, ignore_errors=1)
|
||||
|
||||
def _write_disk_data(self, disk_name):
|
||||
os.mkdir(os.path.join(self.devices, disk_name))
|
||||
objects = os.path.join(self.devices, disk_name,
|
||||
diskfile.get_data_dir(POLICIES[0]))
|
||||
objects_1 = os.path.join(self.devices, disk_name,
|
||||
diskfile.get_data_dir(POLICIES[1]))
|
||||
os.mkdir(objects)
|
||||
os.mkdir(objects_1)
|
||||
parts = {}
|
||||
parts_1 = {}
|
||||
for part in ['0', '1', '2', '3']:
|
||||
parts[part] = os.path.join(objects, part)
|
||||
os.mkdir(parts[part])
|
||||
parts_1[part] = os.path.join(objects_1, part)
|
||||
os.mkdir(parts_1[part])
|
||||
|
||||
return objects, objects_1, parts, parts_1
|
||||
|
||||
def _create_replicator(self):
|
||||
self.replicator = object_replicator.ObjectReplicator(self.conf)
|
||||
self.replicator.logger = self.logger
|
||||
self.df_mgr = diskfile.DiskFileManager(self.conf, self.logger)
|
||||
|
||||
def test_run_once(self):
|
||||
conf = dict(swift_dir=self.testdir, devices=self.devices,
|
||||
bind_ip=_ips()[0],
|
||||
mount_check='false', timeout='300', stats_interval='1')
|
||||
replicator = object_replicator.ObjectReplicator(conf)
|
||||
was_connector = object_replicator.http_connect
|
||||
@ -260,7 +271,9 @@ class TestObjectReplicator(unittest.TestCase):
|
||||
process_arg_checker.append(
|
||||
(0, '', ['rsync', whole_path_from, rsync_mods]))
|
||||
with _mock_process(process_arg_checker):
|
||||
replicator.run_once()
|
||||
with mock.patch('swift.obj.replicator.whataremyips',
|
||||
side_effect=_ips):
|
||||
replicator.run_once()
|
||||
self.assertFalse(process_errors)
|
||||
object_replicator.http_connect = was_connector
|
||||
|
||||
@ -321,17 +334,306 @@ class TestObjectReplicator(unittest.TestCase):
|
||||
[node['id'] for node in jobs_by_pol_part['12']['nodes']], [2, 3])
|
||||
self.assertEquals(
|
||||
[node['id'] for node in jobs_by_pol_part['13']['nodes']], [3, 1])
|
||||
for part in ['00', '01', '02', '03', ]:
|
||||
for part in ['00', '01', '02', '03']:
|
||||
for node in jobs_by_pol_part[part]['nodes']:
|
||||
self.assertEquals(node['device'], 'sda')
|
||||
self.assertEquals(jobs_by_pol_part[part]['path'],
|
||||
os.path.join(self.objects, part[1:]))
|
||||
for part in ['10', '11', '12', '13', ]:
|
||||
for part in ['10', '11', '12', '13']:
|
||||
for node in jobs_by_pol_part[part]['nodes']:
|
||||
self.assertEquals(node['device'], 'sda')
|
||||
self.assertEquals(jobs_by_pol_part[part]['path'],
|
||||
os.path.join(self.objects_1, part[1:]))
|
||||
|
||||
@mock.patch('swift.obj.replicator.random.shuffle', side_effect=lambda l: l)
|
||||
def test_collect_jobs_multi_disk(self, mock_shuffle):
|
||||
devs = [
|
||||
# Two disks on same IP/port
|
||||
{'id': 0, 'device': 'sda', 'zone': 0,
|
||||
'region': 1, 'ip': '1.1.1.1', 'port': 1111,
|
||||
'replication_ip': '127.0.0.0', 'replication_port': 6000},
|
||||
{'id': 1, 'device': 'sdb', 'zone': 1,
|
||||
'region': 1, 'ip': '1.1.1.1', 'port': 1111,
|
||||
'replication_ip': '127.0.0.0', 'replication_port': 6000},
|
||||
# Two disks on same server, different ports
|
||||
{'id': 2, 'device': 'sdc', 'zone': 2,
|
||||
'region': 2, 'ip': '1.1.1.2', 'port': 1112,
|
||||
'replication_ip': '127.0.0.1', 'replication_port': 6000},
|
||||
{'id': 3, 'device': 'sdd', 'zone': 4,
|
||||
'region': 2, 'ip': '1.1.1.2', 'port': 1112,
|
||||
'replication_ip': '127.0.0.1', 'replication_port': 6001},
|
||||
]
|
||||
objects_sdb, objects_1_sdb, _, _ = self._write_disk_data('sdb')
|
||||
objects_sdc, objects_1_sdc, _, _ = self._write_disk_data('sdc')
|
||||
objects_sdd, objects_1_sdd, _, _ = self._write_disk_data('sdd')
|
||||
_create_test_rings(self.testdir, devs)
|
||||
|
||||
jobs = self.replicator.collect_jobs()
|
||||
|
||||
self.assertEqual([mock.call(jobs)], mock_shuffle.mock_calls)
|
||||
|
||||
jobs_to_delete = [j for j in jobs if j['delete']]
|
||||
self.assertEquals(len(jobs_to_delete), 4)
|
||||
self.assertEqual([
|
||||
'1', '2', # policy 0; 1 not on sda, 2 not on sdb
|
||||
'1', '2', # policy 1; 1 not on sda, 2 not on sdb
|
||||
], [j['partition'] for j in jobs_to_delete])
|
||||
|
||||
jobs_by_pol_part_dev = {}
|
||||
for job in jobs:
|
||||
# There should be no jobs with a device not in just sda & sdb
|
||||
self.assertTrue(job['device'] in ('sda', 'sdb'))
|
||||
jobs_by_pol_part_dev[
|
||||
str(int(job['policy'])) + job['partition'] + job['device']
|
||||
] = job
|
||||
|
||||
self.assertEquals([node['id']
|
||||
for node in jobs_by_pol_part_dev['00sda']['nodes']],
|
||||
[1, 2])
|
||||
self.assertEquals([node['id']
|
||||
for node in jobs_by_pol_part_dev['00sdb']['nodes']],
|
||||
[0, 2])
|
||||
self.assertEquals([node['id']
|
||||
for node in jobs_by_pol_part_dev['01sda']['nodes']],
|
||||
[1, 2, 3])
|
||||
self.assertEquals([node['id']
|
||||
for node in jobs_by_pol_part_dev['01sdb']['nodes']],
|
||||
[2, 3])
|
||||
self.assertEquals([node['id']
|
||||
for node in jobs_by_pol_part_dev['02sda']['nodes']],
|
||||
[2, 3])
|
||||
self.assertEquals([node['id']
|
||||
for node in jobs_by_pol_part_dev['02sdb']['nodes']],
|
||||
[2, 3, 0])
|
||||
self.assertEquals([node['id']
|
||||
for node in jobs_by_pol_part_dev['03sda']['nodes']],
|
||||
[3, 1])
|
||||
self.assertEquals([node['id']
|
||||
for node in jobs_by_pol_part_dev['03sdb']['nodes']],
|
||||
[3, 0])
|
||||
self.assertEquals([node['id']
|
||||
for node in jobs_by_pol_part_dev['10sda']['nodes']],
|
||||
[1, 2])
|
||||
self.assertEquals([node['id']
|
||||
for node in jobs_by_pol_part_dev['10sdb']['nodes']],
|
||||
[0, 2])
|
||||
self.assertEquals([node['id']
|
||||
for node in jobs_by_pol_part_dev['11sda']['nodes']],
|
||||
[1, 2, 3])
|
||||
self.assertEquals([node['id']
|
||||
for node in jobs_by_pol_part_dev['11sdb']['nodes']],
|
||||
[2, 3])
|
||||
self.assertEquals([node['id']
|
||||
for node in jobs_by_pol_part_dev['12sda']['nodes']],
|
||||
[2, 3])
|
||||
self.assertEquals([node['id']
|
||||
for node in jobs_by_pol_part_dev['12sdb']['nodes']],
|
||||
[2, 3, 0])
|
||||
self.assertEquals([node['id']
|
||||
for node in jobs_by_pol_part_dev['13sda']['nodes']],
|
||||
[3, 1])
|
||||
self.assertEquals([node['id']
|
||||
for node in jobs_by_pol_part_dev['13sdb']['nodes']],
|
||||
[3, 0])
|
||||
for part in ['00', '01', '02', '03']:
|
||||
self.assertEquals(jobs_by_pol_part_dev[part + 'sda']['path'],
|
||||
os.path.join(self.objects, part[1:]))
|
||||
self.assertEquals(jobs_by_pol_part_dev[part + 'sdb']['path'],
|
||||
os.path.join(objects_sdb, part[1:]))
|
||||
for part in ['10', '11', '12', '13']:
|
||||
self.assertEquals(jobs_by_pol_part_dev[part + 'sda']['path'],
|
||||
os.path.join(self.objects_1, part[1:]))
|
||||
self.assertEquals(jobs_by_pol_part_dev[part + 'sdb']['path'],
|
||||
os.path.join(objects_1_sdb, part[1:]))
|
||||
|
||||
@mock.patch('swift.obj.replicator.random.shuffle', side_effect=lambda l: l)
|
||||
def test_collect_jobs_multi_disk_diff_ports_normal(self, mock_shuffle):
|
||||
# Normally (servers_per_port=0), replication_ip AND replication_port
|
||||
# are used to determine local ring device entries. Here we show that
|
||||
# with bind_ip='127.0.0.1', bind_port=6000, only "sdc" is local.
|
||||
devs = [
|
||||
# Two disks on same IP/port
|
||||
{'id': 0, 'device': 'sda', 'zone': 0,
|
||||
'region': 1, 'ip': '1.1.1.1', 'port': 1111,
|
||||
'replication_ip': '127.0.0.0', 'replication_port': 6000},
|
||||
{'id': 1, 'device': 'sdb', 'zone': 1,
|
||||
'region': 1, 'ip': '1.1.1.1', 'port': 1111,
|
||||
'replication_ip': '127.0.0.0', 'replication_port': 6000},
|
||||
# Two disks on same server, different ports
|
||||
{'id': 2, 'device': 'sdc', 'zone': 2,
|
||||
'region': 2, 'ip': '1.1.1.2', 'port': 1112,
|
||||
'replication_ip': '127.0.0.1', 'replication_port': 6000},
|
||||
{'id': 3, 'device': 'sdd', 'zone': 4,
|
||||
'region': 2, 'ip': '1.1.1.2', 'port': 1112,
|
||||
'replication_ip': '127.0.0.1', 'replication_port': 6001},
|
||||
]
|
||||
objects_sdb, objects_1_sdb, _, _ = self._write_disk_data('sdb')
|
||||
objects_sdc, objects_1_sdc, _, _ = self._write_disk_data('sdc')
|
||||
objects_sdd, objects_1_sdd, _, _ = self._write_disk_data('sdd')
|
||||
_create_test_rings(self.testdir, devs)
|
||||
|
||||
self.conf['bind_ip'] = '127.0.0.1'
|
||||
self._create_replicator()
|
||||
|
||||
jobs = self.replicator.collect_jobs()
|
||||
|
||||
self.assertEqual([mock.call(jobs)], mock_shuffle.mock_calls)
|
||||
|
||||
jobs_to_delete = [j for j in jobs if j['delete']]
|
||||
self.assertEquals(len(jobs_to_delete), 2)
|
||||
self.assertEqual([
|
||||
'3', # policy 0; 3 not on sdc
|
||||
'3', # policy 1; 3 not on sdc
|
||||
], [j['partition'] for j in jobs_to_delete])
|
||||
|
||||
jobs_by_pol_part_dev = {}
|
||||
for job in jobs:
|
||||
# There should be no jobs with a device not sdc
|
||||
self.assertEqual(job['device'], 'sdc')
|
||||
jobs_by_pol_part_dev[
|
||||
str(int(job['policy'])) + job['partition'] + job['device']
|
||||
] = job
|
||||
|
||||
self.assertEquals([node['id']
|
||||
for node in jobs_by_pol_part_dev['00sdc']['nodes']],
|
||||
[0, 1])
|
||||
self.assertEquals([node['id']
|
||||
for node in jobs_by_pol_part_dev['01sdc']['nodes']],
|
||||
[1, 3])
|
||||
self.assertEquals([node['id']
|
||||
for node in jobs_by_pol_part_dev['02sdc']['nodes']],
|
||||
[3, 0])
|
||||
self.assertEquals([node['id']
|
||||
for node in jobs_by_pol_part_dev['03sdc']['nodes']],
|
||||
[3, 0, 1])
|
||||
self.assertEquals([node['id']
|
||||
for node in jobs_by_pol_part_dev['10sdc']['nodes']],
|
||||
[0, 1])
|
||||
self.assertEquals([node['id']
|
||||
for node in jobs_by_pol_part_dev['11sdc']['nodes']],
|
||||
[1, 3])
|
||||
self.assertEquals([node['id']
|
||||
for node in jobs_by_pol_part_dev['12sdc']['nodes']],
|
||||
[3, 0])
|
||||
self.assertEquals([node['id']
|
||||
for node in jobs_by_pol_part_dev['13sdc']['nodes']],
|
||||
[3, 0, 1])
|
||||
for part in ['00', '01', '02', '03']:
|
||||
self.assertEquals(jobs_by_pol_part_dev[part + 'sdc']['path'],
|
||||
os.path.join(objects_sdc, part[1:]))
|
||||
for part in ['10', '11', '12', '13']:
|
||||
self.assertEquals(jobs_by_pol_part_dev[part + 'sdc']['path'],
|
||||
os.path.join(objects_1_sdc, part[1:]))
|
||||
|
||||
@mock.patch('swift.obj.replicator.random.shuffle', side_effect=lambda l: l)
|
||||
def test_collect_jobs_multi_disk_servers_per_port(self, mock_shuffle):
|
||||
# Normally (servers_per_port=0), replication_ip AND replication_port
|
||||
# are used to determine local ring device entries. Here we show that
|
||||
# with servers_per_port > 0 and bind_ip='127.0.0.1', bind_port=6000,
|
||||
# then both "sdc" and "sdd" are local.
|
||||
devs = [
|
||||
# Two disks on same IP/port
|
||||
{'id': 0, 'device': 'sda', 'zone': 0,
|
||||
'region': 1, 'ip': '1.1.1.1', 'port': 1111,
|
||||
'replication_ip': '127.0.0.0', 'replication_port': 6000},
|
||||
{'id': 1, 'device': 'sdb', 'zone': 1,
|
||||
'region': 1, 'ip': '1.1.1.1', 'port': 1111,
|
||||
'replication_ip': '127.0.0.0', 'replication_port': 6000},
|
||||
# Two disks on same server, different ports
|
||||
{'id': 2, 'device': 'sdc', 'zone': 2,
|
||||
'region': 2, 'ip': '1.1.1.2', 'port': 1112,
|
||||
'replication_ip': '127.0.0.1', 'replication_port': 6000},
|
||||
{'id': 3, 'device': 'sdd', 'zone': 4,
|
||||
'region': 2, 'ip': '1.1.1.2', 'port': 1112,
|
||||
'replication_ip': '127.0.0.1', 'replication_port': 6001},
|
||||
]
|
||||
objects_sdb, objects_1_sdb, _, _ = self._write_disk_data('sdb')
|
||||
objects_sdc, objects_1_sdc, _, _ = self._write_disk_data('sdc')
|
||||
objects_sdd, objects_1_sdd, _, _ = self._write_disk_data('sdd')
|
||||
_create_test_rings(self.testdir, devs)
|
||||
|
||||
self.conf['bind_ip'] = '127.0.0.1'
|
||||
self.conf['servers_per_port'] = 1 # diff port ok
|
||||
self._create_replicator()
|
||||
|
||||
jobs = self.replicator.collect_jobs()
|
||||
|
||||
self.assertEqual([mock.call(jobs)], mock_shuffle.mock_calls)
|
||||
|
||||
jobs_to_delete = [j for j in jobs if j['delete']]
|
||||
self.assertEquals(len(jobs_to_delete), 4)
|
||||
self.assertEqual([
|
||||
'3', '0', # policy 0; 3 not on sdc, 0 not on sdd
|
||||
'3', '0', # policy 1; 3 not on sdc, 0 not on sdd
|
||||
], [j['partition'] for j in jobs_to_delete])
|
||||
|
||||
jobs_by_pol_part_dev = {}
|
||||
for job in jobs:
|
||||
# There should be no jobs with a device not in just sdc & sdd
|
||||
self.assertTrue(job['device'] in ('sdc', 'sdd'))
|
||||
jobs_by_pol_part_dev[
|
||||
str(int(job['policy'])) + job['partition'] + job['device']
|
||||
] = job
|
||||
|
||||
self.assertEquals([node['id']
|
||||
for node in jobs_by_pol_part_dev['00sdc']['nodes']],
|
||||
[0, 1])
|
||||
self.assertEquals([node['id']
|
||||
for node in jobs_by_pol_part_dev['00sdd']['nodes']],
|
||||
[0, 1, 2])
|
||||
self.assertEquals([node['id']
|
||||
for node in jobs_by_pol_part_dev['01sdc']['nodes']],
|
||||
[1, 3])
|
||||
self.assertEquals([node['id']
|
||||
for node in jobs_by_pol_part_dev['01sdd']['nodes']],
|
||||
[1, 2])
|
||||
self.assertEquals([node['id']
|
||||
for node in jobs_by_pol_part_dev['02sdc']['nodes']],
|
||||
[3, 0])
|
||||
self.assertEquals([node['id']
|
||||
for node in jobs_by_pol_part_dev['02sdd']['nodes']],
|
||||
[2, 0])
|
||||
self.assertEquals([node['id']
|
||||
for node in jobs_by_pol_part_dev['03sdc']['nodes']],
|
||||
[3, 0, 1])
|
||||
self.assertEquals([node['id']
|
||||
for node in jobs_by_pol_part_dev['03sdd']['nodes']],
|
||||
[0, 1])
|
||||
self.assertEquals([node['id']
|
||||
for node in jobs_by_pol_part_dev['10sdc']['nodes']],
|
||||
[0, 1])
|
||||
self.assertEquals([node['id']
|
||||
for node in jobs_by_pol_part_dev['10sdd']['nodes']],
|
||||
[0, 1, 2])
|
||||
self.assertEquals([node['id']
|
||||
for node in jobs_by_pol_part_dev['11sdc']['nodes']],
|
||||
[1, 3])
|
||||
self.assertEquals([node['id']
|
||||
for node in jobs_by_pol_part_dev['11sdd']['nodes']],
|
||||
[1, 2])
|
||||
self.assertEquals([node['id']
|
||||
for node in jobs_by_pol_part_dev['12sdc']['nodes']],
|
||||
[3, 0])
|
||||
self.assertEquals([node['id']
|
||||
for node in jobs_by_pol_part_dev['12sdd']['nodes']],
|
||||
[2, 0])
|
||||
self.assertEquals([node['id']
|
||||
for node in jobs_by_pol_part_dev['13sdc']['nodes']],
|
||||
[3, 0, 1])
|
||||
self.assertEquals([node['id']
|
||||
for node in jobs_by_pol_part_dev['13sdd']['nodes']],
|
||||
[0, 1])
|
||||
for part in ['00', '01', '02', '03']:
|
||||
self.assertEquals(jobs_by_pol_part_dev[part + 'sdc']['path'],
|
||||
os.path.join(objects_sdc, part[1:]))
|
||||
self.assertEquals(jobs_by_pol_part_dev[part + 'sdd']['path'],
|
||||
os.path.join(objects_sdd, part[1:]))
|
||||
for part in ['10', '11', '12', '13']:
|
||||
self.assertEquals(jobs_by_pol_part_dev[part + 'sdc']['path'],
|
||||
os.path.join(objects_1_sdc, part[1:]))
|
||||
self.assertEquals(jobs_by_pol_part_dev[part + 'sdd']['path'],
|
||||
os.path.join(objects_1_sdd, part[1:]))
|
||||
|
||||
def test_collect_jobs_handoffs_first(self):
|
||||
self.replicator.handoffs_first = True
|
||||
jobs = self.replicator.collect_jobs()
|
||||
@ -929,6 +1231,7 @@ class TestObjectReplicator(unittest.TestCase):
|
||||
|
||||
def test_run_once_recover_from_failure(self):
|
||||
conf = dict(swift_dir=self.testdir, devices=self.devices,
|
||||
bind_ip=_ips()[0],
|
||||
mount_check='false', timeout='300', stats_interval='1')
|
||||
replicator = object_replicator.ObjectReplicator(conf)
|
||||
was_connector = object_replicator.http_connect
|
||||
@ -975,6 +1278,7 @@ class TestObjectReplicator(unittest.TestCase):
|
||||
|
||||
def test_run_once_recover_from_timeout(self):
|
||||
conf = dict(swift_dir=self.testdir, devices=self.devices,
|
||||
bind_ips=_ips()[0],
|
||||
mount_check='false', timeout='300', stats_interval='1')
|
||||
replicator = object_replicator.ObjectReplicator(conf)
|
||||
was_connector = object_replicator.http_connect
|
||||
|
Loading…
Reference in New Issue
Block a user