Merge pull request #2 from SandyWalsh/daemon

Daemon
This commit is contained in:
Sandy Walsh 2012-02-27 19:18:15 -08:00
commit e1ee83f49e

74
worker.py Normal file → Executable file
View File

@ -16,6 +16,7 @@
# This is the worker you run in your OpenStack environment. You need
# to set TENANT_ID and URL to point to your StackTach web server.
import daemon
import json
import kombu
import kombu.connection
@ -26,13 +27,16 @@ import urllib
import urllib2
# CHANGE THESE FOR YOUR INSTALLATION ...
TENANT_ID = 1
URL = 'http://darksecretsoftware.com/stacktach/%d/data/' % TENANT_ID
RABBIT_HOST = "localhost"
RABBIT_PORT = 5672
RABBIT_USERID = "guest"
RABBIT_PASSWORD = "guest"
RABBIT_VIRTUAL_HOST = "/"
DEPLOYMENTS = [
dict(
tenant_id=1,
url='http://example.com',
rabbit_host="localhost",
rabbit_port=5672,
rabbit_userid="guest",
rabbit_password="guest",
rabbit_virtual_host="/"),
]
try:
from worker_conf import *
@ -63,8 +67,9 @@ nova_queues = [
class SchedulerFanoutConsumer(kombu.mixins.ConsumerMixin):
def __init__(self, connection):
def __init__(self, connection, url):
self.connection = connection
self.url = url
def get_consumers(self, Consumer, channel):
return [Consumer(queues=scheduler_queues,
@ -79,13 +84,13 @@ class SchedulerFanoutConsumer(kombu.mixins.ConsumerMixin):
try:
raw_data = dict(args=jvalues)
cooked_data = urllib.urlencode(raw_data)
req = urllib2.Request(URL, cooked_data)
req = urllib2.Request(self.url, cooked_data)
response = urllib2.urlopen(req)
page = response.read()
print page
except urllib2.HTTPError, e:
if e.code == 401:
print "Unauthorized. Correct tenant id of %d?" % TENANT_ID
print "Unauthorized. Correct URL?", self.url
print e
page = e.read()
print page
@ -101,20 +106,41 @@ class SchedulerFanoutConsumer(kombu.mixins.ConsumerMixin):
message.ack()
if __name__ == "__main__":
print "StackTach", URL
print "Rabbit", RABBIT_HOST, RABBIT_PORT, RABBIT_USERID, RABBIT_VIRTUAL_HOST
class Monitor(threading.Thread):
def __init__(self, deployment):
super(Monitor, self).__init__()
self.deployment = deployment
params = dict(hostname=RABBIT_HOST,
port=RABBIT_PORT,
userid=RABBIT_USERID,
password=RABBIT_PASSWORD,
virtual_host=RABBIT_VIRTUAL_HOST)
def run(self):
tenant_id = self.deployment.get('tenant_id', 1)
url = self.deployment.get('url', 'http://www.example.com')
url = "%s/%d/data/" % (url, tenant_id)
host = self.deployment.get('rabbit_host', 'localhost')
port = self.deployment.get('rabbit_port', 5672)
user_id = self.deployment.get('rabbit_userid', 'rabbit')
password = self.deployment.get('rabbit_password', 'rabbit')
virtual_host = self.deployment.get('rabbit_virtual_host', '/')
with kombu.connection.BrokerConnection(**params) as conn:
consumer = SchedulerFanoutConsumer(conn)
try:
print "Listening"
print "StackTach", url
print "Rabbit:", host, port, user_id, virtual_host
params = dict(hostname=host,
port=port,
userid=user_id,
password=password,
virtual_host=virtual_host)
with kombu.connection.BrokerConnection(**params) as conn:
consumer = SchedulerFanoutConsumer(conn, url)
consumer.run()
except KeyboardInterrupt:
print("bye bye")
with daemon.DaemonContext():
workers = []
for deployment in DEPLOYMENTS:
monitor = Monitor(deployment)
workers.append(monitor)
monitor.start()
for worker in workers:
worker.join()