From 4fd6ab051695ec4b45cf92f61e30d4ad59369716 Mon Sep 17 00:00:00 2001 From: gordon chung Date: Wed, 15 Apr 2015 10:07:06 -0400 Subject: [PATCH] cleanup problem events logic in event db storage when an error occurs in event storage, we capture the errors and related events but do nothing with it. this patch logs corresponding errors and raises it if the error is unknown so that dispatcher can requeue payload. Change-Id: Id2493b8073646cad9dece626937c8e8af96e1e43 --- .../event/storage/impl_elasticsearch.py | 18 ++++++++----- ceilometer/event/storage/impl_hbase.py | 16 +++++------ ceilometer/event/storage/impl_sqlalchemy.py | 27 ++++++------------- ceilometer/event/storage/pymongo_base.py | 21 +++++---------- ceilometer/storage/__init__.py | 4 +++ .../tests/storage/test_impl_sqlalchemy.py | 17 ------------ .../tests/storage/test_storage_scenarios.py | 26 ++++++++++++------ 7 files changed, 55 insertions(+), 74 deletions(-) diff --git a/ceilometer/event/storage/impl_elasticsearch.py b/ceilometer/event/storage/impl_elasticsearch.py index b97c62250..1124e0b73 100644 --- a/ceilometer/event/storage/impl_elasticsearch.py +++ b/ceilometer/event/storage/impl_elasticsearch.py @@ -22,8 +22,13 @@ import six from ceilometer.event.storage import base from ceilometer.event.storage import models +from ceilometer.i18n import _LE, _LI +from ceilometer.openstack.common import log +from ceilometer import storage from ceilometer import utils +LOG = log.getLogger(__name__) + AVAILABLE_CAPABILITIES = { 'events': {'query': {'simple': True}}, @@ -93,23 +98,24 @@ class Connection(base.Connection): 'traits': traits, 'raw': ev.raw}} - problem_events = [] + error = None for ok, result in helpers.streaming_bulk( self.conn, _build_bulk_index(events)): if not ok: __, result = result.popitem() if result['status'] == 409: - problem_events.append((models.Event.DUPLICATE, - result['_id'])) + LOG.info(_LI('Duplicate event detected, skipping it: %s') + % result) else: - problem_events.append((models.Event.UNKNOWN_PROBLEM, - result['_id'])) + LOG.exception(_LE('Failed to record event: %s') % result) + error = storage.StorageUnknownWriteError(result) if self._refresh_on_write: self.conn.indices.refresh(index='%s_*' % self.index_name) while self.conn.cluster.pending_tasks(local=True)['tasks']: pass - return problem_events + if error: + raise error def _make_dsl_from_filter(self, indices, ev_filter): q_args = {} diff --git a/ceilometer/event/storage/impl_hbase.py b/ceilometer/event/storage/impl_hbase.py index 2f832cfe4..c9e15690d 100644 --- a/ceilometer/event/storage/impl_hbase.py +++ b/ceilometer/event/storage/impl_hbase.py @@ -15,7 +15,7 @@ import operator from ceilometer.event.storage import base from ceilometer.event.storage import models -from ceilometer.i18n import _ +from ceilometer.i18n import _, _LE from ceilometer.openstack.common import log from ceilometer.storage.hbase import base as hbase_base from ceilometer.storage.hbase import utils as hbase_utils @@ -92,12 +92,8 @@ class Connection(hbase_base.Connection, base.Connection): """Write the events to Hbase. :param event_models: a list of models.Event objects. - :return problem_events: a list of events that could not be saved in a - (reason, event) tuple. From the reasons that are enumerated in - storage.models.Event only the UNKNOWN_PROBLEM is applicable here. """ - problem_events = [] - + error = None with self.conn_pool.connection() as conn: events_table = conn.table(self.EVENT_TABLE) for event_model in event_models: @@ -121,10 +117,10 @@ class Connection(hbase_base.Connection, base.Connection): try: events_table.put(row, record) except Exception as ex: - LOG.debug(_("Failed to record event: %s") % ex) - problem_events.append((models.Event.UNKNOWN_PROBLEM, - event_model)) - return problem_events + LOG.exception(_LE("Failed to record event: %s") % ex) + error = ex + if error: + raise error def get_events(self, event_filter): """Return an iter of models.Event objects. diff --git a/ceilometer/event/storage/impl_sqlalchemy.py b/ceilometer/event/storage/impl_sqlalchemy.py index 64fdc5865..19ea94fde 100644 --- a/ceilometer/event/storage/impl_sqlalchemy.py +++ b/ceilometer/event/storage/impl_sqlalchemy.py @@ -25,7 +25,7 @@ import sqlalchemy as sa from ceilometer.event.storage import base from ceilometer.event.storage import models as api_models -from ceilometer.i18n import _, _LI +from ceilometer.i18n import _LE, _LI from ceilometer.openstack.common import log from ceilometer.storage.sqlalchemy import models from ceilometer import utils @@ -166,16 +166,9 @@ class Connection(base.Connection): """Write the events to SQL database via sqlalchemy. :param event_models: a list of model.Event objects. - - Returns a list of events that could not be saved in a - (reason, event) tuple. Reasons are enumerated in - storage.model.Event - - Flush when they're all added, unless new EventTypes or - TraitTypes are added along the way. """ session = self._engine_facade.get_session() - problem_events = [] + error = None for event_model in event_models: event = None try: @@ -202,18 +195,14 @@ class Connection(base.Connection): session.execute(model.__table__.insert(), trait_map[dtype]) except dbexc.DBDuplicateEntry as e: - LOG.exception(_("Failed to record duplicated event: %s") % e) - problem_events.append((api_models.Event.DUPLICATE, - event_model)) + LOG.info(_LI("Duplicate event detected, skipping it: %s") % e) except KeyError as e: - LOG.exception(_('Failed to record event: %s') % e) - problem_events.append((api_models.Event.INCOMPATIBLE_TRAIT, - event_model)) + LOG.exception(_LE('Failed to record event: %s') % e) except Exception as e: - LOG.exception(_('Failed to record event: %s') % e) - problem_events.append((api_models.Event.UNKNOWN_PROBLEM, - event_model)) - return problem_events + LOG.exception(_LE('Failed to record event: %s') % e) + error = e + if error: + raise error def get_events(self, event_filter): """Return an iterable of model.Event objects. diff --git a/ceilometer/event/storage/pymongo_base.py b/ceilometer/event/storage/pymongo_base.py index 808f26550..9c4e6b51c 100644 --- a/ceilometer/event/storage/pymongo_base.py +++ b/ceilometer/event/storage/pymongo_base.py @@ -16,7 +16,7 @@ import pymongo from ceilometer.event.storage import base from ceilometer.event.storage import models -from ceilometer.i18n import _ +from ceilometer.i18n import _LE, _LI from ceilometer.openstack.common import log from ceilometer.storage.mongo import utils as pymongo_utils from ceilometer import utils @@ -47,14 +47,9 @@ class Connection(base.Connection): def record_events(self, event_models): """Write the events to database. - Return a list of events of type models.Event.DUPLICATE in case of - trying to write an already existing event to the database, or - models.Event.UNKONW_PROBLEM in case of any failures with recording the - event in the database. - :param event_models: a list of models.Event objects. """ - problem_events = [] + error = None for event_model in event_models: traits = [] if event_model.traits: @@ -69,14 +64,12 @@ class Connection(base.Connection): 'timestamp': event_model.generated, 'traits': traits, 'raw': event_model.raw}) except pymongo.errors.DuplicateKeyError as ex: - LOG.exception(_("Failed to record duplicated event: %s") % ex) - problem_events.append((models.Event.DUPLICATE, - event_model)) + LOG.info(_LI("Duplicate event detected, skipping it: %s") % ex) except Exception as ex: - LOG.exception(_("Failed to record event: %s") % ex) - problem_events.append((models.Event.UNKNOWN_PROBLEM, - event_model)) - return problem_events + LOG.exception(_LE("Failed to record event: %s") % ex) + error = ex + if error: + raise error def get_events(self, event_filter): """Return an iter of models.Event objects. diff --git a/ceilometer/storage/__init__.py b/ceilometer/storage/__init__.py index 84d8bfd2a..8249365c8 100644 --- a/ceilometer/storage/__init__.py +++ b/ceilometer/storage/__init__.py @@ -93,6 +93,10 @@ cfg.CONF.register_cli_opts(CLI_OPTS) db_options.set_defaults(cfg.CONF) +class StorageUnknownWriteError(Exception): + """Error raised when an unknown error occurs while recording.""" + + class StorageBadVersion(Exception): """Error raised when the storage backend version is not good enough.""" diff --git a/ceilometer/tests/storage/test_impl_sqlalchemy.py b/ceilometer/tests/storage/test_impl_sqlalchemy.py index 71259e433..9507dbece 100644 --- a/ceilometer/tests/storage/test_impl_sqlalchemy.py +++ b/ceilometer/tests/storage/test_impl_sqlalchemy.py @@ -65,10 +65,6 @@ class EventTypeTest(tests_db.TestBase): self.assertTrue(repr.repr(et2)) -class MyException(Exception): - pass - - @tests_db.run_with('sqlite', 'mysql', 'pgsql') class EventTest(tests_db.TestBase): def _verify_data(self, trait, trait_table): @@ -101,19 +97,6 @@ class EventTest(tests_db.TestBase): model = models.Trait("Foo", models.Trait.DATETIME_TYPE, now) self._verify_data(model, sql_models.TraitDatetime) - def test_bad_event(self): - now = datetime.datetime.utcnow() - m = [models.Event("1", "Foo", now, [], {}), - models.Event("2", "Zoo", now, [], {})] - - with mock.patch.object(self.event_conn, - "_get_or_create_event_type") as mock_save: - mock_save.side_effect = MyException("Boom") - problem_events = self.event_conn.record_events(m) - self.assertEqual(2, len(problem_events)) - for bad, event in problem_events: - self.assertEqual(bad, models.Event.UNKNOWN_PROBLEM) - def test_event_repr(self): ev = sql_models.Event('msg_id', None, False, {}) ev.id = 100 diff --git a/ceilometer/tests/storage/test_storage_scenarios.py b/ceilometer/tests/storage/test_storage_scenarios.py index b0862f924..17f0954f6 100644 --- a/ceilometer/tests/storage/test_storage_scenarios.py +++ b/ceilometer/tests/storage/test_storage_scenarios.py @@ -3325,10 +3325,22 @@ class EventTest(EventTestBase): now = datetime.datetime.utcnow() m = [event_models.Event("1", "Foo", now, None, {}), event_models.Event("1", "Zoo", now, [], {})] - problem_events = self.event_conn.record_events(m) - self.assertEqual(1, len(problem_events)) - bad = problem_events[0] - self.assertEqual(event_models.Event.DUPLICATE, bad[0]) + with mock.patch('%s.LOG' % + self.event_conn.record_events.__module__) as log: + self.event_conn.record_events(m) + self.assertEqual(1, log.info.call_count) + + def test_bad_event(self): + now = datetime.datetime.utcnow() + broken_event = event_models.Event("1", "Foo", now, None, {}) + del(broken_event.__dict__['raw']) + m = [broken_event, broken_event] + with mock.patch('%s.LOG' % + self.event_conn.record_events.__module__) as log: + self.assertRaises(AttributeError, self.event_conn.record_events, m) + # ensure that record_events does not break on first error but + # delays exception and tries to record each event. + self.assertEqual(2, log.exception.call_count) class GetEventTest(EventTestBase): @@ -3634,10 +3646,9 @@ class GetEventTest(EventTestBase): def test_simple_get_event_no_traits(self): new_events = [event_models.Event("id_notraits", "NoTraits", self.start, [], {})] - bad_events = self.event_conn.record_events(new_events) + self.event_conn.record_events(new_events) event_filter = storage.EventFilter(self.start, self.end, "NoTraits") events = [event for event in self.event_conn.get_events(event_filter)] - self.assertEqual(0, len(bad_events)) self.assertEqual(1, len(events)) self.assertEqual("id_notraits", events[0].message_id) self.assertEqual("NoTraits", events[0].event_type) @@ -3654,10 +3665,9 @@ class GetEventTest(EventTestBase): self.start, [], {})] - bad_events = self.event_conn.record_events(new_events) + self.event_conn.record_events(new_events) event_filter = storage.EventFilter(message_id="id_testid") events = [event for event in self.event_conn.get_events(event_filter)] - self.assertEqual(0, len(bad_events)) self.assertEqual(1, len(events)) event = events[0] self.assertEqual("id_testid", event.message_id)