Merge "create collector service"

This commit is contained in:
Jenkins 2017-06-21 13:12:08 +00:00 committed by Gerrit Code Review
commit f6f6f7ee84
30 changed files with 294 additions and 151 deletions

View File

@ -231,6 +231,7 @@ function start_vitrage {
fi fi
fi fi
run_process vitrage-collector "$VITRAGE_BIN_DIR/vitrage-collector --config-file $VITRAGE_CONF"
run_process vitrage-graph "$VITRAGE_BIN_DIR/vitrage-graph --config-file $VITRAGE_CONF" run_process vitrage-graph "$VITRAGE_BIN_DIR/vitrage-graph --config-file $VITRAGE_CONF"
run_process vitrage-notifier "$VITRAGE_BIN_DIR/vitrage-notifier --config-file $VITRAGE_CONF" run_process vitrage-notifier "$VITRAGE_BIN_DIR/vitrage-notifier --config-file $VITRAGE_CONF"
} }
@ -242,7 +243,7 @@ function stop_vitrage {
restart_apache_server restart_apache_server
fi fi
# Kill the vitrage screen windows # Kill the vitrage screen windows
for serv in vitrage-api vitrage-graph vitrage-notifier; do for serv in vitrage-api vitrage-collector vitrage-graph vitrage-notifier; do
stop_process $serv stop_process $serv
done done
} }

View File

@ -6,6 +6,9 @@ enable_service vitrage-graph
# Notifier # Notifier
enable_service vitrage-notifier enable_service vitrage-notifier
# Notifier
enable_service vitrage-collector
# Default directories # Default directories
VITRAGE_DIR=$DEST/vitrage VITRAGE_DIR=$DEST/vitrage

View File

@ -28,6 +28,7 @@ console_scripts =
vitrage-api = vitrage.cmd.api:main vitrage-api = vitrage.cmd.api:main
vitrage-graph = vitrage.cmd.graph:main vitrage-graph = vitrage.cmd.graph:main
vitrage-notifier = vitrage.cmd.notifier:main vitrage-notifier = vitrage.cmd.notifier:main
vitrage-collector = vitrage.cmd.collector:main
vitrage.entity_graph = vitrage.entity_graph =
networkx = vitrage.graph.driver.networkx_graph:NXGraph networkx = vitrage.graph.driver.networkx_graph:NXGraph

44
vitrage/cmd/collector.py Normal file
View File

@ -0,0 +1,44 @@
# Copyright 2017 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
from oslo_service import service as os_service
from vitrage.datasources.listener_service import ListenerService
from vitrage.datasources.collector_notifier import CollectorNotifier
from vitrage.datasources import launcher as datasource_launcher
from vitrage.entity_graph import utils
from vitrage import service
def main():
"""Starts all the datasources drivers services"""
conf = service.prepare_service()
launcher = os_service.ServiceLauncher(conf)
rabbitmq = CollectorNotifier(conf)
callback = datasource_launcher.create_send_to_queue_callback(rabbitmq)
launcher.launch_service(ListenerService(conf,
utils.get_drivers(conf),
callback))
datasources = datasource_launcher.Launcher(conf, callback)
datasources.launch()
launcher.wait()
if __name__ == "__main__":
sys.exit(main())

View File

@ -13,7 +13,6 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import multiprocessing
from six.moves import queue from six.moves import queue
import sys import sys
@ -21,7 +20,6 @@ from oslo_service import service as os_service
from vitrage.api_handler import service as api_handler_svc from vitrage.api_handler import service as api_handler_svc
from vitrage.common.constants import EntityCategory from vitrage.common.constants import EntityCategory
from vitrage.datasources import launcher as datasource_launcher
from vitrage.datasources import OPENSTACK_CLUSTER from vitrage.datasources import OPENSTACK_CLUSTER
from vitrage.datasources.transformer_base import CLUSTER_ID from vitrage.datasources.transformer_base import CLUSTER_ID
from vitrage import entity_graph from vitrage import entity_graph
@ -44,20 +42,15 @@ def main():
conf = service.prepare_service() conf = service.prepare_service()
init_status = InitializationStatus() init_status = InitializationStatus()
mp_queue, evaluator_queue, evaluator, e_graph = init(conf) evaluator_queue, evaluator, e_graph = init(conf)
launcher = os_service.ServiceLauncher(conf) launcher = os_service.ServiceLauncher(conf)
datasources = datasource_launcher.Launcher(
conf,
datasource_launcher.create_send_to_queue_callback(mp_queue))
launcher.launch_service(entity_graph_svc.VitrageGraphService( launcher.launch_service(entity_graph_svc.VitrageGraphService(
conf, mp_queue, evaluator_queue, evaluator, e_graph, init_status)) conf, evaluator_queue, evaluator, e_graph, init_status))
launcher.launch_service(api_handler_svc.VitrageApiHandlerService( launcher.launch_service(api_handler_svc.VitrageApiHandlerService(
conf, e_graph, evaluator.scenario_repo)) conf, e_graph, evaluator.scenario_repo))
datasources.launch()
launcher.launch_service(consistency_svc.VitrageGraphConsistencyService( launcher.launch_service(consistency_svc.VitrageGraphConsistencyService(
conf, evaluator_queue, evaluator, e_graph, init_status)) conf, evaluator_queue, evaluator, e_graph, init_status))
@ -65,7 +58,6 @@ def main():
def init(conf): def init(conf):
mp_queue = multiprocessing.Queue()
evaluator_q = queue.Queue() evaluator_q = queue.Queue()
e_graph = entity_graph.get_graph_driver(conf)( e_graph = entity_graph.get_graph_driver(conf)(
'Entity Graph', 'Entity Graph',
@ -75,7 +67,7 @@ def init(conf):
evaluator = ScenarioEvaluator(conf, e_graph, scenario_repo, evaluator_q) evaluator = ScenarioEvaluator(conf, e_graph, scenario_repo, evaluator_q)
return mp_queue, evaluator_q, evaluator, e_graph return evaluator_q, evaluator, e_graph
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -47,5 +47,9 @@ OPTS = [
' in case of fault'), ' in case of fault'),
cfg.StrOpt('notification_topic', cfg.StrOpt('notification_topic',
default='vitrage_notifications', default='vitrage_notifications',
help='Vitrage configured notifications topic') help='Vitrage configured notifications topic'),
cfg.StrOpt('notification_topic_collector',
default='collector_event_notification',
help='The topic on which event will be sent from the '
'datasources to the graph processor')
] ]

