From cb6e4669f0508f264a6f7fde19631d59f8516549 Mon Sep 17 00:00:00 2001 From: Clark Boylan Date: Mon, 10 Feb 2014 10:20:12 -0800 Subject: [PATCH] Handle log filter exceptions more gracefully. If there is an exception filtering a log event handle that by removing the filter and continuing to process the remaining log events for the assocaited file. This prevents non filter data from being lost when the filters have an exception. Change-Id: I65141daf21a873096829c41fdc2c77cbeecde2e3 --- .../log_processor/files/log-gearman-worker.py | 22 ++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/modules/log_processor/files/log-gearman-worker.py b/modules/log_processor/files/log-gearman-worker.py index 64250991e6..23ca4dc21b 100644 --- a/modules/log_processor/files/log-gearman-worker.py +++ b/modules/log_processor/files/log-gearman-worker.py @@ -52,6 +52,10 @@ def semi_busy_wait(seconds): return +class FilterException(Exception): + pass + + class CRM114Filter(object): def __init__(self, script, path, build_status): self.p = None @@ -77,14 +81,14 @@ class CRM114Filter(object): [self.p.stdin, self.p.stdout], 20) if not r: self.p.kill() - raise Exception('Timeout reading from CRM114') + raise FilterException('Timeout reading from CRM114') r = self.p.stdout.readline() if not r: err = self.p.stderr.read() if err: - raise Exception(err) + raise FilterException(err) else: - raise Exception('Early EOF from CRM114') + raise FilterException('Early EOF from CRM114') r = r.strip() data['error_pr'] = float(r) @@ -143,6 +147,7 @@ class LogRetriever(threading.Thread): for f in self.filters: logging.debug("Adding filter: %s" % f.name) filters.append(f.create(fields)) + all_filters = filters logging.debug("Pushing " + str(len(log_lines)) + " log lines.") base_event = {} @@ -151,10 +156,17 @@ class LogRetriever(threading.Thread): for line in log_lines: out_event = base_event.copy() out_event["message"] = line + new_filters = [] for f in filters: - f.process(out_event) + try: + f.process(out_event) + new_filters.append(f) + except FilterException: + logging.exception("Exception filtering event: " + "%s" % line.encode("utf-8")) + filters = new_filters self.logq.put(out_event) - for f in filters: + for f in all_filters: f.close() job.sendWorkComplete() except Exception as e: