From c3caf83129b09ddad10c68daf2758a404074a8e3 Mon Sep 17 00:00:00 2001 From: Clark Boylan Date: Wed, 3 Sep 2014 16:02:42 -0700 Subject: [PATCH] Handle log processing subprocess cleanup better We are leaking file descriptors in our log worker processes because we are are not catch all possible errors leaving some actions left behind to do. More aggressively catch errors so that all cleanup happens Change-Id: I7a73a36c6fc42d4eba636cf36c8cfffcea48a318 --- .../log_processor/files/log-gearman-worker.py | 67 +++++++++++-------- 1 file changed, 39 insertions(+), 28 deletions(-) diff --git a/modules/log_processor/files/log-gearman-worker.py b/modules/log_processor/files/log-gearman-worker.py index 0a418d701c..66007cb770 100644 --- a/modules/log_processor/files/log-gearman-worker.py +++ b/modules/log_processor/files/log-gearman-worker.py @@ -92,13 +92,21 @@ class CRM114Filter(object): r = r.strip() data['error_pr'] = float(r) + def _catchOSError(self, method): + try: + method() + except OSError: + logging.exception("Subprocess cleanup failed.") + def close(self): if not self.p: return - self.p.stdin.close() - self.p.stdout.read() - self.p.stderr.read() - self.p.wait() + # CRM114 should die when its stdinput is closed. Close that + # fd along with stdout and stderr then return. + self._catchOSError(self.p.stdin.close) + self._catchOSError(self.p.stdout.close) + self._catchOSError(self.p.stderr.close) + self._catchOSError(self.p.wait) class CRM114FilterFactory(object): @@ -143,31 +151,34 @@ class LogRetriever(threading.Thread): # discarded by zuul. log_lines = self._retrieve_log(source_url, retry) - filters = [] - for f in self.filters: - logging.debug("Adding filter: %s" % f.name) - filters.append(f.create(fields)) - all_filters = filters + try: + all_filters = [] + for f in self.filters: + logging.debug("Adding filter: %s" % f.name) + all_filters.append(f.create(fields)) + filters = all_filters - logging.debug("Pushing " + str(len(log_lines)) + " log lines.") - base_event = {} - base_event.update(fields) - base_event["tags"] = tags - for line in log_lines: - out_event = base_event.copy() - out_event["message"] = line - new_filters = [] - for f in filters: - try: - f.process(out_event) - new_filters.append(f) - except FilterException: - logging.exception("Exception filtering event: " - "%s" % line.encode("utf-8")) - filters = new_filters - self.logq.put(out_event) - for f in all_filters: - f.close() + logging.debug("Pushing " + str(len(log_lines)) + + " log lines.") + base_event = {} + base_event.update(fields) + base_event["tags"] = tags + for line in log_lines: + out_event = base_event.copy() + out_event["message"] = line + new_filters = [] + for f in filters: + try: + f.process(out_event) + new_filters.append(f) + except FilterException: + logging.exception("Exception filtering event: " + "%s" % line.encode("utf-8")) + filters = new_filters + self.logq.put(out_event) + finally: + for f in all_filters: + f.close() job.sendWorkComplete() except Exception as e: logging.exception("Exception handling log event.")