Merge "add option to store raw notification"

This commit is contained in:
Jenkins 2015-03-09 20:45:52 +00:00 committed by Gerrit Code Review
commit 4d92fca455
23 changed files with 178 additions and 44 deletions

View File

@ -21,6 +21,7 @@
import ast
import functools
import inspect
import json
from oslo_utils import strutils
from oslo_utils import timeutils
@ -254,3 +255,16 @@ class AlarmRule(Base):
@staticmethod
def update_hook(alarm):
pass
class JsonType(wtypes.UserType):
"""A simple JSON type."""
basetype = wtypes.text
name = 'json'
@staticmethod
def validate(value):
# check that value can be serialised
json.dumps(value)
return value

View File

@ -135,6 +135,9 @@ class Event(base.Base):
generated = datetime.datetime
"The time the event occurred"
raw = base.JsonType()
"The raw copy of notification"
@classmethod
def sample(cls):
return cls(
@ -148,7 +151,8 @@ class Event(base.Base):
value='conductor.tem-devstack-01'),
Trait(name='tenant_id',
value='7f13f2b17917463b9ee21aa92c4b36d6')
}
},
raw={'status': {'nested': 'started'}}
)
@ -243,7 +247,8 @@ class EventsController(rest.RestController):
return [Event(message_id=event.message_id,
event_type=event.event_type,
generated=event.generated,
traits=event.traits)
traits=event.traits,
raw=event.raw)
for event in
pecan.request.event_storage_conn.get_events(event_filter)]
@ -269,4 +274,5 @@ class EventsController(rest.RestController):
return Event(message_id=event.message_id,
event_type=event.event_type,
generated=event.generated,
traits=event.traits)
traits=event.traits,
raw=event.raw)

View File

@ -201,7 +201,8 @@ class EventEndpoint(CollectorEndpoint):
traits=[models.Trait(
name, dtype,
models.Trait.convert_value(dtype, value))
for name, dtype, value in ev['traits']])
for name, dtype, value in ev['traits']],
raw=ev.get('raw', {}))
)
except Exception:
LOG.exception(_LE("Error processing event and it will be "

View File

@ -35,7 +35,11 @@ OPTS = [
default=False,
help='Drop notifications if no event definition matches. '
'(Otherwise, we convert them with just the default traits)'),
cfg.MultiStrOpt('store_raw',
default=[],
help='Store the raw notification for select priority '
'levels (info and/or error). By default, raw details are '
'not captured.')
]
cfg.CONF.register_opts(OPTS, group='event')
@ -157,6 +161,7 @@ class EventDefinition(object):
self._excluded_types = []
self.traits = dict()
self.cfg = definition_cfg
self.raw_levels = [level.lower() for level in cfg.CONF.event.store_raw]
try:
event_type = definition_cfg['event_type']
@ -232,7 +237,9 @@ class EventDefinition(object):
for t in self.traits)
# Only accept non-None value traits ...
traits = [trait for trait in traits if trait is not None]
event = models.Event(message_id, event_type, when, traits)
raw = (notification_body
if notification_body.get('priority') in self.raw_levels else {})
event = models.Event(message_id, event_type, when, traits, raw)
return event

View File

@ -90,7 +90,8 @@ class Connection(base.Connection):
'_type': ev.event_type,
'_id': ev.message_id,
'_source': {'timestamp': ev.generated.isoformat(),
'traits': traits}}
'traits': traits,
'raw': ev.raw}}
problem_events = []
for ok, result in helpers.streaming_bulk(
@ -196,7 +197,8 @@ class Connection(base.Connection):
generated=gen_ts,
traits=sorted(
trait_list,
key=operator.attrgetter('dtype')))
key=operator.attrgetter('dtype')),
raw=record['_source']['raw'])
def get_event_types(self):
iclient = es.client.IndicesClient(self.conn)

View File