View File

@ -42,7 +42,7 @@ class CinderVolumeDriver(DriverBase):
search_opts={'all_tenants': 1})), search_opts={'all_tenants': 1})),
CINDER_VOLUME_DATASOURCE, CINDER_VOLUME_DATASOURCE,
datasource_action, datasource_action,
'manager') *self.properties_to_filter_out())
def enrich_event(self, event, event_type): def enrich_event(self, event, event_type):
event[DSProps.EVENT_TYPE] = event_type event[DSProps.EVENT_TYPE] = event_type
@ -51,6 +51,10 @@ class CinderVolumeDriver(DriverBase):
CINDER_VOLUME_DATASOURCE, CINDER_VOLUME_DATASOURCE,
DatasourceAction.UPDATE)[0] DatasourceAction.UPDATE)[0]
@staticmethod
def properties_to_filter_out():
return ['manager']
@staticmethod @staticmethod
def get_event_types(): def get_event_types():
return ['volume.create.start', return ['volume.create.start',

View File

@ -0,0 +1,53 @@
# Copyright 2017 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log
import oslo_messaging
from vitrage.messaging import get_transport
LOG = log.getLogger(__name__)
class CollectorNotifier(object):
"""Allows writing to message bus"""
def __init__(self, conf):
self.oslo_notifier = None
try:
topic = conf.datasources.notification_topic_collector
self.oslo_notifier = oslo_messaging.Notifier(
get_transport(conf),
driver='messagingv2',
publisher_id='datasources.events',
topics=[topic])
except Exception as e:
LOG.info('Collector notifier - missing configuration %s'
% str(e))
@property
def enabled(self):
return self.oslo_notifier is not None
def notify_when_applicable(self, enriched_event):
"""Callback subscribed to driver.graph updates
:param enriched_event: the event with enriched data added by the driver
"""
try:
self.oslo_notifier.info({}, '', enriched_event)
except Exception as e:
LOG.exception('Datasource event cannot be notified - %s\n'
'Error - %s', enriched_event, e)

View File

@ -114,3 +114,8 @@ class DriverBase(object):
""" """
return [] return []
@staticmethod
def properties_to_filter_out():
"""Return a list of properties to be removed from the event"""
return []

View File

@ -15,10 +15,14 @@
from vitrage.common.constants import DatasourceAction from vitrage.common.constants import DatasourceAction
from vitrage.common.constants import DatasourceProperties as DSProps from vitrage.common.constants import DatasourceProperties as DSProps
from vitrage.datasources.cinder.volume import CINDER_VOLUME_DATASOURCE from vitrage.datasources.cinder.volume import CINDER_VOLUME_DATASOURCE
from vitrage.datasources.cinder.volume.driver import CinderVolumeDriver
from vitrage.datasources.driver_base import DriverBase from vitrage.datasources.driver_base import DriverBase
from vitrage.datasources.heat.stack import HEAT_STACK_DATASOURCE from vitrage.datasources.heat.stack import HEAT_STACK_DATASOURCE
from vitrage.datasources.neutron.network.driver import NetworkDriver
from vitrage.datasources.neutron.network import NEUTRON_NETWORK_DATASOURCE from vitrage.datasources.neutron.network import NEUTRON_NETWORK_DATASOURCE
from vitrage.datasources.neutron.port.driver import PortDriver
from vitrage.datasources.neutron.port import NEUTRON_PORT_DATASOURCE from vitrage.datasources.neutron.port import NEUTRON_PORT_DATASOURCE
from vitrage.datasources.nova.instance.driver import InstanceDriver
from vitrage.datasources.nova.instance import NOVA_INSTANCE_DATASOURCE from vitrage.datasources.nova.instance import NOVA_INSTANCE_DATASOURCE
from vitrage import os_clients from vitrage import os_clients
@ -28,13 +32,20 @@ class HeatStackDriver(DriverBase):
_client = None _client = None
conf = None conf = None
RESOURCE_TYPE_CONVERSION = { RESOURCE_TYPE = {
'OS::Nova::Server': NOVA_INSTANCE_DATASOURCE, 'OS::Nova::Server': NOVA_INSTANCE_DATASOURCE,
'OS::Cinder::Volume': CINDER_VOLUME_DATASOURCE, 'OS::Cinder::Volume': CINDER_VOLUME_DATASOURCE,
'OS::Neutron::Net': NEUTRON_NETWORK_DATASOURCE, 'OS::Neutron::Net': NEUTRON_NETWORK_DATASOURCE,
'OS::Neutron::Port': NEUTRON_PORT_DATASOURCE 'OS::Neutron::Port': NEUTRON_PORT_DATASOURCE
} }
RESOURCE_DRIVERS = {
'OS::Nova::Server': InstanceDriver,
'OS::Cinder::Volume': CinderVolumeDriver,
'OS::Neutron::Net': NetworkDriver,
'OS::Neutron::Port': PortDriver
}
def __init__(self, conf): def __init__(self, conf):
super(HeatStackDriver, self).__init__() super(HeatStackDriver, self).__init__()
HeatStackDriver.conf = conf HeatStackDriver.conf = conf
@ -72,19 +83,21 @@ class HeatStackDriver(DriverBase):
event = HeatStackDriver._retrieve_stack_resources( event = HeatStackDriver._retrieve_stack_resources(
event, event['stack_identity']) event, event['stack_identity'])
return HeatStackDriver.make_pickleable([event], return HeatStackDriver.make_pickleable(
[event],
HEAT_STACK_DATASOURCE, HEAT_STACK_DATASOURCE,
DatasourceAction.UPDATE)[0] DatasourceAction.UPDATE,
*self.properties_to_filter_out())[0]
def _filter_resource_types(self): def _filter_resource_types(self):
types = self.conf.datasources.types types = self.conf.datasources.types
tmp_dict = {} tmp_dict = {}
for key, value in HeatStackDriver.RESOURCE_TYPE_CONVERSION.items(): for key, value in HeatStackDriver.RESOURCE_TYPE.items():
if value in types: if value in types:
tmp_dict[key] = value tmp_dict[key] = value
HeatStackDriver.RESOURCE_TYPE_CONVERSION = tmp_dict HeatStackDriver.RESOURCE_TYPE = tmp_dict
def _make_stacks_list(self, stacks): def _make_stacks_list(self, stacks):
return [stack.__dict__ for stack in stacks] return [stack.__dict__ for stack in stacks]
@ -93,14 +106,28 @@ class HeatStackDriver(DriverBase):
return [self._retrieve_stack_resources(stack, stack['id']) return [self._retrieve_stack_resources(stack, stack['id'])
for stack in stacks] for stack in stacks]
@staticmethod
def properties_to_filter_out():
return ['manager', '_info']
@staticmethod @staticmethod
def _retrieve_stack_resources(stack, stack_id): def _retrieve_stack_resources(stack, stack_id):
resources = HeatStackDriver.client().resources.list(stack_id) resources = HeatStackDriver.client().resources.list(stack_id)
stack['resources'] = [resource.__dict__ for resource in resources stack['resources'] = [resource.__dict__ for resource in resources
if resource.__dict__['resource_type'] in if resource.__dict__['resource_type'] in
HeatStackDriver.RESOURCE_TYPE_CONVERSION] HeatStackDriver.RESOURCE_TYPE]
HeatStackDriver._filter_stack_resources(stack)
return stack return stack
@staticmethod
def _filter_stack_resources(stack):
for resource in stack['resources']:
props = HeatStackDriver.RESOURCE_DRIVERS[
resource['resource_type']].properties_to_filter_out()
for prop in props:
if prop in resource:
del resource[prop]
def get_all(self, datasource_action): def get_all(self, datasource_action):
stacks = HeatStackDriver.client().stacks.list(global_tenant=True) stacks = HeatStackDriver.client().stacks.list(global_tenant=True)
stacks_list = self._make_stacks_list(stacks) stacks_list = self._make_stacks_list(stacks)
@ -108,4 +135,4 @@ class HeatStackDriver(DriverBase):
return self.make_pickleable(stacks_with_resources, return self.make_pickleable(stacks_with_resources,
HEAT_STACK_DATASOURCE, HEAT_STACK_DATASOURCE,
datasource_action, datasource_action,
'manager') *self.properties_to_filter_out())

View File

@ -32,7 +32,7 @@ import vitrage.graph.utils as graph_utils
class HeatStackTransformer(ResourceTransformerBase): class HeatStackTransformer(ResourceTransformerBase):
RESOURCE_TYPE_CONVERSION = { RESOURCE_TYPE = {
'OS::Nova::Server': NOVA_INSTANCE_DATASOURCE, 'OS::Nova::Server': NOVA_INSTANCE_DATASOURCE,
'OS::Cinder::Volume': CINDER_VOLUME_DATASOURCE, 'OS::Cinder::Volume': CINDER_VOLUME_DATASOURCE,
'OS::Neutron::Net': NEUTRON_NETWORK_DATASOURCE, 'OS::Neutron::Net': NEUTRON_NETWORK_DATASOURCE,
@ -124,7 +124,7 @@ class HeatStackTransformer(ResourceTransformerBase):
for neighbor in entity_event['resources']: for neighbor in entity_event['resources']:
neighbor_id = neighbor['physical_resource_id'] neighbor_id = neighbor['physical_resource_id']
neighbor_datasource_type = \ neighbor_datasource_type = \
self.RESOURCE_TYPE_CONVERSION[neighbor['resource_type']] self.RESOURCE_TYPE[neighbor['resource_type']]
neighbors.append(self._create_neighbor(entity_event, neighbors.append(self._create_neighbor(entity_event,
neighbor_id, neighbor_id,
neighbor_datasource_type, neighbor_datasource_type,

View File

@ -16,19 +16,15 @@
import itertools import itertools
from oslo_service import service as os_service from oslo_service import service as os_service
from oslo_utils import importutils as utils
from vitrage.common.constants import DatasourceOpts as DSOpts
from vitrage.common.constants import UpdateMethod
from vitrage.datasources.listener_service import ListenerService
from vitrage.datasources.services import ChangesService from vitrage.datasources.services import ChangesService
from vitrage.datasources.services import SnapshotsService from vitrage.datasources.services import SnapshotsService
from vitrage.utils import opt_exists from vitrage.entity_graph import utils
def create_send_to_queue_callback(queue): def create_send_to_queue_callback(rabbitq):
def send_to_queue_callback(event): def send_to_queue_callback(event):
queue.put(event) rabbitq.notify_when_applicable(event)
return send_to_queue_callback return send_to_queue_callback
@ -37,43 +33,26 @@ class Launcher(object):
def __init__(self, conf, callback): def __init__(self, conf, callback):
self.conf = conf self.conf = conf
self.callback = callback self.callback = callback
self.snapshot_datasources = self._register_snapshot_datasources(conf) self.drivers = utils.get_drivers(conf)
self.services = self._register_services() self.services = self._register_services()
def launch(self): def launch(self):
# launcher = os_service.ServiceLauncher(self.conf) # For Debugging
launcher = os_service.ProcessLauncher(self.conf) launcher = os_service.ProcessLauncher(self.conf)
for service in self.services: for service in self.services:
launcher.launch_service(service, 1) launcher.launch_service(service, 1)
@staticmethod
def _register_snapshot_datasources(conf):
return {datasource: utils.import_object(conf[datasource].driver, conf)
for datasource in conf.datasources.types}
def _register_services(self): def _register_services(self):
pull_datasources = self._get_pull_datasources(self.conf) pull_datasources = utils.get_pull_datasources(self.conf)
changes_services = \ changes_services = \
(ChangesService(self.conf, (ChangesService(self.conf,
[self.snapshot_datasources[datasource]], [self.drivers[datasource]],
self.conf[datasource].changes_interval, self.conf[datasource].changes_interval,
self.callback) self.callback)
for datasource in pull_datasources) for datasource in pull_datasources)
snapshot_service = (SnapshotsService(self.conf, snapshot_service = (SnapshotsService(self.conf,
self.snapshot_datasources, self.drivers,
self.callback),)
listener_service = (ListenerService(self.conf,
self.snapshot_datasources,
self.callback),) self.callback),)
return itertools.chain(changes_services, return itertools.chain(changes_services,
snapshot_service, snapshot_service)
listener_service)
@staticmethod
def _get_pull_datasources(conf):
return (datasource for datasource in conf.datasources.types
if conf[datasource].update_method.lower() == UpdateMethod.PULL
and opt_exists(conf[datasource], DSOpts.CHANGES_INTERVAL))

View File

@ -18,7 +18,7 @@ from oslo_log import log
import oslo_messaging import oslo_messaging
from oslo_service import service as os_service from oslo_service import service as os_service
from vitrage.common.constants import UpdateMethod from vitrage.entity_graph import utils
from vitrage import messaging from vitrage import messaging
@ -47,6 +47,9 @@ class ListenerService(os_service.Service):
def stop(self, graceful=False): def stop(self, graceful=False):
LOG.info("Vitrage data source Listener Service - Stopping...") LOG.info("Vitrage data source Listener Service - Stopping...")
# Should it be here?
# self.listener.stop()
# self.listener.wait()
super(ListenerService, self).stop(graceful) super(ListenerService, self).stop(graceful)
LOG.info("Vitrage data source Listener Service - Stopped!") LOG.info("Vitrage data source Listener Service - Stopped!")
@ -54,7 +57,7 @@ class ListenerService(os_service.Service):
@classmethod @classmethod
def _create_callbacks_by_events_dict(cls, drivers, conf): def _create_callbacks_by_events_dict(cls, drivers, conf):
ret = defaultdict(list) ret = defaultdict(list)
push_drivers = cls._get_push_drivers(drivers, conf) push_drivers = utils.get_push_datasources(drivers, conf)
for driver in push_drivers: for driver in push_drivers:
for event in driver.get_event_types(): for event in driver.get_event_types():
@ -62,11 +65,6 @@ class ListenerService(os_service.Service):
return ret return ret
@staticmethod
def _get_push_drivers(drivers, conf):
return (driver_cls for datasource, driver_cls in drivers.items()
if conf[datasource].update_method.lower() == UpdateMethod.PUSH)
def _get_topic_listener(self, conf, topic, callback): def _get_topic_listener(self, conf, topic, callback):
# Create a listener for each topic # Create a listener for each topic
transport = messaging.get_transport(conf) transport = messaging.get_transport(conf)

View File

@ -32,4 +32,8 @@ class HostDriver(NovaDriverBase):
self.filter_none_compute_hosts(self.client.hosts.list()), self.filter_none_compute_hosts(self.client.hosts.list()),
NOVA_HOST_DATASOURCE, NOVA_HOST_DATASOURCE,
datasource_action, datasource_action,
'manager') *self.properties_to_filter_out())
@staticmethod
def properties_to_filter_out():
return ['manager']

View File

@ -30,9 +30,7 @@ class InstanceDriver(NovaDriverBase):
search_opts={'all_tenants': 1})), search_opts={'all_tenants': 1})),
NOVA_INSTANCE_DATASOURCE, NOVA_INSTANCE_DATASOURCE,
datasource_action, datasource_action,
'manager', *self.properties_to_filter_out())
'OS-EXT-SRV-ATTR:user_data',
'_info')
def enrich_event(self, event, event_type): def enrich_event(self, event, event_type):
event[DSProps.EVENT_TYPE] = event_type event[DSProps.EVENT_TYPE] = event_type
@ -41,6 +39,10 @@ class InstanceDriver(NovaDriverBase):
NOVA_INSTANCE_DATASOURCE, NOVA_INSTANCE_DATASOURCE,
DatasourceAction.UPDATE)[0] DatasourceAction.UPDATE)[0]
@staticmethod
def properties_to_filter_out():
return ['manager', 'OS-EXT-SRV-ATTR:user_data', '_info']
@staticmethod @staticmethod
def get_event_types(): def get_event_types():
# Add event_types to receive notifications about # Add event_types to receive notifications about

