Merge "support time to live on event database for sql backend"
This commit is contained in:
commit
bb03376651
@ -14,16 +14,18 @@
|
||||
"""SQLAlchemy storage backend."""
|
||||
|
||||
from __future__ import absolute_import
|
||||
import datetime
|
||||
import os
|
||||
|
||||
from oslo.db import exception as dbexc
|
||||
from oslo.db.sqlalchemy import session as db_session
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import timeutils
|
||||
import sqlalchemy as sa
|
||||
|
||||
from ceilometer.event.storage import base
|
||||
from ceilometer.event.storage import models as api_models
|
||||
from ceilometer.i18n import _
|
||||
from ceilometer.i18n import _, _LI
|
||||
from ceilometer.openstack.common import log
|
||||
from ceilometer.storage.sqlalchemy import models
|
||||
from ceilometer import utils
|
||||
@ -392,3 +394,32 @@ class Connection(base.Connection):
|
||||
yield api_models.Trait(name=k,
|
||||
dtype=dtype,
|
||||
value=v)
|
||||
|
||||
def clear_expired_event_data(self, ttl):
|
||||
"""Clear expired data from the backend storage system.
|
||||
|
||||
Clearing occurs according to the time-to-live.
|
||||
|
||||
:param ttl: Number of seconds to keep records for.
|
||||
"""
|
||||
session = self._engine_facade.get_session()
|
||||
with session.begin():
|
||||
end = timeutils.utcnow() - datetime.timedelta(seconds=ttl)
|
||||
event_q = (session.query(models.Event.id)
|
||||
.filter(models.Event.generated < end))
|
||||
|
||||
event_subq = event_q.subquery()
|
||||
for trait_model in [models.TraitText, models.TraitInt,
|
||||
models.TraitFloat, models.TraitDatetime]:
|
||||
(session.query(trait_model)
|
||||
.filter(trait_model.event_id.in_(event_subq))
|
||||
.delete(synchronize_session="fetch"))
|
||||
event_rows = event_q.delete()
|
||||
|
||||
# remove EventType and TraitType with no corresponding
|
||||
# matching events and traits
|
||||
(session.query(models.EventType)
|
||||
.filter(~models.EventType.events.any())
|
||||
.delete(synchronize_session="fetch"))
|
||||
LOG.info(_LI("%(event)d events are removed from database"),
|
||||
event_rows)
|
||||
|
@ -3148,24 +3148,6 @@ class EventTestBase(tests_db.TestBase,
|
||||
super(EventTestBase, self).setUp()
|
||||
self.prepare_data()
|
||||
|
||||
def prepare_data(self):
|
||||
# Add some data ...
|
||||
pass
|
||||
|
||||
|
||||
@tests_db.run_with('sqlite', 'mysql', 'pgsql', 'mongodb', 'db2')
|
||||
class EventTest(EventTestBase):
|
||||
def test_duplicate_message_id(self):
|
||||
now = datetime.datetime.utcnow()
|
||||
m = [event_models.Event("1", "Foo", now, None),
|
||||
event_models.Event("1", "Zoo", now, [])]
|
||||
problem_events = self.event_conn.record_events(m)
|
||||
self.assertEqual(1, len(problem_events))
|
||||
bad = problem_events[0]
|
||||
self.assertEqual(event_models.Event.DUPLICATE, bad[0])
|
||||
|
||||
|
||||
class GetEventTest(EventTestBase):
|
||||
def prepare_data(self):
|
||||
self.event_models = []
|
||||
base = 0
|
||||
@ -3191,6 +3173,40 @@ class GetEventTest(EventTestBase):
|
||||
|
||||
self.event_conn.record_events(self.event_models)
|
||||
|
||||
|
||||
@tests_db.run_with('sqlite', 'mysql', 'pgsql')
|
||||
class EventTTLTest(EventTestBase):
|
||||
|
||||
@mock.patch.object(timeutils, 'utcnow')
|
||||
def test_clear_expired_event_data(self, mock_utcnow):
|
||||
mock_utcnow.return_value = datetime.datetime(2013, 12, 31, 10, 0)
|
||||
self.event_conn.clear_expired_event_data(3600)
|
||||
|
||||
events = list(self.event_conn.get_events(storage.EventFilter()))
|
||||
self.assertEqual(2, len(events))
|
||||
event_types = list(self.event_conn.get_event_types())
|
||||
self.assertEqual(['Bar', 'Zoo'], event_types)
|
||||
for event_type in event_types:
|
||||
trait_types = list(self.event_conn.get_trait_types(event_type))
|
||||
self.assertEqual(4, len(trait_types))
|
||||
traits = list(self.event_conn.get_traits(event_type))
|
||||
self.assertEqual(4, len(traits))
|
||||
|
||||
|
||||
@tests_db.run_with('sqlite', 'mysql', 'pgsql', 'mongodb', 'db2')
|
||||
class EventTest(EventTestBase):
|
||||
def test_duplicate_message_id(self):
|
||||
now = datetime.datetime.utcnow()
|
||||
m = [event_models.Event("1", "Foo", now, None),
|
||||
event_models.Event("1", "Zoo", now, [])]
|
||||
problem_events = self.event_conn.record_events(m)
|
||||
self.assertEqual(1, len(problem_events))
|
||||
bad = problem_events[0]
|
||||
self.assertEqual(event_models.Event.DUPLICATE, bad[0])
|
||||
|
||||
|
||||
class GetEventTest(EventTestBase):
|
||||
|
||||
def test_generated_is_datetime(self):
|
||||
event_filter = storage.EventFilter(self.start, self.end)
|
||||
events = [event for event in self.event_conn.get_events(event_filter)]
|
||||
|
Loading…
Reference in New Issue
Block a user