do container listing updates in another (green)thread

The actual server-side changes are simple. The tests are a different
matter. Many changes were needed to the object server tests to
handle the now-async calls to the container server. In an effort to
test this properly, some drive-by changes were made to improve tests.

I tested this patch by doing zero-byte object writes to one container
as fast as possible. Then I did it again while also saturating 2 of the
container replica's disks. The results are linked below.

https://gist.github.com/notmyname/2bb85acfd8fbc7fc312a

DocImpact

Change-Id: I737bd0af3f124a4ce3e0862a155e97c1f0ac3e52
This commit is contained in:
John Dickinson 2015-05-23 15:40:03 -07:00 committed by Clay Gerrard
parent c3cc98b2c9
commit 2289137164
5 changed files with 360 additions and 179 deletions

View File

@ -129,6 +129,8 @@ Logging address. The default is /dev/log.
Request timeout to external services. The default is 3 seconds.
.IP \fBconn_timeout\fR
Connection timeout to external services. The default is 0.5 seconds.
.IP \fBcontainer_update_timeout\fR
Request timeout to do a container update on an object update. The default is 1 second.
.RE
.PD

View File

@ -405,11 +405,12 @@ The following configuration options are available:
[DEFAULT]
=================== ========== =============================================
======================== ========== ==========================================
Option Default Description
------------------- ---------- ---------------------------------------------
------------------------ ---------- ------------------------------------------
swift_dir /etc/swift Swift configuration directory
devices /srv/node Parent directory of where devices are mounted
devices /srv/node Parent directory of where devices are
mounted
mount_check true Whether or not check if the devices are
mounted to prevent accidentally writing
to the root device
@ -418,32 +419,35 @@ bind_port 6000 Port for server to bind to
bind_timeout 30 Seconds to attempt bind before giving up
workers auto Override the number of pre-forked workers
that will accept connections. If set it
should be an integer, zero means no fork. If
unset, it will try to default to the number
of effective cpu cores and fallback to one.
Increasing the number of workers helps slow
filesystem operations in one request from
negatively impacting other requests, but only
the :ref:`servers_per_port
<server-per-port-configuration>`
option provides complete I/O isolation with
no measurable overhead.
servers_per_port 0 If each disk in each storage policy ring has
unique port numbers for its "ip" value, you
can use this setting to have each
object-server worker only service requests
for the single disk matching the port in the
ring. The value of this setting determines
how many worker processes run for each port
(disk) in the ring. If you have 24 disks
per server, and this setting is 4, then
each storage node will have 1 + (24 * 4) =
97 total object-server processes running.
This gives complete I/O isolation, drastically
reducing the impact of slow disks on storage
node performance. The object-replicator and
object-reconstructor need to see this setting
too, so it must be in the [DEFAULT] section.
should be an integer, zero means no fork.
If unset, it will try to default to the
number of effective cpu cores and fallback
to one. Increasing the number of workers
helps slow filesystem operations in one
request from negatively impacting other
requests, but only the
:ref:`servers_per_port
<server-per-port-configuration>` option
provides complete I/O isolation with no
measurable overhead.
servers_per_port 0 If each disk in each storage policy ring
has unique port numbers for its "ip"
value, you can use this setting to have
each object-server worker only service
requests for the single disk matching the
port in the ring. The value of this
setting determines how many worker
processes run for each port (disk) in the
ring. If you have 24 disks per server, and
this setting is 4, then each storage node
will have 1 + (24 * 4) = 97 total
object-server processes running. This
gives complete I/O isolation, drastically
reducing the impact of slow disks on
storage node performance. The
object-replicator and object-reconstructor
need to see this setting too, so it must
be in the [DEFAULT] section.
See :ref:`server-per-port-configuration`.
max_clients 1024 Maximum number of clients one worker can
process simultaneously (it will actually
@ -451,30 +455,36 @@ max_clients 1024 Maximum number of clients one worker can
will only handle one request at a time,
without accepting another request
concurrently.
disable_fallocate false Disable "fast fail" fallocate checks if the
underlying filesystem does not support it.
disable_fallocate false Disable "fast fail" fallocate checks if
the underlying filesystem does not support
it.
log_max_line_length 0 Caps the length of log lines to the
value given; no limit if set to 0, the
default.
log_custom_handlers None Comma-separated list of functions to call
to setup custom log handlers.
eventlet_debug false If true, turn on debug logging for eventlet
fallocate_reserve 0 You can set fallocate_reserve to the number of
bytes you'd like fallocate to reserve, whether
there is space for the given file size or not.
This is useful for systems that behave badly
when they completely run out of space; you can
make the services pretend they're out of space
early.
conn_timeout 0.5 Time to wait while attempting to connect to
another backend node.
node_timeout 3 Time to wait while sending each chunk of data
eventlet_debug false If true, turn on debug logging for
eventlet
fallocate_reserve 0 You can set fallocate_reserve to the
number of bytes you'd like fallocate to
reserve, whether there is space for the
given file size or not. This is useful for
systems that behave badly when they
completely run out of space; you can
make the services pretend they're out of
space early.
conn_timeout 0.5 Time to wait while attempting to connect
to another backend node.
node_timeout 3 Time to wait while sending each chunk of
data to another backend node.
client_timeout 60 Time to wait while receiving each chunk of
data from a client or another backend node.
network_chunk_size 65536 Size of chunks to read/write over the network
data from a client or another backend node
network_chunk_size 65536 Size of chunks to read/write over the
network
disk_chunk_size 65536 Size of chunks to read/write to disk
=================== ========== =============================================
container_update_timeout 1 Time to wait while sending a container
update on object update.
======================== ========== ==========================================
.. _object-server-options:

View File

@ -60,6 +60,8 @@ bind_port = 6000
# conn_timeout = 0.5
# Time to wait while sending each chunk of data to another backend node.
# node_timeout = 3
# Time to wait while sending a container update on object update.
# container_update_timeout = 1.0
# Time to wait while receiving each chunk of data from a client or another
# backend node.
# client_timeout = 60

View File

@ -28,6 +28,7 @@ from swift import gettext_ as _
from hashlib import md5
from eventlet import sleep, wsgi, Timeout
from eventlet.greenthread import spawn
from swift.common.utils import public, get_logger, \
config_true_value, timing_stats, replication, \
@ -108,7 +109,9 @@ class ObjectController(BaseStorageServer):
"""
super(ObjectController, self).__init__(conf)
self.logger = logger or get_logger(conf, log_route='object-server')
self.node_timeout = int(conf.get('node_timeout', 3))
self.node_timeout = float(conf.get('node_timeout', 3))
self.container_update_timeout = float(
conf.get('container_update_timeout', 1))
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
self.client_timeout = int(conf.get('client_timeout', 60))
self.disk_chunk_size = int(conf.get('disk_chunk_size', 65536))
@ -198,7 +201,8 @@ class ObjectController(BaseStorageServer):
device, partition, account, container, obj, policy, **kwargs)
def async_update(self, op, account, container, obj, host, partition,
contdevice, headers_out, objdevice, policy):
contdevice, headers_out, objdevice, policy,
logger_thread_locals=None):
"""
Sends or saves an async update.
@ -213,7 +217,12 @@ class ObjectController(BaseStorageServer):
request
:param objdevice: device name that the object is in
:param policy: the associated BaseStoragePolicy instance
:param logger_thread_locals: The thread local values to be set on the
self.logger to retain transaction
logging information.
"""
if logger_thread_locals:
self.logger.thread_locals = logger_thread_locals
headers_out['user-agent'] = 'object-server %s' % os.getpid()
full_path = '/%s/%s/%s' % (account, container, obj)
if all([host, partition, contdevice]):
@ -285,10 +294,28 @@ class ObjectController(BaseStorageServer):
headers_out['x-trans-id'] = headers_in.get('x-trans-id', '-')
headers_out['referer'] = request.as_referer()
headers_out['X-Backend-Storage-Policy-Index'] = int(policy)
update_greenthreads = []
for conthost, contdevice in updates:
self.async_update(op, account, container, obj, conthost,
contpartition, contdevice, headers_out,
objdevice, policy)
gt = spawn(self.async_update, op, account, container, obj,
conthost, contpartition, contdevice, headers_out,
objdevice, policy,
logger_thread_locals=self.logger.thread_locals)
update_greenthreads.append(gt)
# Wait a little bit to see if the container updates are successful.
# If we immediately return after firing off the greenthread above, then
# we're more likely to confuse the end-user who does a listing right
# after getting a successful response to the object create. The
# `container_update_timeout` bounds the length of time we wait so that
# one slow container server doesn't make the entire request lag.
try:
with Timeout(self.container_update_timeout):
for gt in update_greenthreads:
gt.wait()
except Timeout:
# updates didn't go through, log it and return
self.logger.debug(
'Container update timeout (%.4fs) waiting for %s',
self.container_update_timeout, updates)
def delete_at_update(self, op, delete_at, account, container, obj,
request, objdevice, policy):

View File

@ -33,8 +33,9 @@ from tempfile import mkdtemp
from hashlib import md5
import itertools
import tempfile
from contextlib import contextmanager
from eventlet import sleep, spawn, wsgi, listen, Timeout, tpool
from eventlet import sleep, spawn, wsgi, listen, Timeout, tpool, greenthread
from eventlet.green import httplib
from nose import SkipTest
@ -67,6 +68,35 @@ test_policies = [
]
@contextmanager
def fake_spawn():
"""
Spawn and capture the result so we can later wait on it. This means we can
test code executing in a greenthread but still wait() on the result to
ensure that the method has completed.
"""
orig = object_server.spawn
greenlets = []
def _inner_fake_spawn(func, *a, **kw):
gt = greenthread.spawn(func, *a, **kw)
greenlets.append(gt)
return gt
object_server.spawn = _inner_fake_spawn
try:
yield
finally:
for gt in greenlets:
try:
gt.wait()
except: # noqa
pass # real spawn won't do anything but pollute logs
object_server.spawn = orig
@patch_policies(test_policies)
class TestObjectController(unittest.TestCase):
"""Test swift.obj.server.ObjectController"""
@ -371,8 +401,6 @@ class TestObjectController(unittest.TestCase):
return lambda *args, **kwargs: FakeConn(response, with_exc)
old_http_connect = object_server.http_connect
try:
ts = time()
timestamp = normalize_timestamp(ts)
req = Request.blank(
@ -391,7 +419,8 @@ class TestObjectController(unittest.TestCase):
'X-Container-Device': 'sda1',
'X-Container-Timestamp': '1',
'Content-Type': 'application/new1'})
object_server.http_connect = mock_http_connect(202)
with mock.patch.object(object_server, 'http_connect',
mock_http_connect(202)):
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 202)
req = Request.blank(
@ -403,7 +432,8 @@ class TestObjectController(unittest.TestCase):
'X-Container-Device': 'sda1',
'X-Container-Timestamp': '1',
'Content-Type': 'application/new1'})
object_server.http_connect = mock_http_connect(202, with_exc=True)
with mock.patch.object(object_server, 'http_connect',
mock_http_connect(202, with_exc=True)):
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 202)
req = Request.blank(
@ -415,11 +445,10 @@ class TestObjectController(unittest.TestCase):
'X-Container-Device': 'sda1',
'X-Container-Timestamp': '1',
'Content-Type': 'application/new2'})
object_server.http_connect = mock_http_connect(500)
with mock.patch.object(object_server, 'http_connect',
mock_http_connect(500)):
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 202)
finally:
object_server.http_connect = old_http_connect
def test_POST_quarantine_zbyte(self):
timestamp = normalize_timestamp(time())
@ -1219,8 +1248,6 @@ class TestObjectController(unittest.TestCase):
return lambda *args, **kwargs: FakeConn(response, with_exc)
old_http_connect = object_server.http_connect
try:
timestamp = normalize_timestamp(time())
req = Request.blank(
'/sda1/p/a/c/o',
@ -1232,7 +1259,9 @@ class TestObjectController(unittest.TestCase):
'X-Container-Timestamp': '1',
'Content-Type': 'application/new1',
'Content-Length': '0'})
object_server.http_connect = mock_http_connect(201)
with mock.patch.object(object_server, 'http_connect',
mock_http_connect(201)):
with fake_spawn():
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 201)
timestamp = normalize_timestamp(time())
@ -1246,7 +1275,9 @@ class TestObjectController(unittest.TestCase):
'X-Container-Timestamp': '1',
'Content-Type': 'application/new1',
'Content-Length': '0'})
object_server.http_connect = mock_http_connect(500)
with mock.patch.object(object_server, 'http_connect',
mock_http_connect(500)):
with fake_spawn():
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 201)
timestamp = normalize_timestamp(time())
@ -1260,11 +1291,11 @@ class TestObjectController(unittest.TestCase):
'X-Container-Timestamp': '1',
'Content-Type': 'application/new1',
'Content-Length': '0'})
object_server.http_connect = mock_http_connect(500, with_exc=True)
with mock.patch.object(object_server, 'http_connect',
mock_http_connect(500, with_exc=True)):
with fake_spawn():
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 201)
finally:
object_server.http_connect = old_http_connect
def test_PUT_ssync_multi_frag(self):
timestamp = utils.Timestamp(time()).internal
@ -2407,6 +2438,7 @@ class TestObjectController(unittest.TestCase):
'Content-Type': 'text/plain'})
with mocked_http_conn(
200, give_connect=capture_updates) as fake_conn:
with fake_spawn():
resp = req.get_response(self.object_controller)
self.assertRaises(StopIteration, fake_conn.code_iter.next)
self.assertEqual(resp.status_int, 201)
@ -2446,6 +2478,7 @@ class TestObjectController(unittest.TestCase):
'Content-Type': 'text/html'})
with mocked_http_conn(
200, give_connect=capture_updates) as fake_conn:
with fake_spawn():
resp = req.get_response(self.object_controller)
self.assertRaises(StopIteration, fake_conn.code_iter.next)
self.assertEqual(resp.status_int, 201)
@ -2484,6 +2517,7 @@ class TestObjectController(unittest.TestCase):
'Content-Type': 'text/enriched'})
with mocked_http_conn(
200, give_connect=capture_updates) as fake_conn:
with fake_spawn():
resp = req.get_response(self.object_controller)
self.assertRaises(StopIteration, fake_conn.code_iter.next)
self.assertEqual(resp.status_int, 201)
@ -2522,6 +2556,7 @@ class TestObjectController(unittest.TestCase):
'X-Container-Partition': 'p'})
with mocked_http_conn(
200, give_connect=capture_updates) as fake_conn:
with fake_spawn():
resp = req.get_response(self.object_controller)
self.assertRaises(StopIteration, fake_conn.code_iter.next)
self.assertEqual(resp.status_int, 204)
@ -2553,6 +2588,7 @@ class TestObjectController(unittest.TestCase):
'X-Container-Partition': 'p'})
with mocked_http_conn(
200, give_connect=capture_updates) as fake_conn:
with fake_spawn():
resp = req.get_response(self.object_controller)
self.assertRaises(StopIteration, fake_conn.code_iter.next)
self.assertEqual(resp.status_int, 404)
@ -3022,6 +3058,7 @@ class TestObjectController(unittest.TestCase):
with mock.patch.object(object_server, 'http_connect',
fake_http_connect):
with fake_spawn():
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 201)
@ -3135,6 +3172,7 @@ class TestObjectController(unittest.TestCase):
with mock.patch.object(object_server, 'http_connect',
fake_http_connect):
with fake_spawn():
req.get_response(self.object_controller)
http_connect_args.sort(key=operator.itemgetter('ipaddr'))
@ -3212,6 +3250,7 @@ class TestObjectController(unittest.TestCase):
'/sda1/p/a/c/o', method='PUT', body='', headers=headers)
with mocked_http_conn(
500, 500, give_connect=capture_updates) as fake_conn:
with fake_spawn():
resp = req.get_response(self.object_controller)
self.assertRaises(StopIteration, fake_conn.code_iter.next)
self.assertEqual(resp.status_int, 201)
@ -3448,6 +3487,7 @@ class TestObjectController(unittest.TestCase):
'Content-Type': 'text/plain'}, body='')
with mocked_http_conn(
200, give_connect=capture_updates) as fake_conn:
with fake_spawn():
resp = req.get_response(self.object_controller)
self.assertRaises(StopIteration, fake_conn.code_iter.next)
self.assertEqual(resp.status_int, 201)
@ -3489,6 +3529,7 @@ class TestObjectController(unittest.TestCase):
headers=headers, body='')
with mocked_http_conn(
200, give_connect=capture_updates) as fake_conn:
with fake_spawn():
resp = req.get_response(self.object_controller)
self.assertRaises(StopIteration, fake_conn.code_iter.next)
self.assertEqual(resp.status_int, 201)
@ -3529,6 +3570,7 @@ class TestObjectController(unittest.TestCase):
diskfile_mgr = self.object_controller._diskfile_router[policy]
diskfile_mgr.pickle_async_update = fake_pickle_async_update
with mocked_http_conn(500) as fake_conn:
with fake_spawn():
resp = req.get_response(self.object_controller)
self.assertRaises(StopIteration, fake_conn.code_iter.next)
self.assertEqual(resp.status_int, 201)
@ -3556,6 +3598,104 @@ class TestObjectController(unittest.TestCase):
'container': 'c',
'op': 'PUT'})
def test_container_update_as_greenthread(self):
greenthreads = []
saved_spawn_calls = []
called_async_update_args = []
def local_fake_spawn(func, *a, **kw):
saved_spawn_calls.append((func, a, kw))
return mock.MagicMock()
def local_fake_async_update(*a, **kw):
# just capture the args to see that we would have called
called_async_update_args.append([a, kw])
req = Request.blank(
'/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': '12345',
'Content-Type': 'application/burrito',
'Content-Length': '0',
'X-Backend-Storage-Policy-Index': 0,
'X-Container-Partition': '20',
'X-Container-Host': '1.2.3.4:5',
'X-Container-Device': 'sdb1'})
with mock.patch.object(object_server, 'spawn',
local_fake_spawn):
with mock.patch.object(self.object_controller,
'async_update',
local_fake_async_update):
resp = req.get_response(self.object_controller)
# check the response is completed and successful
self.assertEqual(resp.status_int, 201)
# check that async_update hasn't been called
self.assertFalse(len(called_async_update_args))
# now do the work in greenthreads
for func, a, kw in saved_spawn_calls:
gt = spawn(func, *a, **kw)
greenthreads.append(gt)
# wait for the greenthreads to finish
for gt in greenthreads:
gt.wait()
# check that the calls to async_update have happened
headers_out = {'X-Size': '0',
'X-Content-Type': 'application/burrito',
'X-Timestamp': '0000012345.00000',
'X-Trans-Id': '-',
'Referer': 'PUT http://localhost/sda1/p/a/c/o',
'X-Backend-Storage-Policy-Index': '0',
'X-Etag': 'd41d8cd98f00b204e9800998ecf8427e'}
expected = [('PUT', 'a', 'c', 'o', '1.2.3.4:5', '20', 'sdb1',
headers_out, 'sda1', POLICIES[0]),
{'logger_thread_locals': (None, None)}]
self.assertEqual(called_async_update_args, [expected])
def test_container_update_as_greenthread_with_timeout(self):
'''
give it one container to update (for only one greenthred)
fake the greenthred so it will raise a timeout
test that the right message is logged and the method returns None
'''
called_async_update_args = []
def local_fake_spawn(func, *a, **kw):
m = mock.MagicMock()
def wait_with_error():
raise Timeout()
m.wait = wait_with_error # because raise can't be in a lambda
return m
def local_fake_async_update(*a, **kw):
# just capture the args to see that we would have called
called_async_update_args.append([a, kw])
req = Request.blank(
'/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': '12345',
'Content-Type': 'application/burrito',
'Content-Length': '0',
'X-Backend-Storage-Policy-Index': 0,
'X-Container-Partition': '20',
'X-Container-Host': '1.2.3.4:5',
'X-Container-Device': 'sdb1'})
with mock.patch.object(object_server, 'spawn',
local_fake_spawn):
with mock.patch.object(self.object_controller,
'container_update_timeout',
1.414213562):
resp = req.get_response(self.object_controller)
# check the response is completed and successful
self.assertEqual(resp.status_int, 201)
# check that the timeout was logged
expected_logged_error = "Container update timeout (1.4142s) " \
"waiting for [('1.2.3.4:5', 'sdb1')]"
self.assertTrue(
expected_logged_error in
self.object_controller.logger.get_lines_for_level('debug'))
def test_container_update_bad_args(self):
policy = random.choice(list(POLICIES))
given_args = []