@ -116,7 +116,8 @@ class Connection(hbase_base.Connection, base.Connection):
traits[key] = trait.value
record = hbase_utils.serialize_entry(traits,
event_type=event_type,
timestamp=ts)
timestamp=ts,
raw=event_model.raw)
try:
events_table.put(row, record)
except Exception as ex:
@ -154,7 +155,8 @@ class Connection(hbase_base.Connection, base.Connection):
event_type=events_dict['event_type'],
generated=events_dict['timestamp'],
traits=sorted(traits,
key=operator.attrgetter('dtype'))
key=operator.attrgetter('dtype')),
raw=events_dict['raw']
)
def get_event_types(self):

View File

@ -183,7 +183,8 @@ class Connection(base.Connection):
event_type = self._get_or_create_event_type(
event_model.event_type, session=session)
event = models.Event(event_model.message_id, event_type,
event_model.generated)
event_model.generated,
event_model.raw)
session.add(event)
session.flush()
@ -277,12 +278,12 @@ class Connection(base.Connection):
event_list = {}
# get a list of all events that match filters
for (id_, generated, message_id,
desc) in query.add_columns(
desc, raw) in query.add_columns(
models.Event.generated, models.Event.message_id,
models.EventType.desc).order_by(
models.EventType.desc, models.Event.raw).order_by(
models.Event.generated).all():
event_list[id_] = api_models.Event(message_id, desc,
generated, [])
generated, [], raw)
# Query all traits related to events.
# NOTE (gordc): cast is done because pgsql defaults to TEXT when
# handling unknown values such as null.

View File

@ -33,7 +33,7 @@ class Event(base.Model):
UNKNOWN_PROBLEM = 2
INCOMPATIBLE_TRAIT = 3
def __init__(self, message_id, event_type, generated, traits):
def __init__(self, message_id, event_type, generated, traits, raw):
"""Create a new event.
:param message_id: Unique ID for the message this event
@ -43,9 +43,10 @@ class Event(base.Model):
:param event_type: The type of the event.
:param generated: UTC time for when the event occurred.
:param traits: list of Traits on this Event.
:param raw: Unindexed raw notification details.
"""
base.Model.__init__(self, message_id=message_id, event_type=event_type,
generated=generated, traits=traits)
generated=generated, traits=traits, raw=raw)
def append_trait(self, trait_model):
self.traits.append(trait_model)
@ -62,7 +63,8 @@ class Event(base.Model):
return {'message_id': self.message_id,
'event_type': self.event_type,
'generated': serialize_dt(self.generated),
'traits': [trait.serialize() for trait in self.traits]}
'traits': [trait.serialize() for trait in self.traits],
'raw': self.raw}
class Trait(base.Model):

View File

@ -67,7 +67,7 @@ class Connection(base.Connection):
{'_id': event_model.message_id,
'event_type': event_model.event_type,
'timestamp': event_model.generated,
'traits': traits})
'traits': traits, 'raw': event_model.raw})
except pymongo.errors.DuplicateKeyError as ex:
LOG.exception(_("Failed to record duplicated event: %s") % ex)
problem_events.append((models.Event.DUPLICATE,
@ -94,7 +94,7 @@ class Connection(base.Connection):
yield models.Event(message_id=event['_id'],
event_type=event['event_type'],
generated=event['timestamp'],
traits=traits)
traits=traits, raw=event.get('raw'))
def get_event_types(self):
"""Return all event types as an iter of strings."""

View File

@ -99,7 +99,8 @@ class EventPipelineEndpoint(PipelineEndpoint):
timeutils.parse_isotime(ev['generated'])),
traits=[models.Trait(name, dtype,
models.Trait.convert_value(dtype, value))
for name, dtype, value in ev['traits']])
for name, dtype, value in ev['traits']],
raw=ev.get('raw', {}))
for ev in payload
]
with self.publish_context as p:

View File

@ -0,0 +1,28 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sqlalchemy as sa
def upgrade(migrate_engine):
meta = sa.MetaData(bind=migrate_engine)
event = sa.Table('event', meta, autoload=True)
raw = sa.Column('raw', sa.Text)
event.create_column(raw)
def downgrade(migrate_engine):
meta = sa.MetaData(bind=migrate_engine)
event = sa.Table('event', meta, autoload=True)
raw = sa.Column('raw', sa.Text)
event.drop_column(raw)

