Use gearman to distribute logstash log pushing.

This change reorgs the logstash log pushing so that there is a central
gearman server that listens to Jenkins ZMQ events which are then
converted to per log file gearman jobs which are processed by gearman
workers. The central gearman server will live on logstash.o.o and the
existing logstash-worker hosts will be converted to gearman log pusher
workers.

This commit includes relavent documentation changes.

Change-Id: I45f7185c2479c54b090d223408dff268e1e8d7db
Reviewed-on: https://review.openstack.org/32455
Reviewed-by: Jeremy Stanley <fungi@yuggoth.org>
Approved: Clark Boylan <clark.boylan@gmail.com>
Reviewed-by: Clark Boylan <clark.boylan@gmail.com>
Tested-by: Jenkins
This commit is contained in:
Clark Boylan 2013-06-10 12:06:58 -07:00 committed by Jenkins
parent b00077f351
commit 4aba312807
14 changed files with 880 additions and 649 deletions

View File

@ -46,11 +46,14 @@ System Architecture
There are four major layers in our Logstash setup.
1. Log Pusher Script.
1. Log Pusher Scripts.
Subscribes to the Jenkins ZeroMQ Event Publisher listening for build
finished events. When a build finishes this script fetches the logs
generated by that build, chops them up, annotates them with Jenkins
build info and finally sends them to a Logstash indexer process.
finished events. When a build finishes an event is received from
Jenkins which is then converted into Gearman jobs specific to that
event for each log file we care about. These jobs trigger Gearman
workers that then fetch the logs generated by that build, chop them
up, annotate them with Jenkins build info and finally sends them to a
Logstash indexer process.
2. Logstash Indexer.
Reads these log events from the log pusher, filters them to remove
unwanted lines, collapses multiline events together, and parses
@ -72,34 +75,47 @@ bottleneck very quickly. This looks something like:
::
_ logstash-worker1 _
/ \
jenkins -- logstash-worker2 -- elasticsearch -- kibana
\_ _/
logstash-worker3
jenkins
|
|
gearman-client
/ | \
/ | \
gearman gearman gearman
worker1 worker2 worker3
| | |
logstash logstash logstash
indexer1 indexer2 indexer3
\ | /
\ | /
elasticsearch
|
|
kibana
Log Pusher
----------
This is a simple Python script that is given a list of log files to push
to Logstash when Jenkins builds complete.
This is a pair of simple Python scripts. The first listens to Jenkins
build events and converts them into Gearman jobs and the second performs
Gearman jobs to push log files into logstash.
Log pushing looks like this:
* Jenkins publishes build complete notifications.
* Log pusher receives the notification from Jenkins.
* Using info in the notification log files are retrieved.
* Receive notification from Jenkins and convert to Gearman jobs.
* Using info in the Gearman job log files are retrieved.
* Log files are processed then shipped to Logstash.
In the near future this script will be modified to act as a Gearman
worker so that we can add an arbitrary number of them without needing
to partition the log files that each worker handles by hand. Instead
each worker will be able to fetch and push any log file and will do
so as directed by Gearman.
Using Gearman allows us to scale the number of log pushers
horizontally. It is as simple as adding another process that talks to
the Gearman server.
If you are interested in technical details The source of this script
If you are interested in technical details the source of these scripts
can be found at
:file:`modules/openstack_project/files/logstash/log-pusher.py`
* :file:`modules/openstack_project/files/logstash/log-gearman-client.py`
* :file:`modules/openstack_project/files/logstash/log-gearman-worker.py`
Logstash
--------

View File

@ -75,11 +75,7 @@ node 'jenkins.openstack.org' {
ssl_key_file_contents => hiera('jenkins_ssl_key_file_contents'),
ssl_chain_file_contents => hiera('jenkins_ssl_chain_file_contents'),
sysadmins => hiera('sysadmins'),
zmq_event_receivers => [
'logstash-worker1.openstack.org',
'logstash-worker2.openstack.org',
'logstash-worker3.openstack.org',
],
zmq_event_receivers => ['logstash.openstack.org'],
}
}
@ -207,6 +203,11 @@ node 'logstash.openstack.org' {
class { 'openstack_project::logstash':
sysadmins => hiera('sysadmins'),
elasticsearch_masters => ['elasticsearch.openstack.org'],
gearman_workers => [
'logstash-worker1.openstack.org',
'logstash-worker2.openstack.org',
'logstash-worker3.openstack.org',
],
}
}

View File

@ -1,11 +1,11 @@
#! /bin/sh
### BEGIN INIT INFO
# Provides: jenkins-log-pusher
# Provides: jenkins-log-client
# Required-Start: $remote_fs $syslog
# Required-Stop: $remote_fs $syslog
# Default-Start: 2 3 4 5
# Default-Stop: 0 1 6
# Short-Description: Jenkins Log Pusher
# Short-Description: Jenkins Log Client
# Description: Service to push Jenkins logs into logstash.
### END INIT INFO
@ -13,10 +13,10 @@
# PATH should only include /usr/* if it runs after the mountnfs.sh script
PATH=/sbin:/usr/sbin:/bin:/usr/bin
DESC="Jenkins Log Pusher"
NAME=jenkins-log-pusher
DAEMON=/usr/local/bin/log-pusher.py
DAEMON_ARGS='-c /etc/logstash/jenkins-log-pusher.yaml -d /var/log/logstash/pusher-debug.log'
DESC="Jenkins Log Client"
NAME=jenkins-log-client
DAEMON=/usr/local/bin/log-gearman-client.py
DAEMON_ARGS='-c /etc/logstash/jenkins-log-client.yaml -d /var/log/logstash/log-client-debug.log'
PIDFILE=/var/run/$NAME/$NAME.pid
SCRIPTNAME=/etc/init.d/$NAME
USER=logstash
@ -48,7 +48,6 @@ do_start()
chown $USER /var/run/$NAME
start-stop-daemon --start --quiet --pidfile $PIDFILE -c $USER --exec $DAEMON --test > /dev/null \
|| return 1
# Note using --background as log-pusher.py cannot daemonize itself yet.
start-stop-daemon --start --quiet --pidfile $PIDFILE -c $USER --exec $DAEMON -- \
$DAEMON_ARGS \
|| return 2