View File

@ -32,5 +32,8 @@ class ZoneDriver(NovaDriverBase):
self.client.availability_zones.list()), self.client.availability_zones.list()),
NOVA_ZONE_DATASOURCE, NOVA_ZONE_DATASOURCE,
datasource_action, datasource_action,
'manager', *self.properties_to_filter_out())
'_info')
@staticmethod
def properties_to_filter_out():
return ['manager', '_info']

View File

@ -15,9 +15,11 @@
import datetime import datetime
from oslo_log import log from oslo_log import log
import oslo_messaging
from oslo_service import service as os_service from oslo_service import service as os_service
from vitrage.entity_graph.processor import processor as proc from vitrage.entity_graph.processor import processor as proc
from vitrage import messaging
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
@ -26,31 +28,33 @@ class VitrageGraphService(os_service.Service):
def __init__(self, def __init__(self,
conf, conf,
event_queue,
evaluator_queue, evaluator_queue,
evaluator, evaluator,
entity_graph, entity_graph,
initialization_status): initialization_status):
super(VitrageGraphService, self).__init__() super(VitrageGraphService, self).__init__()
self.queue = event_queue
self.conf = conf self.conf = conf
self.evaluator = evaluator self.evaluator = evaluator
self.processor = proc.Processor(self.conf, self.processor = proc.Processor(self.conf,
initialization_status, initialization_status,
e_graph=entity_graph) e_graph=entity_graph)
self.evaluator_queue = evaluator_queue self.evaluator_queue = evaluator_queue
self.listener = self._create_datasources_event_listener(conf)
def start(self): def start(self):
LOG.info("Vitrage Graph Service - Starting...") LOG.info("Vitrage Graph Service - Starting...")
super(VitrageGraphService, self).start() super(VitrageGraphService, self).start()
self.tg.add_timer(0.1, self._process_event_non_blocking) self.tg.add_timer(0.1, self._process_event_non_blocking)
self.listener.start()
LOG.info("Vitrage Graph Service - Started!") LOG.info("Vitrage Graph Service - Started!")
def stop(self, graceful=False): def stop(self, graceful=False):
LOG.info("Vitrage Graph Service - Stopping...") LOG.info("Vitrage Graph Service - Stopping...")
self.listener.stop()
self.listener.wait()
super(VitrageGraphService, self).stop(graceful) super(VitrageGraphService, self).stop(graceful)
LOG.info("Vitrage Graph Service - Stopped!") LOG.info("Vitrage Graph Service - Stopped!")
@ -68,14 +72,12 @@ class VitrageGraphService(os_service.Service):
the queue they are done when timer returns. the queue they are done when timer returns.
""" """
start_time = datetime.datetime.now() start_time = datetime.datetime.now()
while not self.evaluator_queue.empty() or not self.queue.empty(): while not self.evaluator_queue.empty():
time_delta = datetime.datetime.now() - start_time time_delta = datetime.datetime.now() - start_time
if time_delta.total_seconds() >= 2: if time_delta.total_seconds() >= 2:
break break
if not self.evaluator_queue.empty(): if not self.evaluator_queue.empty():
self.do_process(self.evaluator_queue) self.do_process(self.evaluator_queue)
elif not self.queue.empty():
self.do_process(self.queue)
def do_process(self, queue): def do_process(self, queue):
try: try:
@ -83,3 +85,24 @@ class VitrageGraphService(os_service.Service):
self.processor.process_event(event) self.processor.process_event(event)
except Exception as e: except Exception as e:
LOG.exception("Exception: %s", e) LOG.exception("Exception: %s", e)
def _create_datasources_event_listener(self, conf):
topic = conf.datasources.notification_topic_collector
transport = messaging.get_transport(conf)
targets = [oslo_messaging.Target(topic=topic)]
return messaging.get_notification_listener(
transport, targets,
[PushNotificationsEndpoint(self.processor.process_event)],
allow_requeue=True)
class PushNotificationsEndpoint(object):
def __init__(self, process_event_callback):
self.process_event_callback = process_event_callback
def info(self, ctxt, publisher_id, event_type, payload, metadata):
try:
self.process_event_callback(payload)
except Exception as e:
LOG.exception(e)

