Merge "New proxy logging field for wire status"
This commit is contained in:
commit
127bf9707c
@ -93,6 +93,9 @@ container The container part extracted from the path of the request.
|
||||
object The object part extracted from the path of the request.
|
||||
(anonymizable)
|
||||
pid PID of the process emitting the log line.
|
||||
wire_status_int The status sent to the client, which may be different than
|
||||
the logged response code if there was an error during the
|
||||
body of the request or a disconnect.
|
||||
=================== ==========================================================
|
||||
|
||||
In one log line, all of the above fields are space-separated and url-encoded.
|
||||
|
@ -74,9 +74,10 @@ bandwidth usage will want to only sum up logs with no swift.source.
|
||||
import os
|
||||
import time
|
||||
|
||||
from swift.common.middleware.catch_errors import enforce_byte_count
|
||||
from swift.common.swob import Request
|
||||
from swift.common.utils import (get_logger, get_remote_client,
|
||||
config_true_value,
|
||||
config_true_value, reiterate,
|
||||
InputProxy, list_from_csv, get_policy_index,
|
||||
split_path, StrAnonymizer, StrFormatTime,
|
||||
LogStringFormatter)
|
||||
@ -176,7 +177,8 @@ class ProxyLoggingMiddleware(object):
|
||||
'log_info': '',
|
||||
'policy_index': '',
|
||||
'ttfb': '0.05',
|
||||
'pid': '42'
|
||||
'pid': '42',
|
||||
'wire_status_int': '200',
|
||||
}
|
||||
try:
|
||||
self.log_formatter.format(self.log_msg_template, **replacements)
|
||||
@ -198,7 +200,8 @@ class ProxyLoggingMiddleware(object):
|
||||
return value
|
||||
|
||||
def log_request(self, req, status_int, bytes_received, bytes_sent,
|
||||
start_time, end_time, resp_headers=None, ttfb=0):
|
||||
start_time, end_time, resp_headers=None, ttfb=0,
|
||||
wire_status_int=None):
|
||||
"""
|
||||
Log a request.
|
||||
|
||||
@ -209,6 +212,7 @@ class ProxyLoggingMiddleware(object):
|
||||
:param start_time: timestamp request started
|
||||
:param end_time: timestamp request completed
|
||||
:param resp_headers: dict of the response headers
|
||||
:param wire_status_int: the on the wire status int
|
||||
"""
|
||||
resp_headers = resp_headers or {}
|
||||
logged_headers = None
|
||||
@ -277,6 +281,7 @@ class ProxyLoggingMiddleware(object):
|
||||
'policy_index': policy_index,
|
||||
'ttfb': ttfb,
|
||||
'pid': self.pid,
|
||||
'wire_status_int': wire_status_int or status_int,
|
||||
}
|
||||
self.access_logger.info(
|
||||
self.log_formatter.format(self.log_msg_template,
|
||||
@ -352,47 +357,46 @@ class ProxyLoggingMiddleware(object):
|
||||
def my_start_response(status, headers, exc_info=None):
|
||||
start_response_args[0] = (status, list(headers), exc_info)
|
||||
|
||||
def status_int_for_logging(client_disconnect=False, start_status=None):
|
||||
def status_int_for_logging(start_status, client_disconnect=False):
|
||||
# log disconnected clients as '499' status code
|
||||
if client_disconnect or input_proxy.client_disconnect:
|
||||
ret_status_int = 499
|
||||
elif start_status is None:
|
||||
ret_status_int = int(
|
||||
start_response_args[0][0].split(' ', 1)[0])
|
||||
else:
|
||||
ret_status_int = start_status
|
||||
return ret_status_int
|
||||
return 499
|
||||
return start_status
|
||||
|
||||
def iter_response(iterable):
|
||||
iterator = iter(iterable)
|
||||
try:
|
||||
chunk = next(iterator)
|
||||
while not chunk:
|
||||
chunk = next(iterator)
|
||||
except StopIteration:
|
||||
chunk = b''
|
||||
iterator = reiterate(iterable)
|
||||
content_length = None
|
||||
for h, v in start_response_args[0][1]:
|
||||
if h.lower() in ('content-length', 'transfer-encoding'):
|
||||
if h.lower() == 'content-length':
|
||||
content_length = int(v)
|
||||
break
|
||||
elif h.lower() == 'transfer-encoding':
|
||||
break
|
||||
else:
|
||||
if not chunk:
|
||||
start_response_args[0][1].append(('Content-Length', '0'))
|
||||
elif isinstance(iterable, list):
|
||||
if isinstance(iterator, list):
|
||||
content_length = sum(len(i) for i in iterator)
|
||||
start_response_args[0][1].append(
|
||||
('Content-Length', str(sum(len(i) for i in iterable))))
|
||||
('Content-Length', str(content_length)))
|
||||
|
||||
req = Request(env)
|
||||
method = self.method_from_req(req)
|
||||
if method == 'HEAD':
|
||||
content_length = 0
|
||||
if content_length is not None:
|
||||
iterator = enforce_byte_count(iterator, content_length)
|
||||
|
||||
wire_status_int = int(start_response_args[0][0].split(' ', 1)[0])
|
||||
resp_headers = dict(start_response_args[0][1])
|
||||
start_response(*start_response_args[0])
|
||||
req = Request(env)
|
||||
|
||||
# Log timing information for time-to-first-byte (GET requests only)
|
||||
method = self.method_from_req(req)
|
||||
ttfb = 0.0
|
||||
if method == 'GET':
|
||||
status_int = status_int_for_logging()
|
||||
policy_index = get_policy_index(req.headers, resp_headers)
|
||||
metric_name = self.statsd_metric_name(req, status_int, method)
|
||||
metric_name = self.statsd_metric_name(
|
||||
req, wire_status_int, method)
|
||||
metric_name_policy = self.statsd_metric_name_policy(
|
||||
req, status_int, method, policy_index)
|
||||
req, wire_status_int, method, policy_index)
|
||||
ttfb = time.time() - start_time
|
||||
if metric_name:
|
||||
self.access_logger.timing(
|
||||
@ -403,31 +407,33 @@ class ProxyLoggingMiddleware(object):
|
||||
|
||||
bytes_sent = 0
|
||||
client_disconnect = False
|
||||
start_status = wire_status_int
|
||||
try:
|
||||
while chunk:
|
||||
for chunk in iterator:
|
||||
bytes_sent += len(chunk)
|
||||
yield chunk
|
||||
chunk = next(iterator)
|
||||
except StopIteration: # iterator was depleted
|
||||
return
|
||||
except GeneratorExit: # generator was closed before we finished
|
||||
client_disconnect = True
|
||||
raise
|
||||
except Exception:
|
||||
start_status = 500
|
||||
raise
|
||||
finally:
|
||||
status_int = status_int_for_logging(client_disconnect)
|
||||
status_int = status_int_for_logging(
|
||||
start_status, client_disconnect)
|
||||
self.log_request(
|
||||
req, status_int, input_proxy.bytes_received, bytes_sent,
|
||||
start_time, time.time(), resp_headers=resp_headers,
|
||||
ttfb=ttfb)
|
||||
close_method = getattr(iterable, 'close', None)
|
||||
if callable(close_method):
|
||||
close_method()
|
||||
ttfb=ttfb, wire_status_int=wire_status_int)
|
||||
iterator.close()
|
||||
|
||||
try:
|
||||
iterable = self.app(env, my_start_response)
|
||||
except Exception:
|
||||
req = Request(env)
|
||||
status_int = status_int_for_logging(start_status=500)
|
||||
status_int = status_int_for_logging(500)
|
||||
self.log_request(
|
||||
req, status_int, input_proxy.bytes_received, 0, start_time,
|
||||
time.time())
|
||||
|
@ -329,16 +329,17 @@ class ObjectContext(ObjectVersioningContext):
|
||||
|
||||
# do the write
|
||||
put_resp = put_req.get_response(self.app)
|
||||
drain_and_close(put_resp)
|
||||
close_if_possible(put_req.environ['wsgi.input'])
|
||||
|
||||
if put_resp.status_int == HTTP_NOT_FOUND:
|
||||
drain_and_close(put_resp)
|
||||
raise HTTPInternalServerError(
|
||||
request=req, content_type='text/plain',
|
||||
body=b'The versions container does not exist. You may '
|
||||
b'want to re-enable object versioning.')
|
||||
|
||||
self._check_response_error(req, put_resp)
|
||||
drain_and_close(put_resp)
|
||||
put_bytes = byte_counter.bytes_read
|
||||
# N.B. this is essentially the same hack that symlink does in
|
||||
# _validate_etag_and_update_sysmeta to deal with SLO
|
||||
@ -392,12 +393,13 @@ class ObjectContext(ObjectVersioningContext):
|
||||
"""
|
||||
if is_success(resp.status_int):
|
||||
return
|
||||
body = resp.body
|
||||
drain_and_close(resp)
|
||||
if is_client_error(resp.status_int):
|
||||
# missing container or bad permissions
|
||||
if resp.status_int == 404:
|
||||
raise HTTPPreconditionFailed(request=req)
|
||||
raise HTTPException(body=resp.body, status=resp.status,
|
||||
raise HTTPException(body=body, status=resp.status,
|
||||
headers=resp.headers)
|
||||
# could not version the data, bail
|
||||
raise HTTPServiceUnavailable(request=req)
|
||||
|
@ -3981,23 +3981,26 @@ class CloseableChain(object):
|
||||
"""
|
||||
def __init__(self, *iterables):
|
||||
self.iterables = iterables
|
||||
self.chained_iter = itertools.chain(*self.iterables)
|
||||
|
||||
def __iter__(self):
|
||||
return iter(itertools.chain(*(self.iterables)))
|
||||
return self
|
||||
|
||||
def __next__(self):
|
||||
return next(self.chained_iter)
|
||||
|
||||
next = __next__ # py2
|
||||
|
||||
def close(self):
|
||||
for it in self.iterables:
|
||||
close_method = getattr(it, 'close', None)
|
||||
if close_method:
|
||||
close_method()
|
||||
close_if_possible(it)
|
||||
|
||||
|
||||
def reiterate(iterable):
|
||||
"""
|
||||
Consume the first item from an iterator, then re-chain it to the rest of
|
||||
the iterator. This is useful when you want to make sure the prologue to
|
||||
downstream generators have been executed before continuing.
|
||||
|
||||
Consume the first truthy item from an iterator, then re-chain it to the
|
||||
rest of the iterator. This is useful when you want to make sure the
|
||||
prologue to downstream generators have been executed before continuing.
|
||||
:param iterable: an iterable object
|
||||
"""
|
||||
if isinstance(iterable, (list, tuple)):
|
||||
@ -4005,12 +4008,13 @@ def reiterate(iterable):
|
||||
else:
|
||||
iterator = iter(iterable)
|
||||
try:
|
||||
chunk = ''
|
||||
chunk = next(iterator)
|
||||
while not chunk:
|
||||
chunk = next(iterator)
|
||||
return CloseableChain([chunk], iterator)
|
||||
except StopIteration:
|
||||
return []
|
||||
close_if_possible(iterable)
|
||||
return iter([])
|
||||
|
||||
|
||||
class InputProxy(object):
|
||||
@ -4311,6 +4315,8 @@ def drain_and_close(response_or_app_iter):
|
||||
don't care about the body of an error.
|
||||
"""
|
||||
app_iter = getattr(response_or_app_iter, 'app_iter', response_or_app_iter)
|
||||
if app_iter is None: # for example, if we used the Response.body property
|
||||
return
|
||||
for _chunk in app_iter:
|
||||
pass
|
||||
close_if_possible(app_iter)
|
||||
|
@ -1063,7 +1063,7 @@ class ECAppIter(object):
|
||||
# cleanup the frag queue feeding coros that may be currently
|
||||
# executing the internal_parts_iters.
|
||||
if self.stashed_iter:
|
||||
self.stashed_iter.close()
|
||||
close_if_possible(self.stashed_iter)
|
||||
sleep() # Give the per-frag threads a chance to clean up
|
||||
for it in self.internal_parts_iters:
|
||||
close_if_possible(it)
|
||||
@ -1200,10 +1200,15 @@ class ECAppIter(object):
|
||||
|
||||
def __iter__(self):
|
||||
if self.stashed_iter is not None:
|
||||
return iter(self.stashed_iter)
|
||||
return self
|
||||
else:
|
||||
raise ValueError("Failed to call kickoff() before __iter__()")
|
||||
|
||||
def __next__(self):
|
||||
return next(self.stashed_iter)
|
||||
|
||||
next = __next__ # py2
|
||||
|
||||
def _real_iter(self, req, resp_headers):
|
||||
if not self.range_specs:
|
||||
client_asked_for_range = False
|
||||
|
@ -32,15 +32,22 @@ class LeakTrackingIter(object):
|
||||
def __init__(self, inner_iter, mark_closed, mark_read, key):
|
||||
if isinstance(inner_iter, bytes):
|
||||
inner_iter = (inner_iter, )
|
||||
self.inner_iter = inner_iter
|
||||
self.inner_iter = iter(inner_iter)
|
||||
self.mark_closed = mark_closed
|
||||
self.mark_read = mark_read
|
||||
self.key = key
|
||||
|
||||
def __iter__(self):
|
||||
for x in self.inner_iter:
|
||||
yield x
|
||||
self.mark_read(self.key)
|
||||
return self
|
||||
|
||||
def __next__(self):
|
||||
try:
|
||||
return next(self.inner_iter)
|
||||
except StopIteration:
|
||||
self.mark_read(self.key)
|
||||
raise
|
||||
|
||||
next = __next__ # for py2
|
||||
|
||||
def close(self):
|
||||
self.mark_closed(self.key)
|
||||
|
@ -51,8 +51,10 @@ class FakeApp(object):
|
||||
except ValueError:
|
||||
is_container_or_object_req = False
|
||||
|
||||
headers = [('Content-Type', 'text/plain'),
|
||||
('Content-Length', str(sum(map(len, self.body))))]
|
||||
headers = [('Content-Type', 'text/plain')]
|
||||
if not hasattr(self.body, 'close'):
|
||||
content_length = sum(map(len, self.body))
|
||||
headers.append(('Content-Length', str(content_length)))
|
||||
if is_container_or_object_req and self.policy_idx is not None:
|
||||
headers.append(('X-Backend-Storage-Policy-Index',
|
||||
str(self.policy_idx)))
|
||||
@ -612,13 +614,22 @@ class TestProxyLogging(unittest.TestCase):
|
||||
|
||||
class CloseableBody(object):
|
||||
def __init__(self):
|
||||
self.msg = b"CloseableBody"
|
||||
self.closed = False
|
||||
|
||||
def close(self):
|
||||
self.closed = True
|
||||
|
||||
def __iter__(self):
|
||||
return iter(["CloseableBody"])
|
||||
return self
|
||||
|
||||
def __next__(self):
|
||||
if not self.msg:
|
||||
raise StopIteration
|
||||
result, self.msg = self.msg, b''
|
||||
return result
|
||||
|
||||
next = __next__ # py2
|
||||
|
||||
body = CloseableBody()
|
||||
app = proxy_logging.ProxyLoggingMiddleware(FakeApp(body), {})
|
||||
@ -682,6 +693,27 @@ class TestProxyLogging(unittest.TestCase):
|
||||
self.assertEqual(log_parts[6], '499')
|
||||
self.assertEqual(log_parts[11], '4') # write length
|
||||
|
||||
def test_exploding_body(self):
|
||||
|
||||
def exploding_body():
|
||||
yield 'some'
|
||||
yield 'stuff'
|
||||
raise Exception('kaboom!')
|
||||
|
||||
app = proxy_logging.ProxyLoggingMiddleware(
|
||||
FakeApp(exploding_body()), {
|
||||
'log_msg_template': '{method} {path} '
|
||||
'{status_int} {wire_status_int}',
|
||||
})
|
||||
app.access_logger = FakeLogger()
|
||||
req = Request.blank('/', environ={'REQUEST_METHOD': 'GET'})
|
||||
resp = req.get_response(app)
|
||||
with self.assertRaises(Exception) as ctx:
|
||||
resp.body
|
||||
self.assertEqual('kaboom!', str(ctx.exception))
|
||||
log_parts = self._log_parts(app)
|
||||
self.assertEqual(log_parts, ['GET', '/', '500', '200'])
|
||||
|
||||
def test_disconnect_on_readline(self):
|
||||
app = proxy_logging.ProxyLoggingMiddleware(FakeAppReadline(), {})
|
||||
app.access_logger = FakeLogger()
|
||||
@ -748,7 +780,7 @@ class TestProxyLogging(unittest.TestCase):
|
||||
app = proxy_logging.ProxyLoggingMiddleware(
|
||||
FakeAppNoContentLengthNoTransferEncoding(
|
||||
# test the "while not chunk: chunk = next(iterator)"
|
||||
body=['', '', ''],
|
||||
body=[b'', b'', b''],
|
||||
), {})
|
||||
app.access_logger = FakeLogger()
|
||||
req = Request.blank('/', environ={'REQUEST_METHOD': 'GET'})
|
||||
|
@ -1247,6 +1247,23 @@ class TestUtils(unittest.TestCase):
|
||||
else:
|
||||
os.environ.pop('TZ')
|
||||
|
||||
def test_drain_and_close(self):
|
||||
utils.drain_and_close([])
|
||||
utils.drain_and_close(iter([]))
|
||||
drained = [False]
|
||||
|
||||
def gen():
|
||||
yield 'x'
|
||||
yield 'y'
|
||||
drained[0] = True
|
||||
|
||||
utils.drain_and_close(gen())
|
||||
self.assertTrue(drained[0])
|
||||
utils.drain_and_close(Response(status=200, body=b'Some body'))
|
||||
drained = [False]
|
||||
utils.drain_and_close(Response(status=200, app_iter=gen()))
|
||||
self.assertTrue(drained[0])
|
||||
|
||||
def test_backwards(self):
|
||||
# Test swift.common.utils.backward
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user