diff --git a/doc/source/deployment_guide.rst b/doc/source/deployment_guide.rst index bf321cfd62..2e2c296d4d 100644 --- a/doc/source/deployment_guide.rst +++ b/doc/source/deployment_guide.rst @@ -538,6 +538,9 @@ set log_facility LOG_LOCAL0 Syslog log facility set log_level INFO Log level set log_headers True If True, log headers in each request +set log_handoffs True If True, the proxy will log + whenever it has to failover to a + handoff node recheck_account_existence 60 Cache timeout in seconds to send memcached for account existence diff --git a/etc/proxy-server.conf-sample b/etc/proxy-server.conf-sample index 8a3fe9b29e..7ca1fcbe14 100644 --- a/etc/proxy-server.conf-sample +++ b/etc/proxy-server.conf-sample @@ -35,6 +35,7 @@ use = egg:swift#proxy # set access_log_facility = LOG_LOCAL0 # set access_log_level = INFO # set log_headers = False +# set log_handoffs = True # recheck_account_existence = 60 # recheck_container_existence = 60 # object_chunk_size = 8192 diff --git a/swift/common/utils.py b/swift/common/utils.py index 7c19616493..65aeef5db9 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -422,6 +422,14 @@ class LogAdapter(logging.LoggerAdapter, object): def client_ip(self, value): self._cls_thread_local.client_ip = value + @property + def thread_locals(self): + return (self.txn_id, self.client_ip) + + @thread_locals.setter + def thread_locals(self, value): + self.txn_id, self.client_ip = value + def getEffectiveLevel(self): return self.logger.getEffectiveLevel() diff --git a/swift/proxy/server.py b/swift/proxy/server.py index beec39033a..29899d13f6 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -551,11 +551,21 @@ class Controller(object): for node in nodes: if not self.error_limited(node): yield node + handoffs = 0 for node in ring.get_more_nodes(partition): if not self.error_limited(node): + handoffs += 1 + if self.app.log_handoffs: + self.app.logger.increment('handoff_count') + self.app.logger.warning( + 'Handoff requested (%d)' % handoffs) + if handoffs == len(nodes): + self.app.logger.increment('handoff_all_count') yield node - def _make_request(self, nodes, part, method, path, headers, query): + def _make_request(self, nodes, part, method, path, headers, query, + logger_thread_locals): + self.app.logger.thread_locals = logger_thread_locals for node in nodes: try: with ConnectionTimeout(self.app.conn_timeout): @@ -591,7 +601,7 @@ class Controller(object): pile = GreenPile(len(start_nodes)) for head in headers: pile.spawn(self._make_request, nodes, part, method, path, - head, query_string) + head, query_string, self.app.logger.thread_locals) response = [resp for resp in pile if resp] while len(response) < len(start_nodes): response.append((HTTP_SERVICE_UNAVAILABLE, '', '')) @@ -642,7 +652,7 @@ class Controller(object): """Handler for HTTP HEAD requests.""" return self.GETorHEAD(req, stats_type='HEAD') - def _make_app_iter_reader(self, node, source, queue): + def _make_app_iter_reader(self, node, source, queue, logger_thread_locals): """ Reads from the source and places data in the queue. It expects something else be reading from the queue and, if nothing does within @@ -652,7 +662,11 @@ class Controller(object): logging/error-limiting purposes. :param source: The httplib.Response object to read from. :param queue: The eventlet.queue.Queue to place read source data into. + :param logger_thread_locals: The thread local values to be set on the + self.app.logger to retain transaction + logging information. """ + self.app.logger.thread_locals = logger_thread_locals try: try: while True: @@ -708,7 +722,8 @@ class Controller(object): # We then drop any reference to the source or node, for garbage # collection purposes. queue = Queue(1) - spawn_n(self._make_app_iter_reader, node, source, queue) + spawn_n(self._make_app_iter_reader, node, source, queue, + self.app.logger.thread_locals) source = node = None while True: chunk = queue.get(timeout=self.app.node_timeout) @@ -1101,8 +1116,10 @@ class ObjectController(Controller): _('Trying to write to %s') % path) conn.queue.task_done() - def _connect_put_node(self, nodes, part, path, headers): + def _connect_put_node(self, nodes, part, path, headers, + logger_thread_locals): """Method for a file PUT connect""" + self.app.logger.thread_locals = logger_thread_locals for node in nodes: try: with ConnectionTimeout(self.app.conn_timeout): @@ -1312,7 +1329,7 @@ class ObjectController(Controller): nheaders['X-Delete-At-Partition'] = delete_at_part nheaders['X-Delete-At-Device'] = node['device'] pile.spawn(self._connect_put_node, node_iter, partition, - req.path_info, nheaders) + req.path_info, nheaders, self.app.logger.thread_locals) conns = [conn for conn in pile if conn] if len(conns) <= len(nodes) / 2: self.app.logger.error( @@ -1934,6 +1951,8 @@ class BaseApplication(object): int(conf.get('rate_limit_after_segment', 10)) self.rate_limit_segments_per_sec = \ int(conf.get('rate_limit_segments_per_sec', 1)) + self.log_handoffs = \ + conf.get('log_handoffs', 'true').lower() in TRUE_VALUES def get_controller(self, path): """ diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index e7d17c8487..1d0b2f3fa6 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -96,10 +96,12 @@ class MockSys(): def reset_loggers(): if hasattr(utils.get_logger, 'handler4logger'): for logger, handler in utils.get_logger.handler4logger.items(): + logger.thread_locals = (None, None) logger.removeHandler(handler) delattr(utils.get_logger, 'handler4logger') if hasattr(utils.get_logger, 'console_handler4logger'): for logger, h in utils.get_logger.console_handler4logger.items(): + logger.thread_locals = (None, None) logger.removeHandler(h) delattr(utils.get_logger, 'console_handler4logger') @@ -1159,6 +1161,20 @@ class TestStatsdLoggingDelegation(unittest.TestCase): utils.get_valid_utf8_str(unicode_sample)) self.assertEquals('\xef\xbf\xbd\xef\xbf\xbd\xec\xbc\x9d\xef\xbf\xbd', utils.get_valid_utf8_str(invalid_utf8_str)) + + def test_thread_locals(self): + logger = utils.get_logger(None) + orig_thread_locals = logger.thread_locals + try: + self.assertEquals(logger.thread_locals, (None, None)) + logger.txn_id = '1234' + logger.client_ip = '1.2.3.4' + self.assertEquals(logger.thread_locals, ('1234', '1.2.3.4')) + logger.txn_id = '5678' + logger.client_ip = '5.6.7.8' + self.assertEquals(logger.thread_locals, ('5678', '5.6.7.8')) + finally: + logger.thread_locals = orig_thread_locals if __name__ == '__main__': diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index 55557e6506..329b4597ef 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -354,8 +354,9 @@ class TestController(unittest.TestCase): self.controller.account_info(self.account) proxy_server.http_connect = fake_http_connect(201, raise_timeout_exc=True) - self.controller._make_request(nodes, partition, 'POST', - '/', '', '') + self.controller._make_request( + nodes, partition, 'POST', '/', '', '', + self.controller.app.logger.thread_locals) # tests if 200 is cached and used def test_account_info_200(self): @@ -1264,6 +1265,37 @@ class TestObjectController(unittest.TestCase): self.app.object_ring): collected_nodes.append(node) self.assertEquals(len(collected_nodes), 9) + + self.app.log_handoffs = True + self.app.logger = FakeLogger() + self.app.object_ring.max_more_nodes = 2 + controller = proxy_server.ObjectController(self.app, 'account', + 'container', 'object') + partition, nodes = self.app.object_ring.get_nodes('account', + 'container', 'object') + collected_nodes = [] + for node in controller.iter_nodes(partition, nodes, + self.app.object_ring): + collected_nodes.append(node) + self.assertEquals(len(collected_nodes), 5) + self.assertEquals( + self.app.logger.log_dict['warning'], + [(('Handoff requested (1)',), {}), + (('Handoff requested (2)',), {})]) + + self.app.log_handoffs = False + self.app.logger = FakeLogger() + self.app.object_ring.max_more_nodes = 2 + controller = proxy_server.ObjectController(self.app, 'account', + 'container', 'object') + partition, nodes = self.app.object_ring.get_nodes('account', + 'container', 'object') + collected_nodes = [] + for node in controller.iter_nodes(partition, nodes, + self.app.object_ring): + collected_nodes.append(node) + self.assertEquals(len(collected_nodes), 5) + self.assertEquals(self.app.logger.log_dict['warning'], []) finally: self.app.object_ring.max_more_nodes = 0 @@ -3197,7 +3229,8 @@ class TestObjectController(unittest.TestCase): with save_globals(): given_headers = {} - def fake_connect_put_node(nodes, part, path, headers): + def fake_connect_put_node(nodes, part, path, headers, + logger_thread_locals): given_headers.update(headers) controller = proxy_server.ObjectController(self.app, 'account',