New proxy logging field for wire status

Capture the on the wire status code for logging because we change the
logged status code sometimes.

Closes-Bug: #1896518
Change-Id: I27feabe923a6520e983637a9c68a19ec7174a0df
This commit is contained in:
Clay Gerrard 2020-10-01 14:28:04 -05:00 committed by Tim Burke
parent 4236a6efa3
commit fb08d477eb
8 changed files with 136 additions and 58 deletions

View File

@ -93,6 +93,9 @@ container The container part extracted from the path of the request.
object The object part extracted from the path of the request. object The object part extracted from the path of the request.
(anonymizable) (anonymizable)
pid PID of the process emitting the log line. pid PID of the process emitting the log line.
wire_status_int The status sent to the client, which may be different than
the logged response code if there was an error during the
body of the request or a disconnect.
=================== ========================================================== =================== ==========================================================
In one log line, all of the above fields are space-separated and url-encoded. In one log line, all of the above fields are space-separated and url-encoded.

View File

@ -74,9 +74,10 @@ bandwidth usage will want to only sum up logs with no swift.source.
import os import os
import time import time
from swift.common.middleware.catch_errors import enforce_byte_count
from swift.common.swob import Request from swift.common.swob import Request
from swift.common.utils import (get_logger, get_remote_client, from swift.common.utils import (get_logger, get_remote_client,
config_true_value, config_true_value, reiterate,
InputProxy, list_from_csv, get_policy_index, InputProxy, list_from_csv, get_policy_index,
split_path, StrAnonymizer, StrFormatTime, split_path, StrAnonymizer, StrFormatTime,
LogStringFormatter) LogStringFormatter)
@ -176,7 +177,8 @@ class ProxyLoggingMiddleware(object):
'log_info': '', 'log_info': '',
'policy_index': '', 'policy_index': '',
'ttfb': '0.05', 'ttfb': '0.05',
'pid': '42' 'pid': '42',
'wire_status_int': '200',
} }
try: try:
self.log_formatter.format(self.log_msg_template, **replacements) self.log_formatter.format(self.log_msg_template, **replacements)
@ -198,7 +200,8 @@ class ProxyLoggingMiddleware(object):
return value return value
def log_request(self, req, status_int, bytes_received, bytes_sent, def log_request(self, req, status_int, bytes_received, bytes_sent,
start_time, end_time, resp_headers=None, ttfb=0): start_time, end_time, resp_headers=None, ttfb=0,
wire_status_int=None):
""" """
Log a request. Log a request.
@ -209,6 +212,7 @@ class ProxyLoggingMiddleware(object):
:param start_time: timestamp request started :param start_time: timestamp request started
:param end_time: timestamp request completed :param end_time: timestamp request completed
:param resp_headers: dict of the response headers :param resp_headers: dict of the response headers
:param wire_status_int: the on the wire status int
""" """
resp_headers = resp_headers or {} resp_headers = resp_headers or {}
logged_headers = None logged_headers = None
@ -277,6 +281,7 @@ class ProxyLoggingMiddleware(object):
'policy_index': policy_index, 'policy_index': policy_index,
'ttfb': ttfb, 'ttfb': ttfb,
'pid': self.pid, 'pid': self.pid,
'wire_status_int': wire_status_int or status_int,
} }
self.access_logger.info( self.access_logger.info(
self.log_formatter.format(self.log_msg_template, self.log_formatter.format(self.log_msg_template,
@ -352,47 +357,46 @@ class ProxyLoggingMiddleware(object):
def my_start_response(status, headers, exc_info=None): def my_start_response(status, headers, exc_info=None):
start_response_args[0] = (status, list(headers), exc_info) start_response_args[0] = (status, list(headers), exc_info)
def status_int_for_logging(client_disconnect=False, start_status=None): def status_int_for_logging(start_status, client_disconnect=False):
# log disconnected clients as '499' status code # log disconnected clients as '499' status code
if client_disconnect or input_proxy.client_disconnect: if client_disconnect or input_proxy.client_disconnect:
ret_status_int = 499 return 499
elif start_status is None: return start_status
ret_status_int = int(
start_response_args[0][0].split(' ', 1)[0])
else:
ret_status_int = start_status
return ret_status_int
def iter_response(iterable): def iter_response(iterable):
iterator = iter(iterable) iterator = reiterate(iterable)
try: content_length = None
chunk = next(iterator)
while not chunk:
chunk = next(iterator)
except StopIteration:
chunk = b''
for h, v in start_response_args[0][1]: for h, v in start_response_args[0][1]:
if h.lower() in ('content-length', 'transfer-encoding'): if h.lower() == 'content-length':
content_length = int(v)
break
elif h.lower() == 'transfer-encoding':
break break
else: else:
if not chunk: if isinstance(iterator, list):
start_response_args[0][1].append(('Content-Length', '0')) content_length = sum(len(i) for i in iterator)
elif isinstance(iterable, list):
start_response_args[0][1].append( start_response_args[0][1].append(
('Content-Length', str(sum(len(i) for i in iterable)))) ('Content-Length', str(content_length)))
req = Request(env)
method = self.method_from_req(req)
if method == 'HEAD':
content_length = 0
if content_length is not None:
iterator = enforce_byte_count(iterator, content_length)
wire_status_int = int(start_response_args[0][0].split(' ', 1)[0])
resp_headers = dict(start_response_args[0][1]) resp_headers = dict(start_response_args[0][1])
start_response(*start_response_args[0]) start_response(*start_response_args[0])
req = Request(env)
# Log timing information for time-to-first-byte (GET requests only) # Log timing information for time-to-first-byte (GET requests only)
method = self.method_from_req(req)
ttfb = 0.0 ttfb = 0.0
if method == 'GET': if method == 'GET':
status_int = status_int_for_logging()
policy_index = get_policy_index(req.headers, resp_headers) policy_index = get_policy_index(req.headers, resp_headers)
metric_name = self.statsd_metric_name(req, status_int, method) metric_name = self.statsd_metric_name(
req, wire_status_int, method)
metric_name_policy = self.statsd_metric_name_policy( metric_name_policy = self.statsd_metric_name_policy(
req, status_int, method, policy_index) req, wire_status_int, method, policy_index)
ttfb = time.time() - start_time ttfb = time.time() - start_time
if metric_name: if metric_name:
self.access_logger.timing( self.access_logger.timing(
@ -403,31 +407,33 @@ class ProxyLoggingMiddleware(object):
bytes_sent = 0 bytes_sent = 0
client_disconnect = False client_disconnect = False
start_status = wire_status_int
try: try:
while chunk: for chunk in iterator:
bytes_sent += len(chunk) bytes_sent += len(chunk)
yield chunk yield chunk
chunk = next(iterator)
except StopIteration: # iterator was depleted except StopIteration: # iterator was depleted
return return
except GeneratorExit: # generator was closed before we finished except GeneratorExit: # generator was closed before we finished
client_disconnect = True client_disconnect = True
raise raise
except Exception:
start_status = 500
raise
finally: finally:
status_int = status_int_for_logging(client_disconnect) status_int = status_int_for_logging(
start_status, client_disconnect)
self.log_request( self.log_request(
req, status_int, input_proxy.bytes_received, bytes_sent, req, status_int, input_proxy.bytes_received, bytes_sent,
start_time, time.time(), resp_headers=resp_headers, start_time, time.time(), resp_headers=resp_headers,
ttfb=ttfb) ttfb=ttfb, wire_status_int=wire_status_int)
close_method = getattr(iterable, 'close', None) iterator.close()
if callable(close_method):
close_method()
try: try:
iterable = self.app(env, my_start_response) iterable = self.app(env, my_start_response)
except Exception: except Exception:
req = Request(env) req = Request(env)
status_int = status_int_for_logging(start_status=500) status_int = status_int_for_logging(500)
self.log_request( self.log_request(
req, status_int, input_proxy.bytes_received, 0, start_time, req, status_int, input_proxy.bytes_received, 0, start_time,
time.time()) time.time())

View File

@ -329,16 +329,17 @@ class ObjectContext(ObjectVersioningContext):
# do the write # do the write
put_resp = put_req.get_response(self.app) put_resp = put_req.get_response(self.app)
drain_and_close(put_resp)
close_if_possible(put_req.environ['wsgi.input']) close_if_possible(put_req.environ['wsgi.input'])
if put_resp.status_int == HTTP_NOT_FOUND: if put_resp.status_int == HTTP_NOT_FOUND:
drain_and_close(put_resp)
raise HTTPInternalServerError( raise HTTPInternalServerError(
request=req, content_type='text/plain', request=req, content_type='text/plain',
body=b'The versions container does not exist. You may ' body=b'The versions container does not exist. You may '
b'want to re-enable object versioning.') b'want to re-enable object versioning.')
self._check_response_error(req, put_resp) self._check_response_error(req, put_resp)
drain_and_close(put_resp)
put_bytes = byte_counter.bytes_read put_bytes = byte_counter.bytes_read
# N.B. this is essentially the same hack that symlink does in # N.B. this is essentially the same hack that symlink does in
# _validate_etag_and_update_sysmeta to deal with SLO # _validate_etag_and_update_sysmeta to deal with SLO
@ -392,12 +393,13 @@ class ObjectContext(ObjectVersioningContext):
""" """
if is_success(resp.status_int): if is_success(resp.status_int):
return return
body = resp.body
drain_and_close(resp) drain_and_close(resp)
if is_client_error(resp.status_int): if is_client_error(resp.status_int):
# missing container or bad permissions # missing container or bad permissions
if resp.status_int == 404: if resp.status_int == 404:
raise HTTPPreconditionFailed(request=req) raise HTTPPreconditionFailed(request=req)
raise HTTPException(body=resp.body, status=resp.status, raise HTTPException(body=body, status=resp.status,
headers=resp.headers) headers=resp.headers)
# could not version the data, bail # could not version the data, bail
raise HTTPServiceUnavailable(request=req) raise HTTPServiceUnavailable(request=req)

View File

@ -3981,23 +3981,26 @@ class CloseableChain(object):
""" """
def __init__(self, *iterables): def __init__(self, *iterables):
self.iterables = iterables self.iterables = iterables
self.chained_iter = itertools.chain(*self.iterables)
def __iter__(self): def __iter__(self):
return iter(itertools.chain(*(self.iterables))) return self
def __next__(self):
return next(self.chained_iter)
next = __next__ # py2
def close(self): def close(self):
for it in self.iterables: for it in self.iterables:
close_method = getattr(it, 'close', None) close_if_possible(it)
if close_method:
close_method()
def reiterate(iterable): def reiterate(iterable):
""" """
Consume the first item from an iterator, then re-chain it to the rest of Consume the first truthy item from an iterator, then re-chain it to the
the iterator. This is useful when you want to make sure the prologue to rest of the iterator. This is useful when you want to make sure the
downstream generators have been executed before continuing. prologue to downstream generators have been executed before continuing.
:param iterable: an iterable object :param iterable: an iterable object
""" """
if isinstance(iterable, (list, tuple)): if isinstance(iterable, (list, tuple)):
@ -4005,12 +4008,13 @@ def reiterate(iterable):
else: else:
iterator = iter(iterable) iterator = iter(iterable)
try: try:
chunk = '' chunk = next(iterator)
while not chunk: while not chunk:
chunk = next(iterator) chunk = next(iterator)
return CloseableChain([chunk], iterator) return CloseableChain([chunk], iterator)
except StopIteration: except StopIteration:
return [] close_if_possible(iterable)
return iter([])
class InputProxy(object): class InputProxy(object):
@ -4311,6 +4315,8 @@ def drain_and_close(response_or_app_iter):
don't care about the body of an error. don't care about the body of an error.
""" """
app_iter = getattr(response_or_app_iter, 'app_iter', response_or_app_iter) app_iter = getattr(response_or_app_iter, 'app_iter', response_or_app_iter)
if app_iter is None: # for example, if we used the Response.body property
return
for _chunk in app_iter: for _chunk in app_iter:
pass pass
close_if_possible(app_iter) close_if_possible(app_iter)

View File

@ -1063,7 +1063,7 @@ class ECAppIter(object):
# cleanup the frag queue feeding coros that may be currently # cleanup the frag queue feeding coros that may be currently
# executing the internal_parts_iters. # executing the internal_parts_iters.
if self.stashed_iter: if self.stashed_iter:
self.stashed_iter.close() close_if_possible(self.stashed_iter)
sleep() # Give the per-frag threads a chance to clean up sleep() # Give the per-frag threads a chance to clean up
for it in self.internal_parts_iters: for it in self.internal_parts_iters:
close_if_possible(it) close_if_possible(it)
@ -1200,10 +1200,15 @@ class ECAppIter(object):
def __iter__(self): def __iter__(self):
if self.stashed_iter is not None: if self.stashed_iter is not None:
return iter(self.stashed_iter) return self
else: else:
raise ValueError("Failed to call kickoff() before __iter__()") raise ValueError("Failed to call kickoff() before __iter__()")
def __next__(self):
return next(self.stashed_iter)
next = __next__ # py2
def _real_iter(self, req, resp_headers): def _real_iter(self, req, resp_headers):
if not self.range_specs: if not self.range_specs:
client_asked_for_range = False client_asked_for_range = False

View File

@ -32,15 +32,22 @@ class LeakTrackingIter(object):
def __init__(self, inner_iter, mark_closed, mark_read, key): def __init__(self, inner_iter, mark_closed, mark_read, key):
if isinstance(inner_iter, bytes): if isinstance(inner_iter, bytes):
inner_iter = (inner_iter, ) inner_iter = (inner_iter, )
self.inner_iter = inner_iter self.inner_iter = iter(inner_iter)
self.mark_closed = mark_closed self.mark_closed = mark_closed
self.mark_read = mark_read self.mark_read = mark_read
self.key = key self.key = key
def __iter__(self): def __iter__(self):
for x in self.inner_iter: return self
yield x
def __next__(self):
try:
return next(self.inner_iter)
except StopIteration:
self.mark_read(self.key) self.mark_read(self.key)
raise
next = __next__ # for py2
def close(self): def close(self):
self.mark_closed(self.key) self.mark_closed(self.key)

View File

@ -51,8 +51,10 @@ class FakeApp(object):
except ValueError: except ValueError:
is_container_or_object_req = False is_container_or_object_req = False
headers = [('Content-Type', 'text/plain'), headers = [('Content-Type', 'text/plain')]
('Content-Length', str(sum(map(len, self.body))))] if not hasattr(self.body, 'close'):
content_length = sum(map(len, self.body))
headers.append(('Content-Length', str(content_length)))
if is_container_or_object_req and self.policy_idx is not None: if is_container_or_object_req and self.policy_idx is not None:
headers.append(('X-Backend-Storage-Policy-Index', headers.append(('X-Backend-Storage-Policy-Index',
str(self.policy_idx))) str(self.policy_idx)))
@ -612,13 +614,22 @@ class TestProxyLogging(unittest.TestCase):
class CloseableBody(object): class CloseableBody(object):
def __init__(self): def __init__(self):
self.msg = b"CloseableBody"
self.closed = False self.closed = False
def close(self): def close(self):
self.closed = True self.closed = True
def __iter__(self): def __iter__(self):
return iter(["CloseableBody"]) return self
def __next__(self):
if not self.msg:
raise StopIteration
result, self.msg = self.msg, b''
return result
next = __next__ # py2
body = CloseableBody() body = CloseableBody()
app = proxy_logging.ProxyLoggingMiddleware(FakeApp(body), {}) app = proxy_logging.ProxyLoggingMiddleware(FakeApp(body), {})
@ -682,6 +693,27 @@ class TestProxyLogging(unittest.TestCase):
self.assertEqual(log_parts[6], '499') self.assertEqual(log_parts[6], '499')
self.assertEqual(log_parts[11], '4') # write length self.assertEqual(log_parts[11], '4') # write length
def test_exploding_body(self):
def exploding_body():
yield 'some'
yield 'stuff'
raise Exception('kaboom!')
app = proxy_logging.ProxyLoggingMiddleware(
FakeApp(exploding_body()), {
'log_msg_template': '{method} {path} '
'{status_int} {wire_status_int}',
})
app.access_logger = FakeLogger()
req = Request.blank('/', environ={'REQUEST_METHOD': 'GET'})
resp = req.get_response(app)
with self.assertRaises(Exception) as ctx:
resp.body
self.assertEqual('kaboom!', str(ctx.exception))
log_parts = self._log_parts(app)
self.assertEqual(log_parts, ['GET', '/', '500', '200'])
def test_disconnect_on_readline(self): def test_disconnect_on_readline(self):
app = proxy_logging.ProxyLoggingMiddleware(FakeAppReadline(), {}) app = proxy_logging.ProxyLoggingMiddleware(FakeAppReadline(), {})
app.access_logger = FakeLogger() app.access_logger = FakeLogger()
@ -748,7 +780,7 @@ class TestProxyLogging(unittest.TestCase):
app = proxy_logging.ProxyLoggingMiddleware( app = proxy_logging.ProxyLoggingMiddleware(
FakeAppNoContentLengthNoTransferEncoding( FakeAppNoContentLengthNoTransferEncoding(
# test the "while not chunk: chunk = next(iterator)" # test the "while not chunk: chunk = next(iterator)"
body=['', '', ''], body=[b'', b'', b''],
), {}) ), {})
app.access_logger = FakeLogger() app.access_logger = FakeLogger()
req = Request.blank('/', environ={'REQUEST_METHOD': 'GET'}) req = Request.blank('/', environ={'REQUEST_METHOD': 'GET'})

View File

@ -1247,6 +1247,23 @@ class TestUtils(unittest.TestCase):
else: else:
os.environ.pop('TZ') os.environ.pop('TZ')
def test_drain_and_close(self):
utils.drain_and_close([])
utils.drain_and_close(iter([]))
drained = [False]
def gen():
yield 'x'
yield 'y'
drained[0] = True
utils.drain_and_close(gen())
self.assertTrue(drained[0])
utils.drain_and_close(Response(status=200, body=b'Some body'))
drained = [False]
utils.drain_and_close(Response(status=200, app_iter=gen()))
self.assertTrue(drained[0])
def test_backwards(self): def test_backwards(self):
# Test swift.common.utils.backward # Test swift.common.utils.backward