Merge "Improving statistics sent to Graphite."
This commit is contained in:
commit
63ab40db9a
@ -1065,7 +1065,7 @@ proxy-server controller responsible for the request: "account", "container",
|
||||
middleware. The `<verb>` portion will be one of "GET", "HEAD", "POST", "PUT",
|
||||
"DELETE", "COPY", "OPTIONS", or "BAD_METHOD". The list of valid HTTP methods
|
||||
is configurable via the `log_statsd_valid_http_methods` config variable and
|
||||
the default setting yields the above behavior.
|
||||
the default setting yields the above behavior):
|
||||
|
||||
.. _Swift Origin Server: https://github.com/dpgoetz/sos
|
||||
|
||||
@ -1087,6 +1087,21 @@ Metric Name Description
|
||||
like the main timing metric.
|
||||
==================================================== ============================================
|
||||
|
||||
The `proxy-logging` middleware also groups these metrics by policy. The
|
||||
`<policy-index>` portion represents a policy index):
|
||||
|
||||
========================================================================== =====================================
|
||||
Metric Name Description
|
||||
-------------------------------------------------------------------------- -------------------------------------
|
||||
`proxy-server.object.policy.<policy-index>.<verb>.<status>.timing` Timing data for requests, aggregated
|
||||
by policy index.
|
||||
`proxy-server.object.policy.<policy-index>.GET.<status>.first-byte.timing` Timing data up to completion of
|
||||
sending the response headers,
|
||||
aggregated by policy index.
|
||||
`proxy-server.object.policy.<policy-index>.<verb>.<status>.xfer` Sum of bytes transferred in and out,
|
||||
aggregated by policy index.
|
||||
========================================================================== =====================================
|
||||
|
||||
Metrics for `tempauth` middleware (in the table, `<reseller_prefix>` represents
|
||||
the actual configured reseller_prefix or "`NONE`" if the reseller_prefix is the
|
||||
empty string):
|
||||
|
@ -80,6 +80,8 @@ from swift.common.utils import (get_logger, get_remote_client,
|
||||
get_valid_utf8_str, config_true_value,
|
||||
InputProxy, list_from_csv, get_policy_index)
|
||||
|
||||
from swift.common.storage_policy import POLICIES
|
||||
|
||||
QUOTE_SAFE = '/:'
|
||||
|
||||
|
||||
@ -195,17 +197,27 @@ class ProxyLoggingMiddleware(object):
|
||||
end_time_str,
|
||||
policy_index
|
||||
)))
|
||||
|
||||
# Log timing and bytes-transferred data to StatsD
|
||||
metric_name = self.statsd_metric_name(req, status_int, method)
|
||||
metric_name_policy = self.statsd_metric_name_policy(req, status_int,
|
||||
method,
|
||||
policy_index)
|
||||
# Only log data for valid controllers (or SOS) to keep the metric count
|
||||
# down (egregious errors will get logged by the proxy server itself).
|
||||
|
||||
if metric_name:
|
||||
self.access_logger.timing(metric_name + '.timing',
|
||||
(end_time - start_time) * 1000)
|
||||
self.access_logger.update_stats(metric_name + '.xfer',
|
||||
bytes_received + bytes_sent)
|
||||
if metric_name_policy:
|
||||
self.access_logger.timing(metric_name_policy + '.timing',
|
||||
(end_time - start_time) * 1000)
|
||||
self.access_logger.update_stats(metric_name_policy + '.xfer',
|
||||
bytes_received + bytes_sent)
|
||||
|
||||
def statsd_metric_name(self, req, status_int, method):
|
||||
def get_metric_name_type(self, req):
|
||||
if req.path.startswith('/v1/'):
|
||||
try:
|
||||
stat_type = [None, 'account', 'container',
|
||||
@ -214,12 +226,33 @@ class ProxyLoggingMiddleware(object):
|
||||
stat_type = 'object'
|
||||
else:
|
||||
stat_type = req.environ.get('swift.source')
|
||||
return stat_type
|
||||
|
||||
def statsd_metric_name(self, req, status_int, method):
|
||||
stat_type = self.get_metric_name_type(req)
|
||||
if stat_type is None:
|
||||
return None
|
||||
stat_method = method if method in self.valid_methods \
|
||||
else 'BAD_METHOD'
|
||||
return '.'.join((stat_type, stat_method, str(status_int)))
|
||||
|
||||
def statsd_metric_name_policy(self, req, status_int, method, policy_index):
|
||||
if policy_index is None:
|
||||
return None
|
||||
stat_type = self.get_metric_name_type(req)
|
||||
if stat_type == 'object':
|
||||
stat_method = method if method in self.valid_methods \
|
||||
else 'BAD_METHOD'
|
||||
# The policy may not exist
|
||||
policy = POLICIES.get_by_index(policy_index)
|
||||
if policy:
|
||||
return '.'.join((stat_type, 'policy', str(policy_index),
|
||||
stat_method, str(status_int)))
|
||||
else:
|
||||
return None
|
||||
else:
|
||||
return None
|
||||
|
||||
def __call__(self, env, start_response):
|
||||
if self.req_already_logged(env):
|
||||
return self.app(env, start_response)
|
||||
@ -270,10 +303,16 @@ class ProxyLoggingMiddleware(object):
|
||||
method = self.method_from_req(req)
|
||||
if method == 'GET':
|
||||
status_int = status_int_for_logging()
|
||||
policy_index = get_policy_index(req.headers, resp_headers)
|
||||
metric_name = self.statsd_metric_name(req, status_int, method)
|
||||
metric_name_policy = self.statsd_metric_name_policy(
|
||||
req, status_int, method, policy_index)
|
||||
if metric_name:
|
||||
self.access_logger.timing_since(
|
||||
metric_name + '.first-byte.timing', start_time)
|
||||
if metric_name_policy:
|
||||
self.access_logger.timing_since(
|
||||
metric_name_policy + '.first-byte.timing', start_time)
|
||||
|
||||
bytes_sent = 0
|
||||
client_disconnect = False
|
||||
|
@ -21,25 +21,39 @@ import mock
|
||||
from six import BytesIO
|
||||
|
||||
from test.unit import FakeLogger
|
||||
from swift.common.utils import get_logger
|
||||
from swift.common.utils import get_logger, split_path
|
||||
from swift.common.middleware import proxy_logging
|
||||
from swift.common.swob import Request, Response
|
||||
from swift.common import constraints
|
||||
from swift.common.storage_policy import StoragePolicy
|
||||
from test.unit import patch_policies
|
||||
|
||||
|
||||
class FakeApp(object):
|
||||
|
||||
def __init__(self, body=None, response_str='200 OK'):
|
||||
def __init__(self, body=None, response_str='200 OK', policy_idx='0'):
|
||||
if body is None:
|
||||
body = ['FAKE APP']
|
||||
|
||||
self.body = body
|
||||
self.response_str = response_str
|
||||
self.policy_idx = policy_idx
|
||||
|
||||
def __call__(self, env, start_response):
|
||||
start_response(self.response_str,
|
||||
[('Content-Type', 'text/plain'),
|
||||
('Content-Length', str(sum(map(len, self.body))))])
|
||||
try:
|
||||
# /v1/a/c or /v1/a/c/o
|
||||
split_path(env['PATH_INFO'], 3, 4, True)
|
||||
is_container_or_object_req = True
|
||||
except ValueError:
|
||||
is_container_or_object_req = False
|
||||
|
||||
headers = [('Content-Type', 'text/plain'),
|
||||
('Content-Length', str(sum(map(len, self.body))))]
|
||||
if is_container_or_object_req and self.policy_idx is not None:
|
||||
headers.append(('X-Backend-Storage-Policy-Index',
|
||||
str(self.policy_idx)))
|
||||
|
||||
start_response(self.response_str, headers)
|
||||
while env['wsgi.input'].read(5):
|
||||
pass
|
||||
return self.body
|
||||
@ -91,8 +105,12 @@ def start_response(*args):
|
||||
pass
|
||||
|
||||
|
||||
@patch_policies([StoragePolicy(0, 'zero', False)])
|
||||
class TestProxyLogging(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
pass
|
||||
|
||||
def _log_parts(self, app, should_be_empty=False):
|
||||
info_calls = app.access_logger.log_dict['info']
|
||||
if should_be_empty:
|
||||
@ -136,11 +154,14 @@ class TestProxyLogging(unittest.TestCase):
|
||||
for timing_call in timing_calls:
|
||||
self.assertNotEqual(not_exp_metric, timing_call[0][0])
|
||||
|
||||
def assertUpdateStats(self, exp_metric, exp_bytes, app):
|
||||
update_stats_calls = app.access_logger.log_dict['update_stats']
|
||||
self.assertEquals(1, len(update_stats_calls))
|
||||
self.assertEquals({}, update_stats_calls[0][1])
|
||||
self.assertEquals((exp_metric, exp_bytes), update_stats_calls[0][0])
|
||||
def assertUpdateStats(self, exp_metrics_and_values, app):
|
||||
update_stats_calls = sorted(app.access_logger.log_dict['update_stats'])
|
||||
got_metrics_values_and_kwargs = [(usc[0][0], usc[0][1], usc[1])
|
||||
for usc in update_stats_calls]
|
||||
exp_metrics_values_and_kwargs = [(emv[0], emv[1], {})
|
||||
for emv in exp_metrics_and_values]
|
||||
self.assertEqual(got_metrics_values_and_kwargs,
|
||||
exp_metrics_values_and_kwargs)
|
||||
|
||||
def test_log_request_statsd_invalid_stats_types(self):
|
||||
app = proxy_logging.ProxyLoggingMiddleware(FakeApp(), {})
|
||||
@ -198,14 +219,51 @@ class TestProxyLogging(unittest.TestCase):
|
||||
'wsgi.input': BytesIO(b'4321')})
|
||||
stub_times = [18.0, 20.71828182846]
|
||||
iter_response = app(req.environ, lambda *_: None)
|
||||
|
||||
self.assertEqual('7654321', ''.join(iter_response))
|
||||
self.assertTiming('%s.GET.321.timing' % exp_type, app,
|
||||
exp_timing=2.71828182846 * 1000)
|
||||
self.assertTimingSince(
|
||||
'%s.GET.321.first-byte.timing' % exp_type, app,
|
||||
exp_start=18.0)
|
||||
self.assertUpdateStats('%s.GET.321.xfer' % exp_type,
|
||||
4 + 7, app)
|
||||
if exp_type == 'object':
|
||||
# Object operations also return stats by policy
|
||||
# In this case, the value needs to match the timing for GET
|
||||
self.assertTiming('%s.policy.0.GET.321.timing' % exp_type,
|
||||
app, exp_timing=2.71828182846 * 1000)
|
||||
self.assertUpdateStats([('%s.GET.321.xfer' % exp_type,
|
||||
4 + 7),
|
||||
('object.policy.0.GET.321.xfer',
|
||||
4 + 7)],
|
||||
app)
|
||||
else:
|
||||
self.assertUpdateStats([('%s.GET.321.xfer' % exp_type,
|
||||
4 + 7)],
|
||||
app)
|
||||
|
||||
# GET Repeat the test above, but with a non-existent policy
|
||||
# Do this only for object types
|
||||
if exp_type == 'object':
|
||||
app = proxy_logging.ProxyLoggingMiddleware(
|
||||
FakeApp(body='7654321', response_str='321 Fubar',
|
||||
policy_idx='-1'), {})
|
||||
app.access_logger = FakeLogger()
|
||||
req = Request.blank(path, environ={
|
||||
'REQUEST_METHOD': 'GET',
|
||||
'wsgi.input': BytesIO(b'4321')})
|
||||
stub_times = [18.0, 20.71828182846]
|
||||
iter_response = app(req.environ, lambda *_: None)
|
||||
|
||||
self.assertEqual('7654321', ''.join(iter_response))
|
||||
self.assertTiming('%s.GET.321.timing' % exp_type, app,
|
||||
exp_timing=2.71828182846 * 1000)
|
||||
self.assertTimingSince(
|
||||
'%s.GET.321.first-byte.timing' % exp_type, app,
|
||||
exp_start=18.0)
|
||||
# No results returned for the non-existent policy
|
||||
self.assertUpdateStats([('%s.GET.321.xfer' % exp_type,
|
||||
4 + 7)],
|
||||
app)
|
||||
|
||||
# GET with swift.proxy_access_log_made already set
|
||||
app = proxy_logging.ProxyLoggingMiddleware(
|
||||
@ -241,8 +299,42 @@ class TestProxyLogging(unittest.TestCase):
|
||||
'%s.GET.314.first-byte.timing' % exp_type, app)
|
||||
self.assertNotTiming(
|
||||
'%s.PUT.314.first-byte.timing' % exp_type, app)
|
||||
self.assertUpdateStats(
|
||||
'%s.PUT.314.xfer' % exp_type, 6 + 8, app)
|
||||
if exp_type == 'object':
|
||||
# Object operations also return stats by policy In this
|
||||
# case, the value needs to match the timing for PUT.
|
||||
self.assertTiming('%s.policy.0.PUT.314.timing' %
|
||||
exp_type, app,
|
||||
exp_timing=7.3321 * 1000)
|
||||
self.assertUpdateStats(
|
||||
[('object.PUT.314.xfer', 6 + 8),
|
||||
('object.policy.0.PUT.314.xfer', 6 + 8)], app)
|
||||
else:
|
||||
self.assertUpdateStats(
|
||||
[('%s.PUT.314.xfer' % exp_type, 6 + 8)], app)
|
||||
|
||||
# PUT Repeat the test above, but with a non-existent policy
|
||||
# Do this only for object types
|
||||
if exp_type == 'object':
|
||||
app = proxy_logging.ProxyLoggingMiddleware(
|
||||
FakeApp(body='87654321', response_str='314 PiTown',
|
||||
policy_idx='-1'), {})
|
||||
app.access_logger = FakeLogger()
|
||||
req = Request.blank(path, environ={
|
||||
'REQUEST_METHOD': 'PUT',
|
||||
'wsgi.input': BytesIO(b'654321')})
|
||||
# (it's not a GET, so time() doesn't have a 2nd call)
|
||||
stub_times = [58.2, 58.2 + 7.3321]
|
||||
iter_response = app(req.environ, lambda *_: None)
|
||||
self.assertEqual('87654321', ''.join(iter_response))
|
||||
self.assertTiming('%s.PUT.314.timing' % exp_type, app,
|
||||
exp_timing=7.3321 * 1000)
|
||||
self.assertNotTiming(
|
||||
'%s.GET.314.first-byte.timing' % exp_type, app)
|
||||
self.assertNotTiming(
|
||||
'%s.PUT.314.first-byte.timing' % exp_type, app)
|
||||
# No results returned for the non-existent policy
|
||||
self.assertUpdateStats([('object.PUT.314.xfer', 6 + 8)],
|
||||
app)
|
||||
|
||||
def test_log_request_stat_method_filtering_default(self):
|
||||
method_map = {
|
||||
@ -266,8 +358,8 @@ class TestProxyLogging(unittest.TestCase):
|
||||
app.log_request(req, 299, 11, 3, now, now + 1.17)
|
||||
self.assertTiming('account.%s.299.timing' % exp_method, app,
|
||||
exp_timing=1.17 * 1000)
|
||||
self.assertUpdateStats('account.%s.299.xfer' % exp_method,
|
||||
11 + 3, app)
|
||||
self.assertUpdateStats([('account.%s.299.xfer' % exp_method,
|
||||
11 + 3)], app)
|
||||
|
||||
def test_log_request_stat_method_filtering_custom(self):
|
||||
method_map = {
|
||||
@ -293,8 +385,8 @@ class TestProxyLogging(unittest.TestCase):
|
||||
app.log_request(req, 911, 4, 43, now, now + 1.01)
|
||||
self.assertTiming('container.%s.911.timing' % exp_method, app,
|
||||
exp_timing=1.01 * 1000)
|
||||
self.assertUpdateStats('container.%s.911.xfer' % exp_method,
|
||||
4 + 43, app)
|
||||
self.assertUpdateStats([('container.%s.911.xfer' % exp_method,
|
||||
4 + 43)], app)
|
||||
|
||||
def test_basic_req(self):
|
||||
app = proxy_logging.ProxyLoggingMiddleware(FakeApp(), {})
|
||||
@ -336,7 +428,7 @@ class TestProxyLogging(unittest.TestCase):
|
||||
self.assertEquals(log_parts[6], '200')
|
||||
self.assertEquals(resp_body, 'somechunksof data')
|
||||
self.assertEquals(log_parts[11], str(len(resp_body)))
|
||||
self.assertUpdateStats('SOS.GET.200.xfer', len(resp_body), app)
|
||||
self.assertUpdateStats([('SOS.GET.200.xfer', len(resp_body))], app)
|
||||
|
||||
def test_log_headers(self):
|
||||
for conf_key in ['access_log_headers', 'log_headers']:
|
||||
@ -372,6 +464,7 @@ class TestProxyLogging(unittest.TestCase):
|
||||
self.assertTrue('Host: localhost:80' not in headers)
|
||||
|
||||
def test_upload_size(self):
|
||||
# Using default policy
|
||||
app = proxy_logging.ProxyLoggingMiddleware(FakeApp(),
|
||||
{'log_headers': 'yes'})
|
||||
app.access_logger = FakeLogger()
|
||||
@ -385,8 +478,46 @@ class TestProxyLogging(unittest.TestCase):
|
||||
log_parts = self._log_parts(app)
|
||||
self.assertEquals(log_parts[11], str(len('FAKE APP')))
|
||||
self.assertEquals(log_parts[10], str(len('some stuff')))
|
||||
self.assertUpdateStats('object.PUT.200.xfer',
|
||||
len('some stuff') + len('FAKE APP'),
|
||||
self.assertUpdateStats([('object.PUT.200.xfer',
|
||||
len('some stuff') + len('FAKE APP')),
|
||||
('object.policy.0.PUT.200.xfer',
|
||||
len('some stuff') + len('FAKE APP'))],
|
||||
app)
|
||||
|
||||
# Using a non-existent policy
|
||||
app = proxy_logging.ProxyLoggingMiddleware(FakeApp(policy_idx='-1'),
|
||||
{'log_headers': 'yes'})
|
||||
app.access_logger = FakeLogger()
|
||||
req = Request.blank(
|
||||
'/v1/a/c/o/foo',
|
||||
environ={'REQUEST_METHOD': 'PUT',
|
||||
'wsgi.input': BytesIO(b'some stuff')})
|
||||
resp = app(req.environ, start_response)
|
||||
# exhaust generator
|
||||
[x for x in resp]
|
||||
log_parts = self._log_parts(app)
|
||||
self.assertEquals(log_parts[11], str(len('FAKE APP')))
|
||||
self.assertEquals(log_parts[10], str(len('some stuff')))
|
||||
self.assertUpdateStats([('object.PUT.200.xfer',
|
||||
len('some stuff') + len('FAKE APP'))],
|
||||
app)
|
||||
|
||||
def test_upload_size_no_policy(self):
|
||||
app = proxy_logging.ProxyLoggingMiddleware(FakeApp(policy_idx=None),
|
||||
{'log_headers': 'yes'})
|
||||
app.access_logger = FakeLogger()
|
||||
req = Request.blank(
|
||||
'/v1/a/c/o/foo',
|
||||
environ={'REQUEST_METHOD': 'PUT',
|
||||
'wsgi.input': BytesIO(b'some stuff')})
|
||||
resp = app(req.environ, start_response)
|
||||
# exhaust generator
|
||||
[x for x in resp]
|
||||
log_parts = self._log_parts(app)
|
||||
self.assertEquals(log_parts[11], str(len('FAKE APP')))
|
||||
self.assertEquals(log_parts[10], str(len('some stuff')))
|
||||
self.assertUpdateStats([('object.PUT.200.xfer',
|
||||
len('some stuff') + len('FAKE APP'))],
|
||||
app)
|
||||
|
||||
def test_upload_line(self):
|
||||
@ -403,8 +534,8 @@ class TestProxyLogging(unittest.TestCase):
|
||||
log_parts = self._log_parts(app)
|
||||
self.assertEquals(log_parts[11], str(len('FAKE APP')))
|
||||
self.assertEquals(log_parts[10], str(len('some stuff\n')))
|
||||
self.assertUpdateStats('container.POST.200.xfer',
|
||||
len('some stuff\n') + len('FAKE APP'),
|
||||
self.assertUpdateStats([('container.POST.200.xfer',
|
||||
len('some stuff\n') + len('FAKE APP'))],
|
||||
app)
|
||||
|
||||
def test_log_query_string(self):
|
||||
@ -881,10 +1012,9 @@ class TestProxyLogging(unittest.TestCase):
|
||||
def test_policy_index(self):
|
||||
# Policy index can be specified by X-Backend-Storage-Policy-Index
|
||||
# in the request header for object API
|
||||
app = proxy_logging.ProxyLoggingMiddleware(FakeApp(), {})
|
||||
app = proxy_logging.ProxyLoggingMiddleware(FakeApp(policy_idx='1'), {})
|
||||
app.access_logger = FakeLogger()
|
||||
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Backend-Storage-Policy-Index': '1'})
|
||||
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'})
|
||||
resp = app(req.environ, start_response)
|
||||
''.join(resp)
|
||||
log_parts = self._log_parts(app)
|
||||
|
Loading…
Reference in New Issue
Block a user