Handle log processing subprocess cleanup better

We are leaking file descriptors in our log worker processes because we
are are not catch all possible errors leaving some actions left behind
to do. More aggressively catch errors so that all cleanup happens

Change-Id: I7a73a36c6fc42d4eba636cf36c8cfffcea48a318
This commit is contained in:
Clark Boylan 2014-09-03 16:02:42 -07:00
parent 9a29e4ca4d
commit c3caf83129

View File

@ -92,13 +92,21 @@ class CRM114Filter(object):
r = r.strip() r = r.strip()
data['error_pr'] = float(r) data['error_pr'] = float(r)
def _catchOSError(self, method):
try:
method()
except OSError:
logging.exception("Subprocess cleanup failed.")
def close(self): def close(self):
if not self.p: if not self.p:
return return
self.p.stdin.close() # CRM114 should die when its stdinput is closed. Close that
self.p.stdout.read() # fd along with stdout and stderr then return.
self.p.stderr.read() self._catchOSError(self.p.stdin.close)
self.p.wait() self._catchOSError(self.p.stdout.close)
self._catchOSError(self.p.stderr.close)
self._catchOSError(self.p.wait)
class CRM114FilterFactory(object): class CRM114FilterFactory(object):
@ -143,31 +151,34 @@ class LogRetriever(threading.Thread):
# discarded by zuul. # discarded by zuul.
log_lines = self._retrieve_log(source_url, retry) log_lines = self._retrieve_log(source_url, retry)
filters = [] try:
for f in self.filters: all_filters = []
logging.debug("Adding filter: %s" % f.name) for f in self.filters:
filters.append(f.create(fields)) logging.debug("Adding filter: %s" % f.name)
all_filters = filters all_filters.append(f.create(fields))
filters = all_filters
logging.debug("Pushing " + str(len(log_lines)) + " log lines.") logging.debug("Pushing " + str(len(log_lines)) +
base_event = {} " log lines.")
base_event.update(fields) base_event = {}
base_event["tags"] = tags base_event.update(fields)
for line in log_lines: base_event["tags"] = tags
out_event = base_event.copy() for line in log_lines:
out_event["message"] = line out_event = base_event.copy()
new_filters = [] out_event["message"] = line
for f in filters: new_filters = []
try: for f in filters:
f.process(out_event) try:
new_filters.append(f) f.process(out_event)
except FilterException: new_filters.append(f)
logging.exception("Exception filtering event: " except FilterException:
"%s" % line.encode("utf-8")) logging.exception("Exception filtering event: "
filters = new_filters "%s" % line.encode("utf-8"))
self.logq.put(out_event) filters = new_filters
for f in all_filters: self.logq.put(out_event)
f.close() finally:
for f in all_filters:
f.close()
job.sendWorkComplete() job.sendWorkComplete()
except Exception as e: except Exception as e:
logging.exception("Exception handling log event.") logging.exception("Exception handling log event.")