create collector service
Change-Id: Ic9f84132a41aac3215725208c3ea7fa5cb905cc6
This commit is contained in:
parent
a503147045
commit
bdfa317e09
@ -231,6 +231,7 @@ function start_vitrage {
|
||||
fi
|
||||
fi
|
||||
|
||||
run_process vitrage-collector "$VITRAGE_BIN_DIR/vitrage-collector --config-file $VITRAGE_CONF"
|
||||
run_process vitrage-graph "$VITRAGE_BIN_DIR/vitrage-graph --config-file $VITRAGE_CONF"
|
||||
run_process vitrage-notifier "$VITRAGE_BIN_DIR/vitrage-notifier --config-file $VITRAGE_CONF"
|
||||
}
|
||||
@ -242,7 +243,7 @@ function stop_vitrage {
|
||||
restart_apache_server
|
||||
fi
|
||||
# Kill the vitrage screen windows
|
||||
for serv in vitrage-api vitrage-graph vitrage-notifier; do
|
||||
for serv in vitrage-api vitrage-collector vitrage-graph vitrage-notifier; do
|
||||
stop_process $serv
|
||||
done
|
||||
}
|
||||
|
@ -6,6 +6,9 @@ enable_service vitrage-graph
|
||||
# Notifier
|
||||
enable_service vitrage-notifier
|
||||
|
||||
# Notifier
|
||||
enable_service vitrage-collector
|
||||
|
||||
|
||||
# Default directories
|
||||
VITRAGE_DIR=$DEST/vitrage
|
||||
|
@ -28,6 +28,7 @@ console_scripts =
|
||||
vitrage-api = vitrage.cmd.api:main
|
||||
vitrage-graph = vitrage.cmd.graph:main
|
||||
vitrage-notifier = vitrage.cmd.notifier:main
|
||||
vitrage-collector = vitrage.cmd.collector:main
|
||||
|
||||
vitrage.entity_graph =
|
||||
networkx = vitrage.graph.driver.networkx_graph:NXGraph
|
||||
|
44
vitrage/cmd/collector.py
Normal file
44
vitrage/cmd/collector.py
Normal file
@ -0,0 +1,44 @@
|
||||
# Copyright 2017 - Nokia
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import sys
|
||||
|
||||
from oslo_service import service as os_service
|
||||
from vitrage.datasources.listener_service import ListenerService
|
||||
|
||||
from vitrage.datasources.collector_notifier import CollectorNotifier
|
||||
from vitrage.datasources import launcher as datasource_launcher
|
||||
from vitrage.entity_graph import utils
|
||||
from vitrage import service
|
||||
|
||||
|
||||
def main():
|
||||
"""Starts all the datasources drivers services"""
|
||||
|
||||
conf = service.prepare_service()
|
||||
launcher = os_service.ServiceLauncher(conf)
|
||||
rabbitmq = CollectorNotifier(conf)
|
||||
callback = datasource_launcher.create_send_to_queue_callback(rabbitmq)
|
||||
launcher.launch_service(ListenerService(conf,
|
||||
utils.get_drivers(conf),
|
||||
callback))
|
||||
|
||||
datasources = datasource_launcher.Launcher(conf, callback)
|
||||
datasources.launch()
|
||||
|
||||
launcher.wait()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
@ -13,7 +13,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import multiprocessing
|
||||
from six.moves import queue
|
||||
import sys
|
||||
|
||||
@ -21,7 +20,6 @@ from oslo_service import service as os_service
|
||||
|
||||
from vitrage.api_handler import service as api_handler_svc
|
||||
from vitrage.common.constants import EntityCategory
|
||||
from vitrage.datasources import launcher as datasource_launcher
|
||||
from vitrage.datasources import OPENSTACK_CLUSTER
|
||||
from vitrage.datasources.transformer_base import CLUSTER_ID
|
||||
from vitrage import entity_graph
|
||||
@ -44,20 +42,15 @@ def main():
|
||||
|
||||
conf = service.prepare_service()
|
||||
init_status = InitializationStatus()
|
||||
mp_queue, evaluator_queue, evaluator, e_graph = init(conf)
|
||||
evaluator_queue, evaluator, e_graph = init(conf)
|
||||
launcher = os_service.ServiceLauncher(conf)
|
||||
datasources = datasource_launcher.Launcher(
|
||||
conf,
|
||||
datasource_launcher.create_send_to_queue_callback(mp_queue))
|
||||
|
||||
launcher.launch_service(entity_graph_svc.VitrageGraphService(
|
||||
conf, mp_queue, evaluator_queue, evaluator, e_graph, init_status))
|
||||
conf, evaluator_queue, evaluator, e_graph, init_status))
|
||||
|
||||
launcher.launch_service(api_handler_svc.VitrageApiHandlerService(
|
||||
conf, e_graph, evaluator.scenario_repo))
|
||||
|
||||
datasources.launch()
|
||||
|
||||
launcher.launch_service(consistency_svc.VitrageGraphConsistencyService(
|
||||
conf, evaluator_queue, evaluator, e_graph, init_status))
|
||||
|
||||
@ -65,7 +58,6 @@ def main():
|
||||
|
||||
|
||||
def init(conf):
|
||||
mp_queue = multiprocessing.Queue()
|
||||
evaluator_q = queue.Queue()
|
||||
e_graph = entity_graph.get_graph_driver(conf)(
|
||||
'Entity Graph',
|
||||
@ -75,7 +67,7 @@ def init(conf):
|
||||
|
||||
evaluator = ScenarioEvaluator(conf, e_graph, scenario_repo, evaluator_q)
|
||||
|
||||
return mp_queue, evaluator_q, evaluator, e_graph
|
||||
return evaluator_q, evaluator, e_graph
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
@ -47,5 +47,9 @@ OPTS = [
|
||||
' in case of fault'),
|
||||
cfg.StrOpt('notification_topic',
|
||||
default='vitrage_notifications',
|
||||
help='Vitrage configured notifications topic')
|
||||
help='Vitrage configured notifications topic'),
|
||||
cfg.StrOpt('notification_topic_collector',
|
||||
default='collector_event_notification',
|
||||
help='The topic on which event will be sent from the '
|
||||
'datasources to the graph processor')
|
||||
]
|
||||
|
@ -42,7 +42,7 @@ class CinderVolumeDriver(DriverBase):
|
||||
search_opts={'all_tenants': 1})),
|
||||
CINDER_VOLUME_DATASOURCE,
|
||||
datasource_action,
|
||||
'manager')
|
||||
*self.properties_to_filter_out())
|
||||
|
||||
def enrich_event(self, event, event_type):
|
||||
event[DSProps.EVENT_TYPE] = event_type
|
||||
@ -51,6 +51,10 @@ class CinderVolumeDriver(DriverBase):
|
||||
CINDER_VOLUME_DATASOURCE,
|
||||
DatasourceAction.UPDATE)[0]
|
||||
|
||||
@staticmethod
|
||||
def properties_to_filter_out():
|
||||
return ['manager']
|
||||
|
||||
@staticmethod
|
||||
def get_event_types():
|
||||
return ['volume.create.start',
|
||||
|
53
vitrage/datasources/collector_notifier.py
Normal file
53
vitrage/datasources/collector_notifier.py
Normal file
@ -0,0 +1,53 @@
|
||||
# Copyright 2017 - Nokia
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_log import log
|
||||
import oslo_messaging
|
||||
|
||||
from vitrage.messaging import get_transport
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class CollectorNotifier(object):
|
||||
"""Allows writing to message bus"""
|
||||
def __init__(self, conf):
|
||||
self.oslo_notifier = None
|
||||
try:
|
||||
topic = conf.datasources.notification_topic_collector
|
||||
self.oslo_notifier = oslo_messaging.Notifier(
|
||||
get_transport(conf),
|
||||
driver='messagingv2',
|
||||
publisher_id='datasources.events',
|
||||
topics=[topic])
|
||||
except Exception as e:
|
||||
LOG.info('Collector notifier - missing configuration %s'
|
||||
% str(e))
|
||||
|
||||
@property
|
||||
def enabled(self):
|
||||
return self.oslo_notifier is not None
|
||||
|
||||
def notify_when_applicable(self, enriched_event):
|
||||
"""Callback subscribed to driver.graph updates
|
||||
|
||||
:param enriched_event: the event with enriched data added by the driver
|
||||
"""
|
||||
|
||||
try:
|
||||
self.oslo_notifier.info({}, '', enriched_event)
|
||||
except Exception as e:
|
||||
LOG.exception('Datasource event cannot be notified - %s\n'
|
||||
'Error - %s', enriched_event, e)
|
@ -114,3 +114,8 @@ class DriverBase(object):
|
||||
"""
|
||||
|
||||
return []
|
||||
|
||||
@staticmethod
|
||||
def properties_to_filter_out():
|
||||
"""Return a list of properties to be removed from the event"""
|
||||
return []
|
||||
|
@ -15,10 +15,14 @@
|
||||
from vitrage.common.constants import DatasourceAction
|
||||
from vitrage.common.constants import DatasourceProperties as DSProps
|
||||
from vitrage.datasources.cinder.volume import CINDER_VOLUME_DATASOURCE
|
||||
from vitrage.datasources.cinder.volume.driver import CinderVolumeDriver
|
||||
from vitrage.datasources.driver_base import DriverBase
|
||||
from vitrage.datasources.heat.stack import HEAT_STACK_DATASOURCE
|
||||
from vitrage.datasources.neutron.network.driver import NetworkDriver
|
||||
from vitrage.datasources.neutron.network import NEUTRON_NETWORK_DATASOURCE
|
||||
from vitrage.datasources.neutron.port.driver import PortDriver
|
||||
from vitrage.datasources.neutron.port import NEUTRON_PORT_DATASOURCE
|
||||
from vitrage.datasources.nova.instance.driver import InstanceDriver
|
||||
from vitrage.datasources.nova.instance import NOVA_INSTANCE_DATASOURCE
|
||||
from vitrage import os_clients
|
||||
|
||||
@ -28,13 +32,20 @@ class HeatStackDriver(DriverBase):
|
||||
_client = None
|
||||
conf = None
|
||||
|
||||
RESOURCE_TYPE_CONVERSION = {
|
||||
RESOURCE_TYPE = {
|
||||
'OS::Nova::Server': NOVA_INSTANCE_DATASOURCE,
|
||||
'OS::Cinder::Volume': CINDER_VOLUME_DATASOURCE,
|
||||
'OS::Neutron::Net': NEUTRON_NETWORK_DATASOURCE,
|
||||
'OS::Neutron::Port': NEUTRON_PORT_DATASOURCE
|
||||
}
|
||||
|
||||
RESOURCE_DRIVERS = {
|
||||
'OS::Nova::Server': InstanceDriver,
|
||||
'OS::Cinder::Volume': CinderVolumeDriver,
|
||||
'OS::Neutron::Net': NetworkDriver,
|
||||
'OS::Neutron::Port': PortDriver
|
||||
}
|
||||
|
||||
def __init__(self, conf):
|
||||
super(HeatStackDriver, self).__init__()
|
||||
HeatStackDriver.conf = conf
|
||||
@ -72,19 +83,21 @@ class HeatStackDriver(DriverBase):
|
||||
event = HeatStackDriver._retrieve_stack_resources(
|
||||
event, event['stack_identity'])
|
||||
|
||||
return HeatStackDriver.make_pickleable([event],
|
||||
HEAT_STACK_DATASOURCE,
|
||||
DatasourceAction.UPDATE)[0]
|
||||
return HeatStackDriver.make_pickleable(
|
||||
[event],
|
||||
HEAT_STACK_DATASOURCE,
|
||||
DatasourceAction.UPDATE,
|
||||
*self.properties_to_filter_out())[0]
|
||||
|
||||
def _filter_resource_types(self):
|
||||
types = self.conf.datasources.types
|
||||
tmp_dict = {}
|
||||
|
||||
for key, value in HeatStackDriver.RESOURCE_TYPE_CONVERSION.items():
|
||||
for key, value in HeatStackDriver.RESOURCE_TYPE.items():
|
||||
if value in types:
|
||||
tmp_dict[key] = value
|
||||
|
||||
HeatStackDriver.RESOURCE_TYPE_CONVERSION = tmp_dict
|
||||
HeatStackDriver.RESOURCE_TYPE = tmp_dict
|
||||
|
||||
def _make_stacks_list(self, stacks):
|
||||
return [stack.__dict__ for stack in stacks]
|
||||
@ -93,14 +106,28 @@ class HeatStackDriver(DriverBase):
|
||||
return [self._retrieve_stack_resources(stack, stack['id'])
|
||||
for stack in stacks]
|
||||
|
||||
@staticmethod
|
||||
def properties_to_filter_out():
|
||||
return ['manager', '_info']
|
||||
|
||||
@staticmethod
|
||||
def _retrieve_stack_resources(stack, stack_id):
|
||||
resources = HeatStackDriver.client().resources.list(stack_id)
|
||||
stack['resources'] = [resource.__dict__ for resource in resources
|
||||
if resource.__dict__['resource_type'] in
|
||||
HeatStackDriver.RESOURCE_TYPE_CONVERSION]
|
||||
HeatStackDriver.RESOURCE_TYPE]
|
||||
HeatStackDriver._filter_stack_resources(stack)
|
||||
return stack
|
||||
|
||||
@staticmethod
|
||||
def _filter_stack_resources(stack):
|
||||
for resource in stack['resources']:
|
||||
props = HeatStackDriver.RESOURCE_DRIVERS[
|
||||
resource['resource_type']].properties_to_filter_out()
|
||||
for prop in props:
|
||||
if prop in resource:
|
||||
del resource[prop]
|
||||
|
||||
def get_all(self, datasource_action):
|
||||
stacks = HeatStackDriver.client().stacks.list(global_tenant=True)
|
||||
stacks_list = self._make_stacks_list(stacks)
|
||||
@ -108,4 +135,4 @@ class HeatStackDriver(DriverBase):
|
||||
return self.make_pickleable(stacks_with_resources,
|
||||
HEAT_STACK_DATASOURCE,
|
||||
datasource_action,
|
||||
'manager')
|
||||
*self.properties_to_filter_out())
|
||||
|
@ -32,7 +32,7 @@ import vitrage.graph.utils as graph_utils
|
||||
|
||||
class HeatStackTransformer(ResourceTransformerBase):
|
||||
|
||||
RESOURCE_TYPE_CONVERSION = {
|
||||
RESOURCE_TYPE = {
|
||||
'OS::Nova::Server': NOVA_INSTANCE_DATASOURCE,
|
||||
'OS::Cinder::Volume': CINDER_VOLUME_DATASOURCE,
|
||||
'OS::Neutron::Net': NEUTRON_NETWORK_DATASOURCE,
|
||||
@ -124,7 +124,7 @@ class HeatStackTransformer(ResourceTransformerBase):
|
||||
for neighbor in entity_event['resources']:
|
||||
neighbor_id = neighbor['physical_resource_id']
|
||||
neighbor_datasource_type = \
|
||||
self.RESOURCE_TYPE_CONVERSION[neighbor['resource_type']]
|
||||
self.RESOURCE_TYPE[neighbor['resource_type']]
|
||||
neighbors.append(self._create_neighbor(entity_event,
|
||||
neighbor_id,
|
||||
neighbor_datasource_type,
|
||||
|
@ -16,19 +16,15 @@
|
||||
import itertools
|
||||
|
||||
from oslo_service import service as os_service
|
||||
from oslo_utils import importutils as utils
|
||||
|
||||
from vitrage.common.constants import DatasourceOpts as DSOpts
|
||||
from vitrage.common.constants import UpdateMethod
|
||||
from vitrage.datasources.listener_service import ListenerService
|
||||
from vitrage.datasources.services import ChangesService
|
||||
from vitrage.datasources.services import SnapshotsService
|
||||
from vitrage.utils import opt_exists
|
||||
from vitrage.entity_graph import utils
|
||||
|
||||
|
||||
def create_send_to_queue_callback(queue):
|
||||
def create_send_to_queue_callback(rabbitq):
|
||||
def send_to_queue_callback(event):
|
||||
queue.put(event)
|
||||
rabbitq.notify_when_applicable(event)
|
||||
|
||||
return send_to_queue_callback
|
||||
|
||||
@ -37,43 +33,26 @@ class Launcher(object):
|
||||
def __init__(self, conf, callback):
|
||||
self.conf = conf
|
||||
self.callback = callback
|
||||
self.snapshot_datasources = self._register_snapshot_datasources(conf)
|
||||
self.drivers = utils.get_drivers(conf)
|
||||
self.services = self._register_services()
|
||||
|
||||
def launch(self):
|
||||
# launcher = os_service.ServiceLauncher(self.conf) # For Debugging
|
||||
launcher = os_service.ProcessLauncher(self.conf)
|
||||
for service in self.services:
|
||||
launcher.launch_service(service, 1)
|
||||
|
||||
@staticmethod
|
||||
def _register_snapshot_datasources(conf):
|
||||
return {datasource: utils.import_object(conf[datasource].driver, conf)
|
||||
for datasource in conf.datasources.types}
|
||||
|
||||
def _register_services(self):
|
||||
pull_datasources = self._get_pull_datasources(self.conf)
|
||||
pull_datasources = utils.get_pull_datasources(self.conf)
|
||||
changes_services = \
|
||||
(ChangesService(self.conf,
|
||||
[self.snapshot_datasources[datasource]],
|
||||
[self.drivers[datasource]],
|
||||
self.conf[datasource].changes_interval,
|
||||
self.callback)
|
||||
for datasource in pull_datasources)
|
||||
|
||||
snapshot_service = (SnapshotsService(self.conf,
|
||||
self.snapshot_datasources,
|
||||
self.drivers,
|
||||
self.callback),)
|
||||
|
||||
listener_service = (ListenerService(self.conf,
|
||||
self.snapshot_datasources,
|
||||
self.callback),)
|
||||
|
||||
return itertools.chain(changes_services,
|
||||
snapshot_service,
|
||||
listener_service)
|
||||
|
||||
@staticmethod
|
||||
def _get_pull_datasources(conf):
|
||||
return (datasource for datasource in conf.datasources.types
|
||||
if conf[datasource].update_method.lower() == UpdateMethod.PULL
|
||||
and opt_exists(conf[datasource], DSOpts.CHANGES_INTERVAL))
|
||||
snapshot_service)
|
||||
|
@ -18,7 +18,7 @@ from oslo_log import log
|
||||
import oslo_messaging
|
||||
from oslo_service import service as os_service
|
||||
|
||||
from vitrage.common.constants import UpdateMethod
|
||||
from vitrage.entity_graph import utils
|
||||
from vitrage import messaging
|
||||
|
||||
|
||||
@ -47,6 +47,9 @@ class ListenerService(os_service.Service):
|
||||
def stop(self, graceful=False):
|
||||
LOG.info("Vitrage data source Listener Service - Stopping...")
|
||||
|
||||
# Should it be here?
|
||||
# self.listener.stop()
|
||||
# self.listener.wait()
|
||||
super(ListenerService, self).stop(graceful)
|
||||
|
||||
LOG.info("Vitrage data source Listener Service - Stopped!")
|
||||
@ -54,7 +57,7 @@ class ListenerService(os_service.Service):
|
||||
@classmethod
|
||||
def _create_callbacks_by_events_dict(cls, drivers, conf):
|
||||
ret = defaultdict(list)
|
||||
push_drivers = cls._get_push_drivers(drivers, conf)
|
||||
push_drivers = utils.get_push_datasources(drivers, conf)
|
||||
|
||||
for driver in push_drivers:
|
||||
for event in driver.get_event_types():
|
||||
@ -62,11 +65,6 @@ class ListenerService(os_service.Service):
|
||||
|
||||
return ret
|
||||
|
||||
@staticmethod
|
||||
def _get_push_drivers(drivers, conf):
|
||||
return (driver_cls for datasource, driver_cls in drivers.items()
|
||||
if conf[datasource].update_method.lower() == UpdateMethod.PUSH)
|
||||
|
||||
def _get_topic_listener(self, conf, topic, callback):
|
||||
# Create a listener for each topic
|
||||
transport = messaging.get_transport(conf)
|
||||
|
@ -32,4 +32,8 @@ class HostDriver(NovaDriverBase):
|
||||
self.filter_none_compute_hosts(self.client.hosts.list()),
|
||||
NOVA_HOST_DATASOURCE,
|
||||
datasource_action,
|
||||
'manager')
|
||||
*self.properties_to_filter_out())
|
||||
|
||||
@staticmethod
|
||||
def properties_to_filter_out():
|
||||
return ['manager']
|
||||
|
@ -30,9 +30,7 @@ class InstanceDriver(NovaDriverBase):
|
||||
search_opts={'all_tenants': 1})),
|
||||
NOVA_INSTANCE_DATASOURCE,
|
||||
datasource_action,
|
||||
'manager',
|
||||
'OS-EXT-SRV-ATTR:user_data',
|
||||
'_info')
|
||||
*self.properties_to_filter_out())
|
||||
|
||||
def enrich_event(self, event, event_type):
|
||||
event[DSProps.EVENT_TYPE] = event_type
|
||||
@ -41,6 +39,10 @@ class InstanceDriver(NovaDriverBase):
|
||||
NOVA_INSTANCE_DATASOURCE,
|
||||
DatasourceAction.UPDATE)[0]
|
||||
|
||||
@staticmethod
|
||||
def properties_to_filter_out():
|
||||
return ['manager', 'OS-EXT-SRV-ATTR:user_data', '_info']
|
||||
|
||||
@staticmethod
|
||||
def get_event_types():
|
||||
# Add event_types to receive notifications about
|
||||
|
@ -32,5 +32,8 @@ class ZoneDriver(NovaDriverBase):
|
||||
self.client.availability_zones.list()),
|
||||
NOVA_ZONE_DATASOURCE,
|
||||
datasource_action,
|
||||
'manager',
|
||||
'_info')
|
||||
*self.properties_to_filter_out())
|
||||
|
||||
@staticmethod
|
||||
def properties_to_filter_out():
|
||||
return ['manager', '_info']
|
||||
|
@ -15,9 +15,11 @@
|
||||
import datetime
|
||||
|
||||
from oslo_log import log
|
||||
import oslo_messaging
|
||||
from oslo_service import service as os_service
|
||||
|
||||
from vitrage.entity_graph.processor import processor as proc
|
||||
from vitrage import messaging
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
@ -26,31 +28,33 @@ class VitrageGraphService(os_service.Service):
|
||||
|
||||
def __init__(self,
|
||||
conf,
|
||||
event_queue,
|
||||
evaluator_queue,
|
||||
evaluator,
|
||||
entity_graph,
|
||||
initialization_status):
|
||||
super(VitrageGraphService, self).__init__()
|
||||
self.queue = event_queue
|
||||
self.conf = conf
|
||||
self.evaluator = evaluator
|
||||
self.processor = proc.Processor(self.conf,
|
||||
initialization_status,
|
||||
e_graph=entity_graph)
|
||||
self.evaluator_queue = evaluator_queue
|
||||
self.listener = self._create_datasources_event_listener(conf)
|
||||
|
||||
def start(self):
|
||||
LOG.info("Vitrage Graph Service - Starting...")
|
||||
|
||||
super(VitrageGraphService, self).start()
|
||||
self.tg.add_timer(0.1, self._process_event_non_blocking)
|
||||
self.listener.start()
|
||||
|
||||
LOG.info("Vitrage Graph Service - Started!")
|
||||
|
||||
def stop(self, graceful=False):
|
||||
LOG.info("Vitrage Graph Service - Stopping...")
|
||||
|
||||
self.listener.stop()
|
||||
self.listener.wait()
|
||||
super(VitrageGraphService, self).stop(graceful)
|
||||
|
||||
LOG.info("Vitrage Graph Service - Stopped!")
|
||||
@ -68,14 +72,12 @@ class VitrageGraphService(os_service.Service):
|
||||
the queue they are done when timer returns.
|
||||
"""
|
||||
start_time = datetime.datetime.now()
|
||||
while not self.evaluator_queue.empty() or not self.queue.empty():
|
||||
while not self.evaluator_queue.empty():
|
||||
time_delta = datetime.datetime.now() - start_time
|
||||
if time_delta.total_seconds() >= 2:
|
||||
break
|
||||
if not self.evaluator_queue.empty():
|
||||
self.do_process(self.evaluator_queue)
|
||||
elif not self.queue.empty():
|
||||
self.do_process(self.queue)
|
||||
|
||||
def do_process(self, queue):
|
||||
try:
|
||||
@ -83,3 +85,24 @@ class VitrageGraphService(os_service.Service):
|
||||
self.processor.process_event(event)
|
||||
except Exception as e:
|
||||
LOG.exception("Exception: %s", e)
|
||||
|
||||
def _create_datasources_event_listener(self, conf):
|
||||
topic = conf.datasources.notification_topic_collector
|
||||
transport = messaging.get_transport(conf)
|
||||
targets = [oslo_messaging.Target(topic=topic)]
|
||||
return messaging.get_notification_listener(
|
||||
transport, targets,
|
||||
[PushNotificationsEndpoint(self.processor.process_event)],
|
||||
allow_requeue=True)
|
||||
|
||||
|
||||
class PushNotificationsEndpoint(object):
|
||||
|
||||
def __init__(self, process_event_callback):
|
||||
self.process_event_callback = process_event_callback
|
||||
|
||||
def info(self, ctxt, publisher_id, event_type, payload, metadata):
|
||||
try:
|
||||
self.process_event_callback(payload)
|
||||
except Exception as e:
|
||||
LOG.exception(e)
|
||||
|
35
vitrage/entity_graph/utils.py
Normal file
35
vitrage/entity_graph/utils.py
Normal file
@ -0,0 +1,35 @@
|
||||
# Copyright 2017 - Nokia
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_utils import importutils as utils
|
||||
|
||||
from vitrage.common.constants import DatasourceOpts as DSOpts
|
||||
from vitrage.common.constants import UpdateMethod
|
||||
from vitrage.utils import opt_exists
|
||||
|
||||
|
||||
def get_drivers(conf):
|
||||
return {datasource: utils.import_object(conf[datasource].driver, conf)
|
||||
for datasource in conf.datasources.types}
|
||||
|
||||
|
||||
def get_pull_datasources(conf):
|
||||
return (datasource for datasource in conf.datasources.types
|
||||
if conf[datasource].update_method.lower() == UpdateMethod.PULL
|
||||
and opt_exists(conf[datasource], DSOpts.CHANGES_INTERVAL))
|
||||
|
||||
|
||||
def get_push_datasources(drivers, conf):
|
||||
return (driver_cls for datasource, driver_cls in drivers.items()
|
||||
if conf[datasource].update_method.lower() == UpdateMethod.PUSH)
|
@ -17,12 +17,11 @@ from oslo_utils import importutils as utils
|
||||
|
||||
from vitrage.common.constants import DatasourceOpts as DSOpts
|
||||
from vitrage.common.constants import UpdateMethod
|
||||
from vitrage.datasources.launcher import Launcher
|
||||
from vitrage.datasources.listener_service import ListenerService
|
||||
from vitrage.datasources.nagios import NAGIOS_DATASOURCE
|
||||
from vitrage.datasources.nova.host import NOVA_HOST_DATASOURCE
|
||||
from vitrage.datasources.nova.instance import NOVA_INSTANCE_DATASOURCE
|
||||
from vitrage.datasources.zabbix import ZABBIX_DATASOURCE
|
||||
from vitrage.entity_graph import utils as graph_utils
|
||||
from vitrage.tests import base
|
||||
|
||||
|
||||
@ -174,20 +173,20 @@ class DatasourceUpdateMethod(base.BaseTest):
|
||||
def test_datasource_update_method_push(self):
|
||||
drivers = {driver: utils.import_class(self.conf[driver].driver)
|
||||
for driver in self.conf.datasources.types}
|
||||
push_drivers = ListenerService._get_push_drivers(
|
||||
drivers=drivers, conf=self.conf)
|
||||
push_drivers = graph_utils.get_push_datasources(drivers=drivers,
|
||||
conf=self.conf)
|
||||
self.assertSequenceEqual(set(push_drivers), {utils.import_class(
|
||||
self.conf[NOVA_INSTANCE_DATASOURCE].driver), utils.import_class(
|
||||
self.conf[ZABBIX_DATASOURCE_PUSH].driver)})
|
||||
|
||||
def test_datasource_update_method_pull(self):
|
||||
pull_drivers = tuple(Launcher._get_pull_datasources(self.conf))
|
||||
pull_drivers = tuple(graph_utils.get_pull_datasources(self.conf))
|
||||
self.assertSequenceEqual(pull_drivers,
|
||||
(NAGIOS_DATASOURCE,
|
||||
ZABBIX_DATASOURCE_PULL))
|
||||
|
||||
def test_datasource_update_method_pull_with_no_changes_interval(self):
|
||||
pull_drivers = tuple(Launcher._get_pull_datasources(self.conf))
|
||||
pull_drivers = tuple(graph_utils.get_pull_datasources(self.conf))
|
||||
self.assertNotIn(ZABBIX_DATASOURCE_PULL_NO_INTERVAL, pull_drivers)
|
||||
|
||||
def test_datasources_notification_topic(self):
|
||||
|
@ -13,7 +13,6 @@
|
||||
# under the License.
|
||||
|
||||
import json
|
||||
import traceback
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
@ -49,8 +48,7 @@ class TestAlarms(BaseAlarmsTest):
|
||||
api_alarms, cli_alarms, AODH_DATASOURCE,
|
||||
utils.uni2str(instances[0].id))
|
||||
except Exception as e:
|
||||
LOG.exception(e)
|
||||
traceback.print_exc()
|
||||
self._handle_exception(e)
|
||||
raise
|
||||
finally:
|
||||
self._delete_ceilometer_alarms()
|
||||
|
@ -13,6 +13,7 @@
|
||||
# under the License.
|
||||
|
||||
import time
|
||||
import traceback
|
||||
|
||||
from oslo_log import log as logging
|
||||
from oslotest import base
|
||||
@ -379,3 +380,13 @@ class BaseApiTest(base.BaseTestCase):
|
||||
if not public_nets:
|
||||
return None
|
||||
return public_nets[0]
|
||||
|
||||
def _print_entity_graph(self):
|
||||
api_graph = self.vitrage_client.topology.get(all_tenants=True)
|
||||
graph = self._create_graph_from_graph_dictionary(api_graph)
|
||||
LOG.info('Entity Graph: \n%s', graph.json_output_graph())
|
||||
|
||||
def _handle_exception(self, exception):
|
||||
traceback.print_exc()
|
||||
LOG.exception(exception)
|
||||
self._print_entity_graph()
|
||||
|
@ -12,8 +12,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import traceback
|
||||
|
||||
from oslo_log import log as logging
|
||||
from vitrage_tempest_tests.tests import utils
|
||||
|
||||
@ -59,8 +57,7 @@ class TestAodhAlarm(BaseAlarmsTest):
|
||||
num_edges,
|
||||
entities)
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
LOG.exception(e)
|
||||
self._handle_exception(e)
|
||||
raise
|
||||
finally:
|
||||
self._delete_ceilometer_alarms()
|
||||
@ -90,8 +87,7 @@ class TestAodhAlarm(BaseAlarmsTest):
|
||||
num_edges,
|
||||
entities)
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
LOG.exception(e)
|
||||
self._handle_exception(e)
|
||||
raise
|
||||
finally:
|
||||
self._delete_ceilometer_alarms()
|
||||
|
@ -12,8 +12,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import traceback
|
||||
|
||||
from oslo_log import log as logging
|
||||
from vitrage_tempest_tests.tests.api.topology.base import BaseTopologyTest
|
||||
from vitrage_tempest_tests.tests import utils
|
||||
@ -58,8 +56,7 @@ class TestCinderVolume(BaseTopologyTest):
|
||||
num_edges,
|
||||
entities)
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
LOG.exception(e)
|
||||
self._handle_exception(e)
|
||||
raise
|
||||
finally:
|
||||
self._rollback_to_default()
|
||||
|
@ -13,7 +13,6 @@
|
||||
# under the License.
|
||||
|
||||
import time
|
||||
import traceback
|
||||
|
||||
from oslo_log import log as logging
|
||||
from vitrage_tempest_tests.tests import utils
|
||||
@ -66,8 +65,7 @@ class TestHeatStack(BaseTopologyTest):
|
||||
num_edges,
|
||||
entities)
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
LOG.exception(e)
|
||||
self._handle_exception(e)
|
||||
raise
|
||||
finally:
|
||||
self._delete_stacks()
|
||||
|
@ -12,8 +12,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import traceback
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
from vitrage.common.constants import VertexProperties as VProps
|
||||
@ -66,8 +64,7 @@ class TestNeutron(BaseTopologyTest):
|
||||
num_edges,
|
||||
entities)
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
LOG.exception(e)
|
||||
self._handle_exception(e)
|
||||
raise
|
||||
finally:
|
||||
self._delete_instances()
|
||||
|
@ -12,8 +12,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import traceback
|
||||
|
||||
from oslo_log import log as logging
|
||||
from vitrage_tempest_tests.tests.api.topology.base import BaseTopologyTest
|
||||
from vitrage_tempest_tests.tests import utils
|
||||
@ -54,8 +52,7 @@ class TestNova(BaseTopologyTest):
|
||||
num_edges,
|
||||
entities)
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
LOG.exception(e)
|
||||
self._handle_exception(e)
|
||||
raise
|
||||
finally:
|
||||
self._rollback_to_default()
|
||||
|
@ -15,7 +15,6 @@
|
||||
import os
|
||||
import socket
|
||||
import time
|
||||
import traceback
|
||||
|
||||
from oslo_log import log as logging
|
||||
from vitrage_tempest_tests.tests.api.base import BaseApiTest
|
||||
@ -57,8 +56,7 @@ class TestStaticPhysical(BaseApiTest):
|
||||
num_edges,
|
||||
entities)
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
LOG.exception(e)
|
||||
self._handle_exception(e)
|
||||
raise
|
||||
finally:
|
||||
self._delete_switches()
|
||||
|
@ -12,8 +12,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import traceback
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
from vitrage.common.constants import VertexProperties as VProps
|
||||
@ -54,8 +52,7 @@ class TestRca(BaseRcaTest):
|
||||
|
||||
self._compare_rca(api_rca, cli_rca)
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
LOG.exception(e)
|
||||
self._handle_exception(e)
|
||||
raise
|
||||
finally:
|
||||
self._clean_all()
|
||||
@ -84,8 +81,7 @@ class TestRca(BaseRcaTest):
|
||||
self._validate_relationship(links=api_rca['links'],
|
||||
alarms=api_rca['nodes'])
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
LOG.exception(e)
|
||||
self._handle_exception(e)
|
||||
raise
|
||||
finally:
|
||||
self._clean_all()
|
||||
@ -108,8 +104,7 @@ class TestRca(BaseRcaTest):
|
||||
self._validate_deduce_alarms(alarms=api_alarms,
|
||||
instances=instances)
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
LOG.exception(e)
|
||||
self._handle_exception(e)
|
||||
raise
|
||||
finally:
|
||||
self._clean_all()
|
||||
@ -133,8 +128,7 @@ class TestRca(BaseRcaTest):
|
||||
self._validate_set_state(topology=topology['nodes'],
|
||||
instances=instances)
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
LOG.exception(e)
|
||||
self._handle_exception(e)
|
||||
raise
|
||||
finally:
|
||||
self._clean_all()
|
||||
@ -159,8 +153,7 @@ class TestRca(BaseRcaTest):
|
||||
self._validate_notifier(alarms=ceilometer_alarms,
|
||||
vitrage_alarms=vitrage_alarms)
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
LOG.exception(e)
|
||||
self._handle_exception(e)
|
||||
raise
|
||||
finally:
|
||||
self._clean_all()
|
||||
|
@ -13,7 +13,6 @@
|
||||
# under the License.
|
||||
|
||||
import json
|
||||
import traceback
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
@ -53,8 +52,7 @@ class TestResource(BaseApiTest):
|
||||
|
||||
self._compare_resources(api_resources, cli_resources)
|
||||
except Exception as e:
|
||||
LOG.exception(e)
|
||||
traceback.print_exc()
|
||||
self._handle_exception(e)
|
||||
raise
|
||||
finally:
|
||||
self._delete_instances()
|
||||
@ -72,8 +70,7 @@ class TestResource(BaseApiTest):
|
||||
resources = self.vitrage_client.resource.list()
|
||||
self.assertEqual(4, len(resources))
|
||||
except Exception as e:
|
||||
LOG.exception(e)
|
||||
traceback.print_exc()
|
||||
self._handle_exception(e)
|
||||
raise
|
||||
finally:
|
||||
self._delete_instances()
|
||||
@ -92,8 +89,7 @@ class TestResource(BaseApiTest):
|
||||
resources = self.vitrage_client.resource.list(all_tenants=True)
|
||||
self.assertEqual(4, len(resources))
|
||||
except Exception as e:
|
||||
LOG.exception(e)
|
||||
traceback.print_exc()
|
||||
self._handle_exception(e)
|
||||
raise
|
||||
finally:
|
||||
self._delete_instances()
|
||||
@ -113,8 +109,7 @@ class TestResource(BaseApiTest):
|
||||
all_tenants=False)
|
||||
self.assertEqual(1, len(resources))
|
||||
except Exception as e:
|
||||
LOG.exception(e)
|
||||
traceback.print_exc()
|
||||
self._handle_exception(e)
|
||||
raise
|
||||
finally:
|
||||
self._delete_instances()
|
||||
@ -131,8 +126,7 @@ class TestResource(BaseApiTest):
|
||||
all_tenants=False)
|
||||
self.assertEqual(0, len(resources))
|
||||
except Exception as e:
|
||||
LOG.exception(e)
|
||||
traceback.print_exc()
|
||||
self._handle_exception(e)
|
||||
raise
|
||||
finally:
|
||||
self._delete_instances()
|
||||
@ -158,8 +152,7 @@ class TestResource(BaseApiTest):
|
||||
self.vitrage_client.resource.show('test_for_no_existing')
|
||||
self.assertIsNone(resource)
|
||||
except Exception as e:
|
||||
LOG.exception(e)
|
||||
traceback.print_exc()
|
||||
self._handle_exception(e)
|
||||
raise
|
||||
finally:
|
||||
self._delete_instances()
|
||||
|
@ -12,8 +12,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import traceback
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
from vitrage_tempest_tests.tests.api.topology.base import BaseTopologyTest
|
||||
@ -90,8 +88,7 @@ class TestTopology(BaseTopologyTest):
|
||||
num_edges,
|
||||
entities)
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
LOG.exception(e)
|
||||
self._handle_exception(e)
|
||||
raise
|
||||
finally:
|
||||
self._rollback_to_default()
|
||||
@ -127,8 +124,7 @@ class TestTopology(BaseTopologyTest):
|
||||
num_edges,
|
||||
entities)
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
LOG.exception(e)
|
||||
self._handle_exception(e)
|
||||
raise
|
||||
finally:
|
||||
self._rollback_to_default()
|
||||
@ -162,8 +158,7 @@ class TestTopology(BaseTopologyTest):
|
||||
num_edges,
|
||||
entities)
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
LOG.exception(e)
|
||||
self._handle_exception(e)
|
||||
raise
|
||||
finally:
|
||||
self._rollback_to_default()
|
||||
@ -192,8 +187,7 @@ class TestTopology(BaseTopologyTest):
|
||||
self.num_default_edges,
|
||||
entities)
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
LOG.exception(e)
|
||||
self._handle_exception(e)
|
||||
raise
|
||||
finally:
|
||||
self._rollback_to_default()
|
||||
@ -222,8 +216,7 @@ class TestTopology(BaseTopologyTest):
|
||||
self.num_default_edges,
|
||||
entities)
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
LOG.exception(e)
|
||||
self._handle_exception(e)
|
||||
raise
|
||||
finally:
|
||||
self._rollback_to_default()
|
||||
@ -257,8 +250,7 @@ class TestTopology(BaseTopologyTest):
|
||||
num_edges,
|
||||
entities)
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
LOG.exception(e)
|
||||
self._handle_exception(e)
|
||||
raise
|
||||
finally:
|
||||
self._rollback_to_default()
|
||||
@ -290,8 +282,7 @@ class TestTopology(BaseTopologyTest):
|
||||
self.num_default_edges,
|
||||
entities)
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
LOG.exception(e)
|
||||
self._handle_exception(e)
|
||||
raise
|
||||
finally:
|
||||
self._rollback_to_default()
|
||||
@ -328,8 +319,7 @@ class TestTopology(BaseTopologyTest):
|
||||
num_edges,
|
||||
entities)
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
LOG.exception(e)
|
||||
self._handle_exception(e)
|
||||
raise
|
||||
finally:
|
||||
self._rollback_to_default()
|
||||
@ -383,8 +373,7 @@ class TestTopology(BaseTopologyTest):
|
||||
0,
|
||||
len(api_graph['links']), 'num of edges')
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
LOG.exception(e)
|
||||
self._handle_exception(e)
|
||||
raise
|
||||
finally:
|
||||
self._rollback_to_default()
|
||||
@ -409,8 +398,7 @@ class TestTopology(BaseTopologyTest):
|
||||
# Test Assertions
|
||||
self.assertEqual({}, api_graph)
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
LOG.exception(e)
|
||||
self._handle_exception(e)
|
||||
raise
|
||||
finally:
|
||||
self._rollback_to_default()
|
||||
|
Loading…
x
Reference in New Issue
Block a user