diff --git a/modules/log_processor/files/log-gearman-worker.py b/modules/log_processor/files/log-gearman-worker.py index 64250991e6..23ca4dc21b 100644 --- a/modules/log_processor/files/log-gearman-worker.py +++ b/modules/log_processor/files/log-gearman-worker.py @@ -52,6 +52,10 @@ def semi_busy_wait(seconds): return +class FilterException(Exception): + pass + + class CRM114Filter(object): def __init__(self, script, path, build_status): self.p = None @@ -77,14 +81,14 @@ class CRM114Filter(object): [self.p.stdin, self.p.stdout], 20) if not r: self.p.kill() - raise Exception('Timeout reading from CRM114') + raise FilterException('Timeout reading from CRM114') r = self.p.stdout.readline() if not r: err = self.p.stderr.read() if err: - raise Exception(err) + raise FilterException(err) else: - raise Exception('Early EOF from CRM114') + raise FilterException('Early EOF from CRM114') r = r.strip() data['error_pr'] = float(r) @@ -143,6 +147,7 @@ class LogRetriever(threading.Thread): for f in self.filters: logging.debug("Adding filter: %s" % f.name) filters.append(f.create(fields)) + all_filters = filters logging.debug("Pushing " + str(len(log_lines)) + " log lines.") base_event = {} @@ -151,10 +156,17 @@ class LogRetriever(threading.Thread): for line in log_lines: out_event = base_event.copy() out_event["message"] = line + new_filters = [] for f in filters: - f.process(out_event) + try: + f.process(out_event) + new_filters.append(f) + except FilterException: + logging.exception("Exception filtering event: " + "%s" % line.encode("utf-8")) + filters = new_filters self.logq.put(out_event) - for f in filters: + for f in all_filters: f.close() job.sendWorkComplete() except Exception as e: