Add optional database trimming for old events.
Alows you to trim events older than a configurable time from the events database. (Remerging to fix timex parse error) Change-Id: Iaa290705815d1c3ac23c2ca7370a5d705f1f834c
This commit is contained in:
parent
a15b95dbf3
commit
665c9ad328
@ -1,7 +1,7 @@
|
||||
[metadata]
|
||||
description-file = README.md
|
||||
name = winchester
|
||||
version = 0.56
|
||||
version = 0.57
|
||||
author = Monsyne Dragon
|
||||
author_email = mdragon@rackspace.com
|
||||
summary = An OpenStack notification event processing library.
|
||||
|
@ -531,6 +531,21 @@ class TestDB(unittest.TestCase):
|
||||
self.assertEqual(streams[0]['id'], 3)
|
||||
self.assertEqual(streams[1]['id'], 4)
|
||||
|
||||
def test_purge_events(self):
|
||||
self.db.purge_events([1])
|
||||
events = self.db.find_events()
|
||||
self.assertEqual(3, len(events))
|
||||
|
||||
def test_find_older_events(self):
|
||||
d1 = datetime.datetime(2014, 8, 1, 2, 10, 12, 1)
|
||||
d2 = datetime.datetime(2014, 8, 1, 4, 57, 55, 43)
|
||||
event_ids = self.db.find_older_events(d1, 2)
|
||||
self.assertEqual(event_ids, [3])
|
||||
event_ids = self.db.find_older_events(d2, 2)
|
||||
self.assertEqual(event_ids, [3, 4])
|
||||
event_ids = self.db.find_older_events(d2, 1)
|
||||
self.assertEqual(event_ids, [3])
|
||||
|
||||
def test_find_events(self):
|
||||
events = self.db.find_events()
|
||||
self.assertEqual(4, len(events))
|
||||
|
@ -24,6 +24,7 @@ from sqlalchemy.exc import IntegrityError
|
||||
from sqlalchemy.exc import OperationalError
|
||||
from sqlalchemy.orm.exc import NoResultFound
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
from sqlalchemy.sql import select
|
||||
from winchester.config import ConfigItem
|
||||
from winchester.config import ConfigManager
|
||||
from winchester import models
|
||||
@ -155,6 +156,33 @@ class DBInterface(object):
|
||||
"No event found with message_id %s!" % message_id)
|
||||
return e.as_dict
|
||||
|
||||
@sessioned
|
||||
def find_older_events(self, purge_date, batchsize, session=None):
|
||||
# For speed, we do this below the ORM layer. (mdragon)
|
||||
conn = session.connection()
|
||||
event_table = models.Event.__table__
|
||||
q = select([event_table.c.id])
|
||||
q = q.where(event_table.c.generated < purge_date)
|
||||
q = q.order_by(event_table.c.generated.asc())
|
||||
q = q.limit(batchsize)
|
||||
return [r[0] for r in conn.execute(q).fetchall()]
|
||||
|
||||
@sessioned
|
||||
def purge_events(self, event_ids, session=None):
|
||||
# For speed, we do this below the ORM layer. (mdragon)
|
||||
conn = session.connection()
|
||||
dq = models.stream_event_table.delete()
|
||||
dq = dq.where(models.stream_event_table.c.event_id.in_(event_ids))
|
||||
conn.execute(dq)
|
||||
trait_table = models.Trait.__table__
|
||||
dq = trait_table.delete()
|
||||
dq = dq.where(trait_table.c.event_id.in_(event_ids))
|
||||
conn.execute(dq)
|
||||
event_table = models.Event.__table__
|
||||
dq = event_table.delete()
|
||||
dq = dq.where(event_table.c.id.in_(event_ids))
|
||||
conn.execute(dq)
|
||||
|
||||
@sessioned
|
||||
def find_events(self, from_datetime=None, to_datetime=None,
|
||||
event_name=None, traits=None, mark=None, limit=None,
|
||||
|
@ -19,6 +19,7 @@ import random
|
||||
import simport
|
||||
import six
|
||||
import time
|
||||
import timex
|
||||
|
||||
from winchester.config import ConfigItem
|
||||
from winchester.config import ConfigManager
|
||||
@ -145,6 +146,16 @@ class PipelineManager(object):
|
||||
help="Delete successfully proccessed "
|
||||
"streams when finished?",
|
||||
default=True),
|
||||
trim_events=ConfigItem(
|
||||
help="Delete events older than a configurable time.",
|
||||
default=False),
|
||||
trim_events_age=ConfigItem(
|
||||
help="Delete events older than this (timex expr).",
|
||||
default="$timestamp - 14d"),
|
||||
trim_events_batch_size=ConfigItem(
|
||||
help="Maximum number of events for pipeline "
|
||||
"worker(s) to trim at a time",
|
||||
default=100),
|
||||
))
|
||||
return configs
|
||||
|
||||
@ -208,6 +219,15 @@ class PipelineManager(object):
|
||||
self.pipeline_worker_delay = config['pipeline_worker_delay']
|
||||
self.statistics_period = config['statistics_period']
|
||||
self.purge_completed_streams = config['purge_completed_streams']
|
||||
self.trim_events = config['trim_events']
|
||||
self.trim_events_batch_size = config['trim_events_batch_size']
|
||||
try:
|
||||
self.trim_events_age = timex.parse(str(config['trim_events_age']))
|
||||
except timex.TimexError:
|
||||
logger.error("Invalid trim event expression: %s Event trimming "
|
||||
"disabled." % config['trim_events_age'])
|
||||
self.trim_events_age = None
|
||||
self.trim_events = False
|
||||
self.streams_fired = 0
|
||||
self.streams_expired = 0
|
||||
self.streams_loaded = 0
|
||||
@ -395,6 +415,14 @@ class PipelineManager(object):
|
||||
self.streams_loaded += stream_ct
|
||||
return stream_ct
|
||||
|
||||
def process_trim_events(self):
|
||||
trim_date = self.trim_events_age().timestamp
|
||||
event_ids = self.db.find_older_events(trim_date,
|
||||
self.trim_events_batch_size)
|
||||
logger.debug("Trimming %s old events" % len(event_ids))
|
||||
self.db.purge_events(event_ids)
|
||||
return len(event_ids)
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
try:
|
||||
@ -404,11 +432,15 @@ class PipelineManager(object):
|
||||
self.pipeline_worker_batch_size,
|
||||
expire=True)
|
||||
|
||||
trim_ct = 0
|
||||
if self.trim_events:
|
||||
trim_ct = self.process_trim_events()
|
||||
|
||||
if ((self.current_time() - self.last_status).seconds
|
||||
> self.statistics_period):
|
||||
self._log_statistics()
|
||||
|
||||
if not fire_ct and not expire_ct:
|
||||
if not fire_ct and not expire_ct and not trim_ct:
|
||||
logger.debug("No streams to fire or expire. Sleeping...")
|
||||
time.sleep(self.pipeline_worker_delay)
|
||||
except DatabaseConnectionError:
|
||||
|
Loading…
Reference in New Issue
Block a user