vitrage/vitrage_tempest_tests/tests/base_mock.py
Idan Hefetz e2b964183c Separation of Evaluator and InitializationStatus from ConsistencyEnforcer
Refactoring before parallel evaluator, evaluator is currently held by consistency to be used during the initialization step.
Initialization code moved to InitializationStatus and init thread is now started from the VitrageGraphService.

Change-Id: I551b5d863da0e875b08d3f45754d4be64707fc0a
2017-07-18 13:17:40 +00:00

65 lines
2.4 KiB
Python

# Copyright 2016 Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# noinspection PyPackageRequirements
import testtools
from oslo_config import cfg
from vitrage.common.constants import DatasourceAction
from vitrage.common.constants import DatasourceProperties
from vitrage.entity_graph.processor import processor as proc
from vitrage.entity_graph.vitrage_init import VitrageInit
from vitrage.graph.driver.networkx_graph import NXGraph
from vitrage.tests.mocks import mock_driver as mock_sync
from vitrage.tests.mocks import utils
class BaseMock(testtools.TestCase):
"""Base test class for Vitrage API tests."""
PROCESSOR_OPTS = [
cfg.StrOpt('datasources_values_dir',
default=utils.get_resources_dir() + '/datasources_values'),
]
def create_processor_with_graph(self):
conf = cfg.ConfigOpts()
conf.register_opts(self.PROCESSOR_OPTS, group='entity_graph')
events = self._create_mock_events()
e_graph = NXGraph("Entity Graph", uuid=False)
init = VitrageInit(conf)
processor = proc.Processor(conf, init, e_graph)
for event in events:
processor.process_event(event)
return processor
@staticmethod
def _create_mock_events():
gen_list = mock_sync.simple_zone_generators(
2, 4, snapshot_events=2,
snap_vals={DatasourceProperties.DATASOURCE_ACTION:
DatasourceAction.INIT_SNAPSHOT})
gen_list += mock_sync.simple_host_generators(
2, 4, 4,
snap_vals={DatasourceProperties.DATASOURCE_ACTION:
DatasourceAction.INIT_SNAPSHOT})
gen_list += mock_sync.simple_instance_generators(
4, 15, 15,
snap_vals={DatasourceProperties.DATASOURCE_ACTION:
DatasourceAction.INIT_SNAPSHOT})
return mock_sync.generate_sequential_events_list(gen_list)