Merge "Add or Delete Trigger Definitions to an existing TriggerManager and PipelineManager"
This commit is contained in:
commit
b6dc37aad3
@ -327,7 +327,7 @@ class TestPipelineManager(unittest.TestCase):
|
||||
pm.db.set_stream_state.return_value = stream
|
||||
trigger_def = mock.MagicMock(name='trigger_def')
|
||||
trigger_def.fire_pipeline = 'test_fire_pipeline'
|
||||
pm.trigger_map = dict(test=trigger_def)
|
||||
pm.trigger_manager.trigger_map = dict(test=trigger_def)
|
||||
pipeline_config = mock.MagicMock(name='pipeline_config')
|
||||
pm.pipeline_config = dict(test_fire_pipeline=pipeline_config)
|
||||
pm._error_stream = mock.MagicMock(name='_error_stream')
|
||||
@ -354,7 +354,7 @@ class TestPipelineManager(unittest.TestCase):
|
||||
pm.db.set_stream_state.side_effect = winch_db.LockError('locked!')
|
||||
trigger_def = mock.MagicMock(name='trigger_def')
|
||||
trigger_def.fire_pipeline = 'test_fire_pipeline'
|
||||
pm.trigger_map = dict(test=trigger_def)
|
||||
pm.trigger_manager.trigger_map = dict(test=trigger_def)
|
||||
pipeline_config = mock.MagicMock(name='pipeline_config')
|
||||
pm.pipeline_config = dict(test_fire_pipeline=pipeline_config)
|
||||
pm._error_stream = mock.MagicMock(name='_error_stream')
|
||||
@ -379,7 +379,7 @@ class TestPipelineManager(unittest.TestCase):
|
||||
pm.db.set_stream_state.return_value = stream
|
||||
trigger_def = mock.MagicMock(name='trigger_def')
|
||||
trigger_def.fire_pipeline = None
|
||||
pm.trigger_map = dict(test=trigger_def)
|
||||
pm.trigger_manager.trigger_map = dict(test=trigger_def)
|
||||
pm._error_stream = mock.MagicMock(name='_error_stream')
|
||||
pm._complete_stream = mock.MagicMock(name='_complete_stream')
|
||||
pm._run_pipeline = mock.MagicMock(name='_run_pipeline')
|
||||
@ -402,7 +402,7 @@ class TestPipelineManager(unittest.TestCase):
|
||||
pm.db.set_stream_state.return_value = stream
|
||||
trigger_def = mock.MagicMock(name='trigger_def')
|
||||
trigger_def.fire_pipeline = 'test_fire_pipeline'
|
||||
pm.trigger_map = dict(test=trigger_def)
|
||||
pm.trigger_manager.trigger_map = dict(test=trigger_def)
|
||||
pipeline_config = mock.MagicMock(name='pipeline_config')
|
||||
pm.pipeline_config = dict(test_fire_pipeline=pipeline_config)
|
||||
pm._error_stream = mock.MagicMock(name='_error_stream')
|
||||
@ -429,7 +429,7 @@ class TestPipelineManager(unittest.TestCase):
|
||||
pm.db.set_stream_state.return_value = stream
|
||||
trigger_def = mock.MagicMock(name='trigger_def')
|
||||
trigger_def.expire_pipeline = 'test_fire_pipeline'
|
||||
pm.trigger_map = dict(test=trigger_def)
|
||||
pm.trigger_manager.trigger_map = dict(test=trigger_def)
|
||||
pipeline_config = mock.MagicMock(name='pipeline_config')
|
||||
pm.pipeline_config = dict(test_fire_pipeline=pipeline_config)
|
||||
pm._error_stream = mock.MagicMock(name='_error_stream')
|
||||
@ -456,7 +456,7 @@ class TestPipelineManager(unittest.TestCase):
|
||||
pm.db.set_stream_state.side_effect = winch_db.LockError('locked!')
|
||||
trigger_def = mock.MagicMock(name='trigger_def')
|
||||
trigger_def.expire_pipeline = 'test_fire_pipeline'
|
||||
pm.trigger_map = dict(test=trigger_def)
|
||||
pm.trigger_manager.trigger_map = dict(test=trigger_def)
|
||||
pipeline_config = mock.MagicMock(name='pipeline_config')
|
||||
pm.pipeline_config = dict(test_fire_pipeline=pipeline_config)
|
||||
pm._expire_error_stream = mock.MagicMock(name='_expire_error_stream')
|
||||
@ -481,7 +481,7 @@ class TestPipelineManager(unittest.TestCase):
|
||||
pm.db.set_stream_state.return_value = stream
|
||||
trigger_def = mock.MagicMock(name='trigger_def')
|
||||
trigger_def.expire_pipeline = None
|
||||
pm.trigger_map = dict(test=trigger_def)
|
||||
pm.trigger_manager.trigger_map = dict(test=trigger_def)
|
||||
pm._expire_error_stream = mock.MagicMock(name='_expire_error_stream')
|
||||
pm._complete_stream = mock.MagicMock(name='_complete_stream')
|
||||
pm._run_pipeline = mock.MagicMock(name='_run_pipeline')
|
||||
@ -504,7 +504,7 @@ class TestPipelineManager(unittest.TestCase):
|
||||
pm.db.set_stream_state.return_value = stream
|
||||
trigger_def = mock.MagicMock(name='trigger_def')
|
||||
trigger_def.expire_pipeline = 'test_fire_pipeline'
|
||||
pm.trigger_map = dict(test=trigger_def)
|
||||
pm.trigger_manager.trigger_map = dict(test=trigger_def)
|
||||
pipeline_config = mock.MagicMock(name='pipeline_config')
|
||||
pm.pipeline_config = dict(test_fire_pipeline=pipeline_config)
|
||||
pm._expire_error_stream = mock.MagicMock(name='_expire_error_stream')
|
||||
@ -529,7 +529,7 @@ class TestPipelineManager(unittest.TestCase):
|
||||
stream = mock.MagicMock(name='stream')
|
||||
stream.name = "my_stream"
|
||||
tdef = mock.MagicMock(name='tdef')
|
||||
pm.trigger_map['my_stream'] = tdef
|
||||
pm.trigger_manager.trigger_map['my_stream'] = tdef
|
||||
pm.expire_stream = mock.MagicMock(name='expire_stream')
|
||||
pm.fire_stream = mock.MagicMock(name='fire_stream')
|
||||
pm.current_time = mock.MagicMock(name='current_time')
|
||||
|
@ -298,3 +298,20 @@ class TestTriggerManager(unittest.TestCase):
|
||||
self.assertFalse(tm._add_or_create_stream.called)
|
||||
self.assertFalse(tm.db.get_stream_events.called)
|
||||
self.assertFalse(tm._ready_to_fire.called)
|
||||
|
||||
@mock.patch.object(trigger_manager.ConfigManager, 'wrap')
|
||||
def test_add__del_trigger_definition(self, mock_config_wrap):
|
||||
tm = trigger_manager.TriggerManager('test')
|
||||
tm.db = mock.MagicMock(spec=tm.db)
|
||||
td1 = dict(
|
||||
name='test_trigger1',
|
||||
expiration='$last + 1d',
|
||||
fire_pipeline='test_pipeline',
|
||||
fire_criteria=[dict(event_type='test.thing')],
|
||||
match_criteria=[dict(event_type='test.*')])
|
||||
tdlist = list()
|
||||
tdlist.append(td1)
|
||||
tm.add_trigger_definition(tdlist)
|
||||
self.assertTrue('test_trigger1' in tm.trigger_map)
|
||||
tm.delete_trigger_definition('test_trigger1')
|
||||
self.assertFalse('test_trigger1' in tm.trigger_map)
|
||||
|
@ -104,6 +104,9 @@ class ConfigManager(collections.Mapping):
|
||||
return self._defaults[key]
|
||||
raise KeyError(key)
|
||||
|
||||
def contains(self, key):
|
||||
return key in self._configs
|
||||
|
||||
def add_config_path(self, *args):
|
||||
for path in args:
|
||||
if path not in self.config_paths:
|
||||
|
@ -82,6 +82,7 @@ class Pipeline(object):
|
||||
|
||||
def handle_events(self, events, stream, debugger):
|
||||
self.env['stream_id'] = stream.id
|
||||
self.env['stream_name'] = stream.name
|
||||
event_ids = set(e['message_id'] for e in events)
|
||||
try:
|
||||
for handler in self.handlers:
|
||||
@ -156,6 +157,7 @@ class PipelineManager(object):
|
||||
% (self.proc_name, str(config)))
|
||||
config = ConfigManager.wrap(config, self.config_description())
|
||||
self.config = config
|
||||
self.trigger_definitions = []
|
||||
config.check_config()
|
||||
config.add_config_path(*config['config_path'])
|
||||
if time_sync is None:
|
||||
@ -189,12 +191,12 @@ class PipelineManager(object):
|
||||
if trigger_defs is not None:
|
||||
self.trigger_definitions = trigger_defs
|
||||
else:
|
||||
defs = config.load_file(config['trigger_definitions'])
|
||||
logger.debug("Loaded trigger definitions %s" % str(defs))
|
||||
self.trigger_definitions = [TriggerDefinition(conf, None) for conf
|
||||
in defs]
|
||||
self.trigger_map = dict(
|
||||
(tdef.name, tdef) for tdef in self.trigger_definitions)
|
||||
# trigger_definition config file is optional
|
||||
if config.contains('trigger_definitions'):
|
||||
defs = config.load_file(config['trigger_definitions'])
|
||||
logger.debug("Loaded trigger definitions %s" % str(defs))
|
||||
self.trigger_definitions = [
|
||||
TriggerDefinition(conf, None) for conf in defs]
|
||||
|
||||
self.trigger_manager = TriggerManager(
|
||||
self.config, db=self.db,
|
||||
@ -292,8 +294,14 @@ class PipelineManager(object):
|
||||
return trigger_def.debugger if trigger_def is not None else \
|
||||
self.trigger_manager.debug_manager.get_debugger(None)
|
||||
|
||||
def add_trigger_definition(self, list_of_triggerdefs):
|
||||
self.trigger_manager.add_trigger_definition(list_of_triggerdefs)
|
||||
|
||||
def delete_trigger_definition(self, trigger_def_name):
|
||||
self.trigger_manager.delete_trigger_definition(trigger_def_name)
|
||||
|
||||
def fire_stream(self, stream):
|
||||
trigger_def = self.trigger_map.get(stream.name)
|
||||
trigger_def = self.trigger_manager.trigger_map.get(stream.name)
|
||||
debugger = self.safe_get_debugger(trigger_def)
|
||||
try:
|
||||
stream = self.db.set_stream_state(stream, StreamState.firing)
|
||||
@ -331,7 +339,7 @@ class PipelineManager(object):
|
||||
return True
|
||||
|
||||
def expire_stream(self, stream):
|
||||
trigger_def = self.trigger_map.get(stream.name)
|
||||
trigger_def = self.trigger_manager.trigger_map.get(stream.name)
|
||||
debugger = self.safe_get_debugger(trigger_def)
|
||||
try:
|
||||
stream = self.db.set_stream_state(stream, StreamState.expiring)
|
||||
|
@ -94,7 +94,7 @@ class TriggerManager(object):
|
||||
help="Path(s) to find additional config files",
|
||||
multiple=True, default='.'),
|
||||
distiller_config=ConfigItem(
|
||||
required=True,
|
||||
required=False,
|
||||
help="Name of distiller config file "
|
||||
"describing what to extract from the "
|
||||
"notifications"),
|
||||
@ -118,7 +118,7 @@ class TriggerManager(object):
|
||||
help="Database connection info.",
|
||||
config_description=DBInterface.config_description()),
|
||||
trigger_definitions=ConfigItem(
|
||||
required=True,
|
||||
required=False,
|
||||
help="Name of trigger definitions file "
|
||||
"defining trigger conditions and what events to "
|
||||
"process for each stream"),
|
||||
@ -129,6 +129,7 @@ class TriggerManager(object):
|
||||
config = ConfigManager.wrap(config, self.config_description())
|
||||
self.config = config
|
||||
self.debug_manager = debugging.DebugManager()
|
||||
self.trigger_definitions = []
|
||||
config.check_config()
|
||||
config.add_config_path(*config['config_path'])
|
||||
if time_sync is None:
|
||||
@ -142,22 +143,29 @@ class TriggerManager(object):
|
||||
if stackdistiller is not None:
|
||||
self.distiller = stackdistiller
|
||||
else:
|
||||
dist_config = config.load_file(config['distiller_config'])
|
||||
plugmap = self._load_plugins(config['distiller_trait_plugins'],
|
||||
distiller.DEFAULT_PLUGINMAP)
|
||||
self.distiller = distiller.Distiller(
|
||||
dist_config,
|
||||
trait_plugin_map=plugmap,
|
||||
catchall=config['catch_all_notifications'])
|
||||
# distiller_config is optional
|
||||
if config.contains('distiller_config'):
|
||||
dist_config = config.load_file(config['distiller_config'])
|
||||
plugmap = self._load_plugins(config['distiller_trait_plugins'],
|
||||
distiller.DEFAULT_PLUGINMAP)
|
||||
self.distiller = distiller.Distiller(
|
||||
dist_config,
|
||||
trait_plugin_map=plugmap,
|
||||
catchall=config['catch_all_notifications'])
|
||||
if trigger_defs is not None:
|
||||
self.trigger_definitions = trigger_defs
|
||||
for t in self.trigger_definitions:
|
||||
t.set_debugger(self.debug_manager)
|
||||
else:
|
||||
defs = config.load_file(config['trigger_definitions'])
|
||||
self.trigger_definitions = [TriggerDefinition(conf,
|
||||
self.debug_manager)
|
||||
for conf in defs]
|
||||
# trigger_definition config file is optional
|
||||
if config.contains('trigger_definitions'):
|
||||
defs = config.load_file(config['trigger_definitions'])
|
||||
self.trigger_definitions = [
|
||||
TriggerDefinition(conf, self.debug_manager)
|
||||
for conf in defs]
|
||||
# trigger_map is used to quickly access existing trigger_defs
|
||||
self.trigger_map = dict(
|
||||
(tdef.name, tdef) for tdef in self.trigger_definitions)
|
||||
self.saved_events = 0
|
||||
self.received = 0
|
||||
self.last_status = self.current_time()
|
||||
@ -246,8 +254,23 @@ class TriggerManager(object):
|
||||
timestamp = trigger_def.get_fire_timestamp(self.current_time())
|
||||
self.db.stream_ready_to_fire(stream, timestamp)
|
||||
trigger_def.debugger.bump_counter("Ready to fire")
|
||||
logger.debug("Stream %s ready to fire at %s" % (
|
||||
stream.id, timestamp))
|
||||
logger.debug("Stream %s ready to fire at %s" % (stream.id, timestamp))
|
||||
|
||||
def add_trigger_definition(self, list_of_triggerdefs, debugger=None):
|
||||
if debugger is None:
|
||||
debugger = self.debug_manager
|
||||
for td in list_of_triggerdefs:
|
||||
if (td['name'] in self.trigger_map) is False:
|
||||
# Only add if name is unique
|
||||
tdef = TriggerDefinition(td, debugger)
|
||||
self.trigger_definitions.append(tdef)
|
||||
self.trigger_map[td['name']] = tdef
|
||||
|
||||
def delete_trigger_definition(self, trigger_def_name):
|
||||
if trigger_def_name in self.trigger_map:
|
||||
self.trigger_definitions.remove(
|
||||
self.trigger_map.get(trigger_def_name))
|
||||
del self.trigger_map[trigger_def_name]
|
||||
|
||||
def add_event(self, event):
|
||||
if self.save_event(event):
|
||||
|
Loading…
Reference in New Issue
Block a user