Optionally store Events in Collector.

Blueprint collector-stores-events

This will store the raw event in the Events table if enabled.

There are a few caveats:
1. It seems somewhere in the rpc layer is eating any exception
   being raised from the notification handler, so the ack() handling
   code isn't being reached. This will need to be fixed in oslo.

2. notification delivery_info is being stripped from the notification
   before the handler is called. This means we can't get any routing
   info on the notification (which queue did it come in on, for example).
   Again, this will need to be fixed in oslo.

3. The raw json of the event is not currently stored. The model will need
   to be extended to support this. Next.

Change-Id: Id4687e075e04278d1db6e8acc805c3fed2bd07bb
This commit is contained in:
Sandy Walsh 2013-07-04 15:52:46 -03:00
parent ced2b691c1
commit eabb5624ec
3 changed files with 153 additions and 13 deletions

View File

@ -16,22 +16,25 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
import msgpack
from oslo.config import cfg
import socket
from stevedore import extension
from ceilometer.publisher import rpc as publisher_rpc
from ceilometer.service import prepare_service
from ceilometer.openstack.common import context
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import log
from ceilometer.openstack.common import service as os_service
from ceilometer.openstack.common.rpc import dispatcher as rpc_dispatcher
from ceilometer.openstack.common.rpc import service as rpc_service
from ceilometer.openstack.common import timeutils
from ceilometer import pipeline
from ceilometer import storage
from ceilometer.storage import models
from ceilometer import transformer
OPTS = [
@ -42,6 +45,12 @@ OPTS = [
cfg.IntOpt('udp_port',
default=4952,
help='port to bind the UDP socket to'),
cfg.BoolOpt('ack_on_event_error',
default=True,
help='Acknowledge message when event persistence fails'),
cfg.BoolOpt('store_events',
default=False,
help='Save event details'),
]
cfg.CONF.register_opts(OPTS, group="collector")
@ -141,8 +150,11 @@ class CollectorService(rpc_service.Service):
def _setup_subscription(self, ext, *args, **kwds):
handler = ext.obj
LOG.debug('Event types from %s: %s',
ext.name, ', '.join(handler.get_event_types()))
ack_on_error = cfg.CONF.collector.ack_on_event_error
LOG.debug('Event types from %s: %s (ack_on_error=%s)',
ext.name, ', '.join(handler.get_event_types()),
ack_on_error)
for exchange_topic in handler.get_exchange_topics(cfg.CONF):
for topic in exchange_topic.topics:
try:
@ -151,7 +163,7 @@ class CollectorService(rpc_service.Service):
pool_name='ceilometer.notifications',
topic=topic,
exchange_name=exchange_topic.exchange,
)
ack_on_error=ack_on_error)
except Exception:
LOG.exception('Could not join consumer pool %s/%s' %
(topic, exchange_topic.exchange))
@ -160,8 +172,60 @@ class CollectorService(rpc_service.Service):
"""Make a notification processed by an handler."""
LOG.debug('notification %r', notification.get('event_type'))
self.notification_manager.map(self._process_notification_for_ext,
notification=notification,
)
notification=notification)
if cfg.CONF.collector.store_events:
self._message_to_event(notification)
@staticmethod
def _extract_when(body):
"""Extract the generated datetime from the notification.
"""
when = body.get('timestamp', body.get('_context_timestamp'))
if when:
return timeutils.normalize_time(timeutils.parse_isotime(when))
return timeutils.utcnow()
def _message_to_event(self, body):
"""Convert message to Ceilometer Event.
NOTE: this is currently based on the Nova notification format.
We will need to make this driver-based to support other formats.
NOTE: the rpc layer currently rips out the notification
delivery_info, which is critical to determining the
source of the notification. This will have to get added back later.
"""
event_name = body['event_type']
when = self._extract_when(body)
LOG.debug('Saving event "%s"', event_name)
message_id = body.get('message_id')
# TODO(sandy) - check we have not already saved this notification.
# (possible on retries) Use message_id to spot dups.
publisher = body.get('publisher_id')
request_id = body.get('_context_request_id')
tenant_id = body.get('_context_tenant')
text = models.Trait.TEXT_TYPE
all_traits = [models.Trait('message_id', text, message_id),
models.Trait('service', text, publisher),
models.Trait('request_id', text, request_id),
models.Trait('tenant_id', text, tenant_id),
]
# Only store non-None value traits ...
traits = [trait for trait in all_traits if trait.value is not None]
event = models.Event(event_name, when, traits)
try:
self.storage_conn.record_events([event, ])
except Exception as err:
LOG.exception(_("Unable to store events: %s"), err)
# By re-raising we avoid ack()'ing the message.
raise
def _process_notification_for_ext(self, ext, notification):
handler = ext.obj

View File

@ -597,6 +597,13 @@
# port to bind the UDP socket to (integer value)
#udp_port=4952
# Acknowledge message when event persistence fails (boolean
# value)
#ack_on_event_error=true
# Save event details (boolean value)
#store_events=false
[matchmaker_ring]
@ -624,4 +631,4 @@
#password=<None>
# Total option count: 119
# Total option count: 121

