diff --git a/modules/openstack_project/files/logstash/jenkins-log-pusher.init b/modules/openstack_project/files/logstash/jenkins-log-pusher.init index d413b4126e..2a0497e41c 100755 --- a/modules/openstack_project/files/logstash/jenkins-log-pusher.init +++ b/modules/openstack_project/files/logstash/jenkins-log-pusher.init @@ -16,7 +16,7 @@ PATH=/sbin:/usr/sbin:/bin:/usr/bin DESC="Jenkins Log Pusher" NAME=jenkins-log-pusher DAEMON=/usr/local/bin/log-pusher.py -DAEMON_ARGS='-r -z tcp://jenkins.openstack.org:8888 -l http://logs.openstack.org -f console.html -d /var/log/logstash/pusher-debug.log -t' +DAEMON_ARGS='-c /etc/logstash/jenkins-log-pusher.yaml -d /var/log/logstash/pusher-debug.log' #PIDFILE=/var/run/$NAME/$NAME.pid SCRIPTNAME=/etc/init.d/$NAME USER=logstash diff --git a/modules/openstack_project/files/logstash/jenkins-log-pusher.yaml b/modules/openstack_project/files/logstash/jenkins-log-pusher.yaml new file mode 100644 index 0000000000..fbb1a57a24 --- /dev/null +++ b/modules/openstack_project/files/logstash/jenkins-log-pusher.yaml @@ -0,0 +1,16 @@ +# Defaults +source-defaults: + source-url: http://logs.openstack.org + output-host: localhost + output-port: 9999 + output-mode: tcp + retry-get: False + +# List of zmq event inputs. +zmq-publishers: + - tcp://jenkins.openstack.org:8888 + +# List of files to source logs from. +source-files: + - name: console.html + retry-get: True diff --git a/modules/openstack_project/files/logstash/log-pusher.py b/modules/openstack_project/files/logstash/log-pusher.py index c071d3138c..d7d82400a4 100644 --- a/modules/openstack_project/files/logstash/log-pusher.py +++ b/modules/openstack_project/files/logstash/log-pusher.py @@ -21,17 +21,19 @@ import logging import threading import time import queue +import re import socket import sys import urllib.error import urllib.request +import yaml import zmq class EventCatcher(threading.Thread): - def __init__(self, eventq, zmq_address): + def __init__(self, eventqs, zmq_address): threading.Thread.__init__(self) - self.eventq = eventq + self.eventqs = eventqs self.zmq_address = zmq_address self._connect_zmq() @@ -57,7 +59,8 @@ class EventCatcher(threading.Thread): string = self.socket.recv().decode('utf-8') event = json.loads(string.split(None, 1)[1]) logging.debug("Jenkins event received: " + json.dumps(event)) - self.eventq.put(event) + for eventq in self.eventqs: + eventq.put(event) class LogRetriever(threading.Thread): @@ -75,13 +78,16 @@ class LogRetriever(threading.Thread): 'UNKNOWN': "/periodic/{build_name}/{build_number}/", } - def __init__(self, eventq, logq, log_address, filename, retry=False): + def __init__(self, eventq, logq, log_address, + filename, retry=False, job_filter=''): threading.Thread.__init__(self) self.eventq = eventq self.logq = logq self.retry = retry self.log_address = log_address self.filename = filename + self.job_filter = job_filter + self.tag = [self.filename] def run(self): while True: @@ -94,7 +100,8 @@ class LogRetriever(threading.Thread): event = self.eventq.get() logging.debug("Handling event: " + json.dumps(event)) fields = self._parse_fields(event) - if fields['build_status'] != 'ABORTED': + matches = re.search(self.job_filter, fields['build_name']) + if fields['build_status'] != 'ABORTED' and matches: # Handle events ignoring aborted builds. These builds are # discarded by zuul. log_lines = self._retrieve_log(fields) @@ -103,6 +110,7 @@ class LogRetriever(threading.Thread): for line in log_lines: out_event = {} out_event["@fields"] = fields + out_event["@tags"] = self.tag out_event["event_message"] = line self.logq.put(out_event) @@ -223,7 +231,7 @@ class StdOutLogProcessor(object): class INETLogProcessor(object): socket_type = None - def __init__(self, logq, host='localhost', port=9999): + def __init__(self, logq, host, port): self.logq = logq self.host = host self.port = port @@ -253,23 +261,11 @@ class TCPLogProcessor(INETLogProcessor): def main(): parser = argparse.ArgumentParser() - parser.add_argument("-z", "--zmqaddress", required=True, - help="Address to use as source for zmq events.") - parser.add_argument("-l", "--logaddress", required=True, - help="Http(s) address to use as source for log files.") - parser.add_argument("-f", "--filename", required=True, - help="Name of log file to retrieve from log server.") - parser.add_argument("-p", "--pretty", action="store_true", - help="Print pretty json.") - parser.add_argument("-r", "--retry", action="store_true", - help="Retry until full console log is retrieved.") + parser.add_argument("-c", "--config", required=True, + help="Path to yaml config file.") parser.add_argument("-d", "--debuglog", help="Enable debug log. " "Specifies file to write log to.") - parser.add_argument("-u", "--udp", action="store_true", - help="Output to UDP destination.") - parser.add_argument("-t", "--tcp", action="store_true", - help="Output to TCP destination.") args = parser.parse_args() if args.debuglog: @@ -279,22 +275,54 @@ def main(): # Prevent leakage into the logstash log stream. logging.basicConfig(level=logging.CRITICAL) logging.debug("Log pusher starting.") - eventqueue = queue.Queue() - logqueue = queue.Queue() - catcher = EventCatcher(eventqueue, args.zmqaddress) - retriever = LogRetriever(eventqueue, logqueue, args.logaddress, - args.filename, retry=args.retry) - if args.tcp: - processor = TCPLogProcessor(logqueue) - elif args.udp: - processor = UDPLogProcessor(logqueue) - else: - processor = StdOutLogProcessor(logqueue, pretty_print=args.pretty) - catcher.daemon = True - catcher.start() - retriever.daemon = True - retriever.start() + config_stream = open(args.config, 'r') + config = yaml.load(config_stream) + defaults = config['source-defaults'] + default_source_url = defaults['source-url'] + default_output_host = defaults['output-host'] + default_output_port = defaults['output-port'] + default_output_mode = defaults['output-mode'] + default_retry = defaults['retry-get'] + + event_queues = [] + # TODO(clarkb) support multiple outputs + logqueue = queue.Queue() + retrievers = [] + for source_file in config['source-files']: + eventqueue = queue.Queue() + event_queues.append(eventqueue) + retriever = LogRetriever(eventqueue, logqueue, + source_file.get('source-url', + default_source_url), + source_file['name'], + retry=source_file.get('retry-get', + default_retry), + job_filter=source_file.get('filter', + '')) + retrievers.append(retriever) + + catchers = [] + for zmq_publisher in config['zmq-publishers']: + catcher = EventCatcher(event_queues, zmq_publisher) + catchers.append(catcher) + + if default_output_mode == "tcp": + processor = TCPLogProcessor(logqueue, + default_output_host, default_output_port) + elif default_output_mode == "udp": + processor = UDPLogProcessor(logqueue, + default_output_host, default_output_port) + else: + processor = StdOutLogProcessor(logqueue) + + for catcher in catchers: + catcher.daemon = True + catcher.start() + for retriever in retrievers: + retriever.daemon = True + retriever.start() + while True: try: processor.handle_log_event() diff --git a/modules/openstack_project/manifests/logstash.pp b/modules/openstack_project/manifests/logstash.pp index 12109fa870..db9813dd7b 100644 --- a/modules/openstack_project/manifests/logstash.pp +++ b/modules/openstack_project/manifests/logstash.pp @@ -42,6 +42,10 @@ class openstack_project::logstash ( ensure => 'present', } + package { 'python3-yaml': + ensure => 'present', + } + file { '/usr/local/bin/log-pusher.py': ensure => present, owner => 'root', @@ -51,13 +55,25 @@ class openstack_project::logstash ( require => Package['python3'], } + file { '/etc/logstash/jenkins-log-pusher.yaml': + ensure => present, + owner => 'root', + group => 'root', + mode => '0555', + source => 'puppet:///modules/openstack_project/logstash/jenkins-log-pusher.yaml', + require => Class['logstash::indexer'], + } + file { '/etc/init.d/jenkins-log-pusher': ensure => present, owner => 'root', group => 'root', mode => '0555', source => 'puppet:///modules/openstack_project/logstash/jenkins-log-pusher.init', - require => File['/usr/local/bin/log-pusher.py'], + require => [ + File['/usr/local/bin/log-pusher.py'], + File['/etc/logstash/jenkins-log-pusher.yaml'], + ], } service { 'jenkins-log-pusher':