Event Storage Layer

This change adds the storage layer interfaces
necessary to implement the Event API. It also
updates the get_events call to allow multiple
trait filters.

implements bp specify-event-api

Change-Id: I7aef4a8fc9b88528479f6c0a9feab2c18945bae3
This commit is contained in:
John Herndon 2013-11-21 10:40:20 -06:00
parent ec06be3c6a
commit a5989ab7ad
9 changed files with 447 additions and 99 deletions

View File

@ -116,23 +116,28 @@ class SampleFilter(object):
class EventFilter(object):
"""Properties for building an Event query.
:param start: UTC start datetime (mandatory)
:param end: UTC end datetime (mandatory)
:param start_time: UTC start datetime (mandatory)
:param end_time: UTC end datetime (mandatory)
:param event_type: the name of the event. None for all.
:param traits: the trait filter dict, all of which are optional
:param message_id: the message_id of the event. None for all.
:param traits_filter: the trait filter dicts, all of which are optional.
This parameter is a list of dictionaries that specify
trait values:
{'key': <key>,
't_string': <value>,
't_int': <value>,
't_datetime': <value>
't_float': <value>}
currently, only one trait dict is supported.
't_float': <value>,
'op': <eq, lt, le, ne, gt or ge> }
"""
def __init__(self, start, end, event_type=None, traits={}):
self.start = utils.sanitize_timestamp(start)
self.end = utils.sanitize_timestamp(end)
def __init__(self, start_time=None, end_time=None, event_type=None,
message_id=None, traits_filter=[]):
self.start_time = utils.sanitize_timestamp(start_time)
self.end_time = utils.sanitize_timestamp(end_time)
self.message_id = message_id
self.event_type = event_type
self.traits = traits
self.traits_filter = traits_filter
def dbsync():

View File

@ -278,3 +278,26 @@ class Connection(object):
def get_events(self, event_filter):
"""Return an iterable of model.Event objects.
"""
@abc.abstractmethod
def get_event_types(self):
"""Return all event types as an iterable of strings.
"""
@abc.abstractmethod
def get_trait_types(self, event_type):
"""Return a dictionary containing the name and data type of
the trait type. Only trait types for the provided event_type are
returned.
:param event_type: the type of the Event
"""
@abc.abstractmethod
def get_traits(self, event_type, trait_type=None):
"""Return all trait instances associated with an event_type. If
trait_type is specified, only return instances of that trait type.
:param event_type: the type of the Event to filter by
:param trait_type: the name of the Trait to filter by
"""

View File