View File

@ -0,0 +1,35 @@
# Copyright 2017 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils import importutils as utils
from vitrage.common.constants import DatasourceOpts as DSOpts
from vitrage.common.constants import UpdateMethod
from vitrage.utils import opt_exists
def get_drivers(conf):
return {datasource: utils.import_object(conf[datasource].driver, conf)
for datasource in conf.datasources.types}
def get_pull_datasources(conf):
return (datasource for datasource in conf.datasources.types
if conf[datasource].update_method.lower() == UpdateMethod.PULL
and opt_exists(conf[datasource], DSOpts.CHANGES_INTERVAL))
def get_push_datasources(drivers, conf):
return (driver_cls for datasource, driver_cls in drivers.items()
if conf[datasource].update_method.lower() == UpdateMethod.PUSH)

View File

@ -17,12 +17,11 @@ from oslo_utils import importutils as utils
from vitrage.common.constants import DatasourceOpts as DSOpts from vitrage.common.constants import DatasourceOpts as DSOpts
from vitrage.common.constants import UpdateMethod from vitrage.common.constants import UpdateMethod
from vitrage.datasources.launcher import Launcher
from vitrage.datasources.listener_service import ListenerService
from vitrage.datasources.nagios import NAGIOS_DATASOURCE from vitrage.datasources.nagios import NAGIOS_DATASOURCE
from vitrage.datasources.nova.host import NOVA_HOST_DATASOURCE from vitrage.datasources.nova.host import NOVA_HOST_DATASOURCE
from vitrage.datasources.nova.instance import NOVA_INSTANCE_DATASOURCE from vitrage.datasources.nova.instance import NOVA_INSTANCE_DATASOURCE
from vitrage.datasources.zabbix import ZABBIX_DATASOURCE from vitrage.datasources.zabbix import ZABBIX_DATASOURCE
from vitrage.entity_graph import utils as graph_utils
from vitrage.tests import base from vitrage.tests import base
@ -174,20 +173,20 @@ class DatasourceUpdateMethod(base.BaseTest):
def test_datasource_update_method_push(self): def test_datasource_update_method_push(self):
drivers = {driver: utils.import_class(self.conf[driver].driver) drivers = {driver: utils.import_class(self.conf[driver].driver)
for driver in self.conf.datasources.types} for driver in self.conf.datasources.types}
push_drivers = ListenerService._get_push_drivers( push_drivers = graph_utils.get_push_datasources(drivers=drivers,
drivers=drivers, conf=self.conf) conf=self.conf)
self.assertSequenceEqual(set(push_drivers), {utils.import_class( self.assertSequenceEqual(set(push_drivers), {utils.import_class(
self.conf[NOVA_INSTANCE_DATASOURCE].driver), utils.import_class( self.conf[NOVA_INSTANCE_DATASOURCE].driver), utils.import_class(
self.conf[ZABBIX_DATASOURCE_PUSH].driver)}) self.conf[ZABBIX_DATASOURCE_PUSH].driver)})
def test_datasource_update_method_pull(self): def test_datasource_update_method_pull(self):
pull_drivers = tuple(Launcher._get_pull_datasources(self.conf)) pull_drivers = tuple(graph_utils.get_pull_datasources(self.conf))
self.assertSequenceEqual(pull_drivers, self.assertSequenceEqual(pull_drivers,
(NAGIOS_DATASOURCE, (NAGIOS_DATASOURCE,
ZABBIX_DATASOURCE_PULL)) ZABBIX_DATASOURCE_PULL))
def test_datasource_update_method_pull_with_no_changes_interval(self): def test_datasource_update_method_pull_with_no_changes_interval(self):
pull_drivers = tuple(Launcher._get_pull_datasources(self.conf)) pull_drivers = tuple(graph_utils.get_pull_datasources(self.conf))
self.assertNotIn(ZABBIX_DATASOURCE_PULL_NO_INTERVAL, pull_drivers) self.assertNotIn(ZABBIX_DATASOURCE_PULL_NO_INTERVAL, pull_drivers)
def test_datasources_notification_topic(self): def test_datasources_notification_topic(self):