View File

@ -317,14 +317,16 @@ class Event(Base):
id = Column(Integer, primary_key=True)
message_id = Column(String(50), unique=True)
generated = Column(PreciseTimestamp())
raw = deferred(Column(JSONEncodedDict()))
event_type_id = Column(Integer, ForeignKey('event_type.id'))
event_type = relationship("EventType", backref='events')
def __init__(self, message_id, event_type, generated):
def __init__(self, message_id, event_type, generated, raw):
self.message_id = message_id
self.event_type = event_type
self.generated = generated
self.raw = raw
def __repr__(self):
return "<Event %d('Event: %s %s, Generated: %s')>" % (self.id,

View File

@ -58,7 +58,8 @@ class EventTestBase(v2.FunctionalTest,
models.Event(message_id=str(base),
event_type=event_type,
generated=self.trait_time,
traits=trait_models))
traits=trait_models,
raw={'status': {'nested': 'started'}}))
base += 100
self.trait_time += datetime.timedelta(days=1)
self.event_conn.record_events(event_models)
@ -140,6 +141,7 @@ class TestEventAPI(EventTestBase):
expected_generated = trait_time.isoformat()
self.assertIn(event['event_type'], ['Foo', 'Bar', 'Zoo'])
self.assertEqual(4, len(event['traits']))
self.assertEqual({'status': {'nested': 'started'}}, event['raw']),
self.assertEqual(expected_generated, event['generated'])
for trait_name in ['trait_A', 'trait_B',
'trait_C', 'trait_D']:

View File

@ -36,7 +36,7 @@ class TestDispatcherDB(base.BaseTestCase):
def test_event_conn(self):
event = event_models.Event(uuid.uuid4(), 'test',
datetime.datetime(2012, 7, 2, 13, 53, 40),
[])
[], {})
with mock.patch.object(self.dispatcher.event_conn,
'record_events') as record_events:
self.dispatcher.record_events(event)

View File