@ -748,12 +748,12 @@ class Connection(base.Connection):
:param end_timestamp: Optional modified timestamp end range
:param end_timestamp_op: Optional timestamp end range operation
"""
raise NotImplementedError('Alarm history not implemented')
raise NotImplementedError(_('Alarm history not implemented'))
def record_alarm_change(self, alarm_change):
"""Record alarm change event.
"""
raise NotImplementedError('Alarm history not implemented')
raise NotImplementedError(_('Alarm history not implemented'))
@staticmethod
def record_events(events):
@ -761,7 +761,7 @@ class Connection(base.Connection):
:param events: a list of model.Event objects.
"""
raise NotImplementedError('Events not implemented.')
raise NotImplementedError(_('Events not implemented.'))
@staticmethod
def get_events(event_filter):
@ -769,4 +769,32 @@ class Connection(base.Connection):
:param event_filter: EventFilter instance
"""
raise NotImplementedError('Events not implemented.')
raise NotImplementedError(_('Events not implemented.'))
@staticmethod
def get_event_types():
"""Return all event types as an iterable of strings.
"""
raise NotImplementedError(_('Events not implemented.'))
@staticmethod
def get_trait_types(event_type):
"""Return a dictionary containing the name and data type of
the trait type. Only trait types for the provided event_type are
returned.
:param event_type: the type of the Event
"""
raise NotImplementedError(_('Events not implemented.'))
@staticmethod
def get_traits(event_type, trait_type=None):
"""Return all trait instances associated with an event_type. If
trait_type is specified, only return instances of that trait type.
:param event_type: the type of the Event to filter by
:param trait_type: the name of the Trait to filter by
"""
raise NotImplementedError(_('Events not implemented.'))

View File

@ -597,17 +597,17 @@ class Connection(base.Connection):
"""Yields a lists of alarms that match filters
raise NotImplementedError('metaquery not implemented')
"""
raise NotImplementedError('Alarms not implemented')
raise NotImplementedError(_('Alarms not implemented'))
def create_alarm(self, alarm):
"""update alarm
"""
raise NotImplementedError('Alarms not implemented')
raise NotImplementedError(_('Alarms not implemented'))
def update_alarm(self, alarm):
"""update alarm
"""
raise NotImplementedError('Alarms not implemented')
raise NotImplementedError(_('Alarms not implemented'))
def get_alarm_changes(self, alarm_id, on_behalf_of,
user=None, project=None, type=None,
@ -636,31 +636,56 @@ class Connection(base.Connection):
:param end_timestamp: Optional modified timestamp end range
:param end_timestamp_op: Optional timestamp end range operation
"""
raise NotImplementedError('Alarm history not implemented')
raise NotImplementedError(_('Alarm history not implemented'))
def record_alarm_change(self, alarm_change):
"""Record alarm change event.
"""
raise NotImplementedError('Alarm history not implemented')
raise NotImplementedError(_('Alarm history not implemented'))
def delete_alarm(self, alarm_id):
"""Delete a alarm
"""
raise NotImplementedError('Alarms not implemented')
raise NotImplementedError(_('Alarms not implemented'))
def record_events(self, events):
"""Write the events.
:param events: a list of model.Event objects.
"""
raise NotImplementedError('Events not implemented.')
raise NotImplementedError(_('Events not implemented.'))
def get_events(self, event_filter):
"""Return an iterable of model.Event objects.
:param event_filter: EventFilter instance
"""
raise NotImplementedError('Events not implemented.')
raise NotImplementedError(_('Events not implemented.'))
def get_event_types(self):
"""Return all event types as an iterable of strings.
"""
raise NotImplementedError(_('Events not implemented.'))
def get_trait_types(self, event_type):
"""Return a dictionary containing the name and data type of
the trait type. Only trait types for the provided event_type are
returned.
:param event_type: the type of the Event
"""
raise NotImplementedError(_('Events not implemented.'))
def get_traits(self, event_type, trait_type=None):
"""Return all trait instances associated with an event_type. If
trait_type is specified, only return instances of that trait type.
:param event_type: the type of the Event to filter by
:param trait_type: the name of the Trait to filter by
"""
raise NotImplementedError(_('Events not implemented.'))
###############

View File

