From 5f35319dfaa463d8d08b8ac1733b80a474cb86ab Mon Sep 17 00:00:00 2001 From: Sandy Walsh Date: Tue, 6 Aug 2013 17:00:56 -0300 Subject: [PATCH] Reject duplicate events When ack_on_error=False, there is a possibility that we could receieve the same message more than once. Reject those events. Change-Id: I3814a4222298d2fbc56a25e6e4540d01066ee42f --- ceilometer/collector/dispatcher/database.py | 6 +- ceilometer/collector/dispatcher/file.py | 1 + ceilometer/collector/service.py | 37 ++++++----- ceilometer/storage/impl_sqlalchemy.py | 37 ++++++++--- ceilometer/storage/models.py | 23 +++++-- .../versions/014_add_event_message_id.py | 62 +++++++++++++++++++ ceilometer/storage/sqlalchemy/models.py | 9 ++- tests/collector/dispatcher/test_db.py | 4 +- tests/collector/test_service.py | 40 ++++++++---- tests/storage/test_impl_sqlalchemy.py | 17 +++++ tests/storage/test_models.py | 4 ++ tests/storage/test_storage_scenarios.py | 20 ++++-- 12 files changed, 202 insertions(+), 58 deletions(-) create mode 100644 ceilometer/storage/sqlalchemy/migrate_repo/versions/014_add_event_message_id.py diff --git a/ceilometer/collector/dispatcher/database.py b/ceilometer/collector/dispatcher/database.py index 767393ea6..0837b1bbd 100644 --- a/ceilometer/collector/dispatcher/database.py +++ b/ceilometer/collector/dispatcher/database.py @@ -64,8 +64,7 @@ class DatabaseDispatcher(dispatcher.Base): meter['timestamp'] = timeutils.normalize_time(ts) self.storage_conn.record_metering_data(meter) except Exception as err: - LOG.error('Failed to record metering data: %s', err) - LOG.exception(err) + LOG.exception('Failed to record metering data: %s', err) else: LOG.warning( 'message signature invalid, discarding message: %r', @@ -74,4 +73,5 @@ class DatabaseDispatcher(dispatcher.Base): def record_events(self, events): if not isinstance(events, list): events = [events] - self.storage_conn.record_events(events) + + return self.storage_conn.record_events(events) diff --git a/ceilometer/collector/dispatcher/file.py b/ceilometer/collector/dispatcher/file.py index 295f88e5e..e4d6144f4 100644 --- a/ceilometer/collector/dispatcher/file.py +++ b/ceilometer/collector/dispatcher/file.py @@ -83,3 +83,4 @@ class FileDispatcher(dispatcher.Base): def record_events(self, events): if self.log: self.log.info(events) + return [] diff --git a/ceilometer/collector/service.py b/ceilometer/collector/service.py index f4a49f40b..033f1f0a2 100644 --- a/ceilometer/collector/service.py +++ b/ceilometer/collector/service.py @@ -21,7 +21,6 @@ from oslo.config import cfg import socket from stevedore import extension from stevedore import named -import sys from ceilometer.service import prepare_service from ceilometer.openstack.common import context @@ -108,6 +107,15 @@ def udp_collector(): os_service.launch(UDPCollectorService()).wait() +class UnableToSaveEventException(Exception): + """Thrown when we want to requeue an event. + + Any exception is fine, but this one should make debugging + a little easier. + """ + pass + + class CollectorService(rpc_service.Service): COLLECTOR_NAMESPACE = 'ceilometer.collector' @@ -213,41 +221,32 @@ class CollectorService(rpc_service.Service): delivery_info, which is critical to determining the source of the notification. This will have to get added back later. """ + message_id = body.get('message_id') event_name = body['event_type'] when = self._extract_when(body) - LOG.debug('Saving event "%s"', event_name) - message_id = body.get('message_id') - - # TODO(sandy) - check we have not already saved this notification. - # (possible on retries) Use message_id to spot dups. publisher = body.get('publisher_id') request_id = body.get('_context_request_id') tenant_id = body.get('_context_tenant') text = models.Trait.TEXT_TYPE - all_traits = [models.Trait('message_id', text, message_id), - models.Trait('service', text, publisher), + all_traits = [models.Trait('service', text, publisher), models.Trait('request_id', text, request_id), models.Trait('tenant_id', text, tenant_id), ] # Only store non-None value traits ... traits = [trait for trait in all_traits if trait.value is not None] - event = models.Event(event_name, when, traits) + event = models.Event(message_id, event_name, when, traits) - exc_info = None + problem_events = [] for dispatcher in self.dispatcher_manager: - try: - dispatcher.obj.record_events(event) - except Exception: - LOG.exception('Error while saving events with dispatcher %s', - dispatcher) - exc_info = sys.exc_info() - # Don't ack the message if any of the dispatchers fail - if exc_info: - raise exc_info[1], None, exc_info[2] + problem_events.extend(dispatcher.obj.record_events(event)) + if models.Event.UNKNOWN_PROBLEM in [x[0] for x in problem_events]: + # Don't ack the message, raise to requeue it + # if ack_on_error = False + raise UnableToSaveEventException() @staticmethod def _record_metering_data_for_ext(ext, context, data): diff --git a/ceilometer/storage/impl_sqlalchemy.py b/ceilometer/storage/impl_sqlalchemy.py index b2db5be89..96cb2241c 100644 --- a/ceilometer/storage/impl_sqlalchemy.py +++ b/ceilometer/storage/impl_sqlalchemy.py @@ -26,10 +26,11 @@ from sqlalchemy import func from sqlalchemy import desc from sqlalchemy.orm import aliased +from ceilometer.openstack.common.db import exception as dbexc +import ceilometer.openstack.common.db.sqlalchemy.session as sqlalchemy_session from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import log from ceilometer.openstack.common import timeutils -import ceilometer.openstack.common.db.sqlalchemy.session as sqlalchemy_session from ceilometer.storage import base from ceilometer.storage import models as api_models from ceilometer.storage.sqlalchemy import migration @@ -685,7 +686,7 @@ class Connection(base.Connection): session=session) generated = utils.dt_to_decimal(event_model.generated) - event = Event(unique, generated) + event = Event(event_model.message_id, unique, generated) session.add(event) new_traits = [] @@ -704,23 +705,40 @@ class Connection(base.Connection): :param event_models: a list of model.Event objects. - Flush when they're all added, unless new UniqueNames are - added along the way. + Returns a list of events that could not be saved in a + (reason, event) tuple. Reasons are enumerated in + storage.model.Event """ session = sqlalchemy_session.get_session() - with session.begin(): - events = [self._record_event(session, event_model) - for event_model in event_models] - session.flush() + events = [] + problem_events = [] + for event_model in event_models: + event = None + try: + with session.begin(): + event = self._record_event(session, event_model) + session.flush() + except dbexc.DBDuplicateEntry: + problem_events.append((api_models.Event.DUPLICATE, + event_model)) + except Exception as e: + LOG.exception('Failed to record event: %s', e) + problem_events.append((api_models.Event.UNKNOWN_PROBLEM, + event_model)) + events.append(event) # Update the models with the underlying DB ID. for model, actual in zip(event_models, events): + if not actual: + continue actual_event, actual_traits = actual model.id = actual_event.id if model.traits and actual_traits: for trait, actual_trait in zip(model.traits, actual_traits): trait.id = actual_trait.id + return problem_events + def get_events(self, event_filter): """Return an iterable of model.Event objects. @@ -765,7 +783,8 @@ class Connection(base.Connection): event = event_models_dict.get(trait.event_id) if not event: generated = utils.decimal_to_dt(trait.event.generated) - event = api_models.Event(trait.event.unique_name.key, + event = api_models.Event(trait.event.message_id, + trait.event.unique_name.key, generated, []) event_models_dict[trait.event_id] = event value = trait.get_value() diff --git a/ceilometer/storage/models.py b/ceilometer/storage/models.py index 77f6936e0..47f39a963 100644 --- a/ceilometer/storage/models.py +++ b/ceilometer/storage/models.py @@ -48,23 +48,34 @@ class Event(Model): Metrics will be derived from one or more Events. """ - def __init__(self, event_name, generated, traits): + + DUPLICATE = 1 + UNKNOWN_PROBLEM = 2 + + def __init__(self, message_id, event_name, generated, traits): """Create a new event. + :param message_id: Unique ID for the message this event + stemmed from. This is different than + the Event ID, which comes from the + underlying storage system. :param event_name: Name of the event. :param generated: UTC time for when the event occured. :param traits: list of Traits on this Event. """ - Model.__init__(self, event_name=event_name, generated=generated, - traits=traits) + Model.__init__(self, message_id=message_id, event_name=event_name, + generated=generated, traits=traits) def append_trait(self, trait_model): self.traits.append(trait_model) def __repr__(self): - trait_list = [str(trait) for trait in self.traits] - return "" % \ - (self.event_name, self.generated, " ".join(trait_list)) + trait_list = [] + if self.traits: + trait_list = [str(trait) for trait in self.traits] + return "" % \ + (self.message_id, self.event_name, self.generated, + " ".join(trait_list)) class Trait(Model): diff --git a/ceilometer/storage/sqlalchemy/migrate_repo/versions/014_add_event_message_id.py b/ceilometer/storage/sqlalchemy/migrate_repo/versions/014_add_event_message_id.py new file mode 100644 index 000000000..e243fc2e2 --- /dev/null +++ b/ceilometer/storage/sqlalchemy/migrate_repo/versions/014_add_event_message_id.py @@ -0,0 +1,62 @@ +# -*- encoding: utf-8 -*- +# +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import sqlalchemy +from migrate.changeset.constraint import UniqueConstraint + +from ceilometer.storage.sqlalchemy import models + + +def upgrade(migrate_engine): + meta = sqlalchemy.MetaData(bind=migrate_engine) + + event = sqlalchemy.Table('event', meta, autoload=True) + message_id = sqlalchemy.Column('message_id', sqlalchemy.String(50)) + event.create_column(message_id) + + cons = UniqueConstraint('message_id', table=event) + cons.create() + + index = sqlalchemy.Index('idx_event_message_id', models.Event.message_id) + index.create(bind=migrate_engine) + + # Populate the new column ... + trait = sqlalchemy.Table('trait', meta, autoload=True) + unique_name = sqlalchemy.Table('unique_name', meta, autoload=True) + join = trait.join(unique_name, unique_name.c.id == trait.c.name_id) + traits = sqlalchemy.select([trait.c.event_id, trait.c.t_string], + whereclause=(unique_name.c.key == 'message_id'), + from_obj=join) + + for event_id, value in traits.execute(): + event.update().\ + where(event.c.id == event_id).\ + values(message_id=value).\ + execute() + + # Leave the Trait, makes the rollback easier and won't really hurt anyone. + + +def downgrade(migrate_engine): + meta = sqlalchemy.MetaData(bind=migrate_engine) + event = sqlalchemy.Table('event', meta, autoload=True) + message_id = sqlalchemy.Column('message_id', sqlalchemy.String(50)) + event.drop_column(message_id) + + cons = UniqueConstraint('message_id', table=event) + cons.drop() + + index = sqlalchemy.Index('idx_event_message_id', models.Event.message_id) + index.drop(bind=migrate_engine) diff --git a/ceilometer/storage/sqlalchemy/models.py b/ceilometer/storage/sqlalchemy/models.py index e07ac6d7e..55cbe734d 100644 --- a/ceilometer/storage/sqlalchemy/models.py +++ b/ceilometer/storage/sqlalchemy/models.py @@ -226,22 +226,25 @@ class Event(Base): __tablename__ = 'event' __table_args__ = ( Index('unique_name_id', 'unique_name_id'), + Index('ix_event_message_id', 'message_id'), Index('ix_event_generated', 'generated'), ) id = Column(Integer, primary_key=True) + message_id = Column(String(50), unique=True) generated = Column(Float(asdecimal=True)) unique_name_id = Column(Integer, ForeignKey('unique_name.id')) unique_name = relationship("UniqueName", backref=backref('unique_name', order_by=id)) - def __init__(self, event, generated): + def __init__(self, message_id, event, generated): + self.message_id = message_id self.unique_name = event self.generated = generated def __repr__(self): - return "" % \ - (self.id, self.unique_name, self.generated) + return "" % \ + (self.id, self.message_id, self.unique_name, self.generated) class Trait(Base): diff --git a/tests/collector/dispatcher/test_db.py b/tests/collector/dispatcher/test_db.py index 16e1d47a5..a538a5df9 100644 --- a/tests/collector/dispatcher/test_db.py +++ b/tests/collector/dispatcher/test_db.py @@ -17,13 +17,13 @@ # under the License. """Tests for ceilometer/collector/dispatcher/database.py """ -from oslo.config import cfg from datetime import datetime +from oslo.config import cfg from ceilometer.collector.dispatcher import database from ceilometer.publisher import rpc -from ceilometer.tests import base as tests_base from ceilometer.storage import base +from ceilometer.tests import base as tests_base class TestDispatcherDB(tests_base.TestCase): diff --git a/tests/collector/test_service.py b/tests/collector/test_service.py index 949d9b921..d877fe59a 100644 --- a/tests/collector/test_service.py +++ b/tests/collector/test_service.py @@ -28,12 +28,13 @@ from oslo.config import cfg from stevedore import extension from stevedore.tests import manager as test_manager -from ceilometer import sample -from ceilometer.openstack.common import timeutils from ceilometer.collector import service -from ceilometer.storage import base -from ceilometer.tests import base as tests_base from ceilometer.compute import notifications +from ceilometer.openstack.common import timeutils +from ceilometer import sample +from ceilometer.storage import base +from ceilometer.storage import models +from ceilometer.tests import base as tests_base TEST_NOTICE = { @@ -224,7 +225,9 @@ class TestCollectorService(TestCollector): def test_message_to_event_missing_keys(self): now = timeutils.utcnow() timeutils.set_time_override(now) - message = {'event_type': "foo", 'message_id': "abc"} + message = {'event_type': "foo", + 'message_id': "abc", + 'publisher_id': "1"} mock_dispatcher = MagicMock() self.srv.dispatcher_manager = test_manager.TestExtensionManager( @@ -245,7 +248,7 @@ class TestCollectorService(TestCollector): self.assertEqual(now, event.generated) self.assertEqual(1, len(event.traits)) - def test_message_to_event_bad_save(self): + def test_message_to_event_duplicate(self): cfg.CONF.set_override("store_events", True, group="collector") mock_dispatcher = MagicMock() self.srv.dispatcher_manager = test_manager.TestExtensionManager( @@ -255,13 +258,26 @@ class TestCollectorService(TestCollector): mock_dispatcher ), ]) - mock_dispatcher.record_events.side_effect = MyException("Boom") + mock_dispatcher.record_events.return_value = [ + (models.Event.DUPLICATE, object())] message = {'event_type': "foo", 'message_id': "abc"} - try: - self.srv._message_to_event(message) - self.fail("failing save should raise") - except Exception: - pass + self.srv._message_to_event(message) # Should return silently. + + def test_message_to_event_bad_event(self): + cfg.CONF.set_override("store_events", True, group="collector") + mock_dispatcher = MagicMock() + self.srv.dispatcher_manager = test_manager.TestExtensionManager( + [extension.Extension('test', + None, + None, + mock_dispatcher + ), + ]) + mock_dispatcher.record_events.return_value = [ + (models.Event.UNKNOWN_PROBLEM, object())] + message = {'event_type': "foo", 'message_id': "abc"} + self.assertRaises(service.UnableToSaveEventException, + self.srv._message_to_event, message) def test_extract_when(self): now = timeutils.utcnow() diff --git a/tests/storage/test_impl_sqlalchemy.py b/tests/storage/test_impl_sqlalchemy.py index 17218ab1b..6b78d95e7 100644 --- a/tests/storage/test_impl_sqlalchemy.py +++ b/tests/storage/test_impl_sqlalchemy.py @@ -24,6 +24,7 @@ """ import datetime +from mock import patch from ceilometer.storage import models from ceilometer.storage.sqlalchemy.models import table_args @@ -56,6 +57,10 @@ class UniqueNameTest(EventTestBase): self.assertNotEqual(u1.key, u2.key) +class MyException(Exception): + pass + + class EventTest(EventTestBase): def test_string_traits(self): model = models.Trait("Foo", models.Trait.TEXT_TYPE, "my_text") @@ -98,6 +103,18 @@ class EventTest(EventTestBase): self.assertEqual(trait.t_datetime, utils.dt_to_decimal(now)) self.assertIsNotNone(trait.name) + def test_bad_event(self): + now = datetime.datetime.utcnow() + m = [models.Event("1", "Foo", now, []), + models.Event("2", "Zoo", now, [])] + + with patch.object(self.conn, "_record_event") as mock_save: + mock_save.side_effect = MyException("Boom") + problem_events = self.conn.record_events(m) + self.assertEquals(2, len(problem_events)) + for bad, event in problem_events: + self.assertEquals(models.Event.UNKNOWN_PROBLEM, bad) + class ModelTest(tests_db.TestBase): database_connection = 'mysql://localhost' diff --git a/tests/storage/test_models.py b/tests/storage/test_models.py index 752c1141e..ab53455a9 100644 --- a/tests/storage/test_models.py +++ b/tests/storage/test_models.py @@ -50,3 +50,7 @@ class ModelTest(base.TestCase): self.assertEqual(d, {'arg1': 1, 'arg2': [{'arg1': 'a', 'arg2': 'b'}]}) + + def test_event_repr_no_traits(self): + x = models.Event("1", "name", "now", None) + self.assertEquals("", repr(x)) diff --git a/tests/storage/test_storage_scenarios.py b/tests/storage/test_storage_scenarios.py index 5a36566a8..fee0022bd 100644 --- a/tests/storage/test_storage_scenarios.py +++ b/tests/storage/test_storage_scenarios.py @@ -28,9 +28,9 @@ from ceilometer.publisher import rpc from ceilometer.openstack.common import timeutils from ceilometer import sample from ceilometer import storage -from ceilometer.tests import db as tests_db from ceilometer.storage import models from ceilometer.storage.base import Pagination +from ceilometer.tests import db as tests_db load_tests = testscenarios.load_tests_apply_scenarios @@ -1673,9 +1673,19 @@ class EventTestBase(tests_db.TestBase, class EventTest(EventTestBase): + def test_duplicate_message_id(self): + now = datetime.datetime.utcnow() + m = [models.Event("1", "Foo", now, None), + models.Event("1", "Zoo", now, [])] + problem_events = self.conn.record_events(m) + self.assertEquals(1, len(problem_events)) + bad = problem_events[0] + self.assertEquals(models.Event.DUPLICATE, bad[0]) + def test_save_events_no_traits(self): now = datetime.datetime.utcnow() - m = [models.Event("Foo", now, None), models.Event("Zoo", now, [])] + m = [models.Event("1", "Foo", now, None), + models.Event("2", "Zoo", now, [])] self.conn.record_events(m) for model in m: self.assertTrue(model.id >= 0) @@ -1693,7 +1703,8 @@ class EventTest(EventTestBase): ('trait_C', models.Trait.FLOAT_TYPE, 1.23456), ('trait_D', models.Trait.DATETIME_TYPE, now)]] event_models.append( - models.Event(event_name, now, trait_models)) + models.Event("id_%s" % event_name, + event_name, now, trait_models)) self.conn.record_events(event_models) for model in event_models: @@ -1719,7 +1730,8 @@ class GetEventTest(EventTestBase): float(base) + 0.123456), ('trait_D', models.Trait.DATETIME_TYPE, now)]] event_models.append( - models.Event(event_name, now, trait_models)) + models.Event("id_%s" % event_name, + event_name, now, trait_models)) base += 100 now = now + datetime.timedelta(hours=1) self.end = now