diff --git a/ceilometer/event/storage/impl_elasticsearch.py b/ceilometer/event/storage/impl_elasticsearch.py index b97c62250..1124e0b73 100644 --- a/ceilometer/event/storage/impl_elasticsearch.py +++ b/ceilometer/event/storage/impl_elasticsearch.py @@ -22,8 +22,13 @@ import six from ceilometer.event.storage import base from ceilometer.event.storage import models +from ceilometer.i18n import _LE, _LI +from ceilometer.openstack.common import log +from ceilometer import storage from ceilometer import utils +LOG = log.getLogger(__name__) + AVAILABLE_CAPABILITIES = { 'events': {'query': {'simple': True}}, @@ -93,23 +98,24 @@ class Connection(base.Connection): 'traits': traits, 'raw': ev.raw}} - problem_events = [] + error = None for ok, result in helpers.streaming_bulk( self.conn, _build_bulk_index(events)): if not ok: __, result = result.popitem() if result['status'] == 409: - problem_events.append((models.Event.DUPLICATE, - result['_id'])) + LOG.info(_LI('Duplicate event detected, skipping it: %s') + % result) else: - problem_events.append((models.Event.UNKNOWN_PROBLEM, - result['_id'])) + LOG.exception(_LE('Failed to record event: %s') % result) + error = storage.StorageUnknownWriteError(result) if self._refresh_on_write: self.conn.indices.refresh(index='%s_*' % self.index_name) while self.conn.cluster.pending_tasks(local=True)['tasks']: pass - return problem_events + if error: + raise error def _make_dsl_from_filter(self, indices, ev_filter): q_args = {} diff --git a/ceilometer/event/storage/impl_hbase.py b/ceilometer/event/storage/impl_hbase.py index 2f832cfe4..c9e15690d 100644 --- a/ceilometer/event/storage/impl_hbase.py +++ b/ceilometer/event/storage/impl_hbase.py @@ -15,7 +15,7 @@ import operator from ceilometer.event.storage import base from ceilometer.event.storage import models -from ceilometer.i18n import _ +from ceilometer.i18n import _, _LE from ceilometer.openstack.common import log from ceilometer.storage.hbase import base as hbase_base from ceilometer.storage.hbase import utils as hbase_utils @@ -92,12 +92,8 @@ class Connection(hbase_base.Connection, base.Connection): """Write the events to Hbase. :param event_models: a list of models.Event objects. - :return problem_events: a list of events that could not be saved in a - (reason, event) tuple. From the reasons that are enumerated in - storage.models.Event only the UNKNOWN_PROBLEM is applicable here. """ - problem_events = [] - + error = None with self.conn_pool.connection() as conn: events_table = conn.table(self.EVENT_TABLE) for event_model in event_models: @@ -121,10 +117,10 @@ class Connection(hbase_base.Connection, base.Connection): try: events_table.put(row, record) except Exception as ex: - LOG.debug(_("Failed to record event: %s") % ex) - problem_events.append((models.Event.UNKNOWN_PROBLEM, - event_model)) - return problem_events + LOG.exception(_LE("Failed to record event: %s") % ex) + error = ex + if error: + raise error def get_events(self, event_filter): """Return an iter of models.Event objects. diff --git a/ceilometer/event/storage/impl_sqlalchemy.py b/ceilometer/event/storage/impl_sqlalchemy.py index 64fdc5865..19ea94fde 100644 --- a/ceilometer/event/storage/impl_sqlalchemy.py +++ b/ceilometer/event/storage/impl_sqlalchemy.py @@ -25,7 +25,7 @@ import sqlalchemy as sa from ceilometer.event.storage import base from ceilometer.event.storage import models as api_models -from ceilometer.i18n import _, _LI +from ceilometer.i18n import _LE, _LI from ceilometer.openstack.common import log from ceilometer.storage.sqlalchemy import models from ceilometer import utils @@ -166,16 +166,9 @@ class Connection(base.Connection): """Write the events to SQL database via sqlalchemy. :param event_models: a list of model.Event objects. - - Returns a list of events that could not be saved in a - (reason, event) tuple. Reasons are enumerated in - storage.model.Event - - Flush when they're all added, unless new EventTypes or - TraitTypes are added along the way. """ session = self._engine_facade.get_session() - problem_events = [] + error = None for event_model in event_models: event = None try: @@ -202,18 +195,14 @@ class Connection(base.Connection): session.execute(model.__table__.insert(), trait_map[dtype]) except dbexc.DBDuplicateEntry as e: - LOG.exception(_("Failed to record duplicated event: %s") % e) - problem_events.append((api_models.Event.DUPLICATE, - event_model)) + LOG.info(_LI("Duplicate event detected, skipping it: %s") % e) except KeyError as e: - LOG.exception(_('Failed to record event: %s') % e) - problem_events.append((api_models.Event.INCOMPATIBLE_TRAIT, - event_model)) + LOG.exception(_LE('Failed to record event: %s') % e) except Exception as e: - LOG.exception(_('Failed to record event: %s') % e) - problem_events.append((api_models.Event.UNKNOWN_PROBLEM, - event_model)) - return problem_events + LOG.exception(_LE('Failed to record event: %s') % e) + error = e + if error: + raise error def get_events(self, event_filter): """Return an iterable of model.Event objects. diff --git a/ceilometer/event/storage/pymongo_base.py b/ceilometer/event/storage/pymongo_base.py index 808f26550..9c4e6b51c 100644 --- a/ceilometer/event/storage/pymongo_base.py +++ b/ceilometer/event/storage/pymongo_base.py @@ -16,7 +16,7 @@ import pymongo from ceilometer.event.storage import base from ceilometer.event.storage import models -from ceilometer.i18n import _ +from ceilometer.i18n import _LE, _LI from ceilometer.openstack.common import log from ceilometer.storage.mongo import utils as pymongo_utils from ceilometer import utils @@ -47,14 +47,9 @@ class Connection(base.Connection): def record_events(self, event_models): """Write the events to database. - Return a list of events of type models.Event.DUPLICATE in case of - trying to write an already existing event to the database, or - models.Event.UNKONW_PROBLEM in case of any failures with recording the - event in the database. - :param event_models: a list of models.Event objects. """ - problem_events = [] + error = None for event_model in event_models: traits = [] if event_model.traits: @@ -69,14 +64,12 @@ class Connection(base.Connection): 'timestamp': event_model.generated, 'traits': traits, 'raw': event_model.raw}) except pymongo.errors.DuplicateKeyError as ex: - LOG.exception(_("Failed to record duplicated event: %s") % ex) - problem_events.append((models.Event.DUPLICATE, - event_model)) + LOG.info(_LI("Duplicate event detected, skipping it: %s") % ex) except Exception as ex: - LOG.exception(_("Failed to record event: %s") % ex) - problem_events.append((models.Event.UNKNOWN_PROBLEM, - event_model)) - return problem_events + LOG.exception(_LE("Failed to record event: %s") % ex) + error = ex + if error: + raise error def get_events(self, event_filter): """Return an iter of models.Event objects. diff --git a/ceilometer/storage/__init__.py b/ceilometer/storage/__init__.py index 84d8bfd2a..8249365c8 100644 --- a/ceilometer/storage/__init__.py +++ b/ceilometer/storage/__init__.py @@ -93,6 +93,10 @@ cfg.CONF.register_cli_opts(CLI_OPTS) db_options.set_defaults(cfg.CONF) +class StorageUnknownWriteError(Exception): + """Error raised when an unknown error occurs while recording.""" + + class StorageBadVersion(Exception): """Error raised when the storage backend version is not good enough.""" diff --git a/ceilometer/tests/storage/test_impl_sqlalchemy.py b/ceilometer/tests/storage/test_impl_sqlalchemy.py index 71259e433..9507dbece 100644 --- a/ceilometer/tests/storage/test_impl_sqlalchemy.py +++ b/ceilometer/tests/storage/test_impl_sqlalchemy.py @@ -65,10 +65,6 @@ class EventTypeTest(tests_db.TestBase): self.assertTrue(repr.repr(et2)) -class MyException(Exception): - pass - - @tests_db.run_with('sqlite', 'mysql', 'pgsql') class EventTest(tests_db.TestBase): def _verify_data(self, trait, trait_table): @@ -101,19 +97,6 @@ class EventTest(tests_db.TestBase): model = models.Trait("Foo", models.Trait.DATETIME_TYPE, now) self._verify_data(model, sql_models.TraitDatetime) - def test_bad_event(self): - now = datetime.datetime.utcnow() - m = [models.Event("1", "Foo", now, [], {}), - models.Event("2", "Zoo", now, [], {})] - - with mock.patch.object(self.event_conn, - "_get_or_create_event_type") as mock_save: - mock_save.side_effect = MyException("Boom") - problem_events = self.event_conn.record_events(m) - self.assertEqual(2, len(problem_events)) - for bad, event in problem_events: - self.assertEqual(bad, models.Event.UNKNOWN_PROBLEM) - def test_event_repr(self): ev = sql_models.Event('msg_id', None, False, {}) ev.id = 100 diff --git a/ceilometer/tests/storage/test_storage_scenarios.py b/ceilometer/tests/storage/test_storage_scenarios.py index b0862f924..17f0954f6 100644 --- a/ceilometer/tests/storage/test_storage_scenarios.py +++ b/ceilometer/tests/storage/test_storage_scenarios.py @@ -3325,10 +3325,22 @@ class EventTest(EventTestBase): now = datetime.datetime.utcnow() m = [event_models.Event("1", "Foo", now, None, {}), event_models.Event("1", "Zoo", now, [], {})] - problem_events = self.event_conn.record_events(m) - self.assertEqual(1, len(problem_events)) - bad = problem_events[0] - self.assertEqual(event_models.Event.DUPLICATE, bad[0]) + with mock.patch('%s.LOG' % + self.event_conn.record_events.__module__) as log: + self.event_conn.record_events(m) + self.assertEqual(1, log.info.call_count) + + def test_bad_event(self): + now = datetime.datetime.utcnow() + broken_event = event_models.Event("1", "Foo", now, None, {}) + del(broken_event.__dict__['raw']) + m = [broken_event, broken_event] + with mock.patch('%s.LOG' % + self.event_conn.record_events.__module__) as log: + self.assertRaises(AttributeError, self.event_conn.record_events, m) + # ensure that record_events does not break on first error but + # delays exception and tries to record each event. + self.assertEqual(2, log.exception.call_count) class GetEventTest(EventTestBase): @@ -3634,10 +3646,9 @@ class GetEventTest(EventTestBase): def test_simple_get_event_no_traits(self): new_events = [event_models.Event("id_notraits", "NoTraits", self.start, [], {})] - bad_events = self.event_conn.record_events(new_events) + self.event_conn.record_events(new_events) event_filter = storage.EventFilter(self.start, self.end, "NoTraits") events = [event for event in self.event_conn.get_events(event_filter)] - self.assertEqual(0, len(bad_events)) self.assertEqual(1, len(events)) self.assertEqual("id_notraits", events[0].message_id) self.assertEqual("NoTraits", events[0].event_type) @@ -3654,10 +3665,9 @@ class GetEventTest(EventTestBase): self.start, [], {})] - bad_events = self.event_conn.record_events(new_events) + self.event_conn.record_events(new_events) event_filter = storage.EventFilter(message_id="id_testid") events = [event for event in self.event_conn.get_events(event_filter)] - self.assertEqual(0, len(bad_events)) self.assertEqual(1, len(events)) event = events[0] self.assertEqual("id_testid", event.message_id)