View File

@ -0,0 +1,98 @@
source-url: http://logs.openstack.org
# List of zmq event inputs.
zmq-publishers:
- tcp://jenkins.openstack.org:8888
# List of files to source logs from.
source-files:
- name: console.html
retry-get: True
- name: logs/screen-c-api.txt
tags:
- screen
- oslofmt
- name: logs/screen-c-sch.txt
tags:
- screen
- oslofmt
- name: logs/screen-c-vol.txt
tags:
- screen
- oslofmt
- name: logs/screen-g-api.txt
tags:
- screen
- oslofmt
- name: logs/screen-g-reg.txt
tags:
- screen
- oslofmt
- name: logs/screen-key.txt
tags:
- screen
- keystonefmt
- name: logs/screen-n-api.txt
tags:
- screen
- oslofmt
- name: logs/screen-n-cond.txt
tags:
- screen
- oslofmt
- name: logs/screen-n-cpu.txt
tags:
- screen
- oslofmt
- name: logs/screen-n-crt.txt
tags:
- screen
- oslofmt
- name: logs/screen-n-net.txt
tags:
- screen
- oslofmt
- name: logs/screen-n-obj.txt
tags:
- screen
- oslofmt
- name: logs/screen-n-sch.txt
tags:
- screen
- oslofmt
- name: logs/screen-q-agt.txt
tags:
- screen
- oslofmt
- name: logs/screen-q-dhcp.txt
tags:
- screen
- oslofmt
- name: logs/screen-q-l3.txt
tags:
- screen
- oslofmt
- name: logs/screen-q-meta.txt
tags:
- screen
- oslofmt
- name: logs/screen-q-svc.txt
tags:
- screen
- oslofmt
- name: logs/screen-s-account.txt
tags:
- screen
- apachecombined
- name: logs/screen-s-container.txt
tags:
- screen
- apachecombined
- name: logs/screen-s-object.txt
tags:
- screen
- apachecombined
# TODO(clarkb) Add swift proxy logs here.
- name: logs/syslog.txt
tags:
- syslog

View File