View File

@ -15,10 +15,10 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for ceilometer/agent/manager.py
"""Tests for ceilometer/agent/service.py
"""
from datetime import datetime
import datetime
import msgpack
import socket
@ -29,6 +29,7 @@ from stevedore import extension
from stevedore.tests import manager as test_manager
from ceilometer import counter
from ceilometer.openstack.common import timeutils
from ceilometer.publisher import rpc
from ceilometer.collector import service
from ceilometer.storage import base
@ -164,6 +165,10 @@ class TestUDPCollectorService(TestCollector):
self.srv.start()
class MyException(Exception):
pass
class TestCollectorService(TestCollector):
def setUp(self):
@ -174,7 +179,7 @@ class TestCollectorService(TestCollector):
@patch('ceilometer.pipeline.setup_pipeline', MagicMock())
def test_init_host(self):
# If we try to create a real RPC connection, init_host() never
# returns. Mock it out so we can establish the manager
# returns. Mock it out so we can establish the service
# configuration.
with patch('ceilometer.openstack.common.rpc.create_connection'):
self.srv.start()
@ -230,7 +235,7 @@ class TestCollectorService(TestCollector):
expected = {}
expected.update(msg)
expected['timestamp'] = datetime(2012, 7, 2, 13, 53, 40)
expected['timestamp'] = datetime.datetime(2012, 7, 2, 13, 53, 40)
self.srv.storage_conn = self.mox.CreateMock(base.Connection)
self.srv.storage_conn.record_metering_data(expected)
@ -251,7 +256,8 @@ class TestCollectorService(TestCollector):
expected = {}
expected.update(msg)
expected['timestamp'] = datetime(2012, 9, 30, 23, 31, 50, 262000)
expected['timestamp'] = datetime.datetime(2012, 9, 30,
23, 31, 50, 262000)
self.srv.storage_conn = self.mox.CreateMock(base.Connection)
self.srv.storage_conn.record_metering_data(expected)
@ -262,8 +268,9 @@ class TestCollectorService(TestCollector):
@patch('ceilometer.pipeline.setup_pipeline', MagicMock())
def test_process_notification(self):
# If we try to create a real RPC connection, init_host() never
# returns. Mock it out so we can establish the manager
# returns. Mock it out so we can establish the service
# configuration.
cfg.CONF.set_override("store_events", False, group="collector")
with patch('ceilometer.openstack.common.rpc.create_connection'):
self.srv.start()
self.srv.pipeline_manager.pipelines[0] = MagicMock()
@ -277,3 +284,65 @@ class TestCollectorService(TestCollector):
self.srv.process_notification(TEST_NOTICE)
self.assertTrue(
self.srv.pipeline_manager.publisher.called)
def test_process_notification_no_events(self):
cfg.CONF.set_override("store_events", False, group="collector")
self.srv.notification_manager = MagicMock()
with patch.object(self.srv, '_message_to_event') as fake_msg_to_event:
self.srv.process_notification({})
self.assertFalse(fake_msg_to_event.called)
def test_process_notification_with_events(self):
cfg.CONF.set_override("store_events", True, group="collector")
self.srv.notification_manager = MagicMock()
with patch.object(self.srv, '_message_to_event') as fake_msg_to_event:
self.srv.process_notification({})
self.assertTrue(fake_msg_to_event.called)
def test_message_to_event_missing_keys(self):
now = timeutils.utcnow()
timeutils.set_time_override(now)
message = {'event_type': "foo", 'message_id': "abc"}
self.srv.storage_conn = MagicMock()
with patch('ceilometer.collector.service.LOG') as mylog:
self.srv._message_to_event(message)
self.assertFalse(mylog.exception.called)
events = self.srv.storage_conn.record_events.call_args[0]
self.assertEquals(1, len(events))
event = events[0][0]
self.assertEquals("foo", event.event_name)
self.assertEquals(now, event.generated)
self.assertEquals(1, len(event.traits))
def test_message_to_event_bad_save(self):
cfg.CONF.set_override("store_events", True, group="collector")
self.srv.storage_conn = MagicMock()
self.srv.storage_conn.record_events.side_effect = MyException("Boom")
message = {'event_type': "foo", 'message_id': "abc"}
try:
self.srv._message_to_event(message)
self.fail("failing save should raise")
except MyException:
pass
def test_extract_when(self):
now = timeutils.utcnow()
modified = now + datetime.timedelta(minutes=1)
timeutils.set_time_override(now)
body = {"timestamp": str(modified)}
self.assertEquals(service.CollectorService._extract_when(body),
modified)
body = {"_context_timestamp": str(modified)}
self.assertEquals(service.CollectorService._extract_when(body),
modified)
then = now + datetime.timedelta(hours=1)
body = {"timestamp": str(modified), "_context_timestamp": str(then)}
self.assertEquals(service.CollectorService._extract_when(body),
modified)
self.assertEquals(service.CollectorService._extract_when({}), now)