Remove end messages
Datasource end messages previously used to notify the processor that get_all finished successfully. Are no longer used and are removed. Story: 2005042 Task: 29539 Change-Id: Icb93de7faa92678e2107373d0ffe24ae2970a1af
This commit is contained in:
parent
7bed18c5b5
commit
552cad2049
@ -0,0 +1,9 @@
|
|||||||
|
---
|
||||||
|
features:
|
||||||
|
- Datasource ``end messages`` previously used to notify the processor
|
||||||
|
that get_all finished successfully. Are no longer used and are removed.
|
||||||
|
deprecations:
|
||||||
|
- Config option ``initialization_interval`` is deprecated and no longer
|
||||||
|
used, due to the removal of datasource ``end messages``.
|
||||||
|
- Config option ``initialization_max_retries`` is deprecated and no longer
|
||||||
|
used, due to the removal of datasource ``end messages``.
|
@ -114,7 +114,6 @@ class GraphAction(object):
|
|||||||
DELETE_RELATIONSHIP = 'delete_relationship'
|
DELETE_RELATIONSHIP = 'delete_relationship'
|
||||||
UPDATE_RELATIONSHIP = 'update_relationship'
|
UPDATE_RELATIONSHIP = 'update_relationship'
|
||||||
REMOVE_DELETED_ENTITY = 'remove_deleted_entity'
|
REMOVE_DELETED_ENTITY = 'remove_deleted_entity'
|
||||||
END_MESSAGE = 'end_message'
|
|
||||||
|
|
||||||
|
|
||||||
class NotifierEventTypes(object):
|
class NotifierEventTypes(object):
|
||||||
|
@ -17,9 +17,7 @@ import six
|
|||||||
|
|
||||||
from oslo_log import log
|
from oslo_log import log
|
||||||
|
|
||||||
from vitrage.common.constants import DatasourceAction
|
|
||||||
from vitrage.common.constants import DatasourceProperties as DSProps
|
from vitrage.common.constants import DatasourceProperties as DSProps
|
||||||
from vitrage.common.constants import GraphAction
|
|
||||||
from vitrage.common.constants import VertexProperties as VProps
|
from vitrage.common.constants import VertexProperties as VProps
|
||||||
from vitrage.utils import datetime as datetime_utils
|
from vitrage.utils import datetime as datetime_utils
|
||||||
|
|
||||||
@ -41,28 +39,14 @@ class DriverBase(object):
|
|||||||
def callback_on_fault(self, exception):
|
def callback_on_fault(self, exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _get_end_message(entity_type):
|
|
||||||
end_message = {
|
|
||||||
DSProps.ENTITY_TYPE: entity_type,
|
|
||||||
DSProps.DATASOURCE_ACTION: DatasourceAction.INIT_SNAPSHOT,
|
|
||||||
DSProps.EVENT_TYPE: GraphAction.END_MESSAGE
|
|
||||||
}
|
|
||||||
return end_message
|
|
||||||
|
|
||||||
def get_changes(self, datasource_action):
|
def get_changes(self, datasource_action):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def make_pickleable(cls, entities, entity_type, datasource_action, *args):
|
def make_pickleable(cls, entities, entity_type, datasource_action, *args):
|
||||||
pickleable_entities = cls.make_pickleable_without_end_msg(
|
return cls.make_pickleable_without_end_msg(
|
||||||
entities, entity_type, datasource_action, *args)
|
entities, entity_type, datasource_action, *args)
|
||||||
|
|
||||||
if datasource_action == DatasourceAction.INIT_SNAPSHOT:
|
|
||||||
pickleable_entities.append(cls._get_end_message(entity_type))
|
|
||||||
|
|
||||||
return pickleable_entities
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def make_pickleable_without_end_msg(cls, entities, entity_type,
|
def make_pickleable_without_end_msg(cls, entities, entity_type,
|
||||||
datasource_action, *args):
|
datasource_action, *args):
|
||||||
|
@ -127,19 +127,14 @@ class TransformerBase(object):
|
|||||||
:rtype:EntityWrapper
|
:rtype:EntityWrapper
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if not self._is_end_message(entity_event):
|
entity_vertex = self._create_entity_vertex(entity_event)
|
||||||
entity_vertex = self._create_entity_vertex(entity_event)
|
neighbors = self._create_neighbors(entity_event)
|
||||||
neighbors = self._create_neighbors(entity_event)
|
action = self._extract_graph_action(entity_event)
|
||||||
action = self._extract_graph_action(entity_event)
|
|
||||||
|
|
||||||
if action == GraphAction.DELETE_ENTITY:
|
if action == GraphAction.DELETE_ENTITY:
|
||||||
self._delete_id_from_cache(entity_vertex.vertex_id)
|
self._delete_id_from_cache(entity_vertex.vertex_id)
|
||||||
|
|
||||||
return EntityWrapper(entity_vertex, neighbors, action)
|
return EntityWrapper(entity_vertex, neighbors, action)
|
||||||
else:
|
|
||||||
return EntityWrapper(self._create_end_vertex(entity_event),
|
|
||||||
None,
|
|
||||||
GraphAction.END_MESSAGE)
|
|
||||||
|
|
||||||
def _create_entity_vertex(self, entity_event):
|
def _create_entity_vertex(self, entity_event):
|
||||||
if is_update_event(entity_event) and \
|
if is_update_event(entity_event) and \
|
||||||
@ -343,20 +338,6 @@ class TransformerBase(object):
|
|||||||
raise VitrageTransformerError(
|
raise VitrageTransformerError(
|
||||||
'Invalid action type: (%s)' % datasource_action)
|
'Invalid action type: (%s)' % datasource_action)
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _create_end_vertex(entity_event):
|
|
||||||
entity_type = entity_event[DSProps.ENTITY_TYPE]
|
|
||||||
return graph_utils.create_vertex('END_MESSAGE:' + entity_type,
|
|
||||||
vitrage_type=entity_type)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _is_end_message(entity_event):
|
|
||||||
|
|
||||||
ds_action = entity_event[DSProps.DATASOURCE_ACTION]
|
|
||||||
is_snapshot_event = ds_action == DatasourceAction.INIT_SNAPSHOT
|
|
||||||
event_type = entity_event.get(DSProps.EVENT_TYPE, None)
|
|
||||||
return is_snapshot_event and event_type == GraphAction.END_MESSAGE
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _format_update_timestamp(update_timestamp, sample_timestamp):
|
def _format_update_timestamp(update_timestamp, sample_timestamp):
|
||||||
update_timestamp = update_timestamp if update_timestamp \
|
update_timestamp = update_timestamp if update_timestamp \
|
||||||
|
@ -23,12 +23,18 @@ OPTS = [
|
|||||||
cfg.IntOpt('initialization_interval',
|
cfg.IntOpt('initialization_interval',
|
||||||
default=1,
|
default=1,
|
||||||
min=1,
|
min=1,
|
||||||
|
deprecated_for_removal=True,
|
||||||
|
deprecated_since='Stein',
|
||||||
|
deprecated_reason='This config option is no longer used.',
|
||||||
help='interval between consistency initialization checks for '
|
help='interval between consistency initialization checks for '
|
||||||
'finding if all end messages from datasources were '
|
'finding if all end messages from datasources were '
|
||||||
'received (in seconds)'),
|
'received (in seconds)'),
|
||||||
cfg.IntOpt('initialization_max_retries',
|
cfg.IntOpt('initialization_max_retries',
|
||||||
default=30,
|
default=30,
|
||||||
min=1,
|
min=1,
|
||||||
|
deprecated_for_removal=True,
|
||||||
|
deprecated_since='Stein',
|
||||||
|
deprecated_reason='This config option is no longer used.',
|
||||||
help='maximum retries for consistency initialization '
|
help='maximum retries for consistency initialization '
|
||||||
'for finding if all end messages from datasources were '
|
'for finding if all end messages from datasources were '
|
||||||
'received (in seconds)'),
|
'received (in seconds)'),
|
||||||
|
@ -13,7 +13,6 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
import time
|
|
||||||
|
|
||||||
from oslo_log import log
|
from oslo_log import log
|
||||||
|
|
||||||
@ -136,19 +135,6 @@ class ConsistencyEnforcer(object):
|
|||||||
(vertex.get(VProps.VITRAGE_DATASOURCE_NAME) in
|
(vertex.get(VProps.VITRAGE_DATASOURCE_NAME) in
|
||||||
self.datasources_to_mark_deleted)
|
self.datasources_to_mark_deleted)
|
||||||
|
|
||||||
def _wait_for_action(self, function):
|
|
||||||
count_retries = 0
|
|
||||||
while True:
|
|
||||||
if count_retries >= \
|
|
||||||
self.conf.consistency.initialization_max_retries:
|
|
||||||
return False
|
|
||||||
|
|
||||||
if function():
|
|
||||||
return True
|
|
||||||
|
|
||||||
count_retries += 1
|
|
||||||
time.sleep(self.conf.consistency.initialization_interval)
|
|
||||||
|
|
||||||
def _init_datasources_to_mark_deleted(self):
|
def _init_datasources_to_mark_deleted(self):
|
||||||
self.datasources_to_mark_deleted = []
|
self.datasources_to_mark_deleted = []
|
||||||
|
|
||||||
|
@ -55,8 +55,8 @@ class Processor(processor.ProcessorBase):
|
|||||||
self._enrich_event(event)
|
self._enrich_event(event)
|
||||||
entity = self.transformer_manager.transform(event)
|
entity = self.transformer_manager.transform(event)
|
||||||
|
|
||||||
if entity.action not in self.actions.keys():
|
if entity.action not in self.actions:
|
||||||
LOG.debug('deprecated or unknown entity %s ignored', str(entity))
|
LOG.warning('Deprecated or unknown entity %s ignored', str(entity))
|
||||||
return
|
return
|
||||||
|
|
||||||
self._calculate_vitrage_aggregated_values(entity.vertex, entity.action)
|
self._calculate_vitrage_aggregated_values(entity.vertex, entity.action)
|
||||||
|
@ -21,10 +21,7 @@ from testtools import matchers
|
|||||||
|
|
||||||
from vitrage.common.constants import EntityCategory
|
from vitrage.common.constants import EntityCategory
|
||||||
from vitrage.common.constants import VertexProperties as VProps
|
from vitrage.common.constants import VertexProperties as VProps
|
||||||
from vitrage.datasources.nagios import NAGIOS_DATASOURCE
|
|
||||||
from vitrage.datasources.nova.host import NOVA_HOST_DATASOURCE
|
|
||||||
from vitrage.datasources.nova.instance import NOVA_INSTANCE_DATASOURCE
|
from vitrage.datasources.nova.instance import NOVA_INSTANCE_DATASOURCE
|
||||||
from vitrage.datasources.nova.zone import NOVA_ZONE_DATASOURCE
|
|
||||||
from vitrage.entity_graph.consistency.consistency_enforcer \
|
from vitrage.entity_graph.consistency.consistency_enforcer \
|
||||||
import ConsistencyEnforcer
|
import ConsistencyEnforcer
|
||||||
from vitrage.entity_graph.processor.processor import Processor
|
from vitrage.entity_graph.processor.processor import Processor
|
||||||
@ -44,11 +41,6 @@ class TestConsistencyFunctional(TestFunctionalBase, TestConfiguration):
|
|||||||
cfg.IntOpt('min_time_to_delete',
|
cfg.IntOpt('min_time_to_delete',
|
||||||
default=1,
|
default=1,
|
||||||
min=1),
|
min=1),
|
||||||
cfg.IntOpt('initialization_interval',
|
|
||||||
default=1,
|
|
||||||
min=1),
|
|
||||||
cfg.IntOpt('initialization_max_retries',
|
|
||||||
default=10),
|
|
||||||
]
|
]
|
||||||
|
|
||||||
EVALUATOR_OPTS = [
|
EVALUATOR_OPTS = [
|
||||||
@ -252,13 +244,6 @@ class TestConsistencyFunctional(TestFunctionalBase, TestConfiguration):
|
|||||||
datasource_name='cinder.volume',
|
datasource_name='cinder.volume',
|
||||||
resource_type='cinder.volume')
|
resource_type='cinder.volume')
|
||||||
|
|
||||||
def _set_end_messages(self):
|
|
||||||
self.initialization_status.end_messages[NOVA_ZONE_DATASOURCE] = True
|
|
||||||
self.initialization_status.end_messages[NOVA_HOST_DATASOURCE] = True
|
|
||||||
self.initialization_status.end_messages[NOVA_INSTANCE_DATASOURCE] = \
|
|
||||||
True
|
|
||||||
self.initialization_status.end_messages[NAGIOS_DATASOURCE] = True
|
|
||||||
|
|
||||||
def _update_timestamp(self, lst, timestamp):
|
def _update_timestamp(self, lst, timestamp):
|
||||||
for vertex in lst:
|
for vertex in lst:
|
||||||
vertex[VProps.VITRAGE_SAMPLE_TIMESTAMP] = str(timestamp)
|
vertex[VProps.VITRAGE_SAMPLE_TIMESTAMP] = str(timestamp)
|
||||||
|
@ -43,9 +43,9 @@ class TestStaticDriver(base.BaseTest):
|
|||||||
DatasourceAction.INIT_SNAPSHOT)
|
DatasourceAction.INIT_SNAPSHOT)
|
||||||
|
|
||||||
# Test assertions
|
# Test assertions
|
||||||
self.assertThat(static_entities, matchers.HasLength(9))
|
self.assertThat(static_entities, matchers.HasLength(8))
|
||||||
|
|
||||||
for entity in static_entities[:-1]: # exclude end message
|
for entity in static_entities:
|
||||||
self._validate_static_entity(entity)
|
self._validate_static_entity(entity)
|
||||||
|
|
||||||
# noinspection PyAttributeOutsideInit
|
# noinspection PyAttributeOutsideInit
|
||||||
|
Loading…
x
Reference in New Issue
Block a user