From 842f9d6cea1ed85bc4a0f8c30ae9cb4a22765420 Mon Sep 17 00:00:00 2001 From: Idan Hefetz Date: Thu, 15 Nov 2018 10:44:00 +0000 Subject: [PATCH] Complete removal of vitrage-collector service. - vitrage-graph will execute the drivers. - Simplify large data transfer from drivers to processor by obsoleting rpc. - Drivers don't need to create the complete list by taking advantage of python generators in order to conserve memory. - Lowering the total signiture of vitrage processes. - LockByDriver will enforce a driver does not run get_changes and get_all in parallel. Story: 2004384 Change-Id: Ie713456b2df96e24d0b15d2362a666162bfb4300 --- devstack/plugin.sh | 18 +-- devstack/settings | 2 - devstack/upgrade/settings | 4 +- devstack/upgrade/shutdhown.sh | 2 +- devstack/upgrade/upgrade.sh | 2 +- doc/source/contributor/high-scale.rst | 5 +- doc/source/install/get_started.rst | 3 +- doc/source/install/install-rdo.rst | 1 - .../collector-removal-fd805c6298d66eb0.yaml | 7 + setup.cfg | 1 - vitrage/cli/collector.py | 53 ------- vitrage/datasources/__init__.py | 4 - vitrage/datasources/listener_service.py | 103 -------------- vitrage/datasources/rpc_service.py | 93 ------------ vitrage/entity_graph/datasource_rpc.py | 70 --------- vitrage/entity_graph/driver_exec.py | 134 ++++++++++++++++++ vitrage/entity_graph/graph_init.py | 38 +++-- vitrage/entity_graph/scheduler.py | 39 ++--- vitrage/evaluator/scenario_evaluator.py | 2 +- vitrage/rpc.py | 12 -- .../listener_service/test_listener_service.py | 11 +- .../entity_graph/test_processor_service.py | 2 +- 22 files changed, 187 insertions(+), 419 deletions(-) create mode 100644 releasenotes/notes/collector-removal-fd805c6298d66eb0.yaml delete mode 100644 vitrage/cli/collector.py delete mode 100644 vitrage/datasources/listener_service.py delete mode 100644 vitrage/datasources/rpc_service.py delete mode 100644 vitrage/entity_graph/datasource_rpc.py create mode 100644 vitrage/entity_graph/driver_exec.py diff --git a/devstack/plugin.sh b/devstack/plugin.sh index 3d7d5037d..070c56fbf 100644 --- a/devstack/plugin.sh +++ b/devstack/plugin.sh @@ -300,33 +300,17 @@ function start_vitrage { fi fi - run_process vitrage-collector "$VITRAGE_BIN_DIR/vitrage-collector --config-file $VITRAGE_CONF" run_process vitrage-graph "$VITRAGE_BIN_DIR/vitrage-graph --config-file $VITRAGE_CONF" run_process vitrage-notifier "$VITRAGE_BIN_DIR/vitrage-notifier --config-file $VITRAGE_CONF" run_process vitrage-ml "$VITRAGE_BIN_DIR/vitrage-ml --config-file $VITRAGE_CONF" run_process vitrage-persistor "$VITRAGE_BIN_DIR/vitrage-persistor --config-file $VITRAGE_CONF" run_process vitrage-snmp-parsing "$VITRAGE_BIN_DIR/vitrage-snmp-parsing --config-file $VITRAGE_CONF" - write_systemd_dependency vitrage-graph vitrage-collector change_systemd_kill_mode vitrage-graph - change_systemd_kill_mode vitrage-collector } -function write_systemd_dependency { - local service_after=$1 - local service_before=$2 - local systemd_service_after="devstack@$service_after.service" - local systemd_service_before="devstack@$service_before.service" - - local unitfile_after="$SYSTEMD_DIR/$systemd_service_after" - - iniset -sudo $unitfile_after "Unit" "After" "$systemd_service_before" - - $SYSTEMCTL daemon-reload -} - function change_systemd_kill_mode { local service=$1 local systemd_service="devstack@$service.service" @@ -343,7 +327,7 @@ function stop_vitrage { disable_apache_site vitrage restart_apache_server fi - for serv in vitrage-api vitrage-collector vitrage-graph vitrage-notifier vitrage-persistor vitrage-ml vitrage-snmp-parsing; do + for serv in vitrage-api vitrage-graph vitrage-notifier vitrage-persistor vitrage-ml vitrage-snmp-parsing; do stop_process $serv done } diff --git a/devstack/settings b/devstack/settings index 000ba22b2..c6ce20d35 100644 --- a/devstack/settings +++ b/devstack/settings @@ -5,8 +5,6 @@ enable_service vitrage-api enable_service vitrage-graph # Notifier enable_service vitrage-notifier -# Collector -enable_service vitrage-collector # machine_learning enable_service vitrage-ml # Persistor diff --git a/devstack/upgrade/settings b/devstack/upgrade/settings index ee17b79c5..094005eb3 100644 --- a/devstack/upgrade/settings +++ b/devstack/upgrade/settings @@ -7,7 +7,7 @@ VITRAGE_BASE_DEVSTACK_DIR=$TOP_DIR/../../old/vitrage/devstack VITRAGE_DIR=$TOP_DIR/../../new/vitrage source $VITRAGE_BASE_DEVSTACK_DIR/settings -devstack_localrc base enable_service vitrage-api vitrage-collector vitrage-graph vitrage-notifier vitrage-ml vitrage-persistor vitrage-snmp-parsing -devstack_localrc target enable_service vitrage-api vitrage-collector vitrage-graph vitrage-notifier vitrage-ml vitrage-persistor vitrage-snmp-parsing +devstack_localrc base enable_service vitrage-api vitrage-graph vitrage-notifier vitrage-ml vitrage-persistor vitrage-snmp-parsing +devstack_localrc target enable_service vitrage-api vitrage-graph vitrage-notifier vitrage-ml vitrage-persistor vitrage-snmp-parsing BASE_RUN_SMOKE=False TARGET_RUN_SMOKE=False diff --git a/devstack/upgrade/shutdhown.sh b/devstack/upgrade/shutdhown.sh index 06fb1b4f1..75cf1585d 100755 --- a/devstack/upgrade/shutdhown.sh +++ b/devstack/upgrade/shutdhown.sh @@ -29,7 +29,7 @@ set -o xtrace stop_vitrage -SERVICES_DOWN="vitrage-api vitrage-collector vitrage-graph vitrage-notifier vitrage-ml vitrage-persistor vitrage-snmp-parsing" +SERVICES_DOWN="vitrage-api vitrage-graph vitrage-notifier vitrage-ml vitrage-persistor vitrage-snmp-parsing" # sanity check that services are actually down ensure_services_stopped $SERVICES_DOWN diff --git a/devstack/upgrade/upgrade.sh b/devstack/upgrade/upgrade.sh index 50facb00e..0db4850ff 100755 --- a/devstack/upgrade/upgrade.sh +++ b/devstack/upgrade/upgrade.sh @@ -84,7 +84,7 @@ start_vitrage # Don't succeed unless the services come up # Truncating some service names to 11 characters -ensure_services_started vitrage-api vitrage-collector vitrage-graph vitrage-notifier vitrage-ml vitrage-persistor vitrage-snmp-parsing +ensure_services_started vitrage-api vitrage-graph vitrage-notifier vitrage-ml vitrage-persistor vitrage-snmp-parsing set +o xtrace echo "*********************************************************************" diff --git a/doc/source/contributor/high-scale.rst b/doc/source/contributor/high-scale.rst index 69b4fa1db..5423d207a 100644 --- a/doc/source/contributor/high-scale.rst +++ b/doc/source/contributor/high-scale.rst @@ -7,7 +7,8 @@ In a production environment with > 50,000 entities, the following configuration Tune RPC -------- -Vitrage-graph uses RPC to request data from vitrage-collector, these requests take longer, and there is a need to increase the timeout. +Vitrage-api uses RPC to request data from vitrage-graph, these requests take longer, and there may be a need to +increase the timeout. The following should be set in ``/etc/vitrage/vitrage.conf``, under ``[DEFAULT]`` section: +----------------------+---------------------------------------------------------+-----------------+-----------------+ @@ -20,8 +21,6 @@ To apply, restart these: ``sudo service vitrage-graph restart`` -``sudo service vitrage-collector restart`` - Restart the Vitrage api (either vitrage-api or apache) diff --git a/doc/source/install/get_started.rst b/doc/source/install/get_started.rst index fae045627..15fcba8b0 100644 --- a/doc/source/install/get_started.rst +++ b/doc/source/install/get_started.rst @@ -8,13 +8,12 @@ The Root Cause Analysis service consists of the following components: ``vitrage-graph`` service The main process. It includes the in-memory entity graph and the template evaluator. + Also responsible for retrieving data from the different datasources ``vitrage-notifier`` service Used for notifying external systems about Vitrage alarms/state changes. It only calls Nova force-down API and Simple Network Management Protocol (SNMP) in the Ocata release. ``vitrage-api`` service The API layer for Vitrage. -``vitrage-collector`` service - Responsible for retrieving data from the different datasources. ``vitrage-ml`` service Performs alarm analysis using Machine Learning methods. ``vitrage-persistor`` service diff --git a/doc/source/install/install-rdo.rst b/doc/source/install/install-rdo.rst index a09060416..168088d74 100644 --- a/doc/source/install/install-rdo.rst +++ b/doc/source/install/install-rdo.rst @@ -174,7 +174,6 @@ Run the following commands: .. code:: bash - vitrage-collector vitrage-graph vitrage-api vitrage-notifier diff --git a/releasenotes/notes/collector-removal-fd805c6298d66eb0.yaml b/releasenotes/notes/collector-removal-fd805c6298d66eb0.yaml new file mode 100644 index 000000000..cefff11b6 --- /dev/null +++ b/releasenotes/notes/collector-removal-fd805c6298d66eb0.yaml @@ -0,0 +1,7 @@ +--- +features: + - Collector service removal to simplify and enhance scale performance. + vitrage-collector service was removed and vitrage-graph is responsible + to execute the drivers. + Allowing drivers to take advantage of python yield generators and conserve + memory. diff --git a/setup.cfg b/setup.cfg index d8d9e3bc1..f26f64cc4 100644 --- a/setup.cfg +++ b/setup.cfg @@ -28,7 +28,6 @@ console_scripts = vitrage-api = vitrage.cli.api:main vitrage-graph = vitrage.cli.graph:main vitrage-notifier = vitrage.cli.notifier:main - vitrage-collector = vitrage.cli.collector:main vitrage-persistor = vitrage.cli.persistor:main vitrage-ml = vitrage.cli.machine_learning:main vitrage-dbsync = vitrage.cli.storage:dbsync diff --git a/vitrage/cli/collector.py b/vitrage/cli/collector.py deleted file mode 100644 index b0b0bceca..000000000 --- a/vitrage/cli/collector.py +++ /dev/null @@ -1,53 +0,0 @@ -# Copyright 2017 - Nokia -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import cotyledon -import sys - - -from vitrage.cli import VITRAGE_TITLE -from vitrage.common import utils -from vitrage.datasources.listener_service import ListenerService -from vitrage.datasources.rpc_service import CollectorRpcHandlerService -from vitrage import service - - -class CollectorService(cotyledon.Service): - - def __init__(self, worker_id, conf): - super(CollectorService, self).__init__(worker_id) - self.csvc = CollectorRpcHandlerService(conf) - utils.spawn(self.csvc.start) - self.lsvc = ListenerService(conf) - utils.spawn(self.lsvc.start) - - def terminate(self): - super(CollectorService, self).terminate() - self.lsvc.stop() - self.csvc.stop() - - -def main(): - - """Starts all the datasources drivers services""" - - print(VITRAGE_TITLE) - conf = service.prepare_service() - sm = cotyledon.ServiceManager() - sm.add(CollectorService, args=(conf,)) - sm.run() - - -if __name__ == "__main__": - sys.exit(main()) diff --git a/vitrage/datasources/__init__.py b/vitrage/datasources/__init__.py index 07e60a6ac..946ecba6b 100644 --- a/vitrage/datasources/__init__.py +++ b/vitrage/datasources/__init__.py @@ -53,8 +53,4 @@ OPTS = [ cfg.StrOpt('notification_exchange', required=False, help='Exchange that is used for notifications.'), - cfg.StrOpt('notification_topic_collector', - default='vitrage_collector_notifications', - help='The topic on which event will be sent from the ' - 'datasources to the graph processor') ] diff --git a/vitrage/datasources/listener_service.py b/vitrage/datasources/listener_service.py deleted file mode 100644 index 798f4f0ea..000000000 --- a/vitrage/datasources/listener_service.py +++ /dev/null @@ -1,103 +0,0 @@ -# Copyright 2016 - Nokia -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from collections import defaultdict - -from oslo_log import log -import oslo_messaging - -from vitrage.datasources import utils -from vitrage import messaging -from vitrage.messaging import VitrageNotifier - -LOG = log.getLogger(__name__) - - -class ListenerService(object): - - def __init__(self, conf): - super(ListenerService, self).__init__() - self.enrich_callbacks_by_events = \ - self._create_callbacks_by_events_dict(conf) - - topics = [conf.datasources.notification_topic_collector] - notifier = VitrageNotifier(conf, 'driver.events', topics) - self.listener = self._get_topics_listener(conf, notifier.notify) - - def start(self): - LOG.info("Vitrage data source Listener Service - Starting...") - - self.listener.start() - - LOG.info("Vitrage data source Listener Service - Started!") - - def stop(self): - LOG.info("Vitrage data source Listener Service - Stopping...") - - # Should it be here? - # self.listener.stop() - # self.listener.wait() - - LOG.info("Vitrage data source Listener Service - Stopped!") - - @classmethod - def _create_callbacks_by_events_dict(cls, conf): - ret = defaultdict(list) - driver_names = utils.get_push_drivers_names(conf) - push_drivers = utils.get_drivers_by_name(conf, driver_names) - - for driver in push_drivers: - for event in driver.get_event_types(): - ret[event].append(driver.enrich_event) - - return ret - - def _get_topics_listener(self, conf, callback): - topics = conf.datasources.notification_topics - exchange = conf.datasources.notification_exchange - transport = messaging.get_transport(conf) - targets = [oslo_messaging.Target(exchange=exchange, topic=topic) - for topic in topics] - - return messaging.get_notification_listener( - transport, - targets, - [NotificationsEndpoint(self.enrich_callbacks_by_events, callback)]) - - -class NotificationsEndpoint(object): - - def __init__(self, enrich_callback_by_events, enqueue_callback): - self.enrich_callbacks_by_events = enrich_callback_by_events - self.enqueue_callback = enqueue_callback - - def info(self, ctxt, publisher_id, event_type, payload, metadata): - for event_string in self.enrich_callbacks_by_events: - if str(event_type) == event_string: - - callbacks = self.enrich_callbacks_by_events[event_string] - enriched_events = [] - for callback in callbacks: - result = callback(payload, event_type) - if isinstance(result, list): - enriched_events += result - else: - enriched_events.append(result) - self._enqueue_events(enriched_events) - - def _enqueue_events(self, enriched_events): - for event in enriched_events: - if event is not None: - self.enqueue_callback(event_type='', data=event) - LOG.debug('EVENT ENQUEUED: \n' + str(event)) diff --git a/vitrage/datasources/rpc_service.py b/vitrage/datasources/rpc_service.py deleted file mode 100644 index 5ca142288..000000000 --- a/vitrage/datasources/rpc_service.py +++ /dev/null @@ -1,93 +0,0 @@ -# Copyright 2018 - Nokia -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -import base64 -from concurrent import futures -from six.moves import cPickle -import time -import zlib - -from oslo_log import log - -from vitrage.common.constants import DatasourceAction -from vitrage.datasources import utils -from vitrage import rpc as vitrage_rpc - -LOG = log.getLogger(__name__) - - -class CollectorRpcHandlerService(object): - - def __init__(self, conf): - self.conf = conf - self.server = vitrage_rpc.get_default_server( - conf, - conf.rpc_topic_collector, - [DriversEndpoint(conf)]) - - def start(self): - LOG.info("Collector Rpc Handler Service - Starting...") - self.server.start() - LOG.info("Collector Rpc Handler Service - Started!") - - def stop(self): - LOG.info("Collector Rpc Handler Service - Stopping...") - self.server.stop() - LOG.info("Collector Rpc Handler Service - Stopped!") - - -def compress_events(events): - str_data = cPickle.dumps(events, cPickle.HIGHEST_PROTOCOL) - return base64.b64encode(zlib.compress(str_data)) - - -class DriversEndpoint(object): - - def __init__(self, conf): - self.conf = conf - self.pool = futures.ThreadPoolExecutor( - max_workers=len(self.conf.datasources.types)) - - def driver_get_all(self, ctx, driver_names, action, retry_on_fault=False): - """Call get_all for specified drivers""" - LOG.debug("run drivers get_all: %s %s", driver_names, action) - drivers = utils.get_drivers_by_name(self.conf, driver_names) - fault_interval = self.conf.datasources.snapshot_interval_on_fault - - def run_driver(driver): - try: - return True, driver.get_all(action) - except Exception: - LOG.exception('Driver failed') - return False, driver - - result = list(self.pool.map(run_driver, drivers)) - failed_drivers = [driver for success, driver in result if not success] - if failed_drivers and retry_on_fault: - LOG.info('retrying failed drivers in %s seconds', fault_interval) - time.sleep(fault_interval) - result.extend(list(self.pool.map(run_driver, failed_drivers))) - - events = compress_events([e for success, events in result if success - for e in events]) - LOG.debug("run drivers get_all done.") - return events - - def driver_get_changes(self, ctx, driver_name): - """Call get_changes for a specific driver""" - LOG.debug("run driver get_changes: %s", driver_name) - drivers = utils.get_drivers_by_name(self.conf, [driver_name]) - events = drivers[0].get_changes(DatasourceAction.UPDATE) - events = compress_events([e for e in events]) - LOG.debug("run driver get_changes: %s done.", driver_name) - return events diff --git a/vitrage/entity_graph/datasource_rpc.py b/vitrage/entity_graph/datasource_rpc.py deleted file mode 100644 index b3b330e75..000000000 --- a/vitrage/entity_graph/datasource_rpc.py +++ /dev/null @@ -1,70 +0,0 @@ -# Copyright 2018 - Nokia -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -from base64 import standard_b64decode -from six.moves import cPickle -import time -import zlib - -from oslo_log import log -import oslo_messaging - -from vitrage import messaging -from vitrage import rpc as vitrage_rpc - -LOG = log.getLogger(__name__) - - -def create_rpc_client_instance(conf): - transport = messaging.get_rpc_transport(conf) - target = oslo_messaging.Target(topic=conf.rpc_topic_collector) - client = vitrage_rpc.get_client(transport, target) - return client - - -def get_all(rpc_client, events_coordination, driver_names, action, - retry_on_fault=False): - LOG.info('get_all starting for %s', driver_names) - t1 = time.time() - - def _call(): - result = rpc_client.call( - {}, - 'driver_get_all', - driver_names=driver_names, - action=action, - retry_on_fault=retry_on_fault) - events = cPickle.loads(zlib.decompress(standard_b64decode(result))) - for e in events: - yield e - - try: - events = _call() - except oslo_messaging.MessagingTimeout: - LOG.exception('Got MessagingTimeout') - events = _call() if retry_on_fault else [] - t2 = time.time() - count = events_coordination.handle_multiple_low_priority(events) - t3 = time.time() - LOG.info('get_all took %s, processing took %s for %s events', - t2 - t1, t3 - t2, count) - - -def get_changes(rpc_client, events_coordination, driver_name): - LOG.info('get_changes starting %s', driver_name) - result = rpc_client.call( - {}, - 'driver_get_changes', - driver_name=driver_name) - events = cPickle.loads(zlib.decompress(standard_b64decode(result))) - events_coordination.handle_multiple_low_priority(events) diff --git a/vitrage/entity_graph/driver_exec.py b/vitrage/entity_graph/driver_exec.py new file mode 100644 index 000000000..1806dde79 --- /dev/null +++ b/vitrage/entity_graph/driver_exec.py @@ -0,0 +1,134 @@ +# Copyright 2018 - Nokia +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from collections import defaultdict +import threading +import time + +from oslo_log import log +import oslo_messaging + +from vitrage.common.constants import DatasourceAction +from vitrage.datasources import utils +from vitrage import messaging + +LOG = log.getLogger(__name__) + + +class DriverExec(object): + + def __init__(self, conf, process_output_func, persist): + self.conf = conf + self.process_output_func = process_output_func + self.persist = persist + + def snapshot_get_all(self, action=DatasourceAction.INIT_SNAPSHOT): + driver_names = self.conf.datasources.types + LOG.info('get_all starting for %s', driver_names) + t1 = time.time() + events_count = 0 + for d in driver_names: + events_count += self.get_all(d, action) + LOG.info('get_all and processing took %s for %s events', + time.time() - t1, events_count) + self.persist.store_graph() + + def get_all(self, driver_name, action): + try: + LOCK_BY_DRIVER.acquire(driver_name) + driver = utils.get_drivers_by_name(self.conf, [driver_name])[0] + LOG.info("run driver get_all: %s", driver_name) + events = driver.get_all(action) + count = self.process_output_func(events) + LOG.info("run driver get_all: %s done (%s events)", + driver_name, count) + return count + except Exception: + LOG.exception("run driver get_all: %s Failed", driver_name) + finally: + LOCK_BY_DRIVER.release(driver_name) + return 0 + + def get_changes(self, driver_name): + if not LOCK_BY_DRIVER.acquire(driver_name, blocking=False): + LOG.info("%s get_changes canceled during get_all execution", + driver_name) + return 0 + try: + driver = utils.get_drivers_by_name(self.conf, [driver_name])[0] + LOG.info("run driver get_changes: %s", driver_name) + events = driver.get_changes(DatasourceAction.UPDATE) + count = self.process_output_func(events) + LOG.info("run driver get_changes: %s done (%s events)", + driver_name, count) + return count + except Exception: + LOG.exception("run driver get_changes: %s Failed", driver_name) + finally: + LOCK_BY_DRIVER.release(driver_name) + return 0 + + +class DriversNotificationEndpoint(object): + + def __init__(self, conf, processor_func): + self._conf = conf + self._processor_func = processor_func + self._enrich_event_methods = defaultdict(list) + + def init(self): + driver_names = utils.get_push_drivers_names(self._conf) + push_drivers = utils.get_drivers_by_name(self._conf, driver_names) + for driver in push_drivers: + for event in driver.get_event_types(): + self._enrich_event_methods[event].append(driver.enrich_event) + return self + + def get_listener(self): + topics = self._conf.datasources.notification_topics + exchange = self._conf.datasources.notification_exchange + transport = messaging.get_transport(self._conf) + targets = [oslo_messaging.Target(exchange=exchange, topic=topic) + for topic in topics] + + return messaging.get_notification_listener(transport, targets, [self]) + + def info(self, ctxt, publisher_id, event_type, payload, metadata): + funcs = self._enrich_event_methods[str(event_type)] + events = [] + for func in funcs: + result = func(payload, event_type) + if isinstance(result, list): + events += result + else: + events.append(result) + events = [x for x in events if x is not None] + LOG.info('EVENTS ENQUEUED: \n' + str(events)) + self._processor_func(events) + + +class LockByDriver(object): + + def __init__(self): + self.lock_by_driver = dict() + + def acquire(self, driver_name, blocking=True): + if not self.lock_by_driver.get(driver_name): + self.lock_by_driver[driver_name] = threading.Lock() + return self.lock_by_driver[driver_name].acquire(blocking) + + def release(self, driver_name): + self.lock_by_driver[driver_name].release() + + +LOCK_BY_DRIVER = LockByDriver() diff --git a/vitrage/entity_graph/graph_init.py b/vitrage/entity_graph/graph_init.py index eb2c2531c..f8f8f6abc 100644 --- a/vitrage/entity_graph/graph_init.py +++ b/vitrage/entity_graph/graph_init.py @@ -17,11 +17,10 @@ import time from oslo_log import log import oslo_messaging -from vitrage.common.constants import DatasourceAction from vitrage.common.constants import VertexProperties as VProps from vitrage.common.utils import spawn from vitrage.datasources.transformer_base import TransformerBase -from vitrage.entity_graph import datasource_rpc as ds_rpc +from vitrage.entity_graph import driver_exec from vitrage.entity_graph import EVALUATOR_TOPIC from vitrage.entity_graph.graph_persistency import GraphPersistency from vitrage.entity_graph.processor.notifier import GraphNotifier @@ -42,14 +41,13 @@ class VitrageGraphInit(object): self.graph = graph self.db = db_connection self.workers = GraphWorkersManager(conf, graph, db_connection) - self.events_coordination = EventsCoordination( - conf, - self.process_event, - conf.datasources.notification_topic_collector, - EVALUATOR_TOPIC) + self.events_coordination = EventsCoordination(conf, self.process_event) self.persist = GraphPersistency(conf, db_connection, graph) - self.scheduler = Scheduler(conf, graph, self.events_coordination, - self.persist) + self.driver_exec = driver_exec.DriverExec( + self.conf, + self.events_coordination.handle_multiple_low_priority, + self.persist) + self.scheduler = Scheduler(conf, graph, self.driver_exec, self.persist) self.processor = Processor(conf, graph) def run(self): @@ -78,14 +76,8 @@ class VitrageGraphInit(object): LOG.info('Disabling previously active alarms') self.db.history_facade.disable_alarms_in_history() self.subscribe_presist_notifier() - ds_rpc.get_all( - ds_rpc.create_rpc_client_instance(self.conf), - self.events_coordination, - self.conf.datasources.types, - action=DatasourceAction.INIT_SNAPSHOT, - retry_on_fault=True) + self.driver_exec.snapshot_get_all() LOG.info("%s vertices loaded", self.graph.num_vertices()) - self.persist.store_graph() spawn(self._start_all_workers, is_snapshot=False) def _start_all_workers(self, is_snapshot): @@ -130,7 +122,7 @@ PRIORITY_DELAY = 0.05 class EventsCoordination(object): - def __init__(self, conf, do_work_func, topic_low, topic_high): + def __init__(self, conf, do_work_func): self._conf = conf self._lock = threading.Lock() self._high_event_finish_time = 0 @@ -143,12 +135,16 @@ class EventsCoordination(object): self._do_work_func = do_work - self._low_pri_listener = self._init_listener( - topic_low, self._do_low_priority_work) - self._high_pri_listener = self._init_listener( - topic_high, self._do_high_priority_work) + self._low_pri_listener = None + self._high_pri_listener = None def start(self): + self._low_pri_listener = driver_exec.DriversNotificationEndpoint( + self._conf, + self.handle_multiple_low_priority).init().get_listener() + self._high_pri_listener = self._init_listener( + EVALUATOR_TOPIC, + self._do_high_priority_work) LOG.info('Listening on %s', self._high_pri_listener.targets[0].topic) LOG.info('Listening on %s', self._low_pri_listener.targets[0].topic) self._high_pri_listener.start() diff --git a/vitrage/entity_graph/scheduler.py b/vitrage/entity_graph/scheduler.py index cfd839ffa..2df5afe1c 100644 --- a/vitrage/entity_graph/scheduler.py +++ b/vitrage/entity_graph/scheduler.py @@ -22,31 +22,33 @@ from vitrage.common.utils import spawn from vitrage.entity_graph.consistency.consistency_enforcer import\ ConsistencyEnforcer -from vitrage.entity_graph import datasource_rpc as ds_rpc LOG = log.getLogger(__name__) class Scheduler(object): - def __init__(self, conf, graph, events_coordination, persist): + def __init__(self, conf, graph, driver_exec, persist): super(Scheduler, self).__init__() self.conf = conf self.graph = graph - self.events_coordination = events_coordination + self.driver_exec = driver_exec self.persist = persist self.consistency = ConsistencyEnforcer(conf, graph) self.periodic = None def start_periodic_tasks(self): + thread_num = len(utils.get_pull_drivers_names(self.conf)) + thread_num += 2 # for consistency and get_all self.periodic = periodics.PeriodicWorker.create( - [], executor_factory=lambda: ThreadPoolExecutor(max_workers=10)) + [], executor_factory=lambda: ThreadPoolExecutor( + max_workers=thread_num)) - self.add_consistency_timer() - self.add_rpc_datasources_timers() + self._add_consistency_timer() + self._add_datasource_timers() spawn(self.periodic.start) - def add_consistency_timer(self): + def _add_consistency_timer(self): spacing = self.conf.datasources.snapshots_interval @periodics.periodic(spacing=spacing) @@ -59,20 +61,12 @@ class Scheduler(object): self.periodic.add(consistency_periodic) LOG.info("added consistency_periodic (spacing=%s)", spacing) - def add_rpc_datasources_timers(self): + def _add_datasource_timers(self): spacing = self.conf.datasources.snapshots_interval - rpc_client = ds_rpc.create_rpc_client_instance(self.conf) @periodics.periodic(spacing=spacing) def get_all_periodic(): - try: - ds_rpc.get_all(rpc_client, - self.events_coordination, - self.conf.datasources.types, - DatasourceAction.SNAPSHOT) - self.persist.store_graph() - except Exception: - LOG.exception('get_all_periodic failed.') + self.driver_exec.snapshot_get_all(DatasourceAction.SNAPSHOT) self.periodic.add(get_all_periodic) LOG.info("added get_all_periodic (spacing=%s)", spacing) @@ -80,17 +74,10 @@ class Scheduler(object): driver_names = utils.get_pull_drivers_names(self.conf) for d_name in driver_names: spacing = self.conf[d_name].changes_interval - rpc_client = ds_rpc.create_rpc_client_instance(self.conf) @periodics.periodic(spacing=spacing) - def get_changes_periodic(driver_name=d_name): - try: - ds_rpc.get_changes(rpc_client, - self.events_coordination, - driver_name) - except Exception: - LOG.exception('get_changes_periodic "%s" failed.', - driver_name) + def get_changes_periodic(): + self.driver_exec.get_changes(d_name) self.periodic.add(get_changes_periodic) LOG.info("added get_changes_periodic %s (spacing=%s)", diff --git a/vitrage/evaluator/scenario_evaluator.py b/vitrage/evaluator/scenario_evaluator.py index 03c91296f..c96b32973 100644 --- a/vitrage/evaluator/scenario_evaluator.py +++ b/vitrage/evaluator/scenario_evaluator.py @@ -11,6 +11,7 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. +from collections import defaultdict from collections import namedtuple from collections import OrderedDict import copy @@ -23,7 +24,6 @@ from vitrage.common.constants import EdgeProperties as EProps from vitrage.common.constants import VertexProperties as VProps from vitrage.common.utils import md5 from vitrage.common.utils import recursive_keypairs -from vitrage.datasources.listener_service import defaultdict from vitrage.entity_graph.mappings.datasource_info_mapper \ import DatasourceInfoMapper from vitrage.evaluator.actions.action_executor import ActionExecutor diff --git a/vitrage/rpc.py b/vitrage/rpc.py index 1daccbcb8..86f13d48b 100644 --- a/vitrage/rpc.py +++ b/vitrage/rpc.py @@ -18,7 +18,6 @@ from oslo_config import cfg from oslo_log import log import oslo_messaging as messaging from oslo_messaging.rpc import dispatcher -from oslo_utils import uuidutils from osprofiler import profiler @@ -26,9 +25,6 @@ OPTS = [ cfg.StrOpt('rpc_topic', default='rpcapiv1', help='The topic vitrage listens on'), - cfg.StrOpt('rpc_topic_collector', - default='rpc-collector', - help='The topic vitrage-collector listens on'), ] LOG = log.getLogger(__name__) @@ -90,14 +86,6 @@ def get_client(transport, target, version_cap=None, serializer=None): serializer=serializer) -def get_default_server(conf, topic, endpoints): - transport = messaging.get_rpc_transport(conf) - target = messaging.Target( - topic=topic, - server=uuidutils.generate_uuid()) - return get_server(target, endpoints, transport) - - def get_server(target, endpoints, transport, serializer=None): assert transport is not None diff --git a/vitrage/tests/functional/datasources/listener_service/test_listener_service.py b/vitrage/tests/functional/datasources/listener_service/test_listener_service.py index 0fc2c447e..899b59cf1 100644 --- a/vitrage/tests/functional/datasources/listener_service/test_listener_service.py +++ b/vitrage/tests/functional/datasources/listener_service/test_listener_service.py @@ -11,9 +11,9 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. +from vitrage.entity_graph.driver_exec import DriversNotificationEndpoint from vitrage.datasources.driver_base import DriverBase -from vitrage.datasources.listener_service import NotificationsEndpoint from vitrage.tests import base from vitrage.tests.mocks import mock_driver @@ -39,8 +39,8 @@ class TestListenerService(base.BaseTest): def setUpClass(cls): super(TestListenerService, cls).setUpClass() - def _add_event_to_actual_events(self, event_type, data): - self.actual_events.append(data) + def _add_event_to_actual_events(self, events): + self.actual_events.extend(events) def _set_excepted_events(self, events): self.excepted_events = events @@ -61,9 +61,10 @@ class TestListenerService(base.BaseTest): my_test_driver = MyTestDriver() enrich_callbacks_by_events = {"mock": [my_test_driver.enrich_event]} - endpoint = NotificationsEndpoint( - enrich_callbacks_by_events, + endpoint = DriversNotificationEndpoint( + None, self._add_event_to_actual_events) + endpoint._enrich_event_methods = enrich_callbacks_by_events # test handling one event events = self._generate_events(1) diff --git a/vitrage/tests/unit/entity_graph/test_processor_service.py b/vitrage/tests/unit/entity_graph/test_processor_service.py index 68de7e269..56685dc8a 100644 --- a/vitrage/tests/unit/entity_graph/test_processor_service.py +++ b/vitrage/tests/unit/entity_graph/test_processor_service.py @@ -39,7 +39,7 @@ class EventsCoordinationTest(base.BaseTest): the result should be the number of low priority calls. 0*(2^n) + 1*n """ - priority_listener = EventsCoordination(None, self.do_work, None, None) + priority_listener = EventsCoordination(None, self.do_work) def write_high(): for i in range(10000):