Merge "Execute an external engine from Vitrage"

This commit is contained in:
Jenkins 2017-07-19 12:45:46 +00:00 committed by Gerrit Code Review
commit b8e20918e9
7 changed files with 133 additions and 19 deletions

View File

@ -114,6 +114,7 @@ class NotifierEventTypes(object):
DEACTIVATE_DEDUCED_ALARM_EVENT = 'vitrage.deduced_alarm.deactivate' DEACTIVATE_DEDUCED_ALARM_EVENT = 'vitrage.deduced_alarm.deactivate'
ACTIVATE_MARK_DOWN_EVENT = 'vitrage.mark_down.activate' ACTIVATE_MARK_DOWN_EVENT = 'vitrage.mark_down.activate'
DEACTIVATE_MARK_DOWN_EVENT = 'vitrage.mark_down.deactivate' DEACTIVATE_MARK_DOWN_EVENT = 'vitrage.mark_down.deactivate'
EXECUTE_EXTERNAL_ACTION = 'vitrage.execute_external_action'
class TemplateTopologyFields(object): class TemplateTopologyFields(object):

View File

@ -14,6 +14,7 @@
import copy import copy
from oslo_log import log
from oslo_utils import importutils from oslo_utils import importutils
from vitrage.common.constants import DatasourceAction as AType from vitrage.common.constants import DatasourceAction as AType
@ -23,9 +24,11 @@ from vitrage.evaluator.actions.base import ActionMode
from vitrage.evaluator.actions.base import ActionType from vitrage.evaluator.actions.base import ActionType
from vitrage.evaluator.actions.evaluator_event_transformer \ from vitrage.evaluator.actions.evaluator_event_transformer \
import VITRAGE_DATASOURCE import VITRAGE_DATASOURCE
from vitrage.evaluator.actions.notifier import EvaluatorNotifier
from vitrage.evaluator.actions.recipes.action_steps import ADD_EDGE from vitrage.evaluator.actions.recipes.action_steps import ADD_EDGE
from vitrage.evaluator.actions.recipes.action_steps import ADD_VERTEX from vitrage.evaluator.actions.recipes.action_steps import ADD_VERTEX
from vitrage.evaluator.actions.recipes.action_steps import EXECUTE_EXTERNAL from vitrage.evaluator.actions.recipes.action_steps import EXECUTE_EXTERNAL
from vitrage.evaluator.actions.recipes.action_steps import EXECUTION_ENGINE
from vitrage.evaluator.actions.recipes.action_steps import REMOVE_EDGE from vitrage.evaluator.actions.recipes.action_steps import REMOVE_EDGE
from vitrage.evaluator.actions.recipes.action_steps import REMOVE_VERTEX from vitrage.evaluator.actions.recipes.action_steps import REMOVE_VERTEX
from vitrage.evaluator.actions.recipes.action_steps import UPDATE_VERTEX from vitrage.evaluator.actions.recipes.action_steps import UPDATE_VERTEX
@ -38,11 +41,14 @@ from vitrage.evaluator.actions.recipes.raise_alarm import RaiseAlarm
from vitrage.evaluator.actions.recipes.set_state import SetState from vitrage.evaluator.actions.recipes.set_state import SetState
from vitrage.utils import datetime as datetime_utils from vitrage.utils import datetime as datetime_utils
LOG = log.getLogger(__name__)
class ActionExecutor(object): class ActionExecutor(object):
def __init__(self, event_queue): def __init__(self, conf, event_queue):
self.event_queue = event_queue self.event_queue = event_queue
self.notifier = EvaluatorNotifier(conf)
self.action_recipes = ActionExecutor._register_action_recipes() self.action_recipes = ActionExecutor._register_action_recipes()
self.action_step_defs = { self.action_step_defs = {
@ -106,9 +112,12 @@ class ActionExecutor(object):
def _execute_external(self, params): def _execute_external(self, params):
# TODO(ifat_afek): send to a dedicated queue # Send a notification to the external engine
# external_engine = params[EXECUTION_ENGINE] external_engine = params[EXECUTION_ENGINE]
pass LOG.debug('Notifying external engine %s. Properties: %s',
external_engine,
str(params))
self.notifier.notify(external_engine, params)
@staticmethod @staticmethod
def _add_default_properties(event): def _add_default_properties(event):

View File

@ -0,0 +1,75 @@
# Copyright 2017 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log
import oslo_messaging
from vitrage.common.constants import NotifierEventTypes
from vitrage.messaging import get_transport
LOG = log.getLogger(__name__)
class EvaluatorNotifier(object):
"""Allows writing to message bus"""
def __init__(self, conf):
self.oslo_notifiers = {}
try:
notifier_plugins = conf.notifiers
LOG.debug('notifier_plugins: %s', notifier_plugins)
if not notifier_plugins:
LOG.info('Evaluator Notifier is disabled')
return
for notifier in notifier_plugins:
LOG.debug('Adding evaluator notifier %s', notifier)
self.oslo_notifiers[notifier] = oslo_messaging.Notifier(
get_transport(conf),
driver='messagingv2',
publisher_id='vitrage.evaluator',
topics=[notifier])
except Exception as e:
LOG.info('Evaluator Notifier - missing configuration %s' % str(e))
@property
def enabled(self):
return len(self.oslo_notifiers) > 0
def notify(self, external_engine, properties):
"""Send a message to the wanted notifier
:param external_engine: the external engine that should handle the
notification and execute an action
:param properties: Properties to be processed by the external engine
"""
LOG.debug('external_engine: %s, properties: %s',
external_engine,
str(properties))
try:
if external_engine in self.oslo_notifiers:
LOG.debug('Notifying %s', external_engine)
self.oslo_notifiers[external_engine].info(
{},
NotifierEventTypes.EXECUTE_EXTERNAL_ACTION,
properties)
except Exception as e:
LOG.exception('Cannot notify - %s - %s',
NotifierEventTypes.EXECUTE_EXTERNAL_ACTION,
e)

View File

@ -56,7 +56,7 @@ class ScenarioEvaluator(object):
self.conf = conf self.conf = conf
self._scenario_repo = scenario_repo self._scenario_repo = scenario_repo
self._entity_graph = entity_graph self._entity_graph = entity_graph
self._action_executor = ActionExecutor(event_queue) self._action_executor = ActionExecutor(conf, event_queue)
self._entity_graph.subscribe(self.process_event) self._entity_graph.subscribe(self.process_event)
self._action_tracker = ActionTracker(DatasourceInfoMapper(self.conf)) self._action_tracker = ActionTracker(DatasourceInfoMapper(self.conf))
self.enabled = enabled self.enabled = enabled

View File

@ -30,3 +30,12 @@ class NotifierBase(object):
@abc.abstractmethod @abc.abstractmethod
def get_notifier_name(): def get_notifier_name():
pass pass
@staticmethod
def use_private_topic():
return False
@staticmethod
def info(self, ctxt, publisher_id, event_type, payload, metadata):
"""An endpoint for notifiers that use a private topic"""
pass

View File

@ -29,25 +29,24 @@ class VitrageNotifierService(os_service.Service):
super(VitrageNotifierService, self).__init__() super(VitrageNotifierService, self).__init__()
self.conf = conf self.conf = conf
self.notifiers = self.get_notifier_plugins(conf) self.notifiers = self.get_notifier_plugins(conf)
transport = messaging.get_transport(conf) self._init_listeners(self.conf)
target = oslo_messaging.Target(topic=conf.entity_graph.notifier_topic)
self.listener = messaging.get_notification_listener(
transport, [target],
[VitrageEventEndpoint(self.notifiers)])
def start(self): def start(self):
LOG.info("Vitrage Notifier Service - Starting...") LOG.info("Vitrage Notifier Service - Starting...")
super(VitrageNotifierService, self).start() super(VitrageNotifierService, self).start()
self.listener.start() for listener in self.listeners:
listener.start()
LOG.info("Vitrage Notifier Service - Started!") LOG.info("Vitrage Notifier Service - Started!")
def stop(self, graceful=False): def stop(self, graceful=False):
LOG.info("Vitrage Notifier Service - Stopping...") LOG.info("Vitrage Notifier Service - Stopping...")
self.listener.stop() for listener in self.listeners:
self.listener.wait() listener.stop()
listener.wait()
super(VitrageNotifierService, self).stop(graceful) super(VitrageNotifierService, self).stop(graceful)
LOG.info("Vitrage Notifier Service - Stopped!") LOG.info("Vitrage Notifier Service - Stopped!")
@ -67,8 +66,29 @@ class VitrageNotifierService(os_service.Service):
conf)) conf))
return notifiers return notifiers
def _init_listeners(self, conf):
self.listeners = []
transport = messaging.get_transport(conf)
class VitrageEventEndpoint(object): self._init_notifier(transport=transport,
topic=conf.entity_graph.notifier_topic,
endpoint=VitrageDefaultEventEndpoint(
self.notifiers))
for notifier in self.notifiers:
if notifier.use_private_topic():
self._init_notifier(transport=transport,
topic=notifier.get_notifier_name(),
endpoint=notifier)
def _init_notifier(self, transport, topic, endpoint):
LOG.debug('Initializing notifier with topic %s', topic)
self.listeners.append(messaging.get_notification_listener(
transport, [oslo_messaging.Target(topic=topic)], [endpoint]))
class VitrageDefaultEventEndpoint(object):
def __init__(self, notifiers): def __init__(self, notifiers):
self.notifiers = notifiers self.notifiers = notifiers