@ -18,6 +18,7 @@
"""Simple logging storage backend.
"""
from ceilometer.openstack.common.gettextutils import _ # noqa
from ceilometer.openstack.common import log
from ceilometer.storage import base
@ -205,23 +206,48 @@ class Connection(base.Connection):
:param end_timestamp: Optional modified timestamp end range
:param end_timestamp_op: Optional timestamp end range operation
"""
raise NotImplementedError('Alarm history not implemented')
raise NotImplementedError(_('Alarm history not implemented'))
def record_alarm_change(self, alarm_change):
"""Record alarm change event.
"""
raise NotImplementedError('Alarm history not implemented')
raise NotImplementedError(_('Alarm history not implemented'))
def record_events(self, events):
"""Write the events.
:param events: a list of model.Event objects.
"""
raise NotImplementedError('Events not implemented.')
raise NotImplementedError(_('Events not implemented.'))
def get_events(self, event_filter):
"""Return an iterable of model.Event objects.
:param event_filter: EventFilter instance
"""
raise NotImplementedError('Events not implemented.')
raise NotImplementedError(_('Events not implemented.'))
def get_event_types(self):
"""Return all event types as an iterable of strings.
"""
raise NotImplementedError(_('Events not implemented.'))
def get_trait_types(self, event_type):
"""Return a dictionary containing the name and data type of
the trait type. Only trait types for the provided event_type are
returned.
:param event_type: the type of the Event
"""
raise NotImplementedError(_('Events not implemented.'))
def get_traits(self, event_type, trait_type=None):
"""Return all trait instances associated with an event_type. If
trait_type is specified, only return instances of that trait type.
:param event_type: the type of the Event to filter by
:param trait_type: the name of the Trait to filter by
"""
raise NotImplementedError(_('Events not implemented.'))

View File

@ -965,7 +965,7 @@ class Connection(base.Connection):
:param events: a list of model.Event objects.
"""
raise NotImplementedError('Events not implemented.')
raise NotImplementedError(_('Events not implemented.'))
@staticmethod
def get_events(event_filter):
@ -973,4 +973,32 @@ class Connection(base.Connection):
:param event_filter: EventFilter instance
"""
raise NotImplementedError('Events not implemented.')
raise NotImplementedError(_('Events not implemented.'))
@staticmethod
def get_event_types():
"""Return all event types as an iterable of strings.
"""
raise NotImplementedError(_('EventTypes not implemented.'))
@staticmethod
def get_trait_types(event_type):
"""Return a dictionary containing the name and data type of
the trait type. Only trait types for the provided event_type are
returned.
:param event_type: the type of the Event
"""
raise NotImplementedError(_('Events not implemented.'))
@staticmethod
def get_traits(event_type, trait_type=None):
"""Return all trait instances associated with an event_type. If
trait_type is specified, only return instances of that trait type.
:param event_type: the type of the Event to filter by
:param trait_type: the name of the Trait to filter by
"""
raise NotImplementedError(_('Events not implemented.'))

View File

@ -918,90 +918,182 @@ class Connection(base.Connection):
return problem_events
def get_events(self, event_filter):
"""Return an iterable of model.Event objects. The event model objects
have their Trait model objects available -- filtered by any traits
in the event_filter.
"""Return an iterable of model.Event objects.
:param event_filter: EventFilter instance
"""
start = utils.dt_to_decimal(event_filter.start)
end = utils.dt_to_decimal(event_filter.end)
start = utils.dt_to_decimal(event_filter.start_time)
end = utils.dt_to_decimal(event_filter.end_time)
session = sqlalchemy_session.get_session()
with session.begin():
sub_query = session.query(models.Event.id)\
.join(models.EventType,
models.Event.event_type_id == models.EventType.id)\
.join(models.Trait,
models.Trait.event_id == models.Event.id)\
.filter(models.Event.generated >= start,
models.Event.generated <= end)
event_query = session.query(models.Event)
# Build up the join conditions
event_join_conditions = [models.EventType.id ==
models.Event.event_type_id]
if event_filter.event_type:
event_type = event_filter.event_type
sub_query = sub_query\
.filter(models.EventType.desc == event_type)
event_join_conditions\
.append(models.EventType.desc == event_filter.event_type)
event_query = event_query.join(models.EventType,
and_(*event_join_conditions))
# Build up the where conditions
event_filter_conditions = []
if event_filter.message_id:
event_filter_conditions\
.append(models.Event.message_id == event_filter.message_id)
if start:
event_filter_conditions.append(models.Event.generated >= start)
if end:
event_filter_conditions.append(models.Event.generated <= end)
if event_filter_conditions:
event_query = event_query\
.filter(and_(*event_filter_conditions))
event_models_dict = {}
if event_filter.traits:
sub_query = sub_query.join(models.TraitType,
models.TraitType.id ==
models.Trait.trait_type_id)
for key, value in event_filter.traits.iteritems():
if key == 'key':
sub_query = sub_query.filter(models.TraitType.desc ==
value)
elif key == 't_string':
sub_query = sub_query.filter(
models.Trait.t_string == value)
elif key == 't_int':
sub_query = sub_query.filter(
models.Trait.t_int == value)
elif key == 't_datetime':
dt = utils.dt_to_decimal(value)
sub_query = sub_query.filter(
models.Trait.t_datetime == dt)
elif key == 't_float':
sub_query = sub_query.filter(
models.Trait.t_datetime == value)
if event_filter.traits_filter:
for trait_filter in event_filter.traits_filter:
# Build a sub query that joins Trait to TraitType
# where the trait name matches
trait_name = trait_filter.pop('key')
conditions = [models.Trait.trait_type_id ==
models.TraitType.id,
models.TraitType.desc == trait_name]
for key, value in trait_filter.iteritems():
if key == 't_string':
conditions.append(models.Trait.t_string == value)
elif key == 't_int':
conditions.append(models.Trait.t_int == value)
elif key == 't_datetime':
dt = utils.dt_to_decimal(value)
conditions.append(models.Trait.t_datetime == dt)
elif key == 't_float':
conditions.append(models.Trait.t_float == value)
trait_query = session.query(models.Trait.event_id)\
.join(models.TraitType, and_(*conditions)).subquery()
event_query = event_query\
.join(trait_query,
models.Event.id == trait_query.c.event_id)
else:
# Pre-populate event_models_dict to cover Events without traits
events = session.query(models.Event)\
.filter(models.Event.generated >= start)\
.filter(models.Event.generated <= end)
if event_filter.event_type:
events = events\
.join(models.EventType,
models.EventType.id ==
models.Event.event_type_id)\
.filter(models.EventType.desc ==
event_filter.event_type)
for db_event in events.all():
generated = utils.decimal_to_dt(db_event.generated)
api_event = api_models.Event(db_event.message_id,
db_event.event_type.desc,
generated, [])
event_models_dict[db_event.id] = api_event
# If there are no trait filters, grab the events from the db
query = session.query(models.Event.id,
models.Event.generated,
models.Event.message_id,
models.EventType.desc)\
.join(models.EventType,
and_(*event_join_conditions))
if event_filter_conditions:
query = query.filter(and_(*event_filter_conditions))
for (id, generated, message_id, desc) in query.all():
event_models_dict[id] = api_models.Event(message_id,
desc,
generated,
[])
sub_query = sub_query.subquery()
all_data = session.query(models.Trait)\
.join(sub_query, models.Trait.event_id == sub_query.c.id)
# Build event models for the events
event_query = event_query.subquery()
query = session.query(models.Trait)\
.join(models.TraitType,
models.Trait.trait_type_id == models.TraitType.id)\
.join(event_query, models.Trait.event_id == event_query.c.id)
# Now convert the sqlalchemy objects back into Models ...
for trait in all_data.all():
for trait in query.all():
event = event_models_dict.get(trait.event_id)
if not event:
generated = utils.decimal_to_dt(trait.event.generated)
event = api_models.Event(trait.event.message_id,
trait.event.event_type.desc,
generated, [])
event = api_models.Event(
trait.event.message_id,
trait.event.event_type.desc,
generated, [])
event_models_dict[trait.event_id] = event
value = trait.get_value()
trait_model = api_models.Trait(trait.trait_type.desc,
trait.trait_type.data_type,
value)
trait.get_value())
event.append_trait(trait_model)
event_models = event_models_dict.values()
return sorted(event_models, key=operator.attrgetter('generated'))
@staticmethod
def get_event_types():
"""Return all event types as an iterable of strings.
"""
session = sqlalchemy_session.get_session()
with session.begin():
query = session.query(models.EventType.desc)\
.order_by(models.EventType.desc)
for name in query.all():
# The query returns a tuple with one element.
yield name[0]
@staticmethod
def get_trait_types(event_type):
"""Return a dictionary containing the name and data type of
the trait type. Only trait types for the provided event_type are
returned.
:param event_type: the type of the Event
"""
session = sqlalchemy_session.get_session()
with session.begin():
query = (session.query(models.TraitType.desc,
models.TraitType.data_type)
.join(models.Trait,
models.Trait.trait_type_id ==
models.TraitType.id)
.join(models.Event,
models.Event.id ==
models.Trait.event_id)
.join(models.EventType,
and_(models.EventType.id ==
models.Event.id,
models.EventType.desc ==
event_type))
.group_by(models.TraitType.desc,
models.TraitType.data_type)
.distinct())
for desc, type in query.all():
yield {'name': desc, 'data_type': type}
@staticmethod
def get_traits(event_type, trait_type=None):
"""Return all trait instances associated with an event_type. If
trait_type is specified, only return instances of that trait type.
:param event_type: the type of the Event to filter by
:param trait_type: the name of the Trait to filter by
"""
session = sqlalchemy_session.get_session()
with session.begin():
trait_type_filters = [models.TraitType.id ==
models.Trait.trait_type_id]
if trait_type:
trait_type_filters.append(models.TraitType.desc == trait_type)
query = (session.query(models.Trait)
.join(models.TraitType, and_(*trait_type_filters))
.join(models.Event,
models.Event.id == models.Trait.event_id)
.join(models.EventType,
and_(models.EventType.id ==
models.Event.event_type_id,
models.EventType.desc == event_type)))
for trait in query.all():
type = trait.trait_type
yield api_models.Trait(name=type.desc,
dtype=type.data_type,
value=trait.get_value())

