diff --git a/tests/test_pipeline_manager.py b/tests/test_pipeline_manager.py index 3ba657a..33bb8f0 100644 --- a/tests/test_pipeline_manager.py +++ b/tests/test_pipeline_manager.py @@ -327,7 +327,7 @@ class TestPipelineManager(unittest.TestCase): pm.db.set_stream_state.return_value = stream trigger_def = mock.MagicMock(name='trigger_def') trigger_def.fire_pipeline = 'test_fire_pipeline' - pm.trigger_map = dict(test=trigger_def) + pm.trigger_manager.trigger_map = dict(test=trigger_def) pipeline_config = mock.MagicMock(name='pipeline_config') pm.pipeline_config = dict(test_fire_pipeline=pipeline_config) pm._error_stream = mock.MagicMock(name='_error_stream') @@ -354,7 +354,7 @@ class TestPipelineManager(unittest.TestCase): pm.db.set_stream_state.side_effect = winch_db.LockError('locked!') trigger_def = mock.MagicMock(name='trigger_def') trigger_def.fire_pipeline = 'test_fire_pipeline' - pm.trigger_map = dict(test=trigger_def) + pm.trigger_manager.trigger_map = dict(test=trigger_def) pipeline_config = mock.MagicMock(name='pipeline_config') pm.pipeline_config = dict(test_fire_pipeline=pipeline_config) pm._error_stream = mock.MagicMock(name='_error_stream') @@ -379,7 +379,7 @@ class TestPipelineManager(unittest.TestCase): pm.db.set_stream_state.return_value = stream trigger_def = mock.MagicMock(name='trigger_def') trigger_def.fire_pipeline = None - pm.trigger_map = dict(test=trigger_def) + pm.trigger_manager.trigger_map = dict(test=trigger_def) pm._error_stream = mock.MagicMock(name='_error_stream') pm._complete_stream = mock.MagicMock(name='_complete_stream') pm._run_pipeline = mock.MagicMock(name='_run_pipeline') @@ -402,7 +402,7 @@ class TestPipelineManager(unittest.TestCase): pm.db.set_stream_state.return_value = stream trigger_def = mock.MagicMock(name='trigger_def') trigger_def.fire_pipeline = 'test_fire_pipeline' - pm.trigger_map = dict(test=trigger_def) + pm.trigger_manager.trigger_map = dict(test=trigger_def) pipeline_config = mock.MagicMock(name='pipeline_config') pm.pipeline_config = dict(test_fire_pipeline=pipeline_config) pm._error_stream = mock.MagicMock(name='_error_stream') @@ -429,7 +429,7 @@ class TestPipelineManager(unittest.TestCase): pm.db.set_stream_state.return_value = stream trigger_def = mock.MagicMock(name='trigger_def') trigger_def.expire_pipeline = 'test_fire_pipeline' - pm.trigger_map = dict(test=trigger_def) + pm.trigger_manager.trigger_map = dict(test=trigger_def) pipeline_config = mock.MagicMock(name='pipeline_config') pm.pipeline_config = dict(test_fire_pipeline=pipeline_config) pm._error_stream = mock.MagicMock(name='_error_stream') @@ -456,7 +456,7 @@ class TestPipelineManager(unittest.TestCase): pm.db.set_stream_state.side_effect = winch_db.LockError('locked!') trigger_def = mock.MagicMock(name='trigger_def') trigger_def.expire_pipeline = 'test_fire_pipeline' - pm.trigger_map = dict(test=trigger_def) + pm.trigger_manager.trigger_map = dict(test=trigger_def) pipeline_config = mock.MagicMock(name='pipeline_config') pm.pipeline_config = dict(test_fire_pipeline=pipeline_config) pm._expire_error_stream = mock.MagicMock(name='_expire_error_stream') @@ -481,7 +481,7 @@ class TestPipelineManager(unittest.TestCase): pm.db.set_stream_state.return_value = stream trigger_def = mock.MagicMock(name='trigger_def') trigger_def.expire_pipeline = None - pm.trigger_map = dict(test=trigger_def) + pm.trigger_manager.trigger_map = dict(test=trigger_def) pm._expire_error_stream = mock.MagicMock(name='_expire_error_stream') pm._complete_stream = mock.MagicMock(name='_complete_stream') pm._run_pipeline = mock.MagicMock(name='_run_pipeline') @@ -504,7 +504,7 @@ class TestPipelineManager(unittest.TestCase): pm.db.set_stream_state.return_value = stream trigger_def = mock.MagicMock(name='trigger_def') trigger_def.expire_pipeline = 'test_fire_pipeline' - pm.trigger_map = dict(test=trigger_def) + pm.trigger_manager.trigger_map = dict(test=trigger_def) pipeline_config = mock.MagicMock(name='pipeline_config') pm.pipeline_config = dict(test_fire_pipeline=pipeline_config) pm._expire_error_stream = mock.MagicMock(name='_expire_error_stream') @@ -529,7 +529,7 @@ class TestPipelineManager(unittest.TestCase): stream = mock.MagicMock(name='stream') stream.name = "my_stream" tdef = mock.MagicMock(name='tdef') - pm.trigger_map['my_stream'] = tdef + pm.trigger_manager.trigger_map['my_stream'] = tdef pm.expire_stream = mock.MagicMock(name='expire_stream') pm.fire_stream = mock.MagicMock(name='fire_stream') pm.current_time = mock.MagicMock(name='current_time') diff --git a/tests/test_trigger_manager.py b/tests/test_trigger_manager.py index 61fb694..efb359e 100644 --- a/tests/test_trigger_manager.py +++ b/tests/test_trigger_manager.py @@ -298,3 +298,20 @@ class TestTriggerManager(unittest.TestCase): self.assertFalse(tm._add_or_create_stream.called) self.assertFalse(tm.db.get_stream_events.called) self.assertFalse(tm._ready_to_fire.called) + + @mock.patch.object(trigger_manager.ConfigManager, 'wrap') + def test_add__del_trigger_definition(self, mock_config_wrap): + tm = trigger_manager.TriggerManager('test') + tm.db = mock.MagicMock(spec=tm.db) + td1 = dict( + name='test_trigger1', + expiration='$last + 1d', + fire_pipeline='test_pipeline', + fire_criteria=[dict(event_type='test.thing')], + match_criteria=[dict(event_type='test.*')]) + tdlist = list() + tdlist.append(td1) + tm.add_trigger_definition(tdlist) + self.assertTrue('test_trigger1' in tm.trigger_map) + tm.delete_trigger_definition('test_trigger1') + self.assertFalse('test_trigger1' in tm.trigger_map) diff --git a/winchester/config.py b/winchester/config.py index d1a2815..88bcbf1 100644 --- a/winchester/config.py +++ b/winchester/config.py @@ -104,6 +104,9 @@ class ConfigManager(collections.Mapping): return self._defaults[key] raise KeyError(key) + def contains(self, key): + return key in self._configs + def add_config_path(self, *args): for path in args: if path not in self.config_paths: diff --git a/winchester/pipeline_manager.py b/winchester/pipeline_manager.py index 5a779dd..d038824 100644 --- a/winchester/pipeline_manager.py +++ b/winchester/pipeline_manager.py @@ -82,6 +82,7 @@ class Pipeline(object): def handle_events(self, events, stream, debugger): self.env['stream_id'] = stream.id + self.env['stream_name'] = stream.name event_ids = set(e['message_id'] for e in events) try: for handler in self.handlers: @@ -156,6 +157,7 @@ class PipelineManager(object): % (self.proc_name, str(config))) config = ConfigManager.wrap(config, self.config_description()) self.config = config + self.trigger_definitions = [] config.check_config() config.add_config_path(*config['config_path']) if time_sync is None: @@ -189,12 +191,12 @@ class PipelineManager(object): if trigger_defs is not None: self.trigger_definitions = trigger_defs else: - defs = config.load_file(config['trigger_definitions']) - logger.debug("Loaded trigger definitions %s" % str(defs)) - self.trigger_definitions = [TriggerDefinition(conf, None) for conf - in defs] - self.trigger_map = dict( - (tdef.name, tdef) for tdef in self.trigger_definitions) + # trigger_definition config file is optional + if config.contains('trigger_definitions'): + defs = config.load_file(config['trigger_definitions']) + logger.debug("Loaded trigger definitions %s" % str(defs)) + self.trigger_definitions = [ + TriggerDefinition(conf, None) for conf in defs] self.trigger_manager = TriggerManager( self.config, db=self.db, @@ -292,8 +294,14 @@ class PipelineManager(object): return trigger_def.debugger if trigger_def is not None else \ self.trigger_manager.debug_manager.get_debugger(None) + def add_trigger_definition(self, list_of_triggerdefs): + self.trigger_manager.add_trigger_definition(list_of_triggerdefs) + + def delete_trigger_definition(self, trigger_def_name): + self.trigger_manager.delete_trigger_definition(trigger_def_name) + def fire_stream(self, stream): - trigger_def = self.trigger_map.get(stream.name) + trigger_def = self.trigger_manager.trigger_map.get(stream.name) debugger = self.safe_get_debugger(trigger_def) try: stream = self.db.set_stream_state(stream, StreamState.firing) @@ -331,7 +339,7 @@ class PipelineManager(object): return True def expire_stream(self, stream): - trigger_def = self.trigger_map.get(stream.name) + trigger_def = self.trigger_manager.trigger_map.get(stream.name) debugger = self.safe_get_debugger(trigger_def) try: stream = self.db.set_stream_state(stream, StreamState.expiring) diff --git a/winchester/trigger_manager.py b/winchester/trigger_manager.py index 1edd344..6ba0910 100644 --- a/winchester/trigger_manager.py +++ b/winchester/trigger_manager.py @@ -94,7 +94,7 @@ class TriggerManager(object): help="Path(s) to find additional config files", multiple=True, default='.'), distiller_config=ConfigItem( - required=True, + required=False, help="Name of distiller config file " "describing what to extract from the " "notifications"), @@ -118,7 +118,7 @@ class TriggerManager(object): help="Database connection info.", config_description=DBInterface.config_description()), trigger_definitions=ConfigItem( - required=True, + required=False, help="Name of trigger definitions file " "defining trigger conditions and what events to " "process for each stream"), @@ -129,6 +129,7 @@ class TriggerManager(object): config = ConfigManager.wrap(config, self.config_description()) self.config = config self.debug_manager = debugging.DebugManager() + self.trigger_definitions = [] config.check_config() config.add_config_path(*config['config_path']) if time_sync is None: @@ -142,22 +143,29 @@ class TriggerManager(object): if stackdistiller is not None: self.distiller = stackdistiller else: - dist_config = config.load_file(config['distiller_config']) - plugmap = self._load_plugins(config['distiller_trait_plugins'], - distiller.DEFAULT_PLUGINMAP) - self.distiller = distiller.Distiller( - dist_config, - trait_plugin_map=plugmap, - catchall=config['catch_all_notifications']) + # distiller_config is optional + if config.contains('distiller_config'): + dist_config = config.load_file(config['distiller_config']) + plugmap = self._load_plugins(config['distiller_trait_plugins'], + distiller.DEFAULT_PLUGINMAP) + self.distiller = distiller.Distiller( + dist_config, + trait_plugin_map=plugmap, + catchall=config['catch_all_notifications']) if trigger_defs is not None: self.trigger_definitions = trigger_defs for t in self.trigger_definitions: t.set_debugger(self.debug_manager) else: - defs = config.load_file(config['trigger_definitions']) - self.trigger_definitions = [TriggerDefinition(conf, - self.debug_manager) - for conf in defs] + # trigger_definition config file is optional + if config.contains('trigger_definitions'): + defs = config.load_file(config['trigger_definitions']) + self.trigger_definitions = [ + TriggerDefinition(conf, self.debug_manager) + for conf in defs] + # trigger_map is used to quickly access existing trigger_defs + self.trigger_map = dict( + (tdef.name, tdef) for tdef in self.trigger_definitions) self.saved_events = 0 self.received = 0 self.last_status = self.current_time() @@ -246,8 +254,23 @@ class TriggerManager(object): timestamp = trigger_def.get_fire_timestamp(self.current_time()) self.db.stream_ready_to_fire(stream, timestamp) trigger_def.debugger.bump_counter("Ready to fire") - logger.debug("Stream %s ready to fire at %s" % ( - stream.id, timestamp)) + logger.debug("Stream %s ready to fire at %s" % (stream.id, timestamp)) + + def add_trigger_definition(self, list_of_triggerdefs, debugger=None): + if debugger is None: + debugger = self.debug_manager + for td in list_of_triggerdefs: + if (td['name'] in self.trigger_map) is False: + # Only add if name is unique + tdef = TriggerDefinition(td, debugger) + self.trigger_definitions.append(tdef) + self.trigger_map[td['name']] = tdef + + def delete_trigger_definition(self, trigger_def_name): + if trigger_def_name in self.trigger_map: + self.trigger_definitions.remove( + self.trigger_map.get(trigger_def_name)) + del self.trigger_map[trigger_def_name] def add_event(self, event): if self.save_event(event):