View File

@ -71,7 +71,7 @@ class TestActionExecutor(TestFunctionalBase):
action_spec = ActionSpecs(ActionType.SET_STATE, targets, props) action_spec = ActionSpecs(ActionType.SET_STATE, targets, props)
event_queue = queue.Queue() event_queue = queue.Queue()
action_executor = ActionExecutor(event_queue) action_executor = ActionExecutor(self.conf, event_queue)
# Test Action - do # Test Action - do
action_executor.execute(action_spec, ActionMode.DO) action_executor.execute(action_spec, ActionMode.DO)
@ -122,7 +122,7 @@ class TestActionExecutor(TestFunctionalBase):
action_spec = ActionSpecs(ActionType.MARK_DOWN, targets, props) action_spec = ActionSpecs(ActionType.MARK_DOWN, targets, props)
event_queue = queue.Queue() event_queue = queue.Queue()
action_executor = ActionExecutor(event_queue) action_executor = ActionExecutor(self.conf, event_queue)
# Test Action - do # Test Action - do
action_executor.execute(action_spec, ActionMode.DO) action_executor.execute(action_spec, ActionMode.DO)
@ -178,7 +178,7 @@ class TestActionExecutor(TestFunctionalBase):
{}) {})
event_queue = queue.Queue() event_queue = queue.Queue()
action_executor = ActionExecutor(event_queue) action_executor = ActionExecutor(self.conf, event_queue)
before_edge = processor.entity_graph.get_edge(alarm2.vertex_id, before_edge = processor.entity_graph.get_edge(alarm2.vertex_id,
alarm1.vertex_id, alarm1.vertex_id,
@ -221,7 +221,7 @@ class TestActionExecutor(TestFunctionalBase):
before_alarms = processor.entity_graph.get_vertices( before_alarms = processor.entity_graph.get_vertices(
vertex_attr_filter=alarm_vertex_attrs) vertex_attr_filter=alarm_vertex_attrs)
event_queue = queue.Queue() event_queue = queue.Queue()
action_executor = ActionExecutor(event_queue) action_executor = ActionExecutor(self.conf, event_queue)
# Test Action # Test Action
action_executor.execute(action_spec, ActionMode.DO) action_executor.execute(action_spec, ActionMode.DO)
@ -285,7 +285,7 @@ class TestActionExecutor(TestFunctionalBase):
vertex_attr_filter=alarm_vertex_attrs) vertex_attr_filter=alarm_vertex_attrs)
event_queue = queue.Queue() event_queue = queue.Queue()
action_executor = ActionExecutor(event_queue) action_executor = ActionExecutor(self.conf, event_queue)
# Test Action - undo # Test Action - undo
action_executor.execute(action_spec, ActionMode.UNDO) action_executor.execute(action_spec, ActionMode.UNDO)