View File

@ -13,7 +13,6 @@
# under the License. # under the License.
import json import json
import traceback
from oslo_log import log as logging from oslo_log import log as logging
@ -49,8 +48,7 @@ class TestAlarms(BaseAlarmsTest):
api_alarms, cli_alarms, AODH_DATASOURCE, api_alarms, cli_alarms, AODH_DATASOURCE,
utils.uni2str(instances[0].id)) utils.uni2str(instances[0].id))
except Exception as e: except Exception as e:
LOG.exception(e) self._handle_exception(e)
traceback.print_exc()
raise raise
finally: finally:
self._delete_ceilometer_alarms() self._delete_ceilometer_alarms()

View File

@ -13,6 +13,7 @@
# under the License. # under the License.
import time import time
import traceback
from oslo_log import log as logging from oslo_log import log as logging
from oslotest import base from oslotest import base
@ -379,3 +380,13 @@ class BaseApiTest(base.BaseTestCase):
if not public_nets: if not public_nets:
return None return None
return public_nets[0] return public_nets[0]
def _print_entity_graph(self):
api_graph = self.vitrage_client.topology.get(all_tenants=True)
graph = self._create_graph_from_graph_dictionary(api_graph)
LOG.info('Entity Graph: \n%s', graph.json_output_graph())
def _handle_exception(self, exception):
traceback.print_exc()
LOG.exception(exception)
self._print_entity_graph()