@ -0,0 +1,158 @@
#! /bin/sh
### BEGIN INIT INFO
# Provides: jenkins-log-worker
# Required-Start: $remote_fs $syslog
# Required-Stop: $remote_fs $syslog
# Default-Start: 2 3 4 5
# Default-Stop: 0 1 6
# Short-Description: Jenkins Log Worker
# Description: Service to push Jenkins logs into logstash.
### END INIT INFO
# Do NOT "set -e"
# PATH should only include /usr/* if it runs after the mountnfs.sh script
PATH=/sbin:/usr/sbin:/bin:/usr/bin
DESC="Jenkins Log Worker"
NAME=jenkins-log-worker
DAEMON=/usr/local/bin/log-gearman-worker.py
DAEMON_ARGS='-c /etc/logstash/jenkins-log-worker.yaml -d /var/log/logstash/log-worker-debug.log'
PIDFILE=/var/run/$NAME/$NAME.pid
SCRIPTNAME=/etc/init.d/$NAME
USER=logstash
# Exit if the package is not installed
[ -x "$DAEMON" ] || exit 0
# Read configuration variable file if it is present
[ -r /etc/default/$NAME ] && . /etc/default/$NAME
# Load the VERBOSE setting and other rcS variables
. /lib/init/vars.sh
# Define LSB log_* functions.
# Depend on lsb-base (>= 3.0-6) to ensure that this file is present.
. /lib/lsb/init-functions
#
# Function that starts the daemon/service
#
do_start()
{
# Return
# 0 if daemon has been started
# 1 if daemon was already running
# 2 if daemon could not be started
mkdir -p /var/run/$NAME
chown $USER /var/run/$NAME
start-stop-daemon --start --quiet --pidfile $PIDFILE -c $USER --exec $DAEMON --test > /dev/null \
|| return 1
start-stop-daemon --start --quiet --pidfile $PIDFILE -c $USER --exec $DAEMON -- \
$DAEMON_ARGS \
|| return 2
# Add code here, if necessary, that waits for the process to be ready
# to handle requests from services started subsequently which depend
# on this one. As a last resort, sleep for some time.
}
#
# Function that stops the daemon/service
#
do_stop()
{
# Return
# 0 if daemon has been stopped
# 1 if daemon was already stopped
# 2 if daemon could not be stopped
# other if a failure occurred
start-stop-daemon --stop --signal 9 --pidfile $PIDFILE
RETVAL="$?"
[ "$RETVAL" = 2 ] && return 2
rm -f /var/run/$NAME/*
return "$RETVAL"
}
#
# Function that stops the daemon/service
#
#do_graceful_stop()
#{
# PID=`cat $PIDFILE`
# kill -USR1 $PID
#
# # wait until really stopped
# if [ -n "${PID:-}" ]; then
# i=0
# while kill -0 "${PID:-}" 2> /dev/null; do
# if [ $i -eq '0' ]; then
# echo -n " ... waiting "
# else
# echo -n "."
# fi
# i=$(($i+1))
# sleep 1
# done
# fi
#
# rm -f /var/run/$NAME/*
#}
#
# Function that sends a SIGHUP to the daemon/service
#
#do_reload() {
# #
# # If the daemon can reload its configuration without
# # restarting (for example, when it is sent a SIGHUP),
# # then implement that here.
# #
# start-stop-daemon --stop --signal 1 --quiet --pidfile $PIDFILE --name zuul-server
# return 0
#}
case "$1" in
start)
[ "$VERBOSE" != no ] && log_daemon_msg "Starting $DESC" "$NAME"
do_start
case "$?" in
0|1) [ "$VERBOSE" != no ] && log_end_msg 0 ;;
2) [ "$VERBOSE" != no ] && log_end_msg 1 ;;
esac
;;
stop)
[ "$VERBOSE" != no ] && log_daemon_msg "Stopping $DESC" "$NAME"
do_stop
case "$?" in
0|1) [ "$VERBOSE" != no ] && log_end_msg 0 ;;
2) [ "$VERBOSE" != no ] && log_end_msg 1 ;;
esac
;;
status)
status_of_proc "$DAEMON" "$NAME" && exit 0 || exit $?
;;
# reload)
# #
# # If do_reload() is not implemented then leave this commented out
# # and leave 'force-reload' as an alias for 'restart'.
# #
# log_daemon_msg "Reloading $DESC" "$NAME"
# do_reload
# log_end_msg $?
# ;;
restart|force-reload)
#
# If the "reload" option is implemented then remove the
# 'force-reload' alias
#
log_daemon_msg "Restarting $DESC" "$NAME"
do_stop
do_start
;;
*)
echo "Usage: $SCRIPTNAME {start|stop|status|restart|force-reload}" >&2
exit 3
;;
esac
:

View File

@ -0,0 +1,5 @@
gearman-host: logstash.openstack.org
gearman-port: 4730
output-host: localhost
output-port: 9999
output-mode: tcp

View File

@ -0,0 +1,187 @@
#!/usr/bin/python2
#
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import argparse
import daemon
import gear
import json
import logging
import signal
import threading
import yaml
import zmq
try:
import daemon.pidlockfile as pidfile_mod
except ImportError:
import daemon.pidfile as pidfile_mod
class EventProcessor(threading.Thread):
def __init__(self, zmq_address, gearman_client, files, source_url):
threading.Thread.__init__(self)
self.files = files
self.source_url = source_url
self.gearman_client = gearman_client
self.zmq_address = zmq_address
self._connect_zmq()
def run(self):
while True:
try:
self._read_event()
except:
# Assume that an error reading data from zmq or deserializing
# data received from zmq indicates a zmq error and reconnect.
logging.exception("ZMQ exception.")
self._connect_zmq()
def _connect_zmq(self):
logging.debug("Connecting to zmq endpoint.")
self.context = zmq.Context()
self.socket = self.context.socket(zmq.SUB)
event_filter = b"onFinalized"
self.socket.setsockopt(zmq.SUBSCRIBE, event_filter)
self.socket.connect(self.zmq_address)
def _read_event(self):
string = self.socket.recv().decode('utf-8')
event = json.loads(string.split(None, 1)[1])
logging.debug("Jenkins event received: " + json.dumps(event))
for fileopts in self.files:
output = {}
source_url, out_event = self._parse_event(event, fileopts)
output['source_url'] = source_url
output['retry'] = fileopts.get('retry-get', False)
output['event'] = out_event
job = gear.Job(b'push-log', json.dumps(output).encode('utf8'))
try:
self.gearman_client.submitJob(job)
except:
logging.exception("Exception submitting job to Gearman.")
log_dirs = {
'check': "/{build_change}/{build_patchset}/{build_queue}/"
"{build_name}/{build_number}/",
'gate': "/{build_change}/{build_patchset}/{build_queue}/"
"{build_name}/{build_number}/",
'post': "/{build_shortref}/{build_queue}/{build_name}/"
"{build_number}/",
'pre-release': "/{build_shortref}/{build_queue}/{build_name}/"
"{build_number}/",
'release': "/{build_shortref}/{build_queue}/{build_name}/"
"{build_number}/",
'UNKNOWN': "/periodic/{build_name}/{build_number}/",
}
def _parse_fields(self, event, filename):
fields = {}
fields["filename"] = filename
fields["build_name"] = event.get("name", "UNKNOWN")
fields["build_status"] = event["build"].get("status", "UNKNOWN")
fields["build_number"] = event["build"].get("number", "UNKNOWN")
parameters = event["build"].get("parameters", {})
fields["build_queue"] = parameters.get("ZUUL_PIPELINE", "UNKNOWN")
if fields["build_queue"] in ["check", "gate"]:
fields["build_change"] = parameters.get("ZUUL_CHANGE", "UNKNOWN")
fields["build_patchset"] = parameters.get("ZUUL_PATCHSET",
"UNKNOWN")
elif fields["build_queue"] in ["post", "pre-release", "release"]:
fields["build_shortref"] = parameters.get("ZUUL_SHORT_NEWREV",
"UNKNOWN")
return fields
def _parse_event(self, event, fileopts):
fields = self._parse_fields(event, fileopts['name'])
log_dir = self.log_dirs.get(fields["build_queue"], "").format(**fields)
source_url = fileopts.get('source-url', self.source_url) + \
log_dir + fileopts['name']
out_event = {}
out_event["@fields"] = fields
out_event["@tags"] = [fileopts['name']] + fileopts.get('tags', [])
return source_url, out_event
class Server(object):
def __init__(self, config, debuglog):
# Config init.
self.config = config
self.source_url = self.config['source-url']
# Pythong logging output file.
self.debuglog = debuglog
self.processors = []
def setup_logging(self):
if self.debuglog:
logging.basicConfig(format='%(asctime)s %(message)s',
filename=self.debuglog, level=logging.DEBUG)
else:
# Prevent leakage into the logstash log stream.
logging.basicConfig(level=logging.CRITICAL)
logging.debug("Log pusher starting.")
def setup_processors(self):
for publisher in self.config['zmq-publishers']:
gearclient = gear.Client()
gearclient.addServer('localhost')
gearclient.waitForServer()
processor = EventProcessor(publisher, gearclient,
self.config['source-files'], self.source_url)
self.processors.append(processor)
def main(self):
self.gearserver = gear.Server()
self.setup_processors()
for processor in self.processors:
processor.daemon = True
processor.start()
while True:
signal.pause()
def main():
parser = argparse.ArgumentParser()
parser.add_argument("-c", "--config", required=True,
help="Path to yaml config file.")
parser.add_argument("-d", "--debuglog",
help="Enable debug log. "
"Specifies file to write log to.")
parser.add_argument("--foreground", action='store_true',
help="Run in the foreground.")
parser.add_argument("-p", "--pidfile",
default="/var/run/jenkins-log-pusher/"
"jenkins-log-gearman-client.pid",
help="PID file to lock during daemonization.")
args = parser.parse_args()
with open(args.config, 'r') as config_stream:
config = yaml.load(config_stream)
server = Server(config, args.debuglog)
if args.foreground:
server.setup_logging()
server.main()
else:
pidfile = pidfile_mod.TimeoutPIDLockFile(args.pidfile, 10)
with daemon.DaemonContext(pidfile=pidfile):
server.setup_logging()
server.main()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,301 @@
#!/usr/bin/python2
#
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import argparse
import daemon
import gear
import gzip
import json
import logging
import Queue
import socket
import sys
import threading
import time
import urllib2
import yaml
try:
import daemon.pidlockfile as pidfile_mod
except ImportError:
import daemon.pidfile as pidfile_mod
def semi_busy_wait(seconds):
# time.sleep() may return early. If it does sleep() again and repeat
# until at least the number of seconds specified has elapsed.
start_time = time.time()
while True:
time.sleep(seconds)
cur_time = time.time()
seconds = seconds - (cur_time - start_time)
if seconds <= 0.0:
return
class LogRetriever(threading.Thread):
def __init__(self, gearman_worker, logq):
threading.Thread.__init__(self)
self.gearman_worker = gearman_worker
self.logq = logq
def run(self):
while True:
try:
self._handle_event()
except:
logging.exception("Exception retrieving log event.")
def _handle_event(self):
job = self.gearman_worker.getJob()
try:
arguments = json.loads(job.arguments.decode('utf-8'))
source_url = arguments['source_url']
retry = arguments['retry']
event = arguments['event']
logging.debug("Handling event: " + json.dumps(event))
fields = event['@fields']
tags = event['@tags']
if fields['build_status'] != 'ABORTED':
# Handle events ignoring aborted builds. These builds are
# discarded by zuul.
log_lines = self._retrieve_log(source_url, retry)
logging.debug("Pushing " + str(len(log_lines)) + " log lines.")
for line in log_lines:
out_event = {}
out_event["@fields"] = fields
out_event["@tags"] = tags
out_event["event_message"] = line
self.logq.put(out_event)
job.sendWorkComplete()
except:
job.sendWorkFail()
def _retrieve_log(self, source_url, retry):
# TODO (clarkb): This should check the content type instead of file
# extension for determining if gzip was used.
gzipped = False
raw_buf = b''
try:
gzipped, raw_buf = self._get_log_data(source_url, retry)
except:
# Silently drop fatal errors when retrieving logs.
# TODO (clarkb): Handle these errors.
# Perhaps simply add a log message to raw_buf?
logging.exception("Unable to get log data.")
if gzipped:
logging.debug("Decompressing gzipped source file.")
buf = gzip.decompress(raw_buf).decode('utf-8')
else:
logging.debug("Decoding source file.")
buf = raw_buf.decode('utf-8')
return buf.splitlines()
def _get_log_data(self, source_url, retry):
gzipped = False
try:
logging.debug("Retrieving: " + source_url)
r = urllib2.urlopen(source_url)
except urllib2.URLError:
try:
logging.debug("Retrieving: " + source_url + ".gz")
r = urllib2.urlopen(source_url + ".gz")
gzipped = True
except:
logging.exception("Unable to retrieve source file.")
raise
except:
logging.exception("Unable to retrieve source file.")
raise
raw_buf = r.read()
# Hack to read all of Jenkins console logs as they upload
# asynchronously. Make one attempt per second for up to 60 seconds to
# retrieve the entire file. Short circuit when the end of file string
# for console logs, '\n</pre>\n', is read.
if (retry and not gzipped and
raw_buf[-8:].decode('utf-8') != '\n</pre>\n'):
content_len = len(raw_buf)
for i in range(60):
# Try for up to 60 seconds to retrieve the complete log file.
try:
logging.debug(str(i) + " Retrying fetch of: " + source_url)
logging.debug("Fetching bytes=" + str(content_len) + '-')
req = urllib2.Request(source_url)
req.add_header('Range', 'bytes=' + str(content_len) + '-')
r = urllib2.urlopen(req)
raw_buf += r.read()
content_len = len(raw_buf)
except urllib2.HTTPError as e:
if e.code == 416:
logging.exception("Index out of range.")
else:
raise
finally:
if raw_buf[-8:].decode('utf-8') == '\n</pre>\n':
break
semi_busy_wait(1)
return gzipped, raw_buf
class StdOutLogProcessor(object):
def __init__(self, logq, pretty_print=False):
self.logq = logq
self.pretty_print = pretty_print
def handle_log_event(self):
log = self.logq.get()
if self.pretty_print:
print(json.dumps(log, sort_keys=True,
indent=4, separators=(',', ': ')))
else:
print(json.dumps(log))
# Push each log event through to keep logstash up to date.
sys.stdout.flush()
class INETLogProcessor(object):
socket_type = None
def __init__(self, logq, host, port):
self.logq = logq
self.host = host
self.port = port
self._connect_socket()
def _connect_socket(self):
logging.debug("Creating socket.")
self.socket = socket.socket(socket.AF_INET, self.socket_type)
self.socket.connect((self.host, self.port))
def handle_log_event(self):
log = self.logq.get()
try:
self.socket.sendall((json.dumps(log) + '\n').encode('utf-8'))
except:
logging.exception("Exception sending INET event.")
# Logstash seems to take about a minute to start again. Wait 90
# seconds before attempting to reconnect. If logstash is not
# available after 90 seconds we will throw another exception and
# die.
semi_busy_wait(90)
self._connect_socket()
self.socket.sendall((json.dumps(log) + '\n').encode('utf-8'))
class UDPLogProcessor(INETLogProcessor):
socket_type = socket.SOCK_DGRAM
class TCPLogProcessor(INETLogProcessor):
socket_type = socket.SOCK_STREAM
class Server(object):
def __init__(self, config, debuglog):
# Config init.
self.config = config
self.gearman_host = self.config['gearman-host']
self.gearman_port = self.config['gearman-port']
self.output_host = self.config['output-host']
self.output_port = self.config['output-port']
self.output_mode = self.config['output-mode']
# Pythong logging output file.
self.debuglog = debuglog
self.retriever = None
self.logqueue = Queue.Queue(16384)
self.processor = None
def setup_logging(self):
if self.debuglog:
logging.basicConfig(format='%(asctime)s %(message)s',
filename=self.debuglog, level=logging.DEBUG)
else:
# Prevent leakage into the logstash log stream.
logging.basicConfig(level=logging.CRITICAL)
logging.debug("Log pusher starting.")
def setup_retriever(self):
hostname = socket.gethostname()
gearman_worker = gear.Worker(hostname + b'-pusher')
gearman_worker.addServer(self.gearman_host,
self.gearman_port)
gearman_worker.registerFunction(b'push-log')
self.retriever = LogRetriever(gearman_worker, self.logqueue)
def setup_processor(self):
if self.output_mode == "tcp":
self.processor = TCPLogProcessor(self.logqueue,
self.output_host,
self.output_port)
elif self.output_mode == "udp":
self.processor = UDPLogProcessor(self.logqueue,
self.output_host,
self.output_port)
else:
# Note this processor will not work if the process is run as a
# daemon. You must use the --foreground option.
self.processor = StdOutLogProcessor(self.logqueue)
def main(self):
self.setup_retriever()
self.setup_processor()
self.retriever.daemon = True
self.retriever.start()
while True:
try:
self.processor.handle_log_event()
except:
logging.exception("Exception processing log event.")
raise
def main():
parser = argparse.ArgumentParser()
parser.add_argument("-c", "--config", required=True,
help="Path to yaml config file.")
parser.add_argument("-d", "--debuglog",
help="Enable debug log. "
"Specifies file to write log to.")
parser.add_argument("--foreground", action='store_true',
help="Run in the foreground.")
parser.add_argument("-p", "--pidfile",
default="/var/run/jenkins-log-pusher/"
"jenkins-log-gearman-worker.pid",
help="PID file to lock during daemonization.")
args = parser.parse_args()
with open(args.config, 'r') as config_stream:
config = yaml.load(config_stream)
server = Server(config, args.debuglog)
if args.foreground:
server.setup_logging()
server.main()
else:
pidfile = pidfile_mod.TimeoutPIDLockFile(args.pidfile, 10)
with daemon.DaemonContext(pidfile=pidfile):
server.setup_logging()
server.main()
if __name__ == '__main__':
main()

View File

@ -1,466 +0,0 @@
#!/usr/bin/python3
#
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import argparse
import fcntl
import gzip
import json
import logging
import os
import queue
import re
import resource
import socket
import sys
import threading
import time
import urllib.error
import urllib.request
import yaml
import zmq
def semi_busy_wait(seconds):
# time.sleep() may return early. If it does sleep() again and repeat
# until at least the number of seconds specified has elapsed.
start_time = time.time()
while True:
time.sleep(seconds)
cur_time = time.time()
seconds = seconds - (cur_time - start_time)
if seconds <= 0.0:
return
class EventCatcher(threading.Thread):
def __init__(self, eventqs, zmq_address):
threading.Thread.__init__(self)
self.eventqs = eventqs
self.zmq_address = zmq_address
self._connect_zmq()
def run(self):
while True:
try:
self._read_event()
except:
# Assume that an error reading data from zmq or deserializing
# data received from zmq indicates a zmq error and reconnect.
logging.exception("ZMQ exception.")
self._connect_zmq()
def _connect_zmq(self):
logging.debug("Connecting to zmq endpoint.")
self.context = zmq.Context()
self.socket = self.context.socket(zmq.SUB)
event_filter = b"onFinalized"
self.socket.setsockopt(zmq.SUBSCRIBE, event_filter)
self.socket.connect(self.zmq_address)
def _read_event(self):
string = self.socket.recv().decode('utf-8')
event = json.loads(string.split(None, 1)[1])
logging.debug("Jenkins event received: " + json.dumps(event))
for eventq in self.eventqs:
eventq.put(event)
class LogRetriever(threading.Thread):
log_dirs = {
'check': "/{build_change}/{build_patchset}/{build_queue}/"
"{build_name}/{build_number}/",
'gate': "/{build_change}/{build_patchset}/{build_queue}/"
"{build_name}/{build_number}/",
'post': "/{build_shortref}/{build_queue}/{build_name}/"
"{build_number}/",
'pre-release': "/{build_shortref}/{build_queue}/{build_name}/"
"{build_number}/",
'release': "/{build_shortref}/{build_queue}/{build_name}/"
"{build_number}/",
'UNKNOWN': "/periodic/{build_name}/{build_number}/",
}
def __init__(self, eventq, logq, log_address,
filename, retry=False, job_filter='', tags=None):
threading.Thread.__init__(self)
self.eventq = eventq
self.logq = logq
self.retry = retry
self.log_address = log_address
self.filename = filename
self.job_filter = job_filter
self.tags = [self.filename]
if tags:
self.tags.extend(tags)
def run(self):
while True:
try:
self._handle_event()
except:
logging.exception("Exception retrieving log event.")
def _handle_event(self):
event = self.eventq.get()
logging.debug("Handling event: " + json.dumps(event))
fields = self._parse_fields(event)
matches = re.search(self.job_filter, fields['build_name'])
if fields['build_status'] != 'ABORTED' and matches:
# Handle events ignoring aborted builds. These builds are
# discarded by zuul.
log_lines = self._retrieve_log(fields)
logging.debug("Pushing " + str(len(log_lines)) + " log lines.")
for line in log_lines:
out_event = {}
out_event["@fields"] = fields
out_event["@tags"] = self.tags
out_event["event_message"] = line
self.logq.put(out_event)
def _parse_fields(self, event):
fields = {}
fields["filename"] = self.filename
fields["build_name"] = event.get("name", "UNKNOWN")
fields["build_status"] = event["build"].get("status", "UNKNOWN")
fields["build_number"] = event["build"].get("number", "UNKNOWN")
parameters = event["build"].get("parameters", {})
fields["build_queue"] = parameters.get("ZUUL_PIPELINE", "UNKNOWN")
if fields["build_queue"] in ["check", "gate"]:
fields["build_change"] = parameters.get("ZUUL_CHANGE", "UNKNOWN")
fields["build_patchset"] = parameters.get("ZUUL_PATCHSET",
"UNKNOWN")
elif fields["build_queue"] in ["post", "pre-release", "release"]:
fields["build_shortref"] = parameters.get("ZUUL_SHORT_NEWREV",
"UNKNOWN")
return fields
def _retrieve_log(self, fields):
# TODO (clarkb): This should check the content type instead of file
# extension for determining if gzip was used.
log_dir = self.log_dirs.get(fields["build_queue"], "").format(**fields)
gzipped = False
raw_buf = b''
try:
gzipped, raw_buf = self._get_log_data(self.log_address, log_dir,
self.filename)
except:
# Silently drop fatal errors when retrieving logs.
# TODO (clarkb): Handle these errors.
# Perhaps simply add a log message to raw_buf?
logging.exception("Unable to get log data.")
if gzipped:
logging.debug("Decompressing gzipped source file.")
buf = gzip.decompress(raw_buf).decode('utf-8')
else:
logging.debug("Decoding source file.")
buf = raw_buf.decode('utf-8')
return buf.splitlines()
def _get_log_data(self, log_address, log_dir, filename):
gzipped = False
source_url = log_address + log_dir + filename
try:
logging.debug("Retrieving: " + source_url)
r = urllib.request.urlopen(source_url)
except urllib.error.URLError:
try:
logging.debug("Retrieving: " + source_url + ".gz")
r = urllib.request.urlopen(source_url + ".gz")
gzipped = True
except:
logging.exception("Unable to retrieve source file.")
raise
except:
logging.exception("Unable to retrieve source file.")
raise
raw_buf = r.read()
# Hack to read all of Jenkins console logs as they upload
# asynchronously. Make one attempt per second for up to 60 seconds to
# retrieve the entire file. Short circuit when the end of file string
# for console logs, '\n</pre>\n', is read.
if (self.retry and not gzipped and
raw_buf[-8:].decode('utf-8') != '\n</pre>\n'):
content_len = len(raw_buf)
for i in range(60):
# Try for up to 60 seconds to retrieve the complete log file.
try:
logging.debug(str(i) + " Retrying fetch of: " + source_url)
logging.debug("Fetching bytes=" + str(content_len) + '-')
req = urllib.request.Request(source_url)
req.add_header('Range', 'bytes=' + str(content_len) + '-')
r = urllib.request.urlopen(req)
raw_buf += r.read()
content_len = len(raw_buf)
except urllib.error.HTTPError as e:
if e.code == 416:
logging.exception("Index out of range.")
else:
raise
finally:
if raw_buf[-8:].decode('utf-8') == '\n</pre>\n':
break
semi_busy_wait(1)
return gzipped, raw_buf
class StdOutLogProcessor(object):
def __init__(self, logq, pretty_print=False):
self.logq = logq
self.pretty_print = pretty_print
def handle_log_event(self):
log = self.logq.get()
if self.pretty_print:
print(json.dumps(log, sort_keys=True,
indent=4, separators=(',', ': ')))
else:
print(json.dumps(log))
# Push each log event through to keep logstash up to date.
sys.stdout.flush()
class INETLogProcessor(object):
socket_type = None
def __init__(self, logq, host, port):
self.logq = logq
self.host = host
self.port = port
self._connect_socket()
def _connect_socket(self):
logging.debug("Creating socket.")
self.socket = socket.socket(socket.AF_INET, self.socket_type)
self.socket.connect((self.host, self.port))
def handle_log_event(self):
log = self.logq.get()
try:
self.socket.sendall((json.dumps(log) + '\n').encode('utf-8'))
except:
logging.exception("Exception sending INET event.")
# Logstash seems to take about a minute to start again. Wait 90
# seconds before attempting to reconnect. If logstash is not
# available after 90 seconds we will throw another exception and
# die.
semi_busy_wait(90)
self._connect_socket()
self.socket.sendall((json.dumps(log) + '\n').encode('utf-8'))
class UDPLogProcessor(INETLogProcessor):
socket_type = socket.SOCK_DGRAM
class TCPLogProcessor(INETLogProcessor):
socket_type = socket.SOCK_STREAM
class Server(object):
def __init__(self, config, debuglog):
# Config init.
self.config = config
self.defaults = self.config['source-defaults']
self.default_source_url = self.defaults['source-url']
self.default_output_host = self.defaults['output-host']
self.default_output_port = self.defaults['output-port']
self.default_output_mode = self.defaults['output-mode']
self.default_retry = self.defaults['retry-get']
# Pythong logging output file.
self.debuglog = debuglog
# Input, retriever, output details
self.catchers = []
self.event_queues = []
self.retrievers = []
# TODO(clarkb) support multiple outputs
# Logstash queues are small. Set an upper bound to our queue
# so that this doesn't turn into an unbounded cache prone to
# OOMing. But set it to a reasonable size to provide some caching.
self.logqueue = queue.Queue(16384)
self.processor = None
def setup_logging(self):
if self.debuglog:
logging.basicConfig(format='%(asctime)s %(message)s',
filename=self.debuglog, level=logging.DEBUG)
else:
# Prevent leakage into the logstash log stream.
logging.basicConfig(level=logging.CRITICAL)
logging.debug("Log pusher starting.")
def setup_retrievers(self):
for source_file in self.config['source-files']:
eventqueue = queue.Queue()
self.event_queues.append(eventqueue)
retriever = LogRetriever(eventqueue, self.logqueue,
source_file.get('source-url',
self.default_source_url),
source_file['name'],
retry=source_file.get('retry-get',
self.default_retry),
job_filter=source_file.get('filter', ''),
tags=source_file.get('tags', []))
self.retrievers.append(retriever)
def setup_catchers(self):
for zmq_publisher in self.config['zmq-publishers']:
catcher = EventCatcher(self.event_queues, zmq_publisher)
self.catchers.append(catcher)
def setup_processor(self):
if self.default_output_mode == "tcp":
self.processor = TCPLogProcessor(self.logqueue,
self.default_output_host,
self.default_output_port)
elif self.default_output_mode == "udp":
self.processor = UDPLogProcessor(self.logqueue,
self.default_output_host,
self.default_output_port)
else:
# Note this processor will not work if the process is run as a
# daemon. You must use the --foreground option.
self.processor = StdOutLogProcessor(self.logqueue)
def main(self):
self.setup_retrievers()
self.setup_catchers()
self.setup_processor()
for catcher in self.catchers:
catcher.daemon = True
catcher.start()
for retriever in self.retrievers:
retriever.daemon = True
retriever.start()
while True:
try:
self.processor.handle_log_event()
except:
logging.exception("Exception processing log event.")
raise
class DaemonContext(object):
def __init__(self, pidfile_path):
self.pidfile_path = pidfile_path
self.pidfile = None
self.pidlocked = False
def __enter__(self):
# Perform Sys V daemonization steps as defined by
# http://www.freedesktop.org/software/systemd/man/daemon.html
# Close all open file descriptors but std*
_, max_fds = resource.getrlimit(resource.RLIMIT_NOFILE)
if max_fds == resource.RLIM_INFINITY:
max_fds = 4096
for fd in range(3, max_fds):
try:
os.close(fd)
except OSError:
# TODO(clarkb) check e.errno.
# fd not open.
pass
# TODO(clarkb) reset all signal handlers to their default
# TODO(clarkb) reset signal mask
# TODO(clarkb) sanitize environment block
# Fork to create background process
# TODO(clarkb) pass in read end of pipe and have parent wait for
# bytes on the pipe before exiting.
self._fork_exit_parent()
# setsid() to detach from terminal and create independent session.
os.setsid()
# Fork again to prevent reaquisition of terminal
self._fork_exit_parent()
# Hook std* to /dev/null.
devnull = os.open(os.devnull, os.O_RDWR)
os.dup2(devnull, 0)
os.dup2(devnull, 1)
os.dup2(devnull, 2)
# Set umask to 0
os.umask(0)
# chdir to root of filesystem.
os.chdir(os.sep)
# Lock pidfile.
self.pidfile = open(self.pidfile_path, 'a')
try:
fcntl.lockf(self.pidfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
self.pidlocked = True
except IOError:
# another instance is running
sys.exit(0)
self.pidfile.truncate(0)
self.pidfile.write(str(os.getpid()))
self.pidfile.flush()
def __exit__(self, exc_type, exc_value, traceback):
# remove pidfile
if self.pidlocked:
os.unlink(self.pidfile_path)
if self.pidfile:
self.pidfile.close()
# TODO(clarkb) write to then close parent signal pipe if not
# already done.
def _fork_exit_parent(self, read_pipe=None):
if os.fork():
# Parent
if read_pipe:
os.fdopen(read_pipe).read()
sys.exit()
else:
# Child
return
def main():
parser = argparse.ArgumentParser()
parser.add_argument("-c", "--config", required=True,
help="Path to yaml config file.")
parser.add_argument("-d", "--debuglog",
help="Enable debug log. "
"Specifies file to write log to.")
parser.add_argument("--foreground", action='store_true',
help="Run in the foreground.")
parser.add_argument("-p", "--pidfile",
default="/var/run/jenkins-log-pusher/"
"jenkins-log-pusher.pid",
help="PID file to lock during daemonization.")
args = parser.parse_args()
with open(args.config, 'r') as config_stream:
config = yaml.load(config_stream)
server = Server(config, args.debuglog)
if args.foreground:
server.setup_logging()
server.main()
else:
with DaemonContext(args.pidfile):
server.setup_logging()
server.main()
if __name__ == '__main__':
main()

View File

@ -1,48 +0,0 @@
# Defaults
source-defaults:
source-url: http://logs.openstack.org
output-host: localhost
output-port: 9999
output-mode: tcp
retry-get: False
# List of zmq event inputs.
zmq-publishers:
- tcp://jenkins.openstack.org:8888
# List of files to source logs from.
source-files:
- name: console.html
retry-get: True
- name: logs/screen-c-api.txt
tags:
- screen
- oslofmt
- name: logs/screen-c-sch.txt
tags:
- screen
- oslofmt
- name: logs/screen-c-vol.txt
tags:
- screen
- oslofmt
- name: logs/screen-key.txt
tags:
- screen
- keystonefmt
- name: logs/screen-s-account.txt
tags:
- screen
- apachecombined
- name: logs/screen-s-container.txt
tags:
- screen
- apachecombined
- name: logs/screen-s-object.txt
tags:
- screen
- apachecombined
# TODO(clarkb) Add swift proxy logs here.
- name: logs/syslog.txt
tags:
- syslog

View File

@ -1,42 +0,0 @@
# Defaults
source-defaults:
source-url: http://logs.openstack.org
output-host: localhost
output-port: 9999
output-mode: tcp
retry-get: False
# List of zmq event inputs.
zmq-publishers:
- tcp://jenkins.openstack.org:8888
# List of files to source logs from.
source-files:
- name: logs/screen-n-api.txt
tags:
- screen
- oslofmt
- name: logs/screen-n-cond.txt
tags:
- screen
- oslofmt
- name: logs/screen-n-cpu.txt
tags:
- screen
- oslofmt
- name: logs/screen-n-crt.txt
tags:
- screen
- oslofmt
- name: logs/screen-n-net.txt
tags:
- screen
- oslofmt
- name: logs/screen-n-obj.txt
tags:
- screen
- oslofmt
- name: logs/screen-n-sch.txt
tags:
- screen
- oslofmt

View File

@ -1,42 +0,0 @@
# Defaults
source-defaults:
source-url: http://logs.openstack.org
output-host: localhost
output-port: 9999
output-mode: tcp
retry-get: False
# List of zmq event inputs.
zmq-publishers:
- tcp://jenkins.openstack.org:8888
# List of files to source logs from.
source-files:
- name: logs/screen-g-api.txt
tags:
- screen
- oslofmt
- name: logs/screen-g-reg.txt
tags:
- screen
- oslofmt
- name: logs/screen-q-agt.txt
tags:
- screen
- oslofmt
- name: logs/screen-q-dhcp.txt
tags:
- screen
- oslofmt
- name: logs/screen-q-l3.txt
tags:
- screen
- oslofmt
- name: logs/screen-q-meta.txt
tags:
- screen
- oslofmt
- name: logs/screen-q-svc.txt
tags:
- screen
- oslofmt

View File

@ -16,9 +16,12 @@
#
class openstack_project::logstash (
$elasticsearch_masters = [],
$gearman_workers = [],
$sysadmins = []
) {
$iptables_rule = regsubst ($elasticsearch_masters, '^(.*)$', '-m state --state NEW -m tcp -p tcp --dport 9200:9400 -s \1 -j ACCEPT')
$iptables_es_rule = regsubst ($elasticsearch_masters, '^(.*)$', '-m state --state NEW -m tcp -p tcp --dport 9200:9400 -s \1 -j ACCEPT')
$iptables_gm_rule = regsubst ($gearman_workers, '^(.*)$', '-m state --state NEW -m tcp -p tcp --dport 4730 -s \1 -j ACCEPT')
$iptables_rule = concat($iptables_es_rule, $iptables_gm_rule)
class { 'openstack_project::server':
iptables_public_tcp_ports => [22, 80],
iptables_rules6 => $iptables_rule,
@ -30,4 +33,59 @@ class openstack_project::logstash (
frontend => 'kibana',
elasticsearch_host => 'elasticsearch.openstack.org',
}
package { 'python-daemon':
ensure => present,
}
package { 'python-zmq':
ensure => present,
}
package { 'python-yaml':
ensure => present,
}
include pip
package { 'gear':
ensure => latest,
provider => 'pip',
require => Class['pip'],
}
file { '/usr/local/bin/log-gearman-client.py':
ensure => present,
owner => 'root',
group => 'root',
mode => '0755',
source => 'puppet:///modules/openstack_project/logstash/log-gearman-client.py',
}
file { '/etc/logstash/jenkins-log-client.yaml':
ensure => present,
owner => 'root',
group => 'root',
mode => '0555',
source => 'puppet:///modules/openstack_project/logstash/jenkins-log-client.yaml',
require => Class['logstash::indexer'],
}
file { '/etc/init.d/jenkins-log-client':
ensure => present,
owner => 'root',
group => 'root',
mode => '0555',
source => 'puppet:///modules/openstack_project/logstash/jenkins-log-client.init',
require => [
File['/usr/local/bin/log-client.py'],
File['/etc/logstash/jenkins-log-client.yaml'],
],
}
service { 'jenkins-log-client':
enable => true,
hasrestart => true,
subscribe => File['/etc/logstash/jenkins-log-client.yaml'],
require => File['/etc/init.d/jenkins-log-client'],
}
}

View File

@ -30,52 +30,58 @@ class openstack_project::logstash_worker (
conf_template => 'openstack_project/logstash/indexer.conf.erb',
}
package { 'python3':
ensure => 'present',
package { 'python-daemon':
ensure => present,
}
package { 'python3-zmq':
ensure => 'present',
package { 'python-zmq':
ensure => present,
}
package { 'python3-yaml':
ensure => 'present',
package { 'python-yaml':
ensure => present,
}
file { '/usr/local/bin/log-pusher.py':
include pip
package { 'gear':
ensure => latest,
provider => 'pip',
require => Class['pip'],
}
file { '/usr/local/bin/log-gearman-worker.py':
ensure => present,
owner => 'root',
group => 'root',
mode => '0755',
source => 'puppet:///modules/openstack_project/logstash/log-pusher.py',
require => Package['python3'],
source => 'puppet:///modules/openstack_project/logstash/log-gearman-worker.py',
}
file { '/etc/logstash/jenkins-log-pusher.yaml':
file { '/etc/logstash/jenkins-log-worker.yaml':
ensure => present,
owner => 'root',
group => 'root',
mode => '0555',
source => "puppet:///modules/openstack_project/logstash/${::hostname}/jenkins-log-pusher.yaml",
source => 'puppet:///modules/openstack_project/logstash/jenkins-log-worker.yaml',
require => Class['logstash::indexer'],
}
file { '/etc/init.d/jenkins-log-pusher':
file { '/etc/init.d/jenkins-log-worker':
ensure => present,
owner => 'root',
group => 'root',
mode => '0555',
source => 'puppet:///modules/openstack_project/logstash/jenkins-log-pusher.init',
source => 'puppet:///modules/openstack_project/logstash/jenkins-log-worker.init',
require => [
File['/usr/local/bin/log-pusher.py'],
File['/etc/logstash/jenkins-log-pusher.yaml'],
File['/usr/local/bin/log-worker.py'],
File['/etc/logstash/jenkins-log-worker.yaml'],
],
}
service { 'jenkins-log-pusher':
service { 'jenkins-log-worker':
enable => true,
hasrestart => true,
subscribe => File['/etc/logstash/jenkins-log-pusher.yaml'],
require => File['/etc/init.d/jenkins-log-pusher'],
subscribe => File['/etc/logstash/jenkins-log-worker.yaml'],
require => File['/etc/init.d/jenkins-log-worker'],
}
}