diff --git a/modules/log_processor/files/log-gearman-worker.py b/modules/log_processor/files/log-gearman-worker.py index 0a418d701c..66007cb770 100644 --- a/modules/log_processor/files/log-gearman-worker.py +++ b/modules/log_processor/files/log-gearman-worker.py @@ -92,13 +92,21 @@ class CRM114Filter(object): r = r.strip() data['error_pr'] = float(r) + def _catchOSError(self, method): + try: + method() + except OSError: + logging.exception("Subprocess cleanup failed.") + def close(self): if not self.p: return - self.p.stdin.close() - self.p.stdout.read() - self.p.stderr.read() - self.p.wait() + # CRM114 should die when its stdinput is closed. Close that + # fd along with stdout and stderr then return. + self._catchOSError(self.p.stdin.close) + self._catchOSError(self.p.stdout.close) + self._catchOSError(self.p.stderr.close) + self._catchOSError(self.p.wait) class CRM114FilterFactory(object): @@ -143,31 +151,34 @@ class LogRetriever(threading.Thread): # discarded by zuul. log_lines = self._retrieve_log(source_url, retry) - filters = [] - for f in self.filters: - logging.debug("Adding filter: %s" % f.name) - filters.append(f.create(fields)) - all_filters = filters + try: + all_filters = [] + for f in self.filters: + logging.debug("Adding filter: %s" % f.name) + all_filters.append(f.create(fields)) + filters = all_filters - logging.debug("Pushing " + str(len(log_lines)) + " log lines.") - base_event = {} - base_event.update(fields) - base_event["tags"] = tags - for line in log_lines: - out_event = base_event.copy() - out_event["message"] = line - new_filters = [] - for f in filters: - try: - f.process(out_event) - new_filters.append(f) - except FilterException: - logging.exception("Exception filtering event: " - "%s" % line.encode("utf-8")) - filters = new_filters - self.logq.put(out_event) - for f in all_filters: - f.close() + logging.debug("Pushing " + str(len(log_lines)) + + " log lines.") + base_event = {} + base_event.update(fields) + base_event["tags"] = tags + for line in log_lines: + out_event = base_event.copy() + out_event["message"] = line + new_filters = [] + for f in filters: + try: + f.process(out_event) + new_filters.append(f) + except FilterException: + logging.exception("Exception filtering event: " + "%s" % line.encode("utf-8")) + filters = new_filters + self.logq.put(out_event) + finally: + for f in all_filters: + f.close() job.sendWorkComplete() except Exception as e: logging.exception("Exception handling log event.")