Reject duplicate events

When ack_on_error=False, there is a possibility that we
could receieve the same message more than once. Reject those events.

Change-Id: I3814a4222298d2fbc56a25e6e4540d01066ee42f
This commit is contained in:
Sandy Walsh 2013-08-06 17:00:56 -03:00
parent 62886e7e1e
commit 5f35319dfa
12 changed files with 202 additions and 58 deletions

View File

@ -64,8 +64,7 @@ class DatabaseDispatcher(dispatcher.Base):
meter['timestamp'] = timeutils.normalize_time(ts)
self.storage_conn.record_metering_data(meter)
except Exception as err:
LOG.error('Failed to record metering data: %s', err)
LOG.exception(err)
LOG.exception('Failed to record metering data: %s', err)
else:
LOG.warning(
'message signature invalid, discarding message: %r',
@ -74,4 +73,5 @@ class DatabaseDispatcher(dispatcher.Base):
def record_events(self, events):
if not isinstance(events, list):
events = [events]
self.storage_conn.record_events(events)
return self.storage_conn.record_events(events)

View File

@ -83,3 +83,4 @@ class FileDispatcher(dispatcher.Base):
def record_events(self, events):
if self.log:
self.log.info(events)
return []

View File

@ -21,7 +21,6 @@ from oslo.config import cfg
import socket
from stevedore import extension
from stevedore import named
import sys
from ceilometer.service import prepare_service
from ceilometer.openstack.common import context
@ -108,6 +107,15 @@ def udp_collector():
os_service.launch(UDPCollectorService()).wait()
class UnableToSaveEventException(Exception):
"""Thrown when we want to requeue an event.
Any exception is fine, but this one should make debugging
a little easier.
"""
pass
class CollectorService(rpc_service.Service):
COLLECTOR_NAMESPACE = 'ceilometer.collector'
@ -213,41 +221,32 @@ class CollectorService(rpc_service.Service):
delivery_info, which is critical to determining the
source of the notification. This will have to get added back later.
"""
message_id = body.get('message_id')
event_name = body['event_type']
when = self._extract_when(body)
LOG.debug('Saving event "%s"', event_name)
message_id = body.get('message_id')
# TODO(sandy) - check we have not already saved this notification.
# (possible on retries) Use message_id to spot dups.
publisher = body.get('publisher_id')
request_id = body.get('_context_request_id')
tenant_id = body.get('_context_tenant')
text = models.Trait.TEXT_TYPE
all_traits = [models.Trait('message_id', text, message_id),
models.Trait('service', text, publisher),
all_traits = [models.Trait('service', text, publisher),
models.Trait('request_id', text, request_id),
models.Trait('tenant_id', text, tenant_id),
]
# Only store non-None value traits ...
traits = [trait for trait in all_traits if trait.value is not None]
event = models.Event(event_name, when, traits)
event = models.Event(message_id, event_name, when, traits)
exc_info = None
problem_events = []
for dispatcher in self.dispatcher_manager:
try:
dispatcher.obj.record_events(event)
except Exception:
LOG.exception('Error while saving events with dispatcher %s',
dispatcher)
exc_info = sys.exc_info()
# Don't ack the message if any of the dispatchers fail
if exc_info:
raise exc_info[1], None, exc_info[2]
problem_events.extend(dispatcher.obj.record_events(event))
if models.Event.UNKNOWN_PROBLEM in [x[0] for x in problem_events]:
# Don't ack the message, raise to requeue it
# if ack_on_error = False
raise UnableToSaveEventException()
@staticmethod
def _record_metering_data_for_ext(ext, context, data):

View File

@ -26,10 +26,11 @@ from sqlalchemy import func
from sqlalchemy import desc
from sqlalchemy.orm import aliased
from ceilometer.openstack.common.db import exception as dbexc
import ceilometer.openstack.common.db.sqlalchemy.session as sqlalchemy_session
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import log
from ceilometer.openstack.common import timeutils
import ceilometer.openstack.common.db.sqlalchemy.session as sqlalchemy_session
from ceilometer.storage import base
from ceilometer.storage import models as api_models
from ceilometer.storage.sqlalchemy import migration
@ -685,7 +686,7 @@ class Connection(base.Connection):
session=session)
generated = utils.dt_to_decimal(event_model.generated)
event = Event(unique, generated)
event = Event(event_model.message_id, unique, generated)
session.add(event)
new_traits = []
@ -704,23 +705,40 @@ class Connection(base.Connection):
:param event_models: a list of model.Event objects.
Flush when they're all added, unless new UniqueNames are
added along the way.
Returns a list of events that could not be saved in a
(reason, event) tuple. Reasons are enumerated in
storage.model.Event
"""
session = sqlalchemy_session.get_session()
events = []
problem_events = []
for event_model in event_models:
event = None
try:
with session.begin():
events = [self._record_event(session, event_model)
for event_model in event_models]
event = self._record_event(session, event_model)
session.flush()
except dbexc.DBDuplicateEntry:
problem_events.append((api_models.Event.DUPLICATE,
event_model))
except Exception as e:
LOG.exception('Failed to record event: %s', e)
problem_events.append((api_models.Event.UNKNOWN_PROBLEM,
event_model))
events.append(event)
# Update the models with the underlying DB ID.
for model, actual in zip(event_models, events):
if not actual:
continue
actual_event, actual_traits = actual
model.id = actual_event.id
if model.traits and actual_traits:
for trait, actual_trait in zip(model.traits, actual_traits):
trait.id = actual_trait.id
return problem_events
def get_events(self, event_filter):
"""Return an iterable of model.Event objects.
@ -765,7 +783,8 @@ class Connection(base.Connection):
event = event_models_dict.get(trait.event_id)
if not event:
generated = utils.decimal_to_dt(trait.event.generated)
event = api_models.Event(trait.event.unique_name.key,
event = api_models.Event(trait.event.message_id,
trait.event.unique_name.key,
generated, [])
event_models_dict[trait.event_id] = event
value = trait.get_value()

View File

@ -48,23 +48,34 @@ class Event(Model):
Metrics will be derived from one or more Events.
"""
def __init__(self, event_name, generated, traits):
DUPLICATE = 1
UNKNOWN_PROBLEM = 2
def __init__(self, message_id, event_name, generated, traits):
"""Create a new event.
:param message_id: Unique ID for the message this event
stemmed from. This is different than
the Event ID, which comes from the
underlying storage system.
:param event_name: Name of the event.
:param generated: UTC time for when the event occured.
:param traits: list of Traits on this Event.
"""
Model.__init__(self, event_name=event_name, generated=generated,
traits=traits)
Model.__init__(self, message_id=message_id, event_name=event_name,
generated=generated, traits=traits)
def append_trait(self, trait_model):
self.traits.append(trait_model)
def __repr__(self):
trait_list = []
if self.traits:
trait_list = [str(trait) for trait in self.traits]
return "<Event: %s, %s %s>" % \
(self.event_name, self.generated, " ".join(trait_list))
return "<Event: %s, %s, %s, %s>" % \
(self.message_id, self.event_name, self.generated,
" ".join(trait_list))
class Trait(Model):

View File

@ -0,0 +1,62 @@
# -*- encoding: utf-8 -*-
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sqlalchemy
from migrate.changeset.constraint import UniqueConstraint
from ceilometer.storage.sqlalchemy import models
def upgrade(migrate_engine):
meta = sqlalchemy.MetaData(bind=migrate_engine)
event = sqlalchemy.Table('event', meta, autoload=True)
message_id = sqlalchemy.Column('message_id', sqlalchemy.String(50))
event.create_column(message_id)
cons = UniqueConstraint('message_id', table=event)
cons.create()
index = sqlalchemy.Index('idx_event_message_id', models.Event.message_id)
index.create(bind=migrate_engine)
# Populate the new column ...
trait = sqlalchemy.Table('trait', meta, autoload=True)
unique_name = sqlalchemy.Table('unique_name', meta, autoload=True)
join = trait.join(unique_name, unique_name.c.id == trait.c.name_id)
traits = sqlalchemy.select([trait.c.event_id, trait.c.t_string],
whereclause=(unique_name.c.key == 'message_id'),
from_obj=join)
for event_id, value in traits.execute():
event.update().\
where(event.c.id == event_id).\
values(message_id=value).\
execute()
# Leave the Trait, makes the rollback easier and won't really hurt anyone.
def downgrade(migrate_engine):
meta = sqlalchemy.MetaData(bind=migrate_engine)
event = sqlalchemy.Table('event', meta, autoload=True)
message_id = sqlalchemy.Column('message_id', sqlalchemy.String(50))
event.drop_column(message_id)
cons = UniqueConstraint('message_id', table=event)
cons.drop()
index = sqlalchemy.Index('idx_event_message_id', models.Event.message_id)
index.drop(bind=migrate_engine)

View File

@ -226,22 +226,25 @@ class Event(Base):
__tablename__ = 'event'
__table_args__ = (
Index('unique_name_id', 'unique_name_id'),
Index('ix_event_message_id', 'message_id'),
Index('ix_event_generated', 'generated'),
)
id = Column(Integer, primary_key=True)
message_id = Column(String(50), unique=True)
generated = Column(Float(asdecimal=True))
unique_name_id = Column(Integer, ForeignKey('unique_name.id'))
unique_name = relationship("UniqueName", backref=backref('unique_name',
order_by=id))
def __init__(self, event, generated):
def __init__(self, message_id, event, generated):
self.message_id = message_id
self.unique_name = event
self.generated = generated
def __repr__(self):
return "<Event %d('Event: %s, Generated: %s')>" % \
(self.id, self.unique_name, self.generated)
return "<Event %d('Event: %s %s, Generated: %s')>" % \
(self.id, self.message_id, self.unique_name, self.generated)
class Trait(Base):

View File

@ -17,13 +17,13 @@
# under the License.
"""Tests for ceilometer/collector/dispatcher/database.py
"""
from oslo.config import cfg
from datetime import datetime
from oslo.config import cfg
from ceilometer.collector.dispatcher import database
from ceilometer.publisher import rpc
from ceilometer.tests import base as tests_base
from ceilometer.storage import base
from ceilometer.tests import base as tests_base
class TestDispatcherDB(tests_base.TestCase):

View File

@ -28,12 +28,13 @@ from oslo.config import cfg
from stevedore import extension
from stevedore.tests import manager as test_manager
from ceilometer import sample
from ceilometer.openstack.common import timeutils
from ceilometer.collector import service
from ceilometer.storage import base
from ceilometer.tests import base as tests_base
from ceilometer.compute import notifications
from ceilometer.openstack.common import timeutils
from ceilometer import sample
from ceilometer.storage import base
from ceilometer.storage import models
from ceilometer.tests import base as tests_base
TEST_NOTICE = {
@ -224,7 +225,9 @@ class TestCollectorService(TestCollector):
def test_message_to_event_missing_keys(self):
now = timeutils.utcnow()
timeutils.set_time_override(now)
message = {'event_type': "foo", 'message_id': "abc"}
message = {'event_type': "foo",
'message_id': "abc",
'publisher_id': "1"}
mock_dispatcher = MagicMock()
self.srv.dispatcher_manager = test_manager.TestExtensionManager(
@ -245,7 +248,7 @@ class TestCollectorService(TestCollector):
self.assertEqual(now, event.generated)
self.assertEqual(1, len(event.traits))
def test_message_to_event_bad_save(self):
def test_message_to_event_duplicate(self):
cfg.CONF.set_override("store_events", True, group="collector")
mock_dispatcher = MagicMock()
self.srv.dispatcher_manager = test_manager.TestExtensionManager(
@ -255,13 +258,26 @@ class TestCollectorService(TestCollector):
mock_dispatcher
),
])
mock_dispatcher.record_events.side_effect = MyException("Boom")
mock_dispatcher.record_events.return_value = [
(models.Event.DUPLICATE, object())]
message = {'event_type': "foo", 'message_id': "abc"}
try:
self.srv._message_to_event(message)
self.fail("failing save should raise")
except Exception:
pass
self.srv._message_to_event(message) # Should return silently.
def test_message_to_event_bad_event(self):
cfg.CONF.set_override("store_events", True, group="collector")
mock_dispatcher = MagicMock()
self.srv.dispatcher_manager = test_manager.TestExtensionManager(
[extension.Extension('test',
None,
None,
mock_dispatcher
),
])
mock_dispatcher.record_events.return_value = [
(models.Event.UNKNOWN_PROBLEM, object())]
message = {'event_type': "foo", 'message_id': "abc"}
self.assertRaises(service.UnableToSaveEventException,
self.srv._message_to_event, message)
def test_extract_when(self):
now = timeutils.utcnow()

View File

@ -24,6 +24,7 @@
"""
import datetime
from mock import patch
from ceilometer.storage import models
from ceilometer.storage.sqlalchemy.models import table_args
@ -56,6 +57,10 @@ class UniqueNameTest(EventTestBase):
self.assertNotEqual(u1.key, u2.key)
class MyException(Exception):
pass
class EventTest(EventTestBase):
def test_string_traits(self):
model = models.Trait("Foo", models.Trait.TEXT_TYPE, "my_text")
@ -98,6 +103,18 @@ class EventTest(EventTestBase):
self.assertEqual(trait.t_datetime, utils.dt_to_decimal(now))
self.assertIsNotNone(trait.name)
def test_bad_event(self):
now = datetime.datetime.utcnow()
m = [models.Event("1", "Foo", now, []),
models.Event("2", "Zoo", now, [])]
with patch.object(self.conn, "_record_event") as mock_save:
mock_save.side_effect = MyException("Boom")
problem_events = self.conn.record_events(m)
self.assertEquals(2, len(problem_events))
for bad, event in problem_events:
self.assertEquals(models.Event.UNKNOWN_PROBLEM, bad)
class ModelTest(tests_db.TestBase):
database_connection = 'mysql://localhost'

View File

@ -50,3 +50,7 @@ class ModelTest(base.TestCase):
self.assertEqual(d, {'arg1': 1,
'arg2': [{'arg1': 'a',
'arg2': 'b'}]})
def test_event_repr_no_traits(self):
x = models.Event("1", "name", "now", None)
self.assertEquals("<Event: 1, name, now, >", repr(x))

View File

@ -28,9 +28,9 @@ from ceilometer.publisher import rpc
from ceilometer.openstack.common import timeutils
from ceilometer import sample
from ceilometer import storage
from ceilometer.tests import db as tests_db
from ceilometer.storage import models
from ceilometer.storage.base import Pagination
from ceilometer.tests import db as tests_db
load_tests = testscenarios.load_tests_apply_scenarios
@ -1673,9 +1673,19 @@ class EventTestBase(tests_db.TestBase,
class EventTest(EventTestBase):
def test_duplicate_message_id(self):
now = datetime.datetime.utcnow()
m = [models.Event("1", "Foo", now, None),
models.Event("1", "Zoo", now, [])]
problem_events = self.conn.record_events(m)
self.assertEquals(1, len(problem_events))
bad = problem_events[0]
self.assertEquals(models.Event.DUPLICATE, bad[0])
def test_save_events_no_traits(self):
now = datetime.datetime.utcnow()
m = [models.Event("Foo", now, None), models.Event("Zoo", now, [])]
m = [models.Event("1", "Foo", now, None),
models.Event("2", "Zoo", now, [])]
self.conn.record_events(m)
for model in m:
self.assertTrue(model.id >= 0)
@ -1693,7 +1703,8 @@ class EventTest(EventTestBase):
('trait_C', models.Trait.FLOAT_TYPE, 1.23456),
('trait_D', models.Trait.DATETIME_TYPE, now)]]
event_models.append(
models.Event(event_name, now, trait_models))
models.Event("id_%s" % event_name,
event_name, now, trait_models))
self.conn.record_events(event_models)
for model in event_models:
@ -1719,7 +1730,8 @@ class GetEventTest(EventTestBase):
float(base) + 0.123456),
('trait_D', models.Trait.DATETIME_TYPE, now)]]
event_models.append(
models.Event(event_name, now, trait_models))
models.Event("id_%s" % event_name,
event_name, now, trait_models))
base += 100
now = now + datetime.timedelta(hours=1)
self.end = now