Use gerritlib for event-stream consumption.

Gerritlib is now available for Gerrit event stream consumption. Use
it. This should fix the bug where events are missed due to the
connection dying and the resulting wait for five seconds.

Change-Id: I63e7c3064a42379c985f1a0a723d08c15a1be003
This commit is contained in:
Clark Boylan 2012-07-31 09:35:19 -07:00
parent b2be72e69d
commit d15abaa25f
2 changed files with 32 additions and 69 deletions

View File

@ -29,17 +29,14 @@ openstack-dev:
- master
"""
import irc.bot
import time
import subprocess
import threading
import select
import json
import sys
import os
import ConfigParser
import daemon
import traceback
import irc.bot
import logging.config
import os
import sys
import threading
import time
import yaml
try:
@ -80,40 +77,14 @@ class GerritBot(irc.bot.SingleServerIRCBot):
class Gerrit(threading.Thread):
def __init__(self, ircbot, channel_config,
username, keyfile, server, port=29418):
def __init__(self, ircbot, channel_config, server,
username, port=29418, keyfile=None):
threading.Thread.__init__(self)
self.ircbot = ircbot
self.channel_config = channel_config
self.username = username
self.keyfile = keyfile
self.server = server
self.port = port
self.proc = None
self.poll = select.poll()
def _open(self):
self.proc = subprocess.Popen(['/usr/bin/ssh', '-p', str(self.port),
'-i', self.keyfile,
'-l', self.username, self.server,
'gerrit', 'stream-events'],
bufsize=1,
stdin=None,
stdout=subprocess.PIPE,
stderr=None,
)
self.poll.register(self.proc.stdout)
def _close(self):
try:
self.poll.unregister(self.proc.stdout)
except:
pass
try:
self.proc.kill()
except:
pass
self.proc = None
# Import here because it needs to happen after daemonization
import gerritlib.gerrit
self.gerrit = gerritlib.gerrit.Gerrit(server, username, port, keyfile)
def patchset_created(self, channel, data):
msg = '%s proposed a change to %s: %s %s' % (
@ -174,9 +145,7 @@ class Gerrit(threading.Thread):
data['change']['url'])
self.ircbot.send(channel, msg)
def _read(self):
l = self.proc.stdout.readline()
data = json.loads(l)
def _read(self, data):
channel_set = (self.channel_config.projects.get(
data['change']['project'], set()) &
self.channel_config.events.get(
@ -191,30 +160,11 @@ class Gerrit(threading.Thread):
elif data['type'] == 'change-merged':
self.change_merged(channel, data)
def _listen(self):
while True:
ret = self.poll.poll()
for (fd, event) in ret:
if fd == self.proc.stdout.fileno():
if event == select.POLLIN:
self._read()
else:
raise Exception("event on ssh connection")
def _run(self):
try:
if not self.proc:
self._open()
self._listen()
except:
traceback.print_exc()
self._close()
time.sleep(5)
def run(self):
time.sleep(5)
self.gerrit.startWatching()
while True:
self._run()
event = self.gerrit.getEvent()
self._read(event)
class ChannelConfig(object):
@ -246,6 +196,7 @@ class ChannelConfig(object):
def _main():
config = ConfigParser.ConfigParser()
config.read(sys.argv[1])
setup_logging(config)
fp = config.get('ircbot', 'channel_config')
if fp:
@ -264,10 +215,10 @@ def _main():
config.getint('ircbot', 'port'))
g = Gerrit(bot,
channel_config,
config.get('gerrit', 'user'),
config.get('gerrit', 'key'),
config.get('gerrit', 'host'),
config.getint('gerrit', 'port'))
config.get('gerrit', 'user'),
config.getint('gerrit', 'port'),
config.get('gerrit', 'key'))
g.start()
bot.start()
@ -283,5 +234,16 @@ def main():
_main()
def setup_logging(config):
if config.has_option('gerrit', 'log_config'):
log_config = config.get('gerrit', 'log_config')
fp = os.path.expanduser(log_config)
if not os.path.exists(fp):
raise Exception("Unable to read logging config file at %s" % fp)
logging.config.fileConfig(fp)
else:
logging.basicConfig(level=logging.DEBUG)
if __name__ == "__main__":
main()

View File

@ -1,3 +1,4 @@
gerritlib
irc
pyyaml
python-daemon
irc