View File

@ -12,8 +12,6 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import traceback
from oslo_log import log as logging from oslo_log import log as logging
from vitrage_tempest_tests.tests import utils from vitrage_tempest_tests.tests import utils
@ -59,8 +57,7 @@ class TestAodhAlarm(BaseAlarmsTest):
num_edges, num_edges,
entities) entities)
except Exception as e: except Exception as e:
traceback.print_exc() self._handle_exception(e)
LOG.exception(e)
raise raise
finally: finally:
self._delete_ceilometer_alarms() self._delete_ceilometer_alarms()
@ -90,8 +87,7 @@ class TestAodhAlarm(BaseAlarmsTest):
num_edges, num_edges,
entities) entities)
except Exception as e: except Exception as e:
traceback.print_exc() self._handle_exception(e)
LOG.exception(e)
raise raise
finally: finally:
self._delete_ceilometer_alarms() self._delete_ceilometer_alarms()

View File

@ -12,8 +12,6 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import traceback
from oslo_log import log as logging from oslo_log import log as logging
from vitrage_tempest_tests.tests.api.topology.base import BaseTopologyTest from vitrage_tempest_tests.tests.api.topology.base import BaseTopologyTest
from vitrage_tempest_tests.tests import utils from vitrage_tempest_tests.tests import utils
@ -58,8 +56,7 @@ class TestCinderVolume(BaseTopologyTest):
num_edges, num_edges,
entities) entities)
except Exception as e: except Exception as e:
traceback.print_exc() self._handle_exception(e)
LOG.exception(e)
raise raise
finally: finally:
self._rollback_to_default() self._rollback_to_default()

