From c708a6784ecf2318e6c7e453506237101f089043 Mon Sep 17 00:00:00 2001 From: Muhamad Najjar Date: Sun, 19 Nov 2017 10:00:42 +0000 Subject: [PATCH] Vitrage Persistor Service Change-Id: I4c70157ef8380825bcef844b572b1747ce95b262 --- devstack/plugin.sh | 3 +- devstack/settings | 3 +- setup.cfg | 1 + vitrage/cli/persistor.py | 37 +++++ vitrage/datasources/collector_notifier.py | 9 +- vitrage/opts.py | 2 + vitrage/persistor/__init__.py | 25 ++++ vitrage/persistor/service.py | 78 ++++++++++ vitrage/storage/base.py | 43 +++++- vitrage/storage/impl_sqlalchemy.py | 69 ++++++++- vitrage/storage/sqlalchemy/models.py | 30 +++- .../tests/database/__init__.py | 14 ++ .../tests/database/test_persistor.py | 133 ++++++++++++++++++ 13 files changed, 437 insertions(+), 10 deletions(-) create mode 100644 vitrage/cli/persistor.py create mode 100644 vitrage/persistor/__init__.py create mode 100644 vitrage/persistor/service.py create mode 100644 vitrage_tempest_tests/tests/database/__init__.py create mode 100644 vitrage_tempest_tests/tests/database/test_persistor.py diff --git a/devstack/plugin.sh b/devstack/plugin.sh index 9fcbcb0b0..f08c4ef97 100644 --- a/devstack/plugin.sh +++ b/devstack/plugin.sh @@ -305,6 +305,7 @@ function start_vitrage { run_process vitrage-graph "$VITRAGE_BIN_DIR/vitrage-graph --config-file $VITRAGE_CONF" run_process vitrage-notifier "$VITRAGE_BIN_DIR/vitrage-notifier --config-file $VITRAGE_CONF" run_process vitrage-ml "$VITRAGE_BIN_DIR/vitrage-ml --config-file $VITRAGE_CONF" + run_process vitrage-persistor "$VITRAGE_BIN_DIR/vitrage-persistor --config-file $VITRAGE_CONF" write_systemd_dependency vitrage-graph vitrage-collector @@ -334,7 +335,7 @@ function stop_vitrage { disable_apache_site vitrage restart_apache_server fi - for serv in vitrage-api vitrage-collector vitrage-graph vitrage-notifier; do + for serv in vitrage-api vitrage-collector vitrage-graph vitrage-notifier vitrage-persistor; do stop_process $serv done } diff --git a/devstack/settings b/devstack/settings index 0a6bddd10..2a997d23b 100644 --- a/devstack/settings +++ b/devstack/settings @@ -9,7 +9,8 @@ enable_service vitrage-notifier enable_service vitrage-collector # machine_learning enable_service vitrage-ml - +# Persistor +enable_service vitrage-persistor # Default directories VITRAGE_DIR=$DEST/vitrage diff --git a/setup.cfg b/setup.cfg index a0d409aaa..0f85ed907 100644 --- a/setup.cfg +++ b/setup.cfg @@ -29,6 +29,7 @@ console_scripts = vitrage-graph = vitrage.cli.graph:main vitrage-notifier = vitrage.cli.notifier:main vitrage-collector = vitrage.cli.collector:main + vitrage-persistor = vitrage.cli.persistor:main vitrage-ml = vitrage.cli.machine_learning:main vitrage-dbsync = vitrage.cli.storage:dbsync diff --git a/vitrage/cli/persistor.py b/vitrage/cli/persistor.py new file mode 100644 index 000000000..c22f278e7 --- /dev/null +++ b/vitrage/cli/persistor.py @@ -0,0 +1,37 @@ +# Copyright 2017 - Nokia +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import sys + +from oslo_log import log +from oslo_service import service as os_service +from vitrage.cli import VITRAGE_TITLE +from vitrage.persistor.service import PersistorService +from vitrage import service +from vitrage import storage + +LOG = log.getLogger(__name__) + + +def main(): + print(VITRAGE_TITLE) + conf = service.prepare_service() + db_connection = storage.get_connection_from_config(conf) + launcher = os_service.ServiceLauncher(conf) + launcher.launch_service(PersistorService(conf, db_connection)) + launcher.wait() + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/vitrage/datasources/collector_notifier.py b/vitrage/datasources/collector_notifier.py index d90217da1..3cb99fc17 100644 --- a/vitrage/datasources/collector_notifier.py +++ b/vitrage/datasources/collector_notifier.py @@ -26,12 +26,17 @@ class CollectorNotifier(object): def __init__(self, conf): self.oslo_notifier = None try: - topic = conf.datasources.notification_topic_collector + topics = [conf.datasources.notification_topic_collector] + if conf.persistor.persist_events: + topics.append(conf.persistor.persistor_topic) + else: + LOG.warning("Not persisting events") + self.oslo_notifier = oslo_messaging.Notifier( get_transport(conf), driver='messagingv2', publisher_id='datasources.events', - topics=[topic]) + topics=topics) except Exception as e: LOG.info('Collector notifier - missing configuration %s' % str(e)) diff --git a/vitrage/opts.py b/vitrage/opts.py index ee84f887a..de6f22796 100644 --- a/vitrage/opts.py +++ b/vitrage/opts.py @@ -28,6 +28,7 @@ import vitrage.machine_learning.plugins.jaccard_correlation import vitrage.notifier import vitrage.notifier.plugins.snmp import vitrage.os_clients +import vitrage.persistor import vitrage.rpc import vitrage.storage @@ -46,6 +47,7 @@ def list_opts(): ('evaluator', vitrage.evaluator.OPTS), ('consistency', vitrage.entity_graph.consistency.OPTS), ('database', vitrage.storage.OPTS), + ('persistor', vitrage.persistor.OPTS), ('entity_graph', vitrage.entity_graph.OPTS), ('service_credentials', vitrage.keystone_client.OPTS), ('machine_learning', diff --git a/vitrage/persistor/__init__.py b/vitrage/persistor/__init__.py new file mode 100644 index 000000000..1311ed77e --- /dev/null +++ b/vitrage/persistor/__init__.py @@ -0,0 +1,25 @@ +# Copyright 2017 - Nokia +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_config import cfg + +OPTS = [ + cfg.StrOpt('persistor_topic', + default='vitrage_persistor', + help='The topic on which event will be sent from the ' + 'datasources to the persistor'), + cfg.BoolOpt('persist_events', + default=False, + help='Whether or not persistor is persisting the events'), + ] diff --git a/vitrage/persistor/service.py b/vitrage/persistor/service.py new file mode 100644 index 000000000..5223f1002 --- /dev/null +++ b/vitrage/persistor/service.py @@ -0,0 +1,78 @@ +# Copyright 2017 - Nokia +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from __future__ import print_function + +import dateutil.parser +import oslo_messaging as oslo_m + +from oslo_log import log +from oslo_serialization import jsonutils +from oslo_service import service as os_service +from vitrage.common.constants import DatasourceProperties as DSProps +from vitrage.common.constants import GraphAction +from vitrage import messaging +from vitrage.storage.sqlalchemy import models + + +LOG = log.getLogger(__name__) + + +class PersistorService(os_service.Service): + def __init__(self, conf, db_connection): + super(PersistorService, self).__init__() + self.conf = conf + self.db_connection = db_connection + transport = messaging.get_transport(conf) + target = \ + oslo_m.Target(topic=conf.persistor.persistor_topic) + self.listener = messaging.get_notification_listener( + transport, [target], + [VitragePersistorEndpoint(self.db_connection)]) + + def start(self): + LOG.info("Vitrage Persistor Service - Starting...") + + super(PersistorService, self).start() + self.listener.start() + + LOG.info("Vitrage Persistor Service - Started!") + + def stop(self, graceful=False): + LOG.info("Vitrage Persistor Service - Stopping...") + + self.listener.stop() + self.listener.wait() + super(PersistorService, self).stop(graceful) + + LOG.info("Vitrage Persistor Service - Stopped!") + + +class VitragePersistorEndpoint(object): + def __init__(self, db_connection): + self.db_connection = db_connection + + def info(self, ctxt, publisher_id, event_type, payload, metadata): + LOG.debug('Vitrage Event Info: payload %s', payload) + self.process_event(payload) + + def process_event(self, data): + """:param data: Serialized to a JSON formatted ``str`` """ + if data.get(DSProps.EVENT_TYPE) == GraphAction.END_MESSAGE: + return + collector_timestamp = \ + dateutil.parser.parse(data.get(DSProps.SAMPLE_DATE)) + event_row = models.Event(payload=jsonutils.dumps(data), + collector_timestamp=collector_timestamp) + self.db_connection.events.create(event_row) diff --git a/vitrage/storage/base.py b/vitrage/storage/base.py index 810441b7a..27851c0e0 100644 --- a/vitrage/storage/base.py +++ b/vitrage/storage/base.py @@ -27,6 +27,10 @@ class Connection(object): def active_actions(self): return None + @property + def events(self): + return None + @abc.abstractmethod def upgrade(self, nocreate=False): raise NotImplementedError('upgrade not implemented') @@ -42,7 +46,6 @@ class Connection(object): @six.add_metaclass(abc.ABCMeta) class ActiveActionsConnection(object): - @abc.abstractmethod def create(self, active_action): """Create a new action. @@ -87,3 +90,41 @@ class ActiveActionsConnection(object): ): """Delete all active actions that match the filters.""" raise NotImplementedError('delete active actions not implemented') + + +@six.add_metaclass(abc.ABCMeta) +class EventsConnection(object): + def create(self, event): + """Create a new event. + + :type event: vitrage.storage.sqlalchemy.models.Event + """ + raise NotImplementedError('create event not implemented') + + def update(self, event): + """Update an existing event. + + :type event: vitrage.storage.sqlalchemy.models.Event + """ + raise NotImplementedError('update event not implemented') + + def query(self, + event_id=None, + collector_timestamp=None, + payload=None, + gt_collector_timestamp=None, + lt_collector_timestamp=None): + """Yields a lists of events that match filters. + + :rtype: list of vitrage.storage.sqlalchemy.models.Event + """ + raise NotImplementedError('query events not implemented') + + def delete(self, + event_id=None, + collector_timestamp=None, + payload=None, + gt_collector_timestamp=None, + lt_collector_timestamp=None): + """Delete all events that match the filters.""" + raise NotImplementedError('delete events not implemented') diff --git a/vitrage/storage/impl_sqlalchemy.py b/vitrage/storage/impl_sqlalchemy.py index 12e68f6bc..cb740207b 100644 --- a/vitrage/storage/impl_sqlalchemy.py +++ b/vitrage/storage/impl_sqlalchemy.py @@ -27,7 +27,6 @@ LOG = log.getLogger(__name__) class Connection(base.Connection): - def __init__(self, conf, url): options = dict(conf.database.items()) # set retries to 0 , since reconnection is already implemented @@ -40,11 +39,16 @@ class Connection(base.Connection): **options) self.conf = conf self._active_actions = ActiveActionsConnection(self._engine_facade) + self._events = EventsConnection(self._engine_facade) @property def active_actions(self): return self._active_actions + @property + def events(self): + return self._events + @staticmethod def _dress_url(url): # If no explicit driver has been set, we default to pymysql @@ -135,3 +139,66 @@ class ActiveActionsConnection(base.ActiveActionsConnection, BaseTableConn): score=score, trigger=trigger) return query.delete() + + +class EventsConnection(base.EventsConnection, BaseTableConn): + def __init__(self, engine_facade): + super(EventsConnection, self).__init__(engine_facade) + + def create(self, event): + session = self._engine_facade.get_session() + with session.begin(): + session.add(event) + + def update(self, event): + session = self._engine_facade.get_session() + with session.begin(): + session.merge(event) + + def query(self, + event_id=None, + collector_timestamp=None, + payload=None, + gt_collector_timestamp=None, + lt_collector_timestamp=None): + query = self.query_filter( + models.Event, + event_id=event_id, + collector_timestamp=collector_timestamp, + payload=payload) + + query = self._update_query_gt_lt(gt_collector_timestamp, + lt_collector_timestamp, + query) + + return query.all() + + @staticmethod + def _update_query_gt_lt(gt_collector_timestamp, + lt_collector_timestamp, + query): + if gt_collector_timestamp is not None: + query = query.filter(models.Event.collector_timestamp >= + gt_collector_timestamp) + if lt_collector_timestamp is not None: + query = query.filter(models.Event.collector_timestamp <= + lt_collector_timestamp) + return query + + def delete(self, + event_id=None, + collector_timestamp=None, + payload=None, + gt_collector_timestamp=None, + lt_collector_timestamp=None): + query = self.query_filter( + models.Event, + event_id=event_id, + collector_timestamp=collector_timestamp, + payload=payload) + + query = self._update_query_gt_lt(gt_collector_timestamp, + lt_collector_timestamp, + query) + + query.delete() diff --git a/vitrage/storage/sqlalchemy/models.py b/vitrage/storage/sqlalchemy/models.py index 69cb98af7..c93b17fc9 100644 --- a/vitrage/storage/sqlalchemy/models.py +++ b/vitrage/storage/sqlalchemy/models.py @@ -13,12 +13,12 @@ # under the License. from oslo_db.sqlalchemy import models - -from sqlalchemy import Column, String, SmallInteger, BigInteger, Index +from sqlalchemy import Column, DateTime, INTEGER, String, \ + SmallInteger, BigInteger, Index, Text from sqlalchemy.ext.declarative import declarative_base -class VitrageBase(models.TimestampMixin, models.ModelBase): +class VitrageBase(models.ModelBase): """Base class for Vitrage Models.""" __table_args__ = {'mysql_charset': "utf8", 'mysql_engine': "InnoDB"} @@ -39,7 +39,29 @@ class VitrageBase(models.TimestampMixin, models.ModelBase): Base = declarative_base(cls=VitrageBase) -class ActiveAction(Base): +class Event(Base): + + __tablename__ = 'events' + + event_id = Column("id", INTEGER, primary_key=True, nullable=False, + autoincrement=True) + collector_timestamp = Column(DateTime, index=True, nullable=False) + payload = Column(Text, nullable=False) + + def __repr__(self): + return \ + "" %\ + ( + self.event_id, + self.collector_timestamp, + self.payload + ) + + +class ActiveAction(Base, models.TimestampMixin): __tablename__ = 'active_actions' __table_args__ = ( # Index 'ix_active_action' on fields: diff --git a/vitrage_tempest_tests/tests/database/__init__.py b/vitrage_tempest_tests/tests/database/__init__.py new file mode 100644 index 000000000..ebb36c63d --- /dev/null +++ b/vitrage_tempest_tests/tests/database/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2017 - Nokia +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +__author__ = 'stack' diff --git a/vitrage_tempest_tests/tests/database/test_persistor.py b/vitrage_tempest_tests/tests/database/test_persistor.py new file mode 100644 index 000000000..78e95a0be --- /dev/null +++ b/vitrage_tempest_tests/tests/database/test_persistor.py @@ -0,0 +1,133 @@ +# Copyright 2017 Nokia +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import datetime +import six + +from oslo_log import log as logging +from oslo_serialization import jsonutils +from vitrage.common.constants import DatasourceProperties as DSProps +from vitrage.datasources import NEUTRON_PORT_DATASOURCE +from vitrage.datasources import NOVA_INSTANCE_DATASOURCE +from vitrage import storage +from vitrage_tempest_tests.tests.base import BaseVitrageTempest +from vitrage_tempest_tests.tests.common import nova_utils + +LOG = logging.getLogger(__name__) + + +INSTANCE_NAME = 'test-persistor-vm' + +INSTANCE_CREATE_EVENT = { + DSProps.ENTITY_TYPE: NOVA_INSTANCE_DATASOURCE, + DSProps.EVENT_TYPE: 'compute.instance.create.end', + 'hostname': INSTANCE_NAME + '-0' +} + +PORT_CREATE_EVENT = { + DSProps.ENTITY_TYPE: NEUTRON_PORT_DATASOURCE, + DSProps.EVENT_TYPE: 'port.create.end', +} + +PORT_UPDATE_EVENT = { + DSProps.ENTITY_TYPE: NEUTRON_PORT_DATASOURCE, + DSProps.EVENT_TYPE: 'port.update.end', +} + + +def get_first_match(events, event): + for curr_event in events: + if six.viewitems(event) <= six.viewitems(curr_event.payload): + return curr_event + + +class TestEvents(BaseVitrageTempest): + """Test class for Vitrage persisror service""" + + # noinspection PyPep8Naming + @classmethod + def setUpClass(cls): + super(TestEvents, cls).setUpClass() + cls.db_connection = storage.get_connection_from_config(cls.conf) + + def test_create_instance(self): + """This function validates creating instance events. + + Create instance generates three ordered events. + 1. neutron port is created. + 2. the port is updated to the created instance. + 3. nova instance is created with the given hostname. + """ + try: + + # Action + time_before_action = datetime.datetime.utcnow() + nova_utils.create_instances(num_instances=1, + name=INSTANCE_NAME) + + writen_events = self._load_db_events(time_before_action) + + port_create_event = get_first_match(writen_events, + PORT_CREATE_EVENT) + + self.assertIsNotNone(port_create_event, + "port.create.end event is not writen to db") + + port_update_event = get_first_match(writen_events, + PORT_UPDATE_EVENT) + + self.assertIsNotNone(port_update_event, + "port.update.end event is not writen to db") + + instance_create_event = get_first_match(writen_events, + INSTANCE_CREATE_EVENT) + + self.assertIsNotNone(instance_create_event, + "compute.instance.create.end event is not " + "writen to db") + + # Check correct timestamp order + events_timestamp_list = \ + [port_create_event.collector_timestamp, + port_update_event.collector_timestamp, + instance_create_event.collector_timestamp] + + self.assertEqual(sorted(events_timestamp_list), + events_timestamp_list, + "Events Timestamp order is wrong") + + # Check correct event_id order + events_id_list = \ + [port_create_event.event_id, + port_update_event.event_id, + instance_create_event.event_id] + + self.assertEqual(sorted(events_id_list), + events_id_list, + "Events id order is wrong") + + except Exception as e: + self._handle_exception(e) + raise + + finally: + nova_utils.delete_all_instances() + + def _load_db_events(self, time_before_action): + writen_events = self.db_connection.events.query( + gt_collector_timestamp=time_before_action) + + for event in writen_events: + event.payload = jsonutils.loads(event.payload) + return writen_events