infrastructure changes for synchronizers and consistency
Change-Id: I6854feca155893356e9509d47021e42322074538
This commit is contained in:
parent
f2f9879d0a
commit
76ac86993d
@ -70,3 +70,4 @@ class EventAction(object):
|
||||
CREATE = 'create'
|
||||
DELETE = 'delete'
|
||||
UPDATE = 'update'
|
||||
END_MESSAGE = 'end_message'
|
||||
|
@ -13,9 +13,9 @@
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
import yaml
|
||||
|
||||
from oslo_log import log
|
||||
import yaml
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
@ -35,15 +35,21 @@ def load_yaml_files(dir_path, with_exception=False):
|
||||
yaml_files = []
|
||||
for file in files:
|
||||
full_path = dir_path + '/' + file
|
||||
with open(full_path, 'r') as stream:
|
||||
try:
|
||||
config = yaml.load(stream, Loader=yaml.BaseLoader)
|
||||
except Exception as e:
|
||||
if with_exception:
|
||||
raise e
|
||||
else:
|
||||
LOG.error("Fails to parse file: %s. %s" % (full_path, e))
|
||||
|
||||
config = load_yaml_file(full_path, with_exception)
|
||||
if config:
|
||||
yaml_files.append(config)
|
||||
|
||||
return yaml_files
|
||||
|
||||
|
||||
def load_yaml_file(full_path, with_exception=False):
|
||||
with open(full_path, 'r') as stream:
|
||||
try:
|
||||
return yaml.load(stream, Loader=yaml.BaseLoader)
|
||||
except Exception as e:
|
||||
if with_exception:
|
||||
raise e
|
||||
else:
|
||||
LOG.error("Fails to parse file: %s. %s" % (full_path, e))
|
||||
return None
|
||||
|
@ -14,6 +14,7 @@
|
||||
|
||||
import json
|
||||
|
||||
import eventlet
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
import oslo_messaging
|
||||
@ -28,6 +29,8 @@ from vitrage.graph import Direction
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
eventlet.monkey_patch()
|
||||
|
||||
|
||||
class VitrageApiHandlerService(os_service.Service):
|
||||
|
||||
@ -61,7 +64,7 @@ class VitrageApiHandlerService(os_service.Service):
|
||||
|
||||
# TODO(Dany) use eventlet instead of threading
|
||||
server = oslo_messaging.get_rpc_server(transport, target,
|
||||
endpoints, executor='threading')
|
||||
endpoints, executor='eventlet')
|
||||
|
||||
server.start()
|
||||
|
||||
|
@ -13,6 +13,7 @@
|
||||
# under the License.
|
||||
|
||||
import abc
|
||||
|
||||
import six
|
||||
|
||||
|
||||
@ -37,3 +38,7 @@ class ProcessorBase(object):
|
||||
@abc.abstractmethod
|
||||
def delete_entity(self, deleted_vertex, neighbors):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def handle_end_message(self, event):
|
||||
pass
|
||||
|
@ -125,6 +125,9 @@ class Processor(processor.ProcessorBase):
|
||||
LOG.info("Delete event arrived on invalid resource: %s",
|
||||
deleted_vertex)
|
||||
|
||||
def handle_end_message(self, vertex, neighbors):
|
||||
return
|
||||
|
||||
def transform_entity(self, event):
|
||||
return self.transformer.transform(event)
|
||||
|
||||
@ -146,8 +149,8 @@ class Processor(processor.ProcessorBase):
|
||||
neighbors, valid_edges)
|
||||
for (vertex, edge) in neighbors:
|
||||
graph_vertex = self.entity_graph.get_vertex(vertex.vertex_id)
|
||||
if (not graph_vertex) or self.entity_graph.check_update_validation(
|
||||
graph_vertex, vertex):
|
||||
if not graph_vertex or \
|
||||
not self.entity_graph.is_vertex_deleted(graph_vertex):
|
||||
if self.entity_graph.can_update_vertex(graph_vertex, vertex):
|
||||
LOG.debug("Updates vertex: %s", vertex)
|
||||
self.entity_graph.update_vertex(vertex)
|
||||
@ -215,5 +218,6 @@ class Processor(processor.ProcessorBase):
|
||||
self.actions = {
|
||||
EventAction.CREATE: self.create_entity,
|
||||
EventAction.UPDATE: self.update_entity,
|
||||
EventAction.DELETE: self.delete_entity
|
||||
EventAction.DELETE: self.delete_entity,
|
||||
EventAction.END_MESSAGE: self.handle_end_message
|
||||
}
|
||||
|
@ -16,7 +16,9 @@ from oslo_log import log as logging
|
||||
from oslo_utils import importutils
|
||||
|
||||
from vitrage.common.constants import EntityType
|
||||
from vitrage.common.constants import EventAction
|
||||
from vitrage.common.constants import SynchronizerProperties as SyncProps
|
||||
from vitrage.common.constants import SyncMode
|
||||
from vitrage.common.exception import VitrageTransformerError
|
||||
from vitrage.synchronizer.plugins.nagios.transformer import NagiosTransformer
|
||||
from vitrage.synchronizer.plugins.nova.host.transformer import HostTransformer
|
||||
@ -24,8 +26,8 @@ from vitrage.synchronizer.plugins.nova.instance.transformer import \
|
||||
InstanceTransformer
|
||||
from vitrage.synchronizer.plugins.nova.zone.transformer import ZoneTransformer
|
||||
from vitrage.synchronizer.plugins.static_physical.transformer import \
|
||||
StaticPhysical
|
||||
|
||||
StaticPhysicalTransformer
|
||||
from vitrage.synchronizer.plugins import transformer_base
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -54,7 +56,8 @@ class TransformerManager(object):
|
||||
transformers)
|
||||
|
||||
transformers[EntityType.SWITCH] = importutils.import_object(
|
||||
"%s.%s" % (StaticPhysical.__module__, StaticPhysical.__name__),
|
||||
"%s.%s" % (StaticPhysicalTransformer.__module__,
|
||||
StaticPhysicalTransformer.__name__),
|
||||
transformers)
|
||||
|
||||
transformers[EntityType.NAGIOS] = importutils.import_object(
|
||||
@ -75,14 +78,18 @@ class TransformerManager(object):
|
||||
return transformer
|
||||
|
||||
def transform(self, entity_event):
|
||||
if not self._is_end_message(entity_event):
|
||||
try:
|
||||
sync_type = entity_event[SyncProps.SYNC_TYPE]
|
||||
except KeyError:
|
||||
raise VitrageTransformerError(
|
||||
'Entity Event must contains sync_type field.')
|
||||
|
||||
try:
|
||||
sync_type = entity_event[SyncProps.SYNC_TYPE]
|
||||
except KeyError:
|
||||
raise VitrageTransformerError(
|
||||
'Entity Event must contains sync_type field.')
|
||||
|
||||
return self.get_transformer(sync_type).transform(entity_event)
|
||||
return self.get_transformer(sync_type).transform(entity_event)
|
||||
else:
|
||||
return transformer_base.EntityWrapper(None,
|
||||
None,
|
||||
EventAction.END_MESSAGE)
|
||||
|
||||
def extract_key(self, entity_event):
|
||||
|
||||
@ -93,3 +100,9 @@ class TransformerManager(object):
|
||||
'Entity Event must contains sync_type field.')
|
||||
|
||||
return self.get_transformer(sync_type).extract_key()
|
||||
|
||||
@staticmethod
|
||||
def _is_end_message(entity_event):
|
||||
return entity_event[SyncProps.SYNC_MODE] == SyncMode.INIT_SNAPSHOT and\
|
||||
SyncProps.EVENT_TYPE in entity_event and \
|
||||
entity_event[SyncProps.EVENT_TYPE] == EventAction.END_MESSAGE
|
||||
|
@ -19,4 +19,13 @@ OPTS = [
|
||||
default=600,
|
||||
min=10,
|
||||
help='interval between full snapshots'),
|
||||
cfg.IntOpt('nagios_changes_interval',
|
||||
default=30,
|
||||
min=30,
|
||||
help='interval between checking changes in nagios plugin'),
|
||||
cfg.IntOpt('static_physical_changes_interval',
|
||||
default=30,
|
||||
min=30,
|
||||
help='interval between checking changes in the configuration '
|
||||
'files of the physical topology plugin'),
|
||||
]
|
||||
|
@ -1,4 +1,5 @@
|
||||
# Copyright 2016 - Alcatel-Lucent
|
||||
# Copyright 2016 - Nokia
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
@ -15,6 +16,7 @@
|
||||
from oslo_log import log
|
||||
from oslo_service import service as os_service
|
||||
|
||||
from services import ChangesService
|
||||
from services import SnapshotsService
|
||||
from vitrage.synchronizer.plugins.nagios.synchronizer import NagiosSynchronizer
|
||||
from vitrage.synchronizer.plugins.nova.host.synchronizer import \
|
||||
@ -24,8 +26,7 @@ from vitrage.synchronizer.plugins.nova.instance.synchronizer import \
|
||||
from vitrage.synchronizer.plugins.nova.zone.synchronizer import \
|
||||
ZoneSynchronizer
|
||||
from vitrage.synchronizer.plugins.static_physical.synchronizer import \
|
||||
StaticPhysical
|
||||
|
||||
StaticPhysicalSynchronizer
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
@ -40,11 +41,10 @@ def create_send_to_queue_callback(queue):
|
||||
class Launcher(object):
|
||||
|
||||
def __init__(self, conf, callback):
|
||||
|
||||
self.conf = conf
|
||||
self.callback = callback
|
||||
self.plugins = self._init_registered_plugins()
|
||||
self.services = [SnapshotsService(conf, self.plugins)]
|
||||
self.snapshot_plugins = self._register_snapshot_plugins()
|
||||
self.services = self._register_services()
|
||||
|
||||
def launch(self):
|
||||
launcher = os_service.ProcessLauncher(self.conf)
|
||||
@ -52,7 +52,7 @@ class Launcher(object):
|
||||
service.set_callback(self.callback)
|
||||
launcher.launch_service(service, 1)
|
||||
|
||||
def _init_registered_plugins(self):
|
||||
def _register_snapshot_plugins(self):
|
||||
version = 2.0
|
||||
user = 'admin'
|
||||
password = 'password'
|
||||
@ -63,6 +63,19 @@ class Launcher(object):
|
||||
HostSynchronizer(version, user, password, project, auth_url),
|
||||
InstanceSynchronizer(version, user, password, project, auth_url),
|
||||
NagiosSynchronizer(self.conf),
|
||||
StaticPhysical(self.conf)
|
||||
]
|
||||
StaticPhysicalSynchronizer(self.conf)]
|
||||
return registered_plugins
|
||||
|
||||
def _register_services(self):
|
||||
nagios_changes_interval = self.conf.synchronizer.\
|
||||
nagios_changes_interval
|
||||
static_physical_changes_interval = self.conf.synchronizer.\
|
||||
static_physical_changes_interval
|
||||
|
||||
return [SnapshotsService(self.conf, self.snapshot_plugins),
|
||||
ChangesService(self.conf,
|
||||
[NagiosSynchronizer(self.conf)],
|
||||
nagios_changes_interval),
|
||||
ChangesService(self.conf,
|
||||
[StaticPhysicalSynchronizer(self.conf)],
|
||||
static_physical_changes_interval)]
|
||||
|
@ -11,7 +11,9 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from collections import namedtuple
|
||||
|
||||
from oslo_log import log
|
||||
import requests
|
||||
|
||||
@ -19,11 +21,11 @@ from vitrage.common.constants import EntityType
|
||||
from vitrage.common.constants import SynchronizerProperties as SyncProps
|
||||
from vitrage.i18n import _LE
|
||||
from vitrage.i18n import _LW
|
||||
from vitrage.synchronizer.base import SynchronizerBase
|
||||
from vitrage.synchronizer.plugins.nagios.parser import NagiosParser
|
||||
from vitrage.synchronizer.plugins.nagios.properties import NagiosProperties \
|
||||
as NagiosProps
|
||||
from vitrage.synchronizer.plugins.nagios.properties import NagiosStatus
|
||||
from vitrage.synchronizer.plugins.synchronizer_base import SynchronizerBase
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
@ -36,8 +38,13 @@ class NagiosSynchronizer(SynchronizerBase):
|
||||
self.conf = conf
|
||||
self.cache = dict()
|
||||
|
||||
def get_all(self):
|
||||
return self.make_pickleable(self._get_services(), EntityType.NAGIOS)
|
||||
def get_all(self, sync_mode):
|
||||
return self.make_pickleable(self._get_services(),
|
||||
EntityType.NAGIOS,
|
||||
sync_mode)
|
||||
|
||||
def get_changes(self, sync_mode):
|
||||
return []
|
||||
|
||||
def _get_services(self):
|
||||
nagios_user = self.conf.synchronizer_plugins.nagios_user
|
||||
@ -71,7 +78,8 @@ class NagiosSynchronizer(SynchronizerBase):
|
||||
response.status_code)
|
||||
return []
|
||||
|
||||
def _enrich_services(self, nagios_services):
|
||||
@staticmethod
|
||||
def _enrich_services(nagios_services):
|
||||
for service in nagios_services:
|
||||
# TODO(ifat_afek) - add a configuration file for resource types
|
||||
service[NagiosProps.RESOURCE_TYPE] = EntityType.NOVA_HOST
|
||||
|
@ -15,7 +15,7 @@
|
||||
|
||||
from novaclient import client
|
||||
|
||||
from vitrage.synchronizer.base import SynchronizerBase
|
||||
from vitrage.synchronizer.plugins.synchronizer_base import SynchronizerBase
|
||||
|
||||
|
||||
class NovaBase(SynchronizerBase):
|
||||
|
@ -34,6 +34,12 @@ class HostSynchronizer(NovaBase):
|
||||
compute_hosts.append(host_dict)
|
||||
return compute_hosts
|
||||
|
||||
def get_all(self):
|
||||
return self.make_pickleable(self.filter_none_compute_hosts(
|
||||
self.client.hosts.list()), EntityType.NOVA_HOST, ['manager'])
|
||||
def get_all(self, sync_mode):
|
||||
return self.make_pickleable(
|
||||
self.filter_none_compute_hosts(self.client.hosts.list()),
|
||||
EntityType.NOVA_HOST,
|
||||
sync_mode,
|
||||
['manager'])
|
||||
|
||||
def get_changes(self, sync_mode):
|
||||
pass
|
||||
|
@ -31,7 +31,12 @@ class InstanceSynchronizer(NovaBase):
|
||||
instances_res.append(instance.__dict__)
|
||||
return instances_res
|
||||
|
||||
def get_all(self):
|
||||
def get_all(self, sync_mode):
|
||||
return self.make_pickleable(
|
||||
self.filter_instances(self.client.servers.list()),
|
||||
EntityType.NOVA_INSTANCE, ['manager'])
|
||||
EntityType.NOVA_INSTANCE,
|
||||
sync_mode,
|
||||
['manager'])
|
||||
|
||||
def get_changes(self, sync_mode):
|
||||
pass
|
||||
|
@ -33,7 +33,12 @@ class ZoneSynchronizer(NovaBase):
|
||||
zones_res.append(zone_dict)
|
||||
return zones_res
|
||||
|
||||
def get_all(self):
|
||||
def get_all(self, sync_mode):
|
||||
return self.make_pickleable(self.filter_internal_zone(
|
||||
self.client.availability_zones.list()),
|
||||
EntityType.NOVA_ZONE, ['manager'])
|
||||
EntityType.NOVA_ZONE,
|
||||
sync_mode,
|
||||
['manager'])
|
||||
|
||||
def get_changes(self, sync_mode):
|
||||
pass
|
||||
|
@ -14,26 +14,120 @@
|
||||
|
||||
import os
|
||||
|
||||
from vitrage.common.constants import EventAction
|
||||
|
||||
from vitrage.common.constants import SynchronizerProperties as SyncProps
|
||||
from vitrage.common.constants import VertexProperties as VProps
|
||||
from vitrage.common import file_utils
|
||||
from vitrage.synchronizer.base import SynchronizerBase
|
||||
from vitrage.synchronizer.plugins.synchronizer_base import SynchronizerBase
|
||||
|
||||
|
||||
class StaticPhysical(SynchronizerBase):
|
||||
class StaticPhysicalSynchronizer(SynchronizerBase):
|
||||
STATIC_PHYSICAL = 'static_physical'
|
||||
ENTITIES_SECTION = 'entities'
|
||||
|
||||
def __init__(self, conf):
|
||||
super(StaticPhysical, self).__init__()
|
||||
super(StaticPhysicalSynchronizer, self).__init__()
|
||||
self.cfg = conf
|
||||
self.cache = {}
|
||||
|
||||
def get_all(self):
|
||||
return self.make_pickleable(self.get_instances(), None, [])
|
||||
def get_all(self, sync_mode):
|
||||
return self.make_pickleable(self._get_all_entities(),
|
||||
self.STATIC_PHYSICAL,
|
||||
sync_mode)
|
||||
|
||||
def get_instances(self):
|
||||
def get_changes(self, sync_mode):
|
||||
return self.make_pickleable(self._get_changes_entities(),
|
||||
self.STATIC_PHYSICAL,
|
||||
sync_mode)
|
||||
|
||||
def _get_all_entities(self):
|
||||
static_entities = []
|
||||
if os.path.isdir(self.cfg.synchronizer_plugins.static_plugins_dir):
|
||||
static_plugin_configs = file_utils.load_yaml_files(
|
||||
self.cfg.synchronizer_plugins.static_plugins_dir)
|
||||
|
||||
for config in static_plugin_configs:
|
||||
for entity in config['entities']:
|
||||
static_entities.append(entity)
|
||||
if os.path.isdir(self.cfg.synchronizer_plugins.static_plugins_dir):
|
||||
files = file_utils.load_files(
|
||||
self.cfg.synchronizer_plugins.static_plugins_dir, '.yaml')
|
||||
|
||||
for file in files:
|
||||
full_path = self.cfg.synchronizer_plugins.static_plugins_dir \
|
||||
+ '/' + file
|
||||
static_entities += self._get_entities_from_file(file,
|
||||
full_path)
|
||||
|
||||
return static_entities
|
||||
|
||||
def _get_entities_from_file(self, file, path):
|
||||
static_entities = []
|
||||
config = file_utils.load_yaml_file(path)
|
||||
|
||||
for entity in config[self.ENTITIES_SECTION]:
|
||||
static_entities.append(entity.copy())
|
||||
|
||||
self.cache[file] = config
|
||||
|
||||
return static_entities
|
||||
|
||||
def _get_changes_entities(self):
|
||||
entities_updates = []
|
||||
|
||||
if os.path.isdir(self.cfg.synchronizer_plugins.static_plugins_dir):
|
||||
entities_updates = []
|
||||
files = file_utils.load_files(
|
||||
self.cfg.synchronizer_plugins.static_plugins_dir, '.yaml')
|
||||
|
||||
for file in files:
|
||||
full_path = self.cfg.synchronizer_plugins.static_plugins_dir +\
|
||||
'/' + file
|
||||
config = file_utils.load_yaml_file(full_path)
|
||||
if config:
|
||||
if file in self.cache:
|
||||
if str(config) != str(self.cache[file]):
|
||||
# TODO(alexey_weyl): need also to remove deleted
|
||||
# files from cache
|
||||
|
||||
self._update_on_existing_entities(
|
||||
self.cache[file][self.ENTITIES_SECTION],
|
||||
config[self.ENTITIES_SECTION],
|
||||
entities_updates)
|
||||
|
||||
self._update_on_new_entities(
|
||||
config[self.ENTITIES_SECTION],
|
||||
self.cache[file][self.ENTITIES_SECTION],
|
||||
entities_updates)
|
||||
|
||||
self.cache[file] = config
|
||||
else:
|
||||
self.cache[file] = config
|
||||
entities_updates += \
|
||||
self._get_entities_from_file(file, full_path)
|
||||
|
||||
return entities_updates
|
||||
|
||||
def _update_on_existing_entities(self, old_entities,
|
||||
new_entities, updates):
|
||||
for old_entity in old_entities:
|
||||
if old_entity not in new_entities:
|
||||
new_entity = self._find_entity(old_entity, new_entities)
|
||||
if not new_entity:
|
||||
self._delete_event(old_entity)
|
||||
updates.append(old_entity.copy())
|
||||
else:
|
||||
updates.append(new_entity.copy())
|
||||
|
||||
@staticmethod
|
||||
def _find_entity(new_entity, entities):
|
||||
for entity in entities:
|
||||
if entity[SyncProps.SYNC_TYPE] == new_entity[SyncProps.SYNC_TYPE] \
|
||||
and entity[VProps.ID] == new_entity[VProps.ID]:
|
||||
return entity
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _update_on_new_entities(new_entities, old_entities, updates):
|
||||
for entity in new_entities:
|
||||
if entity not in updates and entity not in old_entities:
|
||||
updates.append(entity.copy())
|
||||
|
||||
@staticmethod
|
||||
def _delete_event(entity):
|
||||
entity[SyncProps.EVENT_TYPE] = EventAction.DELETE
|
||||
|
@ -24,9 +24,10 @@ from vitrage.synchronizer.plugins import transformer_base
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class StaticPhysical(transformer_base.TransformerBase):
|
||||
class StaticPhysicalTransformer(transformer_base.TransformerBase):
|
||||
|
||||
RELATION_TYPE = 'relation_type'
|
||||
RELATIONSHIPS_SECTION = 'relationships'
|
||||
|
||||
def __init__(self, transformers):
|
||||
self.transformers = transformers
|
||||
@ -55,7 +56,7 @@ class StaticPhysical(transformer_base.TransformerBase):
|
||||
entity_key = self.extract_key(entity_event)
|
||||
timestamp = entity_event[SyncProps.SAMPLE_DATE]
|
||||
|
||||
for neighbor_details in entity_event['relationships']:
|
||||
for neighbor_details in entity_event[self.RELATIONSHIPS_SECTION]:
|
||||
# TODO(alexey): need to decide what to do if one of the entities
|
||||
# fails
|
||||
neighbor = self._create_neighbor(neighbor_details, entity_type,
|
||||
|
@ -16,7 +16,10 @@ import abc
|
||||
|
||||
import six
|
||||
|
||||
from vitrage.common.constants import EventAction
|
||||
|
||||
from vitrage.common.constants import SynchronizerProperties as SyncProps
|
||||
from vitrage.common.constants import SyncMode
|
||||
import vitrage.common.datetime_utils
|
||||
|
||||
|
||||
@ -27,11 +30,24 @@ class SynchronizerBase(object):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_all(self):
|
||||
def get_all(self, sync_mode):
|
||||
pass
|
||||
|
||||
def make_pickleable(self, entities, sync_type, fields_to_remove=[]):
|
||||
@staticmethod
|
||||
def _get_end_message(sync_type):
|
||||
end_message = {
|
||||
SyncProps.SYNC_TYPE: sync_type,
|
||||
SyncProps.SYNC_MODE: SyncMode.INIT_SNAPSHOT,
|
||||
SyncProps.EVENT_TYPE: EventAction.END_MESSAGE
|
||||
}
|
||||
return end_message
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_changes(self, sync_mode):
|
||||
pass
|
||||
|
||||
def make_pickleable(self, entities, sync_type,
|
||||
sync_mode, fields_to_remove=[]):
|
||||
pickleable_entities = []
|
||||
|
||||
for entity in entities:
|
||||
@ -39,17 +55,25 @@ class SynchronizerBase(object):
|
||||
entity.pop(field)
|
||||
|
||||
self._add_sync_type(entity, sync_type)
|
||||
self._add_sync_mode(entity, sync_mode)
|
||||
self._add_sampling_time(entity)
|
||||
pickleable_entities.append(entity)
|
||||
|
||||
if sync_mode == SyncMode.INIT_SNAPSHOT:
|
||||
pickleable_entities.append(self._get_end_message(sync_type))
|
||||
|
||||
return pickleable_entities
|
||||
|
||||
@staticmethod
|
||||
def _add_sync_type(entity, sync_type):
|
||||
if sync_type:
|
||||
if SyncProps.SYNC_TYPE not in entity:
|
||||
entity[SyncProps.SYNC_TYPE] = sync_type
|
||||
|
||||
@staticmethod
|
||||
def _add_sampling_time(entity):
|
||||
entity[SyncProps.SAMPLE_DATE] = str(
|
||||
vitrage.common.datetime_utils.utcnow())
|
||||
|
||||
@staticmethod
|
||||
def _add_sync_mode(entity, sync_mode):
|
||||
entity[SyncProps.SYNC_MODE] = sync_mode
|
@ -1,4 +1,5 @@
|
||||
# Copyright 2016 - Alcatel-Lucent
|
||||
# Copyright 2016 - Nokia
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
@ -15,7 +16,6 @@
|
||||
from oslo_log import log
|
||||
from oslo_service import service as os_service
|
||||
|
||||
from vitrage.common.constants import SynchronizerProperties as SyncProps
|
||||
from vitrage.common.constants import SyncMode
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
@ -40,17 +40,19 @@ class SnapshotsService(SynchronizerService):
|
||||
|
||||
def start(self):
|
||||
LOG.info("Start VitrageSnapshotsService")
|
||||
|
||||
super(SnapshotsService, self).start()
|
||||
interval = self.conf.synchronizer.snapshots_interval
|
||||
self.tg.add_timer(interval, self._get_all)
|
||||
|
||||
LOG.info("Finish start VitrageSnapshotsService")
|
||||
|
||||
def stop(self):
|
||||
LOG.info("Stop VitrageSynchronizerService")
|
||||
LOG.info("Stop VitrageSnapshotsService")
|
||||
|
||||
super(SnapshotsService, self).stop()
|
||||
|
||||
LOG.info("Finish stop VitrageSynchronizerService")
|
||||
LOG.info("Finish stop VitrageSnapshotsService")
|
||||
|
||||
def _get_all(self):
|
||||
sync_mode = SyncMode.INIT_SNAPSHOT \
|
||||
@ -58,15 +60,44 @@ class SnapshotsService(SynchronizerService):
|
||||
LOG.debug("start get all with sync mode %s" % sync_mode)
|
||||
|
||||
for plugin in self.registered_plugins:
|
||||
entities_dictionaries = \
|
||||
self._mark_snapshot_entities(plugin.get_all(), sync_mode)
|
||||
entities_dictionaries = plugin.get_all(sync_mode)
|
||||
for entity in entities_dictionaries:
|
||||
self.callback_function(entity)
|
||||
|
||||
LOG.debug("end get all with sync mode %s" % sync_mode)
|
||||
self.first_time = False
|
||||
|
||||
@staticmethod
|
||||
def _mark_snapshot_entities(dicts, sync_mode):
|
||||
[x.setdefault(SyncProps.SYNC_MODE, sync_mode) for x in dicts]
|
||||
return dicts
|
||||
|
||||
class ChangesService(SynchronizerService):
|
||||
|
||||
def __init__(self, conf, registered_plugins, changes_interval):
|
||||
super(ChangesService, self).__init__(conf, registered_plugins)
|
||||
self.changes_interval = changes_interval
|
||||
|
||||
def start(self):
|
||||
LOG.info("Start VitrageChangesService - %s",
|
||||
self.registered_plugins[0].__class__.__name__)
|
||||
|
||||
super(ChangesService, self).start()
|
||||
self.tg.add_timer(self.changes_interval, self._get_changes)
|
||||
|
||||
LOG.info("Finish start VitrageChangesService - %s",
|
||||
self.registered_plugins[0].__class__.__name__)
|
||||
|
||||
def stop(self):
|
||||
LOG.info("Stop VitrageChangesService - %s",
|
||||
self.registered_plugins[0].__class__.__name__)
|
||||
|
||||
super(ChangesService, self).stop()
|
||||
|
||||
LOG.info("Finish stop VitrageChangesService - %s",
|
||||
self.registered_plugins[0].__class__.__name__)
|
||||
|
||||
def _get_changes(self):
|
||||
LOG.debug("start get changes")
|
||||
|
||||
for plugin in self.registered_plugins:
|
||||
for entity in plugin.get_changes(SyncMode.UPDATE):
|
||||
self.callback_function(entity)
|
||||
|
||||
LOG.debug("end get changes")
|
||||
|
@ -0,0 +1,41 @@
|
||||
entities:
|
||||
- name: switch-1
|
||||
id: 12345
|
||||
sync_type: switch
|
||||
state: available
|
||||
relationships:
|
||||
- sync_type: nova.host
|
||||
name: host-1
|
||||
id: 1
|
||||
relation_type: contains
|
||||
- sync_type: nova.host
|
||||
name: host-2
|
||||
id: 2
|
||||
relation_type: contains
|
||||
- name: switch-3
|
||||
id: 34567
|
||||
sync_type: switch
|
||||
state: available
|
||||
relationships:
|
||||
- sync_type: nova.host
|
||||
name: host-4
|
||||
id: 4
|
||||
relation_type: contains
|
||||
- name: switch-4
|
||||
id: 45678
|
||||
sync_type: switch
|
||||
state: error
|
||||
relationships:
|
||||
- sync_type: nova.host
|
||||
name: host-2
|
||||
id: 2
|
||||
relation_type: contains
|
||||
- name: switch-5
|
||||
id: 56789
|
||||
sync_type: switch
|
||||
state: error
|
||||
relationships:
|
||||
- sync_type: nova.host
|
||||
name: host-3
|
||||
id: 3
|
||||
relation_type: contains
|
@ -0,0 +1,19 @@
|
||||
entities:
|
||||
- name: switch-1
|
||||
id: 12345
|
||||
sync_type: switch
|
||||
state: available
|
||||
relationships:
|
||||
- sync_type: switch
|
||||
name: switch-2
|
||||
id: 23456
|
||||
relation_type: contains
|
||||
- name: switch-2
|
||||
id: 23456
|
||||
sync_type: switch
|
||||
state: available
|
||||
relationships:
|
||||
- sync_type: switch
|
||||
name: switch-3
|
||||
id: 34567
|
||||
relation_type: contains
|
@ -0,0 +1,120 @@
|
||||
# Copyright 2016 - Nokia
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
|
||||
from vitrage.common.constants import EntityType
|
||||
from vitrage.common.constants import EventAction
|
||||
from vitrage.common.constants import SynchronizerProperties as SyncProps
|
||||
from vitrage.common.constants import SyncMode
|
||||
from vitrage.common.constants import VertexProperties as VProps
|
||||
from vitrage.common import file_utils
|
||||
from vitrage.synchronizer.plugins.static_physical import synchronizer
|
||||
from vitrage.tests import base
|
||||
from vitrage.tests.mocks import utils
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TestStaticPhysicalSynchronizer(base.BaseTest):
|
||||
|
||||
OPTS = [
|
||||
cfg.StrOpt('static_plugins_dir',
|
||||
default=utils.get_resources_dir() + '/static_plugins',
|
||||
),
|
||||
]
|
||||
|
||||
CHANGES_OPTS = [
|
||||
cfg.StrOpt('static_plugins_dir',
|
||||
default=utils.get_resources_dir() + '/static_plugins/'
|
||||
'changes_plugins',
|
||||
),
|
||||
]
|
||||
|
||||
def setUp(self):
|
||||
super(TestStaticPhysicalSynchronizer, self).setUp()
|
||||
self.conf = cfg.ConfigOpts()
|
||||
self.conf.register_opts(self.OPTS, group='synchronizer_plugins')
|
||||
self.static_physical_synchronizer = \
|
||||
synchronizer.StaticPhysicalSynchronizer(self.conf)
|
||||
|
||||
def test_static_plugins_loader(self):
|
||||
# Setup
|
||||
total_static_plugins = \
|
||||
os.listdir(self.conf.synchronizer_plugins.static_plugins_dir)
|
||||
|
||||
# Action
|
||||
static_configs = file_utils.load_yaml_files(
|
||||
self.conf.synchronizer_plugins.static_plugins_dir)
|
||||
|
||||
# Test assertions
|
||||
# -1 is because there are 2 files and a folder in static_plugins_dir
|
||||
self.assertEqual(len(total_static_plugins) - 1, len(static_configs))
|
||||
|
||||
def test_get_all(self):
|
||||
# Action
|
||||
static_entities = self.static_physical_synchronizer.get_all(
|
||||
SyncMode.UPDATE)
|
||||
|
||||
# Test assertions
|
||||
self.assertEqual(5, len(static_entities))
|
||||
|
||||
def test_get_changes(self):
|
||||
# Setup
|
||||
entities = self.static_physical_synchronizer.get_all(SyncMode.UPDATE)
|
||||
self.assertEqual(5, len(entities))
|
||||
|
||||
self.conf = cfg.ConfigOpts()
|
||||
self.conf.register_opts(self.CHANGES_OPTS,
|
||||
group='synchronizer_plugins')
|
||||
self.static_physical_synchronizer.cfg = self.conf
|
||||
|
||||
# Action
|
||||
changes = self.static_physical_synchronizer.get_changes(
|
||||
EventAction.UPDATE)
|
||||
|
||||
# Test Assertions
|
||||
status = any(change[SyncProps.SYNC_TYPE] == EntityType.SWITCH and
|
||||
change[VProps.ID] == '12345' for change in changes)
|
||||
self.assertEqual(False, status)
|
||||
|
||||
status = any(change[SyncProps.SYNC_TYPE] == EntityType.SWITCH and
|
||||
change[VProps.ID] == '23456' and
|
||||
change[SyncProps.EVENT_TYPE] == 'delete'
|
||||
for change in changes)
|
||||
self.assertEqual(True, status)
|
||||
|
||||
status = any(change[SyncProps.SYNC_TYPE] == EntityType.SWITCH and
|
||||
change[VProps.ID] == '34567' for change in changes)
|
||||
self.assertEqual(True, status)
|
||||
|
||||
status = any(change[SyncProps.SYNC_TYPE] == EntityType.SWITCH and
|
||||
change[VProps.ID] == '45678' for change in changes)
|
||||
self.assertEqual(True, status)
|
||||
status = any(change[SyncProps.SYNC_TYPE] == EntityType.SWITCH and
|
||||
change[VProps.ID] == '56789' for change in changes)
|
||||
self.assertEqual(True, status)
|
||||
|
||||
self.assertEqual(4, len(changes))
|
||||
|
||||
# Action
|
||||
changes = self.static_physical_synchronizer.get_changes(
|
||||
EventAction.UPDATE)
|
||||
|
||||
# Test Assertions
|
||||
self.assertEqual(0, len(changes))
|
@ -23,7 +23,7 @@ from vitrage.common.constants import SynchronizerProperties as SyncProps
|
||||
from vitrage.common.constants import VertexProperties as VProps
|
||||
from vitrage.synchronizer.plugins.nova.host.transformer import HostTransformer
|
||||
from vitrage.synchronizer.plugins.static_physical.transformer \
|
||||
import StaticPhysical
|
||||
import StaticPhysicalTransformer
|
||||
from vitrage.synchronizer.plugins.transformer_base import TransformerBase
|
||||
from vitrage.tests import base
|
||||
from vitrage.tests.mocks import mock_syncronizer as mock_sync
|
||||
@ -38,7 +38,7 @@ class TestStaticPhysicalTransformer(base.BaseTest):
|
||||
|
||||
self.transformers = {}
|
||||
host_transformer = HostTransformer(self.transformers)
|
||||
static_transformer = StaticPhysical(self.transformers)
|
||||
static_transformer = StaticPhysicalTransformer(self.transformers)
|
||||
self.transformers[EntityType.NOVA_HOST] = host_transformer
|
||||
self.transformers[EntityType.SWITCH] = static_transformer
|
||||
|
||||
@ -51,7 +51,7 @@ class TestStaticPhysicalTransformer(base.BaseTest):
|
||||
switch_type = EntityType.SWITCH
|
||||
switch_name = 'switch-1'
|
||||
timestamp = datetime.datetime.utcnow()
|
||||
static_transformer = StaticPhysical(self.transformers)
|
||||
static_transformer = StaticPhysicalTransformer(self.transformers)
|
||||
|
||||
# Test action
|
||||
properties = {
|
||||
@ -65,7 +65,7 @@ class TestStaticPhysicalTransformer(base.BaseTest):
|
||||
observed_id_values = placeholder.vertex_id.split(
|
||||
TransformerBase.KEY_SEPARATOR)
|
||||
expected_id_values = \
|
||||
StaticPhysical(self.transformers).key_values(
|
||||
StaticPhysicalTransformer(self.transformers).key_values(
|
||||
[switch_type, switch_name])
|
||||
self.assertEqual(observed_id_values, expected_id_values)
|
||||
|
||||
@ -90,7 +90,7 @@ class TestStaticPhysicalTransformer(base.BaseTest):
|
||||
# Test setup
|
||||
switch_type = EntityType.SWITCH
|
||||
switch_name = 'switch-1'
|
||||
static_transformer = StaticPhysical(self.transformers)
|
||||
static_transformer = StaticPhysicalTransformer(self.transformers)
|
||||
|
||||
# Test action
|
||||
observed_key_fields = static_transformer.key_values(
|
||||
@ -112,7 +112,8 @@ class TestStaticPhysicalTransformer(base.BaseTest):
|
||||
|
||||
for event in static_events:
|
||||
# Test action
|
||||
wrapper = StaticPhysical(self.transformers).transform(event)
|
||||
wrapper = StaticPhysicalTransformer(self.transformers).\
|
||||
transform(event)
|
||||
|
||||
# Test assertions
|
||||
vertex = wrapper.vertex
|
||||
|
@ -1,63 +0,0 @@
|
||||
# Copyright 2016 - Nokia
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
|
||||
from vitrage.common import file_utils
|
||||
from vitrage.tests import base
|
||||
from vitrage.tests.mocks import utils
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TestStaticPlugin(base.BaseTest):
|
||||
|
||||
OPTS = [
|
||||
cfg.StrOpt('static_plugins_dir',
|
||||
default=utils.get_resources_dir() + '/static_plugins',
|
||||
),
|
||||
]
|
||||
|
||||
def setUp(self):
|
||||
super(TestStaticPlugin, self).setUp()
|
||||
self.static_dir_path = utils.get_resources_dir() + '/static_plugins'
|
||||
self.conf = cfg.ConfigOpts()
|
||||
self.conf.register_opts(self.OPTS, group='synchronizer_plugins')
|
||||
|
||||
def test_static_plugins_loader(self):
|
||||
# Setup
|
||||
total_static_plugins = os.listdir(self.static_dir_path)
|
||||
|
||||
# Action
|
||||
static_configs = file_utils.load_yaml_files(
|
||||
self.conf.synchronizer_plugins.static_plugins_dir)
|
||||
|
||||
# Test assertions
|
||||
self.assertEqual(len(total_static_plugins), len(static_configs))
|
||||
|
||||
def test_number_of_entities(self):
|
||||
static_entities = []
|
||||
static_plugin_configs = file_utils.load_yaml_files(
|
||||
self.conf.synchronizer_plugins.static_plugins_dir)
|
||||
|
||||
for config in static_plugin_configs:
|
||||
for entity in config['entities']:
|
||||
static_entities.append(entity)
|
||||
|
||||
# Test assertions
|
||||
self.assertEqual(5, len(static_entities))
|
Loading…
x
Reference in New Issue
Block a user