View File

@ -13,7 +13,6 @@
# under the License. # under the License.
import time import time
import traceback
from oslo_log import log as logging from oslo_log import log as logging
from vitrage_tempest_tests.tests import utils from vitrage_tempest_tests.tests import utils
@ -66,8 +65,7 @@ class TestHeatStack(BaseTopologyTest):
num_edges, num_edges,
entities) entities)
except Exception as e: except Exception as e:
traceback.print_exc() self._handle_exception(e)
LOG.exception(e)
raise raise
finally: finally:
self._delete_stacks() self._delete_stacks()

View File

@ -12,8 +12,6 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import traceback
from oslo_log import log as logging from oslo_log import log as logging
from vitrage.common.constants import VertexProperties as VProps from vitrage.common.constants import VertexProperties as VProps
@ -66,8 +64,7 @@ class TestNeutron(BaseTopologyTest):
num_edges, num_edges,
entities) entities)
except Exception as e: except Exception as e:
traceback.print_exc() self._handle_exception(e)
LOG.exception(e)
raise raise
finally: finally:
self._delete_instances() self._delete_instances()

View File

@ -12,8 +12,6 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import traceback
from oslo_log import log as logging from oslo_log import log as logging
from vitrage_tempest_tests.tests.api.topology.base import BaseTopologyTest from vitrage_tempest_tests.tests.api.topology.base import BaseTopologyTest
from vitrage_tempest_tests.tests import utils from vitrage_tempest_tests.tests import utils
@ -54,8 +52,7 @@ class TestNova(BaseTopologyTest):
num_edges, num_edges,
entities) entities)
except Exception as e: except Exception as e:
traceback.print_exc() self._handle_exception(e)
LOG.exception(e)
raise raise
finally: finally:
self._rollback_to_default() self._rollback_to_default()

View File

@ -15,7 +15,6 @@
import os import os
import socket import socket
import time import time
import traceback
from oslo_log import log as logging from oslo_log import log as logging
from vitrage_tempest_tests.tests.api.base import BaseApiTest from vitrage_tempest_tests.tests.api.base import BaseApiTest
@ -57,8 +56,7 @@ class TestStaticPhysical(BaseApiTest):
num_edges, num_edges,
entities) entities)
except Exception as e: except Exception as e:
traceback.print_exc() self._handle_exception(e)
LOG.exception(e)
raise raise
finally: finally:
self._delete_switches() self._delete_switches()

View File

@ -12,8 +12,6 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import traceback
from oslo_log import log as logging from oslo_log import log as logging
from vitrage.common.constants import VertexProperties as VProps from vitrage.common.constants import VertexProperties as VProps
@ -54,8 +52,7 @@ class TestRca(BaseRcaTest):
self._compare_rca(api_rca, cli_rca) self._compare_rca(api_rca, cli_rca)
except Exception as e: except Exception as e:
traceback.print_exc() self._handle_exception(e)
LOG.exception(e)
raise raise
finally: finally:
self._clean_all() self._clean_all()
@ -84,8 +81,7 @@ class TestRca(BaseRcaTest):
self._validate_relationship(links=api_rca['links'], self._validate_relationship(links=api_rca['links'],
alarms=api_rca['nodes']) alarms=api_rca['nodes'])
except Exception as e: except Exception as e:
traceback.print_exc() self._handle_exception(e)
LOG.exception(e)
raise raise
finally: finally:
self._clean_all() self._clean_all()
@ -108,8 +104,7 @@ class TestRca(BaseRcaTest):
self._validate_deduce_alarms(alarms=api_alarms, self._validate_deduce_alarms(alarms=api_alarms,
instances=instances) instances=instances)
except Exception as e: except Exception as e:
traceback.print_exc() self._handle_exception(e)
LOG.exception(e)
raise raise
finally: finally:
self._clean_all() self._clean_all()
@ -133,8 +128,7 @@ class TestRca(BaseRcaTest):
self._validate_set_state(topology=topology['nodes'], self._validate_set_state(topology=topology['nodes'],
instances=instances) instances=instances)
except Exception as e: except Exception as e:
traceback.print_exc() self._handle_exception(e)
LOG.exception(e)
raise raise
finally: finally:
self._clean_all() self._clean_all()
@ -159,8 +153,7 @@ class TestRca(BaseRcaTest):
self._validate_notifier(alarms=ceilometer_alarms, self._validate_notifier(alarms=ceilometer_alarms,
vitrage_alarms=vitrage_alarms) vitrage_alarms=vitrage_alarms)
except Exception as e: except Exception as e:
traceback.print_exc() self._handle_exception(e)
LOG.exception(e)
raise raise
finally: finally:
self._clean_all() self._clean_all()

View File

