665c9ad328
Alows you to trim events older than a configurable time from the events database. (Remerging to fix timex parse error) Change-Id: Iaa290705815d1c3ac23c2ca7370a5d705f1f834c
454 lines
18 KiB
Python
454 lines
18 KiB
Python
# Copyright (c) 2014 Dark Secret Software Inc.
|
|
# Copyright (c) 2015 Rackspace
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import logging
|
|
import random
|
|
import simport
|
|
import six
|
|
import time
|
|
import timex
|
|
|
|
from winchester.config import ConfigItem
|
|
from winchester.config import ConfigManager
|
|
from winchester.db import DatabaseConnectionError
|
|
from winchester.db import DBInterface
|
|
from winchester.db import LockError
|
|
from winchester.definition import TriggerDefinition
|
|
from winchester.models import StreamState
|
|
from winchester import time_sync as ts
|
|
from winchester.trigger_manager import TriggerManager
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class PipelineError(Exception):
|
|
pass
|
|
|
|
|
|
class PipelineExecutionError(PipelineError):
|
|
def __init__(self, msg="", cause=None):
|
|
super(PipelineExecutionError, self).__init__(
|
|
"%s: caused by %s" % (msg, repr(cause)))
|
|
self.cause = cause
|
|
|
|
|
|
class PipelineConfigError(PipelineError):
|
|
pass
|
|
|
|
|
|
class Pipeline(object):
|
|
@classmethod
|
|
def check_handler_config(cls, conf, handler_map):
|
|
if isinstance(conf, six.string_types):
|
|
conf = dict(name=conf, params=dict())
|
|
if 'name' not in conf:
|
|
raise PipelineConfigError(
|
|
"Handler name not in config! %s" % str(conf))
|
|
if 'params' not in conf:
|
|
conf['params'] = {}
|
|
if conf['name'] not in handler_map:
|
|
raise PipelineConfigError(
|
|
"Unknown handler in pipeline config %s" % conf['name'])
|
|
return conf
|
|
|
|
def __init__(self, name, config, handler_map):
|
|
self.name = name
|
|
self.handlers = []
|
|
self.env = dict()
|
|
for handler_conf in config:
|
|
name = handler_conf['name']
|
|
params = handler_conf['params']
|
|
handler_class = handler_map[name]
|
|
try:
|
|
handler = handler_class(**params)
|
|
except Exception as e:
|
|
logger.exception(
|
|
"Error initalizing handler %s for pipeline %s" %
|
|
(handler_class, self.name))
|
|
raise PipelineExecutionError("Error loading pipeline", e)
|
|
self.handlers.append(handler)
|
|
|
|
def handle_events(self, events, stream, debugger):
|
|
self.env['stream_id'] = stream.id
|
|
self.env['stream_name'] = stream.name
|
|
event_ids = set(e['message_id'] for e in events)
|
|
try:
|
|
for handler in self.handlers:
|
|
events = handler.handle_events(events, self.env)
|
|
debugger.bump_counter("Pre-commit successful")
|
|
except Exception as err:
|
|
logger.exception("Error processing pipeline %s" % self.name)
|
|
debugger.bump_counter("Pipeline error")
|
|
self.rollback(debugger)
|
|
raise PipelineExecutionError("Error in pipeline", err)
|
|
new_events = [e for e in events if e['message_id'] not in event_ids]
|
|
self.commit(debugger)
|
|
return new_events
|
|
|
|
def commit(self, debugger):
|
|
for handler in self.handlers:
|
|
try:
|
|
handler.commit()
|
|
debugger.bump_counter("Commit successful")
|
|
except Exception:
|
|
debugger.bump_counter("Commit error")
|
|
logger.exception(
|
|
"Commit error on handler in pipeline %s" % self.name)
|
|
|
|
def rollback(self, debugger):
|
|
for handler in self.handlers:
|
|
try:
|
|
handler.rollback()
|
|
debugger.bump_counter("Rollback successful")
|
|
except Exception:
|
|
debugger.bump_counter("Rollback error")
|
|
logger.exception(
|
|
"Rollback error on handler in pipeline %s" % self.name)
|
|
|
|
|
|
class PipelineManager(object):
|
|
@classmethod
|
|
def config_description(cls):
|
|
configs = TriggerManager.config_description()
|
|
configs.update(dict(
|
|
pipeline_handlers=ConfigItem(
|
|
required=True,
|
|
help="dictionary of pipeline handlers to load "
|
|
"Classes specified with simport syntax. "
|
|
"simport docs for more info"),
|
|
pipeline_worker_batch_size=ConfigItem(
|
|
help="Number of streams for pipeline "
|
|
"worker(s) to load at a time",
|
|
default=1000),
|
|
pipeline_worker_delay=ConfigItem(
|
|
help="Number of seconds for pipeline worker "
|
|
"to sleep when it finds no streams to "
|
|
"process", default=10),
|
|
pipeline_config=ConfigItem(required=True,
|
|
help="Name of pipeline config file "
|
|
"defining the handlers for each "
|
|
"pipeline."),
|
|
purge_completed_streams=ConfigItem(
|
|
help="Delete successfully proccessed "
|
|
"streams when finished?",
|
|
default=True),
|
|
trim_events=ConfigItem(
|
|
help="Delete events older than a configurable time.",
|
|
default=False),
|
|
trim_events_age=ConfigItem(
|
|
help="Delete events older than this (timex expr).",
|
|
default="$timestamp - 14d"),
|
|
trim_events_batch_size=ConfigItem(
|
|
help="Maximum number of events for pipeline "
|
|
"worker(s) to trim at a time",
|
|
default=100),
|
|
))
|
|
return configs
|
|
|
|
def __init__(self, config, db=None, pipeline_handlers=None,
|
|
pipeline_config=None, trigger_defs=None, time_sync=None,
|
|
proc_name='pipeline_worker'):
|
|
# name used to distinguish worker processes in logs
|
|
self.proc_name = proc_name
|
|
|
|
logger.debug("PipelineManager(%s): Using config: %s"
|
|
% (self.proc_name, str(config)))
|
|
config = ConfigManager.wrap(config, self.config_description())
|
|
self.config = config
|
|
self.trigger_definitions = []
|
|
config.check_config()
|
|
config.add_config_path(*config['config_path'])
|
|
if time_sync is None:
|
|
time_sync = ts.TimeSync()
|
|
self.time_sync = time_sync
|
|
|
|
if db is not None:
|
|
self.db = db
|
|
else:
|
|
self.db = DBInterface(config['database'])
|
|
|
|
if pipeline_handlers is not None:
|
|
self.pipeline_handlers = pipeline_handlers
|
|
else:
|
|
self.pipeline_handlers = self._load_plugins(
|
|
config['pipeline_handlers'])
|
|
logger.debug("Pipeline handlers: %s" % str(self.pipeline_handlers))
|
|
|
|
if pipeline_config is not None:
|
|
self.pipeline_config = pipeline_config
|
|
else:
|
|
self.pipeline_config = config.load_file(config['pipeline_config'])
|
|
|
|
logger.debug("Pipeline config: %s" % str(self.pipeline_config))
|
|
for pipeline, handler_configs in self.pipeline_config.items():
|
|
self.pipeline_config[pipeline] = [
|
|
Pipeline.check_handler_config(conf,
|
|
self.pipeline_handlers)
|
|
for conf in handler_configs]
|
|
|
|
if trigger_defs is not None:
|
|
self.trigger_definitions = trigger_defs
|
|
else:
|
|
# trigger_definition config file is optional
|
|
if config.contains('trigger_definitions'):
|
|
defs = config.load_file(config['trigger_definitions'])
|
|
logger.debug("Loaded trigger definitions %s" % str(defs))
|
|
self.trigger_definitions = [
|
|
TriggerDefinition(conf, None) for conf in defs]
|
|
|
|
self.trigger_manager = TriggerManager(
|
|
self.config, db=self.db,
|
|
trigger_defs=self.trigger_definitions,
|
|
time_sync=time_sync)
|
|
|
|
self.pipeline_worker_batch_size = config['pipeline_worker_batch_size']
|
|
self.pipeline_worker_delay = config['pipeline_worker_delay']
|
|
self.statistics_period = config['statistics_period']
|
|
self.purge_completed_streams = config['purge_completed_streams']
|
|
self.trim_events = config['trim_events']
|
|
self.trim_events_batch_size = config['trim_events_batch_size']
|
|
try:
|
|
self.trim_events_age = timex.parse(str(config['trim_events_age']))
|
|
except timex.TimexError:
|
|
logger.error("Invalid trim event expression: %s Event trimming "
|
|
"disabled." % config['trim_events_age'])
|
|
self.trim_events_age = None
|
|
self.trim_events = False
|
|
self.streams_fired = 0
|
|
self.streams_expired = 0
|
|
self.streams_loaded = 0
|
|
self.last_status = self.current_time()
|
|
|
|
@classmethod
|
|
def _load_plugins(cls, plug_map, defaults=None):
|
|
plugins = dict()
|
|
if defaults is not None:
|
|
plugins.update(defaults)
|
|
for name, cls_string in plug_map.items():
|
|
try:
|
|
plugins[name] = simport.load(cls_string)
|
|
except simport.ImportFailed as e:
|
|
logger.error("Could not load plugin %s: Import failed. %s" % (
|
|
name, e))
|
|
except (simport.MissingMethodOrFunction,
|
|
simport.MissingModule,
|
|
simport.BadDirectory) as e:
|
|
logger.error("Could not load plugin %s: Not found. %s" % (
|
|
name, e))
|
|
return plugins
|
|
|
|
def current_time(self):
|
|
# here so it's easily overridden.
|
|
return self.time_sync.current_time()
|
|
|
|
def _log_statistics(self):
|
|
logger.info("Loaded %s streams. Fired %s, Expired %s." % (
|
|
self.streams_loaded, self.streams_fired, self.streams_expired))
|
|
self.streams_fired = 0
|
|
self.streams_expired = 0
|
|
self.streams_loaded = 0
|
|
self.last_status = self.current_time()
|
|
|
|
self.trigger_manager.debug_manager.dump_debuggers()
|
|
|
|
def add_new_events(self, events):
|
|
for event in events:
|
|
self.trigger_manager.add_event(event)
|
|
|
|
def _run_pipeline(self, stream, trigger_def, pipeline_name,
|
|
pipeline_config):
|
|
events = self.db.get_stream_events(stream)
|
|
debugger = trigger_def.debugger
|
|
try:
|
|
pipeline = Pipeline(pipeline_name, pipeline_config,
|
|
self.pipeline_handlers)
|
|
new_events = pipeline.handle_events(events, stream, debugger)
|
|
except PipelineExecutionError:
|
|
logger.error("Exception in pipeline %s handling stream %s" % (
|
|
pipeline_name, stream.id))
|
|
return False
|
|
if new_events:
|
|
self.add_new_events(new_events)
|
|
return True
|
|
|
|
def _complete_stream(self, stream):
|
|
if self.purge_completed_streams:
|
|
self.db.purge_stream(stream)
|
|
else:
|
|
try:
|
|
self.db.set_stream_state(stream, StreamState.completed)
|
|
except LockError:
|
|
logger.error(
|
|
"Stream %s locked while trying to set 'complete' state! "
|
|
"This should not happen." % stream.id)
|
|
|
|
def _error_stream(self, stream):
|
|
try:
|
|
self.db.set_stream_state(stream, StreamState.error)
|
|
except LockError:
|
|
logger.error("Stream %s locked while trying to set 'error' state! "
|
|
"This should not happen." % stream.id)
|
|
|
|
def _expire_error_stream(self, stream):
|
|
try:
|
|
self.db.set_stream_state(stream, StreamState.expire_error)
|
|
except LockError:
|
|
logger.error(
|
|
"Stream %s locked while trying to set 'expire_error' state! "
|
|
"This should not happen." % stream.id)
|
|
|
|
def safe_get_debugger(self, trigger_def):
|
|
return trigger_def.debugger if trigger_def is not None else \
|
|
self.trigger_manager.debug_manager.get_debugger(None)
|
|
|
|
def add_trigger_definition(self, list_of_triggerdefs):
|
|
self.trigger_manager.add_trigger_definition(list_of_triggerdefs)
|
|
|
|
def delete_trigger_definition(self, trigger_def_name):
|
|
self.trigger_manager.delete_trigger_definition(trigger_def_name)
|
|
|
|
def fire_stream(self, stream):
|
|
trigger_def = self.trigger_manager.trigger_map.get(stream.name)
|
|
debugger = self.safe_get_debugger(trigger_def)
|
|
try:
|
|
stream = self.db.set_stream_state(stream, StreamState.firing)
|
|
except LockError:
|
|
logger.debug("Stream %s locked. Moving on..." % stream.id)
|
|
debugger.bump_counter("Locked")
|
|
return False
|
|
logger.debug("Firing Stream %s." % stream.id)
|
|
if trigger_def is None:
|
|
debugger.bump_counter("Unknown trigger def '%s'" % stream.name)
|
|
logger.error("Stream %s has unknown trigger definition %s" % (
|
|
stream.id, stream.name))
|
|
self._error_stream(stream)
|
|
return False
|
|
pipeline = trigger_def.fire_pipeline
|
|
if pipeline is not None:
|
|
pipe_config = self.pipeline_config.get(pipeline)
|
|
if pipe_config is None:
|
|
debugger.bump_counter("Unknown pipeline '%s'" % pipeline)
|
|
logger.error("Trigger %s for stream %s has unknown "
|
|
"pipeline %s" % (stream.name, stream.id,
|
|
pipeline))
|
|
self._error_stream(stream)
|
|
if not self._run_pipeline(stream, trigger_def, pipeline,
|
|
pipe_config):
|
|
self._error_stream(stream)
|
|
return False
|
|
else:
|
|
logger.debug("No fire pipeline for stream %s. Nothing to do." % (
|
|
stream.id))
|
|
debugger.bump_counter("No fire pipeline for '%s'" % stream.name)
|
|
self._complete_stream(stream)
|
|
debugger.bump_counter("Streams fired")
|
|
self.streams_fired += 1
|
|
return True
|
|
|
|
def expire_stream(self, stream):
|
|
trigger_def = self.trigger_manager.trigger_map.get(stream.name)
|
|
debugger = self.safe_get_debugger(trigger_def)
|
|
try:
|
|
stream = self.db.set_stream_state(stream, StreamState.expiring)
|
|
except LockError:
|
|
debugger.bump_counter("Locked")
|
|
logger.debug("Stream %s locked. Moving on..." % stream.id)
|
|
return False
|
|
logger.debug("Expiring Stream %s." % stream.id)
|
|
if trigger_def is None:
|
|
debugger.bump_counter("Unknown trigger def '%s'" % stream.name)
|
|
logger.error("Stream %s has unknown trigger definition %s" % (
|
|
stream.id, stream.name))
|
|
self._expire_error_stream(stream)
|
|
return False
|
|
pipeline = trigger_def.expire_pipeline
|
|
if pipeline is not None:
|
|
pipe_config = self.pipeline_config.get(pipeline)
|
|
if pipe_config is None:
|
|
debugger.bump_counter("Unknown pipeline '%s'" % pipeline)
|
|
logger.error(
|
|
"Trigger %s for stream %s has unknown pipeline %s" % (
|
|
stream.name, stream.id, pipeline))
|
|
self._expire_error_stream(stream)
|
|
if not self._run_pipeline(stream, trigger_def, pipeline,
|
|
pipe_config):
|
|
self._expire_error_stream(stream)
|
|
return False
|
|
else:
|
|
logger.debug("No expire pipeline for stream %s. Nothing to do." % (
|
|
stream.id))
|
|
debugger.bump_counter("No expire pipeline for '%s'" % stream.name)
|
|
self._complete_stream(stream)
|
|
debugger.bump_counter("Streams expired")
|
|
self.streams_expired += 1
|
|
return True
|
|
|
|
def process_ready_streams(self, batch_size, expire=False):
|
|
streams = self.db.get_ready_streams(batch_size, self.current_time(),
|
|
expire=expire)
|
|
stream_ct = len(streams)
|
|
if expire:
|
|
logger.debug("Loaded %s streams to expire." % stream_ct)
|
|
else:
|
|
logger.debug("Loaded %s streams to fire." % stream_ct)
|
|
|
|
random.shuffle(streams)
|
|
for stream in streams:
|
|
if expire:
|
|
self.expire_stream(stream)
|
|
else:
|
|
self.fire_stream(stream)
|
|
self.streams_loaded += stream_ct
|
|
return stream_ct
|
|
|
|
def process_trim_events(self):
|
|
trim_date = self.trim_events_age().timestamp
|
|
event_ids = self.db.find_older_events(trim_date,
|
|
self.trim_events_batch_size)
|
|
logger.debug("Trimming %s old events" % len(event_ids))
|
|
self.db.purge_events(event_ids)
|
|
return len(event_ids)
|
|
|
|
def run(self):
|
|
while True:
|
|
try:
|
|
fire_ct = self.process_ready_streams(
|
|
self.pipeline_worker_batch_size)
|
|
expire_ct = self.process_ready_streams(
|
|
self.pipeline_worker_batch_size,
|
|
expire=True)
|
|
|
|
trim_ct = 0
|
|
if self.trim_events:
|
|
trim_ct = self.process_trim_events()
|
|
|
|
if ((self.current_time() - self.last_status).seconds
|
|
> self.statistics_period):
|
|
self._log_statistics()
|
|
|
|
if not fire_ct and not expire_ct and not trim_ct:
|
|
logger.debug("No streams to fire or expire. Sleeping...")
|
|
time.sleep(self.pipeline_worker_delay)
|
|
except DatabaseConnectionError:
|
|
logger.warn("Database Connection went away. Reconnecting...")
|
|
time.sleep(5)
|
|
# DB layer will reconnect automatically. We just need to
|
|
# retry the operation. (mdragon)
|
|
except Exception:
|
|
logger.exception("Unknown Error in pipeline worker!")
|
|
raise
|