View File

@ -2101,7 +2101,7 @@ class GetEventTest(EventTestBase):
base = 0
self.start = datetime.datetime(2013, 12, 31, 5, 0)
now = self.start
for event_type in ['Foo', 'Bar', 'Zoo']:
for event_type in ['Foo', 'Bar', 'Zoo', 'Foo', 'Bar', 'Zoo']:
trait_models = \
[models.Trait(name, dtype, value)
for name, dtype, value in [
@ -2113,7 +2113,7 @@ class GetEventTest(EventTestBase):
float(base) + 0.123456),
('trait_D', models.Trait.DATETIME_TYPE, now)]]
event_models.append(
models.Event("id_%s" % event_type,
models.Event("id_%s_%d" % (event_type, base),
event_type, now, trait_models))
base += 100
now = now + datetime.timedelta(hours=1)
@ -2124,10 +2124,10 @@ class GetEventTest(EventTestBase):
def test_simple_get(self):
event_filter = storage.EventFilter(self.start, self.end)
events = self.conn.get_events(event_filter)
self.assertEqual(3, len(events))
self.assertEqual(6, len(events))
start_time = None
for i, name in enumerate(["Foo", "Bar", "Zoo"]):
self.assertEqual(events[i].event_type, name)
for i, type in enumerate(['Foo', 'Bar', 'Zoo']):
self.assertEqual(events[i].event_type, type)
self.assertEqual(4, len(events[i].traits))
# Ensure sorted results ...
if start_time is not None:
@ -2136,22 +2136,120 @@ class GetEventTest(EventTestBase):
start_time = events[i].generated
def test_simple_get_event_type(self):
expected_trait_values = {
'id_Bar_100': {
'trait_A': 'my_Bar_text',
'trait_B': 101,
'trait_C': 100.123456,
'trait_D': self.start + datetime.timedelta(hours=1)
},
'id_Bar_400': {
'trait_A': 'my_Bar_text',
'trait_B': 401,
'trait_C': 400.123456,
'trait_D': self.start + datetime.timedelta(hours=4)
}
}
event_filter = storage.EventFilter(self.start, self.end, "Bar")
events = self.conn.get_events(event_filter)
self.assertEqual(1, len(events))
self.assertEqual(2, len(events))
self.assertEqual(events[0].event_type, "Bar")
self.assertEqual(events[1].event_type, "Bar")
self.assertEqual(4, len(events[0].traits))
self.assertEqual(4, len(events[1].traits))
for event in events:
trait_values = expected_trait_values.get(event.message_id,
None)
if not trait_values:
self.fail("Unexpected event ID returned:" % event.message_id)
for trait in event.traits:
expected_val = trait_values.get(trait.name, None)
if not expected_val:
self.fail("Unexpected trait type: %s" % trait.dtype)
self.assertEqual(expected_val, trait.value)
def test_get_event_trait_filter(self):
trait_filters = {'key': 'trait_B', 't_int': 101}
trait_filters = [{'key': 'trait_B', 't_int': 101}]
event_filter = storage.EventFilter(self.start, self.end,
traits=trait_filters)
traits_filter=trait_filters)
events = self.conn.get_events(event_filter)
self.assertEqual(1, len(events))
self.assertEqual(events[0].event_type, "Bar")
self.assertEqual(4, len(events[0].traits))
def test_simple_get_no_traits(self):
def test_get_event_multiple_trait_filter(self):
trait_filters = [{'key': 'trait_B', 't_int': 1},
{'key': 'trait_A', 't_string': 'my_Foo_text'}]
event_filter = storage.EventFilter(self.start, self.end,
traits_filter=trait_filters)
events = self.conn.get_events(event_filter)
self.assertEqual(1, len(events))
self.assertEqual(events[0].event_type, "Foo")
self.assertEqual(4, len(events[0].traits))
def test_get_event_multiple_trait_filter_expect_none(self):
trait_filters = [{'key': 'trait_B', 't_int': 1},
{'key': 'trait_A', 't_string': 'my_Zoo_text'}]
event_filter = storage.EventFilter(self.start, self.end,
traits_filter=trait_filters)
events = self.conn.get_events(event_filter)
self.assertEqual(0, len(events))
def test_get_event_types(self):
event_types = [e for e in
self.conn.get_event_types()]
self.assertEqual(3, len(event_types))
self.assertTrue("Bar" in event_types)
self.assertTrue("Foo" in event_types)
self.assertTrue("Zoo" in event_types)
def test_get_trait_types(self):
trait_types = [tt for tt in
self.conn.get_trait_types("Foo")]
self.assertEqual(4, len(trait_types))
trait_type_names = map(lambda x: x['name'], trait_types)
self.assertIn("trait_A", trait_type_names)
self.assertIn("trait_B", trait_type_names)
self.assertIn("trait_C", trait_type_names)
self.assertIn("trait_D", trait_type_names)
def test_get_trait_types_unknown_event(self):
trait_types = [tt for tt in
self.conn.get_trait_types("Moo")]
self.assertEqual(0, len(trait_types))
def test_get_traits(self):
traits = self.conn.get_traits("Bar")
#format results in a way that makes them easier to
#work with
trait_dict = {}
for trait in traits:
trait_dict[trait.name] = trait.dtype
self.assertTrue("trait_A" in trait_dict)
self.assertEqual(models.Trait.TEXT_TYPE, trait_dict["trait_A"])
self.assertTrue("trait_B" in trait_dict)
self.assertEqual(models.Trait.INT_TYPE, trait_dict["trait_B"])
self.assertTrue("trait_C" in trait_dict)
self.assertEqual(models.Trait.FLOAT_TYPE, trait_dict["trait_C"])
self.assertTrue("trait_D" in trait_dict)
self.assertEqual(models.Trait.DATETIME_TYPE,
trait_dict["trait_D"])
def test_get_all_traits(self):
traits = self.conn.\
get_traits("Foo")
traits = [t for t in traits]
self.assertEqual(8, len(traits))
trait = traits[0]
self.assertEqual("trait_A", trait.name)
self.assertEqual(models.Trait.TEXT_TYPE, trait.dtype)
def test_simple_get_event_no_traits(self):
new_events = [models.Event("id_notraits", "NoTraits", self.start, [])]
bad_events = self.conn.record_events(new_events)
event_filter = storage.EventFilter(self.start, self.end, "NoTraits")
@ -2162,6 +2260,25 @@ class GetEventTest(EventTestBase):
self.assertEqual(events[0].event_type, "NoTraits")
self.assertEqual(0, len(events[0].traits))
def test_simple_get_no_filters(self):
event_filter = storage.EventFilter(None, None, None)
events = self.conn.get_events(event_filter)
self.assertEqual(6, len(events))
def test_get_by_message_id(self):
new_events = [models.Event("id_testid",
"MessageIDTest",
self.start,
[])]
bad_events = self.conn.record_events(new_events)
event_filter = storage.EventFilter(message_id="id_testid")
events = self.conn.get_events(event_filter)
self.assertEqual(0, len(bad_events))
self.assertEqual(1, len(events))
event = events[0]
self.assertEqual("id_testid", event.message_id)
class BigIntegerTest(tests_db.TestBase,
tests_db.MixinTestsWithBackendScenarios):

View File

@ -48,6 +48,9 @@ def dt_to_decimal(utc):
Some databases don't store microseconds in datetime
so we always store as Decimal unixtime.
"""
if utc is None:
return None
decimal.getcontext().prec = 30
return decimal.Decimal(str(calendar.timegm(utc.utctimetuple()))) + \
(decimal.Decimal(str(utc.microsecond)) /
@ -59,6 +62,7 @@ def decimal_to_dt(dec):
"""
if dec is None:
return None
integer = int(dec)
micro = (dec - decimal.Decimal(integer)) * decimal.Decimal(1000000)
daittyme = datetime.datetime.utcfromtimestamp(integer)