Merge "Handle log processing subprocess cleanup better"
This commit is contained in:
commit
e86cd862c8
@ -92,13 +92,21 @@ class CRM114Filter(object):
|
||||
r = r.strip()
|
||||
data['error_pr'] = float(r)
|
||||
|
||||
def _catchOSError(self, method):
|
||||
try:
|
||||
method()
|
||||
except OSError:
|
||||
logging.exception("Subprocess cleanup failed.")
|
||||
|
||||
def close(self):
|
||||
if not self.p:
|
||||
return
|
||||
self.p.stdin.close()
|
||||
self.p.stdout.read()
|
||||
self.p.stderr.read()
|
||||
self.p.wait()
|
||||
# CRM114 should die when its stdinput is closed. Close that
|
||||
# fd along with stdout and stderr then return.
|
||||
self._catchOSError(self.p.stdin.close)
|
||||
self._catchOSError(self.p.stdout.close)
|
||||
self._catchOSError(self.p.stderr.close)
|
||||
self._catchOSError(self.p.wait)
|
||||
|
||||
|
||||
class CRM114FilterFactory(object):
|
||||
@ -143,31 +151,34 @@ class LogRetriever(threading.Thread):
|
||||
# discarded by zuul.
|
||||
log_lines = self._retrieve_log(source_url, retry)
|
||||
|
||||
filters = []
|
||||
for f in self.filters:
|
||||
logging.debug("Adding filter: %s" % f.name)
|
||||
filters.append(f.create(fields))
|
||||
all_filters = filters
|
||||
try:
|
||||
all_filters = []
|
||||
for f in self.filters:
|
||||
logging.debug("Adding filter: %s" % f.name)
|
||||
all_filters.append(f.create(fields))
|
||||
filters = all_filters
|
||||
|
||||
logging.debug("Pushing " + str(len(log_lines)) + " log lines.")
|
||||
base_event = {}
|
||||
base_event.update(fields)
|
||||
base_event["tags"] = tags
|
||||
for line in log_lines:
|
||||
out_event = base_event.copy()
|
||||
out_event["message"] = line
|
||||
new_filters = []
|
||||
for f in filters:
|
||||
try:
|
||||
f.process(out_event)
|
||||
new_filters.append(f)
|
||||
except FilterException:
|
||||
logging.exception("Exception filtering event: "
|
||||
"%s" % line.encode("utf-8"))
|
||||
filters = new_filters
|
||||
self.logq.put(out_event)
|
||||
for f in all_filters:
|
||||
f.close()
|
||||
logging.debug("Pushing " + str(len(log_lines)) +
|
||||
" log lines.")
|
||||
base_event = {}
|
||||
base_event.update(fields)
|
||||
base_event["tags"] = tags
|
||||
for line in log_lines:
|
||||
out_event = base_event.copy()
|
||||
out_event["message"] = line
|
||||
new_filters = []
|
||||
for f in filters:
|
||||
try:
|
||||
f.process(out_event)
|
||||
new_filters.append(f)
|
||||
except FilterException:
|
||||
logging.exception("Exception filtering event: "
|
||||
"%s" % line.encode("utf-8"))
|
||||
filters = new_filters
|
||||
self.logq.put(out_event)
|
||||
finally:
|
||||
for f in all_filters:
|
||||
f.close()
|
||||
job.sendWorkComplete()
|
||||
except Exception as e:
|
||||
logging.exception("Exception handling log event.")
|
||||
|
Loading…
x
Reference in New Issue
Block a user