diff --git a/docker-compose-production.yml b/docker-compose-production.yml index 9840e0a..3b867f7 100644 --- a/docker-compose-production.yml +++ b/docker-compose-production.yml @@ -36,6 +36,7 @@ rabbitconsumer: #SURVEIL_OS_PASSWORD: "password" #SURVEIL_TENANT_NAME: "admin" #SURVEIL_DEFAULT_TAGS: "linux-openstackceilometer" + #SURVEIL_NETWORK_LABEL: "surveil" command: bash -c "cd /opt/surveil && ./setup.sh && /opt/surveil/env/bin/python setup.py develop && surveil-rabbitMQ-consumer" alignak: diff --git a/etc/surveil/surveil_rabbitMQ_consumer.cfg b/etc/surveil/surveil_rabbitMQ_consumer.cfg index e62552d..48914ca 100644 --- a/etc/surveil/surveil_rabbitMQ_consumer.cfg +++ b/etc/surveil/surveil_rabbitMQ_consumer.cfg @@ -7,6 +7,7 @@ SURVEIL_OS_USERNAME=admin SURVEIL_OS_PASSWORD=password SURVEIL_OS_TENANT_NAME=admin SURVEIL_DEFAULT_TAGS=linux-openstackceilometer +SURVEIL_NETWORK_LABEL=surveil RABBIT_HOST=192.168.49.239 RABBIT_PORT=5672 QUEUE=test diff --git a/surveil/cmd/rabbitMQ_consumer.py b/surveil/cmd/rabbitMQ_consumer.py index a4d1ff0..4f666ac 100644 --- a/surveil/cmd/rabbitMQ_consumer.py +++ b/surveil/cmd/rabbitMQ_consumer.py @@ -14,8 +14,11 @@ """Starter script for the RabbitMQ receiver""" +from __future__ import print_function + import json import os +import sys import threading import time @@ -81,22 +84,34 @@ def main(): "SURVEIL_DEFAULT_TAGS": os.environ.get( 'SURVEIL_DEFAULT_TAGS', config.get("rabbitconsumer", "SURVEIL_DEFAULT_TAGS") - ) + ), + "SURVEIL_NETWORK_LABEL": os.environ.get( + 'SURVEIL_NETWORK_LABEL', + config.get("rabbitconsumer", "SURVEIL_NETWORK_LABEL") + ), } if (daemon_config["RABBIT_USER"] is not None and daemon_config["RABBIT_PASSWORD"] is not None): - id = pika.credentials.PlainCredentials(daemon_config["RABBIT_USER"], - daemon_config["RABBIT_PASSWORD"] - ) + id = pika.credentials.PlainCredentials( + daemon_config["RABBIT_USER"], + daemon_config["RABBIT_PASSWORD"] + ) + connection = pika.BlockingConnection( - pika.ConnectionParameters(host=daemon_config["RABBIT_HOST"], - port=daemon_config["RABBIT_PORT"], - credentials=id)) + pika.ConnectionParameters( + host=daemon_config["RABBIT_HOST"], + port=daemon_config["RABBIT_PORT"], + credentials=id + ) + ) else: connection = pika.BlockingConnection( - pika.ConnectionParameters(host=daemon_config["RABBIT_HOST"], - port=daemon_config["RABBIT_PORT"])) + pika.ConnectionParameters( + host=daemon_config["RABBIT_HOST"], + port=daemon_config["RABBIT_PORT"] + ) + ) channel = connection.channel() channel.queue_declare(queue=daemon_config["QUEUE"]) @@ -133,12 +148,7 @@ def main(): reload_config_thread.daemon = True reload_config_thread.start() - def process_event(body): - c = client.Client(daemon_config["SURVEIL_API_URL"], - auth_url=daemon_config["SURVEIL_AUTH_URL"], - version=daemon_config["SURVEIL_VERSION"]) - event = json.loads(body) - if event['event_type'] == 'compute.instance.create.start': + def process_instance_create_start(event, sclient): custom_fields = { "_OS_AUTH_URL": daemon_config["SURVEIL_OS_AUTH_URL"], "_OS_TENANT_NAME": daemon_config["SURVEIL_OS_TENANT_NAME"], @@ -151,6 +161,8 @@ def main(): 'surveil_custom_fields', None ) + + # Custom fields if surveil_metadata_custom_fields is not None: try: custom_fields.update( @@ -158,8 +170,10 @@ def main(): ) except ValueError: print("Could not load json %s" % - surveil_metadata_custom_fields) + surveil_metadata_custom_fields, + file=sys.stderr) + # Tags instance_tags = daemon_config["SURVEIL_DEFAULT_TAGS"] surveil_metadata_tags = event['payload']['metadata'].get( 'surveil_tags', @@ -168,14 +182,46 @@ def main(): if surveil_metadata_tags is not None: instance_tags += ',' + surveil_metadata_tags - c.config.hosts.create( + sclient.config.hosts.create( host_name=event['payload']['hostname'], address=event['payload']['hostname'], use=instance_tags, custom_fields=custom_fields ) - elif event['event_type'] == 'compute.instance.delete.end': - c.config.hosts.delete(event['payload']['hostname']) + + def process_instance_create_end(event, sclient): + # Get the ip address + addresses = event['payload'].get('fixed_ips', []) + for addr in addresses: + if (addr.get('label', '') == + daemon_config['SURVEIL_NETWORK_LABEL']): + sclient.config.hosts.update( + event['payload']['hostname'], + address=addr['address'] + ) + break + + def process_instance_delete_end(sclient, event): + sclient.config.hosts.delete(event['payload']['hostname']) + + def process_event(body): + sclient = client.Client(daemon_config["SURVEIL_API_URL"], + auth_url=daemon_config["SURVEIL_AUTH_URL"], + version=daemon_config["SURVEIL_VERSION"]) + + # Load the event + event = json.loads(body) + + # Process the event + try: + if event['event_type'] == 'compute.instance.create.start': + process_instance_create_start(event, sclient) + elif event['event_type'] == 'compute.instance.create.end': + process_instance_create_end(event, sclient) + elif event['event_type'] == 'compute.instance.delete.end': + process_instance_delete_end(sclient, event) + except Exception as e: + print("Could not process event %s" % e, file=sys.stderr) def callback(ch, method, properties, body): t = threading.Thread(target=process_event, args=(body,))