Merge "Logging improvements: handoffs and thread locals"
This commit is contained in:
commit
f20d5fdfc2
@ -538,6 +538,9 @@ set log_facility LOG_LOCAL0 Syslog log facility
|
||||
set log_level INFO Log level
|
||||
set log_headers True If True, log headers in each
|
||||
request
|
||||
set log_handoffs True If True, the proxy will log
|
||||
whenever it has to failover to a
|
||||
handoff node
|
||||
recheck_account_existence 60 Cache timeout in seconds to
|
||||
send memcached for account
|
||||
existence
|
||||
|
@ -35,6 +35,7 @@ use = egg:swift#proxy
|
||||
# set access_log_facility = LOG_LOCAL0
|
||||
# set access_log_level = INFO
|
||||
# set log_headers = False
|
||||
# set log_handoffs = True
|
||||
# recheck_account_existence = 60
|
||||
# recheck_container_existence = 60
|
||||
# object_chunk_size = 8192
|
||||
|
@ -422,6 +422,14 @@ class LogAdapter(logging.LoggerAdapter, object):
|
||||
def client_ip(self, value):
|
||||
self._cls_thread_local.client_ip = value
|
||||
|
||||
@property
|
||||
def thread_locals(self):
|
||||
return (self.txn_id, self.client_ip)
|
||||
|
||||
@thread_locals.setter
|
||||
def thread_locals(self, value):
|
||||
self.txn_id, self.client_ip = value
|
||||
|
||||
def getEffectiveLevel(self):
|
||||
return self.logger.getEffectiveLevel()
|
||||
|
||||
|
@ -551,11 +551,21 @@ class Controller(object):
|
||||
for node in nodes:
|
||||
if not self.error_limited(node):
|
||||
yield node
|
||||
handoffs = 0
|
||||
for node in ring.get_more_nodes(partition):
|
||||
if not self.error_limited(node):
|
||||
handoffs += 1
|
||||
if self.app.log_handoffs:
|
||||
self.app.logger.increment('handoff_count')
|
||||
self.app.logger.warning(
|
||||
'Handoff requested (%d)' % handoffs)
|
||||
if handoffs == len(nodes):
|
||||
self.app.logger.increment('handoff_all_count')
|
||||
yield node
|
||||
|
||||
def _make_request(self, nodes, part, method, path, headers, query):
|
||||
def _make_request(self, nodes, part, method, path, headers, query,
|
||||
logger_thread_locals):
|
||||
self.app.logger.thread_locals = logger_thread_locals
|
||||
for node in nodes:
|
||||
try:
|
||||
with ConnectionTimeout(self.app.conn_timeout):
|
||||
@ -591,7 +601,7 @@ class Controller(object):
|
||||
pile = GreenPile(len(start_nodes))
|
||||
for head in headers:
|
||||
pile.spawn(self._make_request, nodes, part, method, path,
|
||||
head, query_string)
|
||||
head, query_string, self.app.logger.thread_locals)
|
||||
response = [resp for resp in pile if resp]
|
||||
while len(response) < len(start_nodes):
|
||||
response.append((HTTP_SERVICE_UNAVAILABLE, '', ''))
|
||||
@ -642,7 +652,7 @@ class Controller(object):
|
||||
"""Handler for HTTP HEAD requests."""
|
||||
return self.GETorHEAD(req, stats_type='HEAD')
|
||||
|
||||
def _make_app_iter_reader(self, node, source, queue):
|
||||
def _make_app_iter_reader(self, node, source, queue, logger_thread_locals):
|
||||
"""
|
||||
Reads from the source and places data in the queue. It expects
|
||||
something else be reading from the queue and, if nothing does within
|
||||
@ -652,7 +662,11 @@ class Controller(object):
|
||||
logging/error-limiting purposes.
|
||||
:param source: The httplib.Response object to read from.
|
||||
:param queue: The eventlet.queue.Queue to place read source data into.
|
||||
:param logger_thread_locals: The thread local values to be set on the
|
||||
self.app.logger to retain transaction
|
||||
logging information.
|
||||
"""
|
||||
self.app.logger.thread_locals = logger_thread_locals
|
||||
try:
|
||||
try:
|
||||
while True:
|
||||
@ -708,7 +722,8 @@ class Controller(object):
|
||||
# We then drop any reference to the source or node, for garbage
|
||||
# collection purposes.
|
||||
queue = Queue(1)
|
||||
spawn_n(self._make_app_iter_reader, node, source, queue)
|
||||
spawn_n(self._make_app_iter_reader, node, source, queue,
|
||||
self.app.logger.thread_locals)
|
||||
source = node = None
|
||||
while True:
|
||||
chunk = queue.get(timeout=self.app.node_timeout)
|
||||
@ -1101,8 +1116,10 @@ class ObjectController(Controller):
|
||||
_('Trying to write to %s') % path)
|
||||
conn.queue.task_done()
|
||||
|
||||
def _connect_put_node(self, nodes, part, path, headers):
|
||||
def _connect_put_node(self, nodes, part, path, headers,
|
||||
logger_thread_locals):
|
||||
"""Method for a file PUT connect"""
|
||||
self.app.logger.thread_locals = logger_thread_locals
|
||||
for node in nodes:
|
||||
try:
|
||||
with ConnectionTimeout(self.app.conn_timeout):
|
||||
@ -1312,7 +1329,7 @@ class ObjectController(Controller):
|
||||
nheaders['X-Delete-At-Partition'] = delete_at_part
|
||||
nheaders['X-Delete-At-Device'] = node['device']
|
||||
pile.spawn(self._connect_put_node, node_iter, partition,
|
||||
req.path_info, nheaders)
|
||||
req.path_info, nheaders, self.app.logger.thread_locals)
|
||||
conns = [conn for conn in pile if conn]
|
||||
if len(conns) <= len(nodes) / 2:
|
||||
self.app.logger.error(
|
||||
@ -1934,6 +1951,8 @@ class BaseApplication(object):
|
||||
int(conf.get('rate_limit_after_segment', 10))
|
||||
self.rate_limit_segments_per_sec = \
|
||||
int(conf.get('rate_limit_segments_per_sec', 1))
|
||||
self.log_handoffs = \
|
||||
conf.get('log_handoffs', 'true').lower() in TRUE_VALUES
|
||||
|
||||
def get_controller(self, path):
|
||||
"""
|
||||
|
@ -96,10 +96,12 @@ class MockSys():
|
||||
def reset_loggers():
|
||||
if hasattr(utils.get_logger, 'handler4logger'):
|
||||
for logger, handler in utils.get_logger.handler4logger.items():
|
||||
logger.thread_locals = (None, None)
|
||||
logger.removeHandler(handler)
|
||||
delattr(utils.get_logger, 'handler4logger')
|
||||
if hasattr(utils.get_logger, 'console_handler4logger'):
|
||||
for logger, h in utils.get_logger.console_handler4logger.items():
|
||||
logger.thread_locals = (None, None)
|
||||
logger.removeHandler(h)
|
||||
delattr(utils.get_logger, 'console_handler4logger')
|
||||
|
||||
@ -1160,6 +1162,20 @@ class TestStatsdLoggingDelegation(unittest.TestCase):
|
||||
self.assertEquals('\xef\xbf\xbd\xef\xbf\xbd\xec\xbc\x9d\xef\xbf\xbd',
|
||||
utils.get_valid_utf8_str(invalid_utf8_str))
|
||||
|
||||
def test_thread_locals(self):
|
||||
logger = utils.get_logger(None)
|
||||
orig_thread_locals = logger.thread_locals
|
||||
try:
|
||||
self.assertEquals(logger.thread_locals, (None, None))
|
||||
logger.txn_id = '1234'
|
||||
logger.client_ip = '1.2.3.4'
|
||||
self.assertEquals(logger.thread_locals, ('1234', '1.2.3.4'))
|
||||
logger.txn_id = '5678'
|
||||
logger.client_ip = '5.6.7.8'
|
||||
self.assertEquals(logger.thread_locals, ('5678', '5.6.7.8'))
|
||||
finally:
|
||||
logger.thread_locals = orig_thread_locals
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
@ -354,8 +354,9 @@ class TestController(unittest.TestCase):
|
||||
self.controller.account_info(self.account)
|
||||
proxy_server.http_connect = fake_http_connect(201,
|
||||
raise_timeout_exc=True)
|
||||
self.controller._make_request(nodes, partition, 'POST',
|
||||
'/', '', '')
|
||||
self.controller._make_request(
|
||||
nodes, partition, 'POST', '/', '', '',
|
||||
self.controller.app.logger.thread_locals)
|
||||
|
||||
# tests if 200 is cached and used
|
||||
def test_account_info_200(self):
|
||||
@ -1264,6 +1265,37 @@ class TestObjectController(unittest.TestCase):
|
||||
self.app.object_ring):
|
||||
collected_nodes.append(node)
|
||||
self.assertEquals(len(collected_nodes), 9)
|
||||
|
||||
self.app.log_handoffs = True
|
||||
self.app.logger = FakeLogger()
|
||||
self.app.object_ring.max_more_nodes = 2
|
||||
controller = proxy_server.ObjectController(self.app, 'account',
|
||||
'container', 'object')
|
||||
partition, nodes = self.app.object_ring.get_nodes('account',
|
||||
'container', 'object')
|
||||
collected_nodes = []
|
||||
for node in controller.iter_nodes(partition, nodes,
|
||||
self.app.object_ring):
|
||||
collected_nodes.append(node)
|
||||
self.assertEquals(len(collected_nodes), 5)
|
||||
self.assertEquals(
|
||||
self.app.logger.log_dict['warning'],
|
||||
[(('Handoff requested (1)',), {}),
|
||||
(('Handoff requested (2)',), {})])
|
||||
|
||||
self.app.log_handoffs = False
|
||||
self.app.logger = FakeLogger()
|
||||
self.app.object_ring.max_more_nodes = 2
|
||||
controller = proxy_server.ObjectController(self.app, 'account',
|
||||
'container', 'object')
|
||||
partition, nodes = self.app.object_ring.get_nodes('account',
|
||||
'container', 'object')
|
||||
collected_nodes = []
|
||||
for node in controller.iter_nodes(partition, nodes,
|
||||
self.app.object_ring):
|
||||
collected_nodes.append(node)
|
||||
self.assertEquals(len(collected_nodes), 5)
|
||||
self.assertEquals(self.app.logger.log_dict['warning'], [])
|
||||
finally:
|
||||
self.app.object_ring.max_more_nodes = 0
|
||||
|
||||
@ -3197,7 +3229,8 @@ class TestObjectController(unittest.TestCase):
|
||||
with save_globals():
|
||||
given_headers = {}
|
||||
|
||||
def fake_connect_put_node(nodes, part, path, headers):
|
||||
def fake_connect_put_node(nodes, part, path, headers,
|
||||
logger_thread_locals):
|
||||
given_headers.update(headers)
|
||||
|
||||
controller = proxy_server.ObjectController(self.app, 'account',
|
||||
|
Loading…
Reference in New Issue
Block a user