stacktach/worker.py
2012-02-27 21:16:59 -06:00

147 lines
4.8 KiB
Python
Executable File

# Copyright 2012 - Dark Secret Software Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# This is the worker you run in your OpenStack environment. You need
# to set TENANT_ID and URL to point to your StackTach web server.
import daemon
import json
import kombu
import kombu.connection
import kombu.entity
import kombu.mixins
import threading
import urllib
import urllib2
# CHANGE THESE FOR YOUR INSTALLATION ...
DEPLOYMENTS = [
dict(
tenant_id=1,
url='http://example.com',
rabbit_host="localhost",
rabbit_port=5672,
rabbit_userid="guest",
rabbit_password="guest",
rabbit_virtual_host="/"),
]
try:
from worker_conf import *
except ImportError:
pass
# For now we'll just grab all the fanout messages from compute to scheduler ...
scheduler_exchange = kombu.entity.Exchange("scheduler_fanout", type="fanout",
durable=False, auto_delete=True,
exclusive=True)
scheduler_queues = [
# The Queue name has to be unique or we we'll end up with Round Robin
# behavior from Rabbit, even though it's a Fanout queue. In Nova the
# queues have UUID's tacked on the end.
kombu.Queue("scheduler.xxx", scheduler_exchange, durable=False,
auto_delete=False),
]
nova_exchange = kombu.entity.Exchange("nova", type="topic",
durable=True, auto_delete=False,
exclusive=False)
nova_queues = [
kombu.Queue("monitor", nova_exchange, durable=False, auto_delete=False,
exclusive=False, routing_key='monitor.*'),
]
class SchedulerFanoutConsumer(kombu.mixins.ConsumerMixin):
def __init__(self, connection, url):
self.connection = connection
self.url = url
def get_consumers(self, Consumer, channel):
return [Consumer(queues=scheduler_queues,
callbacks=[self.on_scheduler]),
Consumer(queues=nova_queues, callbacks=[self.on_nova])]
def _process(self, body, message):
routing_key = message.delivery_info['routing_key']
payload = (routing_key, body)
jvalues = json.dumps(payload)
try:
raw_data = dict(args=jvalues)
cooked_data = urllib.urlencode(raw_data)
req = urllib2.Request(self.url, cooked_data)
response = urllib2.urlopen(req)
page = response.read()
print page
except urllib2.HTTPError, e:
if e.code == 401:
print "Unauthorized. Correct URL?", self.url
print e
page = e.read()
print page
raise
def on_scheduler(self, body, message):
# Uncomment if you want periodic compute node status updates.
#self._process(body, message)
message.ack()
def on_nova(self, body, message):
self._process(body, message)
message.ack()
class Monitor(threading.Thread):
def __init__(self, deployment):
super(Monitor, self).__init__()
self.deployment = deployment
def run(self):
tenant_id = self.deployment.get('tenant_id', 1)
url = self.deployment.get('url', 'http://www.example.com')
url = "%s/%d/data/" % (url, tenant_id)
host = self.deployment.get('rabbit_host', 'localhost')
port = self.deployment.get('rabbit_port', 5672)
user_id = self.deployment.get('rabbit_userid', 'rabbit')
password = self.deployment.get('rabbit_password', 'rabbit')
virtual_host = self.deployment.get('rabbit_virtual_host', '/')
print "StackTach", url
print "Rabbit:", host, port, user_id, virtual_host
params = dict(hostname=host,
port=port,
userid=user_id,
password=password,
virtual_host=virtual_host)
with kombu.connection.BrokerConnection(**params) as conn:
consumer = SchedulerFanoutConsumer(conn, url)
consumer.run()
with daemon.DaemonContext():
workers = []
for deployment in DEPLOYMENTS:
monitor = Monitor(deployment)
workers.append(monitor)
monitor.start()
for worker in workers:
worker.join()