rebuild event model only for database writes
we should only build event model when dispatching to database. we do not need to ensure type-correctness when writing to file or http. Change-Id: I553f9062299d7ddcfd0d694aad7a432692bdc953 Closes-Bug: #1438285
This commit is contained in:
parent
4fd6ab0516
commit
5521215dc8
@ -19,11 +19,9 @@ import msgpack
|
||||
from oslo_config import cfg
|
||||
import oslo_messaging
|
||||
from oslo_utils import netutils
|
||||
from oslo_utils import timeutils
|
||||
from oslo_utils import units
|
||||
|
||||
from ceilometer import dispatcher
|
||||
from ceilometer.event.storage import models
|
||||
from ceilometer import messaging
|
||||
from ceilometer.i18n import _, _LE
|
||||
from ceilometer.openstack.common import log
|
||||
@ -191,25 +189,3 @@ class EventEndpoint(CollectorEndpoint):
|
||||
super(EventEndpoint, self).__init__(
|
||||
dispatcher_manager,
|
||||
cfg.CONF.collector.requeue_event_on_dispatcher_error)
|
||||
|
||||
def sample(self, ctxt, publisher_id, event_type, payload, metadata):
|
||||
events = []
|
||||
for ev in payload:
|
||||
try:
|
||||
events.append(
|
||||
models.Event(
|
||||
message_id=ev['message_id'],
|
||||
event_type=ev['event_type'],
|
||||
generated=timeutils.normalize_time(
|
||||
timeutils.parse_isotime(ev['generated'])),
|
||||
traits=[models.Trait(
|
||||
name, dtype,
|
||||
models.Trait.convert_value(dtype, value))
|
||||
for name, dtype, value in ev['traits']],
|
||||
raw=ev.get('raw', {}))
|
||||
)
|
||||
except Exception:
|
||||
LOG.exception(_LE("Error processing event and it will be "
|
||||
"dropped: %s"), ev)
|
||||
return super(EventEndpoint, self).sample(
|
||||
ctxt, publisher_id, event_type, events, metadata)
|
||||
|
@ -15,7 +15,8 @@
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from ceilometer import dispatcher
|
||||
from ceilometer.i18n import _
|
||||
from ceilometer.event.storage import models
|
||||
from ceilometer.i18n import _, _LE, _LW
|
||||
from ceilometer.openstack.common import log
|
||||
from ceilometer.publisher import utils as publisher_utils
|
||||
from ceilometer import storage
|
||||
@ -46,8 +47,8 @@ class DatabaseDispatcher(dispatcher.Base):
|
||||
return storage.get_connection_from_config(self.conf, purpose)
|
||||
except Exception as err:
|
||||
params = {"purpose": purpose, "err": err}
|
||||
LOG.exception(_("Failed to connect to db, purpose %(purpose)s "
|
||||
"re-try later: %(err)s") % params)
|
||||
LOG.exception(_LE("Failed to connect to db, purpose %(purpose)s "
|
||||
"re-try later: %(err)s") % params)
|
||||
if not ignore_exception:
|
||||
raise
|
||||
|
||||
@ -89,12 +90,12 @@ class DatabaseDispatcher(dispatcher.Base):
|
||||
meter['timestamp'] = timeutils.normalize_time(ts)
|
||||
self.meter_conn.record_metering_data(meter)
|
||||
except Exception as err:
|
||||
LOG.exception(_('Failed to record metering data: %s'),
|
||||
LOG.exception(_LE('Failed to record metering data: %s'),
|
||||
err)
|
||||
# raise the exception to propagate it up in the chain.
|
||||
raise
|
||||
else:
|
||||
LOG.warning(_(
|
||||
LOG.warning(_LW(
|
||||
'message signature invalid, discarding message: %r'),
|
||||
meter)
|
||||
|
||||
@ -102,4 +103,22 @@ class DatabaseDispatcher(dispatcher.Base):
|
||||
if not isinstance(events, list):
|
||||
events = [events]
|
||||
|
||||
return self.event_conn.record_events(events)
|
||||
event_list = []
|
||||
for ev in events:
|
||||
try:
|
||||
event_list.append(
|
||||
models.Event(
|
||||
message_id=ev['message_id'],
|
||||
event_type=ev['event_type'],
|
||||
generated=timeutils.normalize_time(
|
||||
timeutils.parse_isotime(ev['generated'])),
|
||||
traits=[models.Trait(
|
||||
name, dtype,
|
||||
models.Trait.convert_value(dtype, value))
|
||||
for name, dtype, value in ev['traits']],
|
||||
raw=ev.get('raw', {}))
|
||||
)
|
||||
except Exception:
|
||||
LOG.exception(_LE("Error processing event and it will be "
|
||||
"dropped: %s"), ev)
|
||||
self.event_conn.record_events(event_list)
|
||||
|
@ -81,4 +81,3 @@ class FileDispatcher(dispatcher.Base):
|
||||
def record_events(self, events):
|
||||
if self.log:
|
||||
self.log.info(events)
|
||||
return []
|
||||
|
@ -128,12 +128,11 @@ class HttpDispatcher(dispatcher.Base):
|
||||
for event in events:
|
||||
res = None
|
||||
try:
|
||||
res = requests.post(self.event_target, data=event.serialize(),
|
||||
res = requests.post(self.event_target, data=event,
|
||||
headers=self.headers, timeout=self.timeout)
|
||||
res.raise_for_status()
|
||||
except Exception:
|
||||
error_code = res.status_code if res else 'unknown'
|
||||
LOG.exception(_LE('Status Code: %{code}s. Failed to dispatch '
|
||||
'event: %{event}s'),
|
||||
{'code': error_code,
|
||||
'event': event.serialize()})
|
||||
{'code': error_code, 'event': event})
|
||||
|
@ -36,7 +36,7 @@ class TestDispatcherDB(base.BaseTestCase):
|
||||
def test_event_conn(self):
|
||||
event = event_models.Event(uuid.uuid4(), 'test',
|
||||
datetime.datetime(2012, 7, 2, 13, 53, 40),
|
||||
[], {})
|
||||
[], {}).serialize()
|
||||
with mock.patch.object(self.dispatcher.event_conn,
|
||||
'record_events') as record_events:
|
||||
self.dispatcher.record_events(event)
|
||||
|
@ -136,7 +136,7 @@ class TestEventDispatcherHttp(base.BaseTestCase):
|
||||
|
||||
event = event_models.Event(uuid.uuid4(), 'test',
|
||||
datetime.datetime(2012, 7, 2, 13, 53, 40),
|
||||
[], {})
|
||||
[], {}).serialize()
|
||||
|
||||
with mock.patch.object(requests, 'post') as post:
|
||||
dispatcher.record_events(event)
|
||||
@ -149,7 +149,7 @@ class TestEventDispatcherHttp(base.BaseTestCase):
|
||||
|
||||
event = event_models.Event(uuid.uuid4(), 'test',
|
||||
datetime.datetime(2012, 7, 2, 13, 53, 40),
|
||||
[], {})
|
||||
[], {}).serialize()
|
||||
|
||||
with mock.patch('ceilometer.dispatcher.http.LOG',
|
||||
mock.MagicMock()) as LOG:
|
||||
@ -162,7 +162,7 @@ class TestEventDispatcherHttp(base.BaseTestCase):
|
||||
|
||||
event = event_models.Event(uuid.uuid4(), 'test',
|
||||
datetime.datetime(2012, 7, 2, 13, 53, 40),
|
||||
[], {})
|
||||
[], {}).serialize()
|
||||
|
||||
with mock.patch.object(requests, 'post') as post:
|
||||
dispatcher.record_events(event)
|
||||
|
Loading…
x
Reference in New Issue
Block a user