@ -13,7 +13,6 @@
# under the License. # under the License.
import json import json
import traceback
from oslo_log import log as logging from oslo_log import log as logging
@ -53,8 +52,7 @@ class TestResource(BaseApiTest):
self._compare_resources(api_resources, cli_resources) self._compare_resources(api_resources, cli_resources)
except Exception as e: except Exception as e:
LOG.exception(e) self._handle_exception(e)
traceback.print_exc()
raise raise
finally: finally:
self._delete_instances() self._delete_instances()
@ -72,8 +70,7 @@ class TestResource(BaseApiTest):
resources = self.vitrage_client.resource.list() resources = self.vitrage_client.resource.list()
self.assertEqual(4, len(resources)) self.assertEqual(4, len(resources))
except Exception as e: except Exception as e:
LOG.exception(e) self._handle_exception(e)
traceback.print_exc()
raise raise
finally: finally:
self._delete_instances() self._delete_instances()
@ -92,8 +89,7 @@ class TestResource(BaseApiTest):
resources = self.vitrage_client.resource.list(all_tenants=True) resources = self.vitrage_client.resource.list(all_tenants=True)
self.assertEqual(4, len(resources)) self.assertEqual(4, len(resources))
except Exception as e: except Exception as e:
LOG.exception(e) self._handle_exception(e)
traceback.print_exc()
raise raise
finally: finally:
self._delete_instances() self._delete_instances()
@ -113,8 +109,7 @@ class TestResource(BaseApiTest):
all_tenants=False) all_tenants=False)
self.assertEqual(1, len(resources)) self.assertEqual(1, len(resources))
except Exception as e: except Exception as e:
LOG.exception(e) self._handle_exception(e)
traceback.print_exc()
raise raise
finally: finally:
self._delete_instances() self._delete_instances()
@ -131,8 +126,7 @@ class TestResource(BaseApiTest):
all_tenants=False) all_tenants=False)
self.assertEqual(0, len(resources)) self.assertEqual(0, len(resources))
except Exception as e: except Exception as e:
LOG.exception(e) self._handle_exception(e)
traceback.print_exc()
raise raise
finally: finally:
self._delete_instances() self._delete_instances()
@ -158,8 +152,7 @@ class TestResource(BaseApiTest):
self.vitrage_client.resource.show('test_for_no_existing') self.vitrage_client.resource.show('test_for_no_existing')
self.assertIsNone(resource) self.assertIsNone(resource)
except Exception as e: except Exception as e:
LOG.exception(e) self._handle_exception(e)
traceback.print_exc()
raise raise
finally: finally:
self._delete_instances() self._delete_instances()

View File

@ -12,8 +12,6 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import traceback
from oslo_log import log as logging from oslo_log import log as logging
from vitrage_tempest_tests.tests.api.topology.base import BaseTopologyTest from vitrage_tempest_tests.tests.api.topology.base import BaseTopologyTest
@ -90,8 +88,7 @@ class TestTopology(BaseTopologyTest):
num_edges, num_edges,
entities) entities)
except Exception as e: except Exception as e:
traceback.print_exc() self._handle_exception(e)
LOG.exception(e)
raise raise
finally: finally:
self._rollback_to_default() self._rollback_to_default()
@ -127,8 +124,7 @@ class TestTopology(BaseTopologyTest):
num_edges, num_edges,
entities) entities)
except Exception as e: except Exception as e:
traceback.print_exc() self._handle_exception(e)
LOG.exception(e)
raise raise
finally: finally:
self._rollback_to_default() self._rollback_to_default()
@ -162,8 +158,7 @@ class TestTopology(BaseTopologyTest):
num_edges, num_edges,
entities) entities)
except Exception as e: except Exception as e:
traceback.print_exc() self._handle_exception(e)
LOG.exception(e)
raise raise
finally: finally:
self._rollback_to_default() self._rollback_to_default()
@ -192,8 +187,7 @@ class TestTopology(BaseTopologyTest):
self.num_default_edges, self.num_default_edges,
entities) entities)
except Exception as e: except Exception as e:
traceback.print_exc() self._handle_exception(e)
LOG.exception(e)
raise raise
finally: finally:
self._rollback_to_default() self._rollback_to_default()
@ -222,8 +216,7 @@ class TestTopology(BaseTopologyTest):
self.num_default_edges, self.num_default_edges,
entities) entities)
except Exception as e: except Exception as e:
traceback.print_exc() self._handle_exception(e)
LOG.exception(e)
raise raise
finally: finally:
self._rollback_to_default() self._rollback_to_default()
@ -257,8 +250,7 @@ class TestTopology(BaseTopologyTest):
num_edges, num_edges,
entities) entities)
except Exception as e: except Exception as e:
traceback.print_exc() self._handle_exception(e)
LOG.exception(e)
raise raise
finally: finally:
self._rollback_to_default() self._rollback_to_default()
@ -290,8 +282,7 @@ class TestTopology(BaseTopologyTest):
self.num_default_edges, self.num_default_edges,
entities) entities)
except Exception as e: except Exception as e:
traceback.print_exc() self._handle_exception(e)
LOG.exception(e)
raise raise
finally: finally:
self._rollback_to_default() self._rollback_to_default()
@ -328,8 +319,7 @@ class TestTopology(BaseTopologyTest):
num_edges, num_edges,
entities) entities)
except Exception as e: except Exception as e:
traceback.print_exc() self._handle_exception(e)
LOG.exception(e)
raise raise
finally: finally:
self._rollback_to_default() self._rollback_to_default()
@ -383,8 +373,7 @@ class TestTopology(BaseTopologyTest):
0, 0,
len(api_graph['links']), 'num of edges') len(api_graph['links']), 'num of edges')
except Exception as e: except Exception as e:
traceback.print_exc() self._handle_exception(e)
LOG.exception(e)
raise raise
finally: finally:
self._rollback_to_default() self._rollback_to_default()
@ -409,8 +398,7 @@ class TestTopology(BaseTopologyTest):
# Test Assertions # Test Assertions
self.assertEqual({}, api_graph) self.assertEqual({}, api_graph)
except Exception as e: except Exception as e:
traceback.print_exc() self._handle_exception(e)
LOG.exception(e)
raise raise
finally: finally:
self._rollback_to_default() self._rollback_to_default()