Merge "Process logs with CRM114"
This commit is contained in:
commit
83f84a7cc5
90
modules/openstack_project/files/logstash/classify-log.crm
Executable file
90
modules/openstack_project/files/logstash/classify-log.crm
Executable file
@ -0,0 +1,90 @@
|
|||||||
|
#! /usr/bin/crm
|
||||||
|
#
|
||||||
|
# Copyright 2013 OpenStack Foundation
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
# This script trains an OSB (Orthogonal Sparse Bigram) bayesian filter
|
||||||
|
# with log lines from test runs and classifies each line according to
|
||||||
|
# the likelyhood it indicates an error. Very little experimentation
|
||||||
|
# has been done to determine the best classifier and training method;
|
||||||
|
# further experimentation may be useful.
|
||||||
|
|
||||||
|
# The training method is TET -- Train Every Thing. This is not
|
||||||
|
# normally advised as a training method for Bayesian filters. In
|
||||||
|
# experiments, it identified about twice as many lines as being
|
||||||
|
# associated with errers as were indicated by a TOE (Train On Error)
|
||||||
|
# method. Some of them were false positives, but many were not, and
|
||||||
|
# of those, it had a much higher (pR ~= 37) confidence in them than
|
||||||
|
# TOE. TET seems to give qualitatively better results when filtering
|
||||||
|
# for higher pR values.
|
||||||
|
|
||||||
|
# Set unbuffered IO
|
||||||
|
window
|
||||||
|
|
||||||
|
# Base component of path to data files
|
||||||
|
isolate (:prefix:) /:*:_arg2:/
|
||||||
|
|
||||||
|
# Whether this run is for a SUCCESS or FAILURE result
|
||||||
|
isolate (:target:) /:*:_arg3:/
|
||||||
|
|
||||||
|
# Train each file on a newline just to make sure it exists
|
||||||
|
learn [:_nl:] <osb unique microgroom> (:*:prefix:/SUCCESS.css)
|
||||||
|
learn [:_nl:] <osb unique microgroom> (:*:prefix:/FAILURE.css)
|
||||||
|
{
|
||||||
|
# Iterate over each line
|
||||||
|
window <bychar> /\n/ /\n/
|
||||||
|
{
|
||||||
|
isolate (:stats:)
|
||||||
|
isolate (:result:)
|
||||||
|
isolate (:prob:)
|
||||||
|
isolate (:pr:)
|
||||||
|
# Save a copy of this line
|
||||||
|
isolate (:line:) /:*:_dw:/
|
||||||
|
{
|
||||||
|
{
|
||||||
|
# Remove things that look like timestamps from the beginning of the line
|
||||||
|
match (:timestamp:) /^[-.0-9 |:]+/
|
||||||
|
alter (:timestamp:) //
|
||||||
|
}
|
||||||
|
# Train on the line
|
||||||
|
learn <osb unique microgroom> (:*:prefix:/:*:target:.css)
|
||||||
|
# Classify the line to see if it looks more like a SUCCESS or FAILURE line
|
||||||
|
classify <osb unique microgroom> (:*:prefix:/SUCCESS.css :*:prefix:/FAILURE.css) (:stats:)
|
||||||
|
{
|
||||||
|
# The stats variable looks like:
|
||||||
|
# CLASSIFY succeeds; success probability: 1.0000 pR: 304.6527
|
||||||
|
# Best match to file #0 (/tmp/crm114/console_html/SUCCESS.css) prob: 0.9933 pR: 2.1720
|
||||||
|
# Total features in input file: 20
|
||||||
|
# #0 (/tmp/crm114/console_html/SUCCESS.css): features: 3544235, hits: 901854, prob: 9.93e-01, pR: 2.17
|
||||||
|
# #1 (/tmp/crm114/console_html/FAILURE.css): features: 1, hits: 0, prob: 6.69e-03, pR: -2.17
|
||||||
|
# Pull out the filename, probability, and pR (a kind of logarithmic probability, see CRM docs)
|
||||||
|
match [:stats:] <nomultiline> /^Best match to .*\/([A-Za-z]+).css\) prob: ([-.0-9]+) pR: ([-.0-9]+)/ ( :: :result: :prob: :pr: )
|
||||||
|
{
|
||||||
|
# If this line is classified as FAILURE, negate
|
||||||
|
# the pR value (which will always be positive).
|
||||||
|
# Do this by prepending a '-' or the empty string.
|
||||||
|
{
|
||||||
|
match [:result:] /FAILURE/
|
||||||
|
alter (:result:) /-/
|
||||||
|
} alius {
|
||||||
|
alter (:result:) //
|
||||||
|
}
|
||||||
|
}
|
||||||
|
# Output the sign and pR value for this line.
|
||||||
|
output /:*:result::*:pr:\n/
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
liaf
|
||||||
|
}
|
@ -3,3 +3,5 @@ gearman-port: 4730
|
|||||||
output-host: localhost
|
output-host: localhost
|
||||||
output-port: 9999
|
output-port: 9999
|
||||||
output-mode: tcp
|
output-mode: tcp
|
||||||
|
crm114-script: /usr/local/bin/classify-log.crm
|
||||||
|
crm114-data: /var/lib/crm114
|
||||||
|
@ -21,8 +21,12 @@ import gear
|
|||||||
import gzip
|
import gzip
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
|
import os
|
||||||
import Queue
|
import Queue
|
||||||
|
import re
|
||||||
|
import select
|
||||||
import socket
|
import socket
|
||||||
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
@ -48,10 +52,68 @@ def semi_busy_wait(seconds):
|
|||||||
return
|
return
|
||||||
|
|
||||||
|
|
||||||
|
class CRM114Filter(object):
|
||||||
|
def __init__(self, script, path, build_status):
|
||||||
|
self.p = None
|
||||||
|
self.script = script
|
||||||
|
self.path = path
|
||||||
|
self.build_status = build_status
|
||||||
|
if build_status not in ['SUCCESS', 'FAILURE']:
|
||||||
|
return
|
||||||
|
if not os.path.exists(path):
|
||||||
|
os.makedirs(path)
|
||||||
|
args = [script, path, build_status]
|
||||||
|
self.p = subprocess.Popen(args,
|
||||||
|
stdout=subprocess.PIPE,
|
||||||
|
stderr=subprocess.PIPE,
|
||||||
|
stdin=subprocess.PIPE)
|
||||||
|
|
||||||
|
def process(self, data):
|
||||||
|
if not self.p:
|
||||||
|
return
|
||||||
|
self.p.stdin.write(data['message'].encode('utf-8') + '\n')
|
||||||
|
(r, w, x) = select.select([self.p.stdout], [],
|
||||||
|
[self.p.stdin, self.p.stdout], 20)
|
||||||
|
if not r:
|
||||||
|
self.p.kill()
|
||||||
|
raise Exception('Timeout reading from CRM114')
|
||||||
|
r = self.p.stdout.readline()
|
||||||
|
if not r:
|
||||||
|
err = self.p.stderr.read()
|
||||||
|
if err:
|
||||||
|
raise Exception(err)
|
||||||
|
else:
|
||||||
|
raise Exception('Early EOF from CRM114')
|
||||||
|
r = r.strip()
|
||||||
|
data['error_pr'] = float(r)
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
if not self.p:
|
||||||
|
return
|
||||||
|
self.p.stdin.close()
|
||||||
|
self.p.stdout.read()
|
||||||
|
self.p.stderr.read()
|
||||||
|
self.p.wait()
|
||||||
|
|
||||||
|
|
||||||
|
class CRM114FilterFactory(object):
|
||||||
|
name = "CRM114"
|
||||||
|
|
||||||
|
def __init__(self, script, basepath):
|
||||||
|
self.script = script
|
||||||
|
self.basepath = basepath
|
||||||
|
|
||||||
|
def create(self, fields):
|
||||||
|
filename = re.sub('\.', '_', fields['filename'])
|
||||||
|
path = os.path.join(self.basepath, filename)
|
||||||
|
return CRM114Filter(self.script, path, fields['build_status'])
|
||||||
|
|
||||||
|
|
||||||
class LogRetriever(threading.Thread):
|
class LogRetriever(threading.Thread):
|
||||||
def __init__(self, gearman_worker, logq):
|
def __init__(self, gearman_worker, filters, logq):
|
||||||
threading.Thread.__init__(self)
|
threading.Thread.__init__(self)
|
||||||
self.gearman_worker = gearman_worker
|
self.gearman_worker = gearman_worker
|
||||||
|
self.filters = filters
|
||||||
self.logq = logq
|
self.logq = logq
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
@ -76,6 +138,11 @@ class LogRetriever(threading.Thread):
|
|||||||
# discarded by zuul.
|
# discarded by zuul.
|
||||||
log_lines = self._retrieve_log(source_url, retry)
|
log_lines = self._retrieve_log(source_url, retry)
|
||||||
|
|
||||||
|
filters = []
|
||||||
|
for f in self.filters:
|
||||||
|
logging.debug("Adding filter: %s" % f.name)
|
||||||
|
filters.append(f.create(fields))
|
||||||
|
|
||||||
logging.debug("Pushing " + str(len(log_lines)) + " log lines.")
|
logging.debug("Pushing " + str(len(log_lines)) + " log lines.")
|
||||||
base_event = {}
|
base_event = {}
|
||||||
base_event.update(fields)
|
base_event.update(fields)
|
||||||
@ -83,7 +150,11 @@ class LogRetriever(threading.Thread):
|
|||||||
for line in log_lines:
|
for line in log_lines:
|
||||||
out_event = base_event.copy()
|
out_event = base_event.copy()
|
||||||
out_event["message"] = line
|
out_event["message"] = line
|
||||||
|
for f in filters:
|
||||||
|
f.process(out_event)
|
||||||
self.logq.put(out_event)
|
self.logq.put(out_event)
|
||||||
|
for f in filters:
|
||||||
|
f.close()
|
||||||
job.sendWorkComplete()
|
job.sendWorkComplete()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.exception("Exception handling log event.")
|
logging.exception("Exception handling log event.")
|
||||||
@ -248,6 +319,12 @@ class Server(object):
|
|||||||
self.retriever = None
|
self.retriever = None
|
||||||
self.logqueue = Queue.Queue(131072)
|
self.logqueue = Queue.Queue(131072)
|
||||||
self.processor = None
|
self.processor = None
|
||||||
|
self.filter_factories = []
|
||||||
|
crmscript = self.config.get('crm114-script')
|
||||||
|
crmdata = self.config.get('crm114-data')
|
||||||
|
if crmscript and crmdata:
|
||||||
|
self.filter_factories.append(
|
||||||
|
CRM114FilterFactory(crmscript, crmdata))
|
||||||
|
|
||||||
def setup_logging(self):
|
def setup_logging(self):
|
||||||
if self.debuglog:
|
if self.debuglog:
|
||||||
@ -264,7 +341,8 @@ class Server(object):
|
|||||||
gearman_worker.addServer(self.gearman_host,
|
gearman_worker.addServer(self.gearman_host,
|
||||||
self.gearman_port)
|
self.gearman_port)
|
||||||
gearman_worker.registerFunction(b'push-log')
|
gearman_worker.registerFunction(b'push-log')
|
||||||
self.retriever = LogRetriever(gearman_worker, self.logqueue)
|
self.retriever = LogRetriever(gearman_worker, self.filter_factories,
|
||||||
|
self.logqueue)
|
||||||
|
|
||||||
def setup_processor(self):
|
def setup_processor(self):
|
||||||
if self.output_mode == "tcp":
|
if self.output_mode == "tcp":
|
||||||
|
@ -43,6 +43,10 @@ class openstack_project::logstash_worker (
|
|||||||
ensure => present,
|
ensure => present,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
package { 'crm114':
|
||||||
|
ensure => present,
|
||||||
|
}
|
||||||
|
|
||||||
include pip
|
include pip
|
||||||
package { 'gear':
|
package { 'gear':
|
||||||
ensure => latest,
|
ensure => latest,
|
||||||
@ -50,6 +54,24 @@ class openstack_project::logstash_worker (
|
|||||||
require => Class['pip'],
|
require => Class['pip'],
|
||||||
}
|
}
|
||||||
|
|
||||||
|
file { '/var/lib/crm114':
|
||||||
|
ensure => directory,
|
||||||
|
owner => 'logstash',
|
||||||
|
group => 'logstash',
|
||||||
|
require => User['logstash'],
|
||||||
|
}
|
||||||
|
|
||||||
|
file { '/usr/local/bin/classify-log.crm':
|
||||||
|
ensure => present,
|
||||||
|
owner => 'root',
|
||||||
|
group => 'root',
|
||||||
|
mode => '0755',
|
||||||
|
source => 'puppet:///modules/openstack_project/logstash/classify-log.crm',
|
||||||
|
require => [
|
||||||
|
Package['crm114'],
|
||||||
|
],
|
||||||
|
}
|
||||||
|
|
||||||
file { '/usr/local/bin/log-gearman-worker.py':
|
file { '/usr/local/bin/log-gearman-worker.py':
|
||||||
ensure => present,
|
ensure => present,
|
||||||
owner => 'root',
|
owner => 'root',
|
||||||
|
Loading…
x
Reference in New Issue
Block a user