@ -17,7 +17,7 @@ import datetime
import jsonpath_rw
import mock
from oslo_config import cfg as oslo_cfg
from oslo_config import fixture as fixture_config
import six
from ceilometer.event import converter
@ -594,7 +594,7 @@ class TestNotificationConverter(ConverterBase):
def setUp(self):
super(TestNotificationConverter, self).setUp()
self.CONF = self.useFixture(fixture_config.Config()).conf
self.valid_event_def1 = [{
'event_type': 'compute.instance.create.*',
'traits': {
@ -704,6 +704,60 @@ class TestNotificationConverter(ConverterBase):
e = c.to_event(self.test_notification2)
self.assertIsNotValidEvent(e, self.test_notification2)
@staticmethod
def _convert_message(convert, level):
message = {'priority': level, 'event_type': "foo",
'message_id': "abc", 'publisher_id': "1"}
return convert.to_event(message)
def test_store_raw_all(self):
self.CONF.event.store_raw = ['info', 'error']
c = converter.NotificationEventsConverter(
[], self.fake_plugin_mgr)
self.assertTrue(self._convert_message(c, 'info').raw)
self.assertTrue(self._convert_message(c, 'error').raw)
def test_store_raw_info_only(self):
self.CONF.event.store_raw = ['info']
c = converter.NotificationEventsConverter(
[], self.fake_plugin_mgr)
self.assertTrue(self._convert_message(c, 'info').raw)
self.assertFalse(self._convert_message(c, 'error').raw)
def test_store_raw_error_only(self):
self.CONF.event.store_raw = ['error']
c = converter.NotificationEventsConverter(
[], self.fake_plugin_mgr)
self.assertFalse(self._convert_message(c, 'info').raw)
self.assertTrue(self._convert_message(c, 'error').raw)
def test_store_raw_skip_all(self):
c = converter.NotificationEventsConverter(
[], self.fake_plugin_mgr)
self.assertFalse(self._convert_message(c, 'info').raw)
self.assertFalse(self._convert_message(c, 'error').raw)
def test_store_raw_info_only_no_case(self):
self.CONF.event.store_raw = ['INFO']
c = converter.NotificationEventsConverter(
[], self.fake_plugin_mgr)
self.assertTrue(self._convert_message(c, 'info').raw)
self.assertFalse(self._convert_message(c, 'error').raw)
def test_store_raw_bad_skip_all(self):
self.CONF.event.store_raw = ['unknown']
c = converter.NotificationEventsConverter(
[], self.fake_plugin_mgr)
self.assertFalse(self._convert_message(c, 'info').raw)
self.assertFalse(self._convert_message(c, 'error').raw)
def test_store_raw_bad_and_good(self):
self.CONF.event.store_raw = ['info', 'unknown']
c = converter.NotificationEventsConverter(
[], self.fake_plugin_mgr)
self.assertTrue(self._convert_message(c, 'info').raw)
self.assertFalse(self._convert_message(c, 'error').raw)
def test_setup_events_default_config(self):
def mock_exists(path):
@ -715,7 +769,7 @@ class TestNotificationConverter(ConverterBase):
with mock.patch('ceilometer.event.converter.get_config_file',
mock_get_config_file):
oslo_cfg.CONF.set_override('drop_unmatched_notifications',
self.CONF.set_override('drop_unmatched_notifications',
False, group='event')
with mock.patch('os.path.exists', mock_exists):
@ -724,7 +778,7 @@ class TestNotificationConverter(ConverterBase):
self.assertEqual(1, len(c.definitions))
self.assertTrue(c.definitions[0].is_catchall)
oslo_cfg.CONF.set_override('drop_unmatched_notifications',
self.CONF.set_override('drop_unmatched_notifications',
True, group='event')
with mock.patch('os.path.exists', mock_exists):

View File

@ -138,7 +138,7 @@ class EventDataFixture(fixture.GabbiFixture):
event = models.Event(message_id,
'cookies_{}'.format(name),
timestamp,
traits)
traits, {'nested': {'inside': 'value'}})
events.append(event)
self.conn.record_events(events)

View File

@ -15,12 +15,15 @@ tests:
$.[0].event_type: cookies_chocolate.chip
$.[0].traits.[0].value: chocolate.chip
$.[0].traits.[1].value: '0'
$.[0].raw.nested.inside: value
$.[1].event_type: cookies_peanut.butter
$.[1].traits.[0].name: type
$.[1].traits.[1].name: ate
$.[1].raw.nested.inside: value
$.[2].event_type: cookies_sugar
$.[2].traits.[0].type: string
$.[2].traits.[1].type: integer
$.[2].raw.nested.inside: value
# this attempts to get all the events with invalid parameters and expects a 400
- name: get events with bad params

View File

@ -89,7 +89,7 @@ class TestEventDirectPublisher(tests_db.TestBase,
test_data = [event.Event(message_id=str(uuid.uuid4()),
event_type='event_%d' % i,
generated=datetime.datetime.utcnow(),
traits=[])
traits=[], raw={})
for i in range(0, 5)]
def test_direct_publisher(self, ):

View File

@ -37,7 +37,7 @@ class BasePublisherTestCase(tests_base.BaseTestCase):
event.Event(message_id=uuid.uuid4(),
event_type='event_%d' % i,
generated=datetime.datetime.utcnow(),
traits=[])
traits=[], raw={})
for i in range(0, 5)
]

View File

@ -73,7 +73,7 @@ class MyException(Exception):
class EventTest(tests_db.TestBase):
def _verify_data(self, trait, trait_table):
now = datetime.datetime.utcnow()
ev = models.Event('1', 'name', now, [trait])
ev = models.Event('1', 'name', now, [trait], {})
self.event_conn.record_events([ev])
session = self.event_conn._engine_facade.get_session()
t_tables = [sql_models.TraitText, sql_models.TraitFloat,
@ -103,8 +103,8 @@ class EventTest(tests_db.TestBase):
def test_bad_event(self):
now = datetime.datetime.utcnow()
m = [models.Event("1", "Foo", now, []),
models.Event("2", "Zoo", now, [])]
m = [models.Event("1", "Foo", now, [], {}),
models.Event("2", "Zoo", now, [], {})]
with mock.patch.object(self.event_conn,
"_get_or_create_event_type") as mock_save:
@ -115,7 +115,7 @@ class EventTest(tests_db.TestBase):
self.assertEqual(bad, models.Event.UNKNOWN_PROBLEM)
def test_event_repr(self):
ev = sql_models.Event('msg_id', None, False)
ev = sql_models.Event('msg_id', None, False, {})
ev.id = 100
self.assertTrue(repr.repr(ev))

View File

@ -58,7 +58,7 @@ class ModelTest(testbase.BaseTestCase):
d)
def test_event_repr_no_traits(self):
x = event_models.Event("1", "name", "now", None)
x = event_models.Event("1", "name", "now", None, {})
self.assertEqual("<Event: 1, name, now, >", repr(x))
def test_get_field_names_of_sample(self):

View File

@ -3178,7 +3178,8 @@ class EventTestBase(tests_db.TestBase,
now)]]
self.event_models.append(
event_models.Event("id_%s_%d" % (event_type, base),
event_type, now, trait_models))
event_type, now, trait_models,
{'status': {'nested': 'started'}}))
base += 100
now = now + datetime.timedelta(hours=1)
self.end = now
@ -3209,8 +3210,8 @@ class EventTTLTest(EventTestBase):
class EventTest(EventTestBase):
def test_duplicate_message_id(self):
now = datetime.datetime.utcnow()
m = [event_models.Event("1", "Foo", now, None),
event_models.Event("1", "Zoo", now, [])]
m = [event_models.Event("1", "Foo", now, None, {}),
event_models.Event("1", "Zoo", now, [], {})]
problem_events = self.event_conn.record_events(m)
self.assertEqual(1, len(problem_events))
bad = problem_events[0]
@ -3520,7 +3521,7 @@ class GetEventTest(EventTestBase):
def test_simple_get_event_no_traits(self):
new_events = [event_models.Event("id_notraits", "NoTraits",
self.start, [])]
self.start, [], {})]
bad_events = self.event_conn.record_events(new_events)
event_filter = storage.EventFilter(self.start, self.end, "NoTraits")
events = [event for event in self.event_conn.get_events(event_filter)]
@ -3539,7 +3540,7 @@ class GetEventTest(EventTestBase):
new_events = [event_models.Event("id_testid",
"MessageIDTest",
self.start,
[])]
[], {})]
bad_events = self.event_conn.record_events(new_events)
event_filter = storage.EventFilter(message_id="id_testid")
@ -3549,6 +3550,12 @@ class GetEventTest(EventTestBase):
event = events[0]
self.assertEqual("id_testid", event.message_id)
def test_simple_get_raw(self):
event_filter = storage.EventFilter()
events = [event for event in self.event_conn.get_events(event_filter)]
self.assertTrue(events)
self.assertEqual({'status': {'nested': 'started'}}, events[0].raw)
class BigIntegerTest(tests_db.TestBase,
tests_db.MixinTestsWithBackendScenarios):

View File

@ -53,7 +53,8 @@ class EventPipelineTestCase(base.BaseTestCase):
models.Trait('t_int', 2, 'int_trait'),
models.Trait('t_float', 3, 'float_trait'),
models.Trait('t_datetime', 4, 'datetime_trait')
]
],
raw={'status': 'started'}
)
self.test_event2 = models.Event(
@ -65,7 +66,8 @@ class EventPipelineTestCase(base.BaseTestCase):
models.Trait('t_int', 2, 'int_trait'),
models.Trait('t_float', 3, 'float_trait'),
models.Trait('t_datetime', 4, 'datetime_trait')
]
],
raw={'status': 'stopped'}
)
self.useFixture(mockpatch.PatchObject(