Improving statistics sent to Graphite.
Currently, statistics are organized by command. However, it would also be useful to display statistics organized by policy. Different policies may be based on different storage properties (ie, faster disks). With this change, all the statistics for object timers will be sent per policy as well. Policy statistics reporting will use policy index and the name in Graphite will show as proxy-server.object.policy.<policy-index>.<verb>, etc. Updated unit tests for per-policy stat reporting and added new unit tests for invalid cases. Updated documentation in the Administrator's Guide to reflect this new aggregation. Change-Id: Id70491e4833791a3fb8ff385953d69018514cd9c
This commit is contained in:
parent
a2c0e6f687
commit
4765189ef3
@ -1060,7 +1060,7 @@ proxy-server controller responsible for the request: "account", "container",
|
||||
middleware. The `<verb>` portion will be one of "GET", "HEAD", "POST", "PUT",
|
||||
"DELETE", "COPY", "OPTIONS", or "BAD_METHOD". The list of valid HTTP methods
|
||||
is configurable via the `log_statsd_valid_http_methods` config variable and
|
||||
the default setting yields the above behavior.
|
||||
the default setting yields the above behavior):
|
||||
|
||||
.. _Swift Origin Server: https://github.com/dpgoetz/sos
|
||||
|
||||
@ -1082,6 +1082,21 @@ Metric Name Description
|
||||
like the main timing metric.
|
||||
==================================================== ============================================
|
||||
|
||||
The `proxy-logging` middleware also groups these metrics by policy. The
|
||||
`<policy-index>` portion represents a policy index):
|
||||
|
||||
========================================================================== =====================================
|
||||
Metric Name Description
|
||||
-------------------------------------------------------------------------- -------------------------------------
|
||||
`proxy-server.object.policy.<policy-index>.<verb>.<status>.timing` Timing data for requests, aggregated
|
||||
by policy index.
|
||||
`proxy-server.object.policy.<policy-index>.GET.<status>.first-byte.timing` Timing data up to completion of
|
||||
sending the response headers,
|
||||
aggregated by policy index.
|
||||
`proxy-server.object.policy.<policy-index>.<verb>.<status>.xfer` Sum of bytes transferred in and out,
|
||||
aggregated by policy index.
|
||||
========================================================================== =====================================
|
||||
|
||||
Metrics for `tempauth` middleware (in the table, `<reseller_prefix>` represents
|
||||
the actual configured reseller_prefix or "`NONE`" if the reseller_prefix is the
|
||||
empty string):
|
||||
|
@ -80,6 +80,8 @@ from swift.common.utils import (get_logger, get_remote_client,
|
||||
get_valid_utf8_str, config_true_value,
|
||||
InputProxy, list_from_csv, get_policy_index)
|
||||
|
||||
from swift.common.storage_policy import POLICIES
|
||||
|
||||
QUOTE_SAFE = '/:'
|
||||
|
||||
|
||||
@ -195,17 +197,27 @@ class ProxyLoggingMiddleware(object):
|
||||
end_time_str,
|
||||
policy_index
|
||||
)))
|
||||
|
||||
# Log timing and bytes-transferred data to StatsD
|
||||
metric_name = self.statsd_metric_name(req, status_int, method)
|
||||
metric_name_policy = self.statsd_metric_name_policy(req, status_int,
|
||||
method,
|
||||
policy_index)
|
||||
# Only log data for valid controllers (or SOS) to keep the metric count
|
||||
# down (egregious errors will get logged by the proxy server itself).
|
||||
|
||||
if metric_name:
|
||||
self.access_logger.timing(metric_name + '.timing',
|
||||
(end_time - start_time) * 1000)
|
||||
self.access_logger.update_stats(metric_name + '.xfer',
|
||||
bytes_received + bytes_sent)
|
||||
if metric_name_policy:
|
||||
self.access_logger.timing(metric_name_policy + '.timing',
|
||||
(end_time - start_time) * 1000)
|
||||
self.access_logger.update_stats(metric_name_policy + '.xfer',
|
||||
bytes_received + bytes_sent)
|
||||
|
||||
def statsd_metric_name(self, req, status_int, method):
|
||||
def get_metric_name_type(self, req):
|
||||
if req.path.startswith('/v1/'):
|
||||
try:
|
||||
stat_type = [None, 'account', 'container',
|
||||
@ -214,12 +226,33 @@ class ProxyLoggingMiddleware(object):
|
||||
stat_type = 'object'
|
||||
else:
|
||||
stat_type = req.environ.get('swift.source')
|
||||
return stat_type
|
||||
|
||||
def statsd_metric_name(self, req, status_int, method):
|
||||
stat_type = self.get_metric_name_type(req)
|
||||
if stat_type is None:
|
||||
return None
|
||||
stat_method = method if method in self.valid_methods \
|
||||
else 'BAD_METHOD'
|
||||
return '.'.join((stat_type, stat_method, str(status_int)))
|
||||
|
||||
def statsd_metric_name_policy(self, req, status_int, method, policy_index):
|
||||
if policy_index is None:
|
||||
return None
|
||||
stat_type = self.get_metric_name_type(req)
|
||||
if stat_type == 'object':
|
||||
stat_method = method if method in self.valid_methods \
|
||||
else 'BAD_METHOD'
|
||||
# The policy may not exist
|
||||
policy = POLICIES.get_by_index(policy_index)
|
||||
if policy:
|
||||
return '.'.join((stat_type, 'policy', str(policy_index),
|
||||
stat_method, str(status_int)))
|
||||
else:
|
||||
return None
|
||||
else:
|
||||
return None
|
||||
|
||||
def __call__(self, env, start_response):
|
||||
if self.req_already_logged(env):
|
||||
return self.app(env, start_response)
|
||||
@ -270,10 +303,16 @@ class ProxyLoggingMiddleware(object):
|
||||
method = self.method_from_req(req)
|
||||
if method == 'GET':
|
||||
status_int = status_int_for_logging()
|
||||
policy_index = get_policy_index(req.headers, resp_headers)
|
||||
metric_name = self.statsd_metric_name(req, status_int, method)
|
||||
metric_name_policy = self.statsd_metric_name_policy(
|
||||
req, status_int, method, policy_index)
|
||||
if metric_name:
|
||||
self.access_logger.timing_since(
|
||||
metric_name + '.first-byte.timing', start_time)
|
||||
if metric_name_policy:
|
||||
self.access_logger.timing_since(
|
||||
metric_name_policy + '.first-byte.timing', start_time)
|
||||
|
||||
bytes_sent = 0
|
||||
client_disconnect = False
|
||||
|
@ -21,25 +21,39 @@ import mock
|
||||
from six import BytesIO
|
||||
|
||||
from test.unit import FakeLogger
|
||||
from swift.common.utils import get_logger
|
||||
from swift.common.utils import get_logger, split_path
|
||||
from swift.common.middleware import proxy_logging
|
||||
from swift.common.swob import Request, Response
|
||||
from swift.common import constraints
|
||||
from swift.common.storage_policy import StoragePolicy
|
||||
from test.unit import patch_policies
|
||||
|
||||
|
||||
class FakeApp(object):
|
||||
|
||||
def __init__(self, body=None, response_str='200 OK'):
|
||||
def __init__(self, body=None, response_str='200 OK', policy_idx='0'):
|
||||
if body is None:
|
||||
body = ['FAKE APP']
|
||||
|
||||
self.body = body
|
||||
self.response_str = response_str
|
||||
self.policy_idx = policy_idx
|
||||
|
||||
def __call__(self, env, start_response):
|
||||
start_response(self.response_str,
|
||||
[('Content-Type', 'text/plain'),
|
||||
('Content-Length', str(sum(map(len, self.body))))])
|
||||
try:
|
||||
# /v1/a/c or /v1/a/c/o
|
||||
split_path(env['PATH_INFO'], 3, 4, True)
|
||||
is_container_or_object_req = True
|
||||
except ValueError:
|
||||
is_container_or_object_req = False
|
||||
|
||||
headers = [('Content-Type', 'text/plain'),
|
||||
('Content-Length', str(sum(map(len, self.body))))]
|
||||
if is_container_or_object_req and self.policy_idx is not None:
|
||||
headers.append(('X-Backend-Storage-Policy-Index',
|
||||
str(self.policy_idx)))
|
||||
|
||||
start_response(self.response_str, headers)
|
||||
while env['wsgi.input'].read(5):
|
||||
pass
|
||||
return self.body
|
||||
@ -91,8 +105,12 @@ def start_response(*args):
|
||||
pass
|
||||
|
||||
|
||||
@patch_policies([StoragePolicy(0, 'zero', False)])
|
||||
class TestProxyLogging(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
pass
|
||||
|
||||
def _log_parts(self, app, should_be_empty=False):
|
||||
info_calls = app.access_logger.log_dict['info']
|
||||
if should_be_empty:
|
||||
@ -136,11 +154,14 @@ class TestProxyLogging(unittest.TestCase):
|
||||
for timing_call in timing_calls:
|
||||
self.assertNotEqual(not_exp_metric, timing_call[0][0])
|
||||
|
||||
def assertUpdateStats(self, exp_metric, exp_bytes, app):
|
||||
update_stats_calls = app.access_logger.log_dict['update_stats']
|
||||
self.assertEquals(1, len(update_stats_calls))
|
||||
self.assertEquals({}, update_stats_calls[0][1])
|
||||
self.assertEquals((exp_metric, exp_bytes), update_stats_calls[0][0])
|
||||
def assertUpdateStats(self, exp_metrics_and_values, app):
|
||||
update_stats_calls = sorted(app.access_logger.log_dict['update_stats'])
|
||||
got_metrics_values_and_kwargs = [(usc[0][0], usc[0][1], usc[1])
|
||||
for usc in update_stats_calls]
|
||||
exp_metrics_values_and_kwargs = [(emv[0], emv[1], {})
|
||||
for emv in exp_metrics_and_values]
|
||||
self.assertEqual(got_metrics_values_and_kwargs,
|
||||
exp_metrics_values_and_kwargs)
|
||||
|
||||
def test_log_request_statsd_invalid_stats_types(self):
|
||||
app = proxy_logging.ProxyLoggingMiddleware(FakeApp(), {})
|
||||
@ -198,14 +219,51 @@ class TestProxyLogging(unittest.TestCase):
|
||||
'wsgi.input': BytesIO(b'4321')})
|
||||
stub_times = [18.0, 20.71828182846]
|
||||
iter_response = app(req.environ, lambda *_: None)
|
||||
|
||||
self.assertEqual('7654321', ''.join(iter_response))
|
||||
self.assertTiming('%s.GET.321.timing' % exp_type, app,
|
||||
exp_timing=2.71828182846 * 1000)
|
||||
self.assertTimingSince(
|
||||
'%s.GET.321.first-byte.timing' % exp_type, app,
|
||||
exp_start=18.0)
|
||||
self.assertUpdateStats('%s.GET.321.xfer' % exp_type,
|
||||
4 + 7, app)
|
||||
if exp_type == 'object':
|
||||
# Object operations also return stats by policy
|
||||
# In this case, the value needs to match the timing for GET
|
||||
self.assertTiming('%s.policy.0.GET.321.timing' % exp_type,
|
||||
app, exp_timing=2.71828182846 * 1000)
|
||||
self.assertUpdateStats([('%s.GET.321.xfer' % exp_type,
|
||||
4 + 7),
|
||||
('object.policy.0.GET.321.xfer',
|
||||
4 + 7)],
|
||||
app)
|
||||
else:
|
||||
self.assertUpdateStats([('%s.GET.321.xfer' % exp_type,
|
||||
4 + 7)],
|
||||
app)
|
||||
|
||||
# GET Repeat the test above, but with a non-existent policy
|
||||
# Do this only for object types
|
||||
if exp_type == 'object':
|
||||
app = proxy_logging.ProxyLoggingMiddleware(
|
||||
FakeApp(body='7654321', response_str='321 Fubar',
|
||||
policy_idx='-1'), {})
|
||||
app.access_logger = FakeLogger()
|
||||
req = Request.blank(path, environ={
|
||||
'REQUEST_METHOD': 'GET',
|
||||
'wsgi.input': BytesIO(b'4321')})
|
||||
stub_times = [18.0, 20.71828182846]
|
||||
iter_response = app(req.environ, lambda *_: None)
|
||||
|
||||
self.assertEqual('7654321', ''.join(iter_response))
|
||||
self.assertTiming('%s.GET.321.timing' % exp_type, app,
|
||||
exp_timing=2.71828182846 * 1000)
|
||||
self.assertTimingSince(
|
||||
'%s.GET.321.first-byte.timing' % exp_type, app,
|
||||
exp_start=18.0)
|
||||
# No results returned for the non-existent policy
|
||||
self.assertUpdateStats([('%s.GET.321.xfer' % exp_type,
|
||||
4 + 7)],
|
||||
app)
|
||||
|
||||
# GET with swift.proxy_access_log_made already set
|
||||
app = proxy_logging.ProxyLoggingMiddleware(
|
||||
@ -241,8 +299,42 @@ class TestProxyLogging(unittest.TestCase):
|
||||
'%s.GET.314.first-byte.timing' % exp_type, app)
|
||||
self.assertNotTiming(
|
||||
'%s.PUT.314.first-byte.timing' % exp_type, app)
|
||||
if exp_type == 'object':
|
||||
# Object operations also return stats by policy In this
|
||||
# case, the value needs to match the timing for PUT.
|
||||
self.assertTiming('%s.policy.0.PUT.314.timing' %
|
||||
exp_type, app,
|
||||
exp_timing=7.3321 * 1000)
|
||||
self.assertUpdateStats(
|
||||
'%s.PUT.314.xfer' % exp_type, 6 + 8, app)
|
||||
[('object.PUT.314.xfer', 6 + 8),
|
||||
('object.policy.0.PUT.314.xfer', 6 + 8)], app)
|
||||
else:
|
||||
self.assertUpdateStats(
|
||||
[('%s.PUT.314.xfer' % exp_type, 6 + 8)], app)
|
||||
|
||||
# PUT Repeat the test above, but with a non-existent policy
|
||||
# Do this only for object types
|
||||
if exp_type == 'object':
|
||||
app = proxy_logging.ProxyLoggingMiddleware(
|
||||
FakeApp(body='87654321', response_str='314 PiTown',
|
||||
policy_idx='-1'), {})
|
||||
app.access_logger = FakeLogger()
|
||||
req = Request.blank(path, environ={
|
||||
'REQUEST_METHOD': 'PUT',
|
||||
'wsgi.input': BytesIO(b'654321')})
|
||||
# (it's not a GET, so time() doesn't have a 2nd call)
|
||||
stub_times = [58.2, 58.2 + 7.3321]
|
||||
iter_response = app(req.environ, lambda *_: None)
|
||||
self.assertEqual('87654321', ''.join(iter_response))
|
||||
self.assertTiming('%s.PUT.314.timing' % exp_type, app,
|
||||
exp_timing=7.3321 * 1000)
|
||||
self.assertNotTiming(
|
||||
'%s.GET.314.first-byte.timing' % exp_type, app)
|
||||
self.assertNotTiming(
|
||||
'%s.PUT.314.first-byte.timing' % exp_type, app)
|
||||
# No results returned for the non-existent policy
|
||||
self.assertUpdateStats([('object.PUT.314.xfer', 6 + 8)],
|
||||
app)
|
||||
|
||||
def test_log_request_stat_method_filtering_default(self):
|
||||
method_map = {
|
||||
@ -266,8 +358,8 @@ class TestProxyLogging(unittest.TestCase):
|
||||
app.log_request(req, 299, 11, 3, now, now + 1.17)
|
||||
self.assertTiming('account.%s.299.timing' % exp_method, app,
|
||||
exp_timing=1.17 * 1000)
|
||||
self.assertUpdateStats('account.%s.299.xfer' % exp_method,
|
||||
11 + 3, app)
|
||||
self.assertUpdateStats([('account.%s.299.xfer' % exp_method,
|
||||
11 + 3)], app)
|
||||
|
||||
def test_log_request_stat_method_filtering_custom(self):
|
||||
method_map = {
|
||||
@ -293,8 +385,8 @@ class TestProxyLogging(unittest.TestCase):
|
||||
app.log_request(req, 911, 4, 43, now, now + 1.01)
|
||||
self.assertTiming('container.%s.911.timing' % exp_method, app,
|
||||
exp_timing=1.01 * 1000)
|
||||
self.assertUpdateStats('container.%s.911.xfer' % exp_method,
|
||||
4 + 43, app)
|
||||
self.assertUpdateStats([('container.%s.911.xfer' % exp_method,
|
||||
4 + 43)], app)
|
||||
|
||||
def test_basic_req(self):
|
||||
app = proxy_logging.ProxyLoggingMiddleware(FakeApp(), {})
|
||||
@ -336,7 +428,7 @@ class TestProxyLogging(unittest.TestCase):
|
||||
self.assertEquals(log_parts[6], '200')
|
||||
self.assertEquals(resp_body, 'somechunksof data')
|
||||
self.assertEquals(log_parts[11], str(len(resp_body)))
|
||||
self.assertUpdateStats('SOS.GET.200.xfer', len(resp_body), app)
|
||||
self.assertUpdateStats([('SOS.GET.200.xfer', len(resp_body))], app)
|
||||
|
||||
def test_log_headers(self):
|
||||
for conf_key in ['access_log_headers', 'log_headers']:
|
||||
@ -372,6 +464,7 @@ class TestProxyLogging(unittest.TestCase):
|
||||
self.assertTrue('Host: localhost:80' not in headers)
|
||||
|
||||
def test_upload_size(self):
|
||||
# Using default policy
|
||||
app = proxy_logging.ProxyLoggingMiddleware(FakeApp(),
|
||||
{'log_headers': 'yes'})
|
||||
app.access_logger = FakeLogger()
|
||||
@ -385,8 +478,46 @@ class TestProxyLogging(unittest.TestCase):
|
||||
log_parts = self._log_parts(app)
|
||||
self.assertEquals(log_parts[11], str(len('FAKE APP')))
|
||||
self.assertEquals(log_parts[10], str(len('some stuff')))
|
||||
self.assertUpdateStats('object.PUT.200.xfer',
|
||||
len('some stuff') + len('FAKE APP'),
|
||||
self.assertUpdateStats([('object.PUT.200.xfer',
|
||||
len('some stuff') + len('FAKE APP')),
|
||||
('object.policy.0.PUT.200.xfer',
|
||||
len('some stuff') + len('FAKE APP'))],
|
||||
app)
|
||||
|
||||
# Using a non-existent policy
|
||||
app = proxy_logging.ProxyLoggingMiddleware(FakeApp(policy_idx='-1'),
|
||||
{'log_headers': 'yes'})
|
||||
app.access_logger = FakeLogger()
|
||||
req = Request.blank(
|
||||
'/v1/a/c/o/foo',
|
||||
environ={'REQUEST_METHOD': 'PUT',
|
||||
'wsgi.input': BytesIO(b'some stuff')})
|
||||
resp = app(req.environ, start_response)
|
||||
# exhaust generator
|
||||
[x for x in resp]
|
||||
log_parts = self._log_parts(app)
|
||||
self.assertEquals(log_parts[11], str(len('FAKE APP')))
|
||||
self.assertEquals(log_parts[10], str(len('some stuff')))
|
||||
self.assertUpdateStats([('object.PUT.200.xfer',
|
||||
len('some stuff') + len('FAKE APP'))],
|
||||
app)
|
||||
|
||||
def test_upload_size_no_policy(self):
|
||||
app = proxy_logging.ProxyLoggingMiddleware(FakeApp(policy_idx=None),
|
||||
{'log_headers': 'yes'})
|
||||
app.access_logger = FakeLogger()
|
||||
req = Request.blank(
|
||||
'/v1/a/c/o/foo',
|
||||
environ={'REQUEST_METHOD': 'PUT',
|
||||
'wsgi.input': BytesIO(b'some stuff')})
|
||||
resp = app(req.environ, start_response)
|
||||
# exhaust generator
|
||||
[x for x in resp]
|
||||
log_parts = self._log_parts(app)
|
||||
self.assertEquals(log_parts[11], str(len('FAKE APP')))
|
||||
self.assertEquals(log_parts[10], str(len('some stuff')))
|
||||
self.assertUpdateStats([('object.PUT.200.xfer',
|
||||
len('some stuff') + len('FAKE APP'))],
|
||||
app)
|
||||
|
||||
def test_upload_line(self):
|
||||
@ -403,8 +534,8 @@ class TestProxyLogging(unittest.TestCase):
|
||||
log_parts = self._log_parts(app)
|
||||
self.assertEquals(log_parts[11], str(len('FAKE APP')))
|
||||
self.assertEquals(log_parts[10], str(len('some stuff\n')))
|
||||
self.assertUpdateStats('container.POST.200.xfer',
|
||||
len('some stuff\n') + len('FAKE APP'),
|
||||
self.assertUpdateStats([('container.POST.200.xfer',
|
||||
len('some stuff\n') + len('FAKE APP'))],
|
||||
app)
|
||||
|
||||
def test_log_query_string(self):
|
||||
@ -881,10 +1012,9 @@ class TestProxyLogging(unittest.TestCase):
|
||||
def test_policy_index(self):
|
||||
# Policy index can be specified by X-Backend-Storage-Policy-Index
|
||||
# in the request header for object API
|
||||
app = proxy_logging.ProxyLoggingMiddleware(FakeApp(), {})
|
||||
app = proxy_logging.ProxyLoggingMiddleware(FakeApp(policy_idx='1'), {})
|
||||
app.access_logger = FakeLogger()
|
||||
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Backend-Storage-Policy-Index': '1'})
|
||||
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'})
|
||||
resp = app(req.environ, start_response)
|
||||
''.join(resp)
|
||||
log_parts = self._log_parts(app)
|
||||
|
Loading…
Reference in New Issue
Block a user