diff --git a/ceilometer/event/storage/impl_db2.py b/ceilometer/event/storage/impl_db2.py index 3e0d31fb9..8a1231dad 100644 --- a/ceilometer/event/storage/impl_db2.py +++ b/ceilometer/event/storage/impl_db2.py @@ -12,9 +12,53 @@ # under the License. """DB2 storage backend """ +import pymongo + from ceilometer.event.storage import pymongo_base -from ceilometer.storage import impl_db2 +from ceilometer import storage +from ceilometer.storage.mongo import utils as pymongo_utils -class Connection(impl_db2.Connection, pymongo_base.Connection): +class Connection(pymongo_base.Connection): """The db2 event storage for Ceilometer.""" + + CONNECTION_POOL = pymongo_utils.ConnectionPool() + + def __init__(self, url): + + # Since we are using pymongo, even though we are connecting to DB2 + # we still have to make sure that the scheme which used to distinguish + # db2 driver from mongodb driver be replaced so that pymongo will not + # produce an exception on the scheme. + url = url.replace('db2:', 'mongodb:', 1) + self.conn = self.CONNECTION_POOL.connect(url) + + # Require MongoDB 2.2 to use aggregate(), since we are using mongodb + # as backend for test, the following code is necessary to make sure + # that the test wont try aggregate on older mongodb during the test. + # For db2, the versionArray won't be part of the server_info, so there + # will not be exception when real db2 gets used as backend. + server_info = self.conn.server_info() + if server_info.get('sysInfo'): + self._using_mongodb = True + else: + self._using_mongodb = False + + if self._using_mongodb and server_info.get('versionArray') < [2, 2]: + raise storage.StorageBadVersion("Need at least MongoDB 2.2") + + connection_options = pymongo.uri_parser.parse_uri(url) + self.db = getattr(self.conn, connection_options['database']) + if connection_options.get('username'): + self.db.authenticate(connection_options['username'], + connection_options['password']) + + self.upgrade() + + def clear(self): + # drop_database command does nothing on db2 database since this has + # not been implemented. However calling this method is important for + # removal of all the empty dbs created during the test runs since + # test run is against mongodb on Jenkins + self.conn.drop_database(self.db) + self.conn.close() diff --git a/ceilometer/event/storage/impl_mongodb.py b/ceilometer/event/storage/impl_mongodb.py index 1b3551e9c..bba96f9d7 100644 --- a/ceilometer/event/storage/impl_mongodb.py +++ b/ceilometer/event/storage/impl_mongodb.py @@ -11,9 +11,42 @@ # License for the specific language governing permissions and limitations # under the License. """MongoDB storage backend""" +import pymongo + from ceilometer.event.storage import pymongo_base -from ceilometer.storage import impl_mongodb +from ceilometer import storage +from ceilometer.storage.mongo import utils as pymongo_utils -class Connection(impl_mongodb.Connection, pymongo_base.Connection): +class Connection(pymongo_base.Connection): """Put the event data into a MongoDB database.""" + + CONNECTION_POOL = pymongo_utils.ConnectionPool() + + def __init__(self, url): + + # NOTE(jd) Use our own connection pooling on top of the Pymongo one. + # We need that otherwise we overflow the MongoDB instance with new + # connection since we instanciate a Pymongo client each time someone + # requires a new storage connection. + self.conn = self.CONNECTION_POOL.connect(url) + + # Require MongoDB 2.4 to use $setOnInsert + if self.conn.server_info()['versionArray'] < [2, 4]: + raise storage.StorageBadVersion("Need at least MongoDB 2.4") + + connection_options = pymongo.uri_parser.parse_uri(url) + self.db = getattr(self.conn, connection_options['database']) + if connection_options.get('username'): + self.db.authenticate(connection_options['username'], + connection_options['password']) + + # NOTE(jd) Upgrading is just about creating index, so let's do this + # on connection to be sure at least the TTL is correcly updated if + # needed. + self.upgrade() + + def clear(self): + self.conn.drop_database(self.db) + # Connection will be reopened automatically if needed + self.conn.close() diff --git a/ceilometer/event/storage/pymongo_base.py b/ceilometer/event/storage/pymongo_base.py index e7fed7532..bac948b2c 100644 --- a/ceilometer/event/storage/pymongo_base.py +++ b/ceilometer/event/storage/pymongo_base.py @@ -12,10 +12,17 @@ # under the License. """Common functions for MongoDB and DB2 backends """ -from ceilometer.event.storage import base as event_base -from ceilometer.storage import pymongo_base as base +import pymongo + +from ceilometer.event.storage import base +from ceilometer.event.storage import models +from ceilometer.openstack.common.gettextutils import _ +from ceilometer.openstack.common import log +from ceilometer.storage.mongo import utils as pymongo_utils from ceilometer import utils +LOG = log.getLogger(__name__) + COMMON_AVAILABLE_CAPABILITIES = { 'events': {'query': {'simple': True}}, @@ -27,7 +34,7 @@ AVAILABLE_STORAGE_CAPABILITIES = { } -class Connection(base.Connection, event_base.Connection): +class Connection(base.Connection): """Base event Connection class for MongoDB and DB2 drivers.""" CAPABILITIES = utils.update_nested(base.Connection.CAPABILITIES, COMMON_AVAILABLE_CAPABILITIES) @@ -36,3 +43,112 @@ class Connection(base.Connection, event_base.Connection): base.Connection.STORAGE_CAPABILITIES, AVAILABLE_STORAGE_CAPABILITIES, ) + + def record_events(self, event_models): + """Write the events to database. + + Return a list of events of type models.Event.DUPLICATE in case of + trying to write an already existing event to the database, or + models.Event.UNKONW_PROBLEM in case of any failures with recording the + event in the database. + + :param event_models: a list of models.Event objects. + """ + problem_events = [] + for event_model in event_models: + traits = [] + if event_model.traits: + for trait in event_model.traits: + traits.append({'trait_name': trait.name, + 'trait_type': trait.dtype, + 'trait_value': trait.value}) + try: + self.db.event.insert( + {'_id': event_model.message_id, + 'event_type': event_model.event_type, + 'timestamp': event_model.generated, + 'traits': traits}) + except pymongo.errors.DuplicateKeyError as ex: + LOG.exception(_("Failed to record duplicated event: %s") % ex) + problem_events.append((models.Event.DUPLICATE, + event_model)) + except Exception as ex: + LOG.exception(_("Failed to record event: %s") % ex) + problem_events.append((models.Event.UNKNOWN_PROBLEM, + event_model)) + return problem_events + + def get_events(self, event_filter): + """Return an iter of models.Event objects. + + :param event_filter: storage.EventFilter object, consists of filters + for events that are stored in database. + """ + q = pymongo_utils.make_events_query_from_filter(event_filter) + for event in self.db.event.find(q): + traits = [] + for trait in event['traits']: + traits.append(models.Trait(name=trait['trait_name'], + dtype=int(trait['trait_type']), + value=trait['trait_value'])) + yield models.Event(message_id=event['_id'], + event_type=event['event_type'], + generated=event['timestamp'], + traits=traits) + + def get_event_types(self): + """Return all event types as an iter of strings.""" + event_types = set() + events = self.db.event.find() + + for event in events: + event_type = event['event_type'] + if event_type not in event_types: + event_types.add(event_type) + yield event_type + + def get_trait_types(self, event_type): + """Return a dictionary containing the name and data type of the trait. + + Only trait types for the provided event_type are returned. + + :param event_type: the type of the Event. + """ + trait_names = set() + events = self.db.event.find({'event_type': event_type}) + + for event in events: + for trait in event['traits']: + trait_name = trait['trait_name'] + if trait_name not in trait_names: + # Here we check that our method return only unique + # trait types. Method will return only one trait type. It + # is proposed that certain trait name could have only one + # trait type. + trait_names.add(trait_name) + yield {'name': trait_name, + 'data_type': trait['trait_type']} + + def get_traits(self, event_type, trait_name=None): + """Return all trait instances associated with an event_type. + + If trait_type is specified, only return instances of that trait type. + + :param event_type: the type of the Event to filter by + :param trait_name: the name of the Trait to filter by + """ + if not trait_name: + events = self.db.event.find({'event_type': event_type}) + else: + # We choose events that simultaneously have event_type and certain + # trait_name, and retrieve events contains only mentioned traits. + events = self.db.event.find({'$and': [{'event_type': event_type}, + {'traits.trait_name': trait_name}]}, + {'traits': {'$elemMatch': + {'trait_name': trait_name}} + }) + for event in events: + for trait in event['traits']: + yield models.Trait(name=trait['trait_name'], + dtype=trait['trait_type'], + value=trait['trait_value']) diff --git a/ceilometer/storage/pymongo_base.py b/ceilometer/storage/pymongo_base.py index edea56ce5..32613e5f3 100644 --- a/ceilometer/storage/pymongo_base.py +++ b/ceilometer/storage/pymongo_base.py @@ -20,16 +20,11 @@ import pymongo import ceilometer -from ceilometer.event.storage import models as ev_models -from ceilometer.openstack.common.gettextutils import _ -from ceilometer.openstack.common import log from ceilometer.storage import base from ceilometer.storage import models from ceilometer.storage.mongo import utils as pymongo_utils from ceilometer import utils -LOG = log.getLogger(__name__) - COMMON_AVAILABLE_CAPABILITIES = { 'meters': {'query': {'simple': True, @@ -37,7 +32,6 @@ COMMON_AVAILABLE_CAPABILITIES = { 'samples': {'query': {'simple': True, 'metadata': True, 'complex': True}}, - 'events': {'query': {'simple': True}}, } @@ -113,116 +107,6 @@ class Connection(base.Connection): [("timestamp", pymongo.DESCENDING)], limit) - def record_events(self, event_models): - """Write the events to database. - - Return a list of events of type models.Event.DUPLICATE in case of - trying to write an already existing event to the database, or - models.Event.UNKONW_PROBLEM in case of any failures with recording the - event in the database. - - :param event_models: a list of models.Event objects. - """ - problem_events = [] - for event_model in event_models: - traits = [] - if event_model.traits: - for trait in event_model.traits: - traits.append({'trait_name': trait.name, - 'trait_type': trait.dtype, - 'trait_value': trait.value}) - try: - self.db.event.insert( - {'_id': event_model.message_id, - 'event_type': event_model.event_type, - 'timestamp': event_model.generated, - 'traits': traits}) - except pymongo.errors.DuplicateKeyError as ex: - LOG.exception(_("Failed to record duplicated event: %s") % ex) - problem_events.append((ev_models.Event.DUPLICATE, - event_model)) - except Exception as ex: - LOG.exception(_("Failed to record event: %s") % ex) - problem_events.append((ev_models.Event.UNKNOWN_PROBLEM, - event_model)) - return problem_events - - def get_events(self, event_filter): - """Return an iter of models.Event objects. - - :param event_filter: storage.EventFilter object, consists of filters - for events that are stored in database. - """ - q = pymongo_utils.make_events_query_from_filter(event_filter) - for event in self.db.event.find(q): - traits = [] - for trait in event['traits']: - traits.append( - ev_models.Trait(name=trait['trait_name'], - dtype=int(trait['trait_type']), - value=trait['trait_value'])) - yield ev_models.Event(message_id=event['_id'], - event_type=event['event_type'], - generated=event['timestamp'], - traits=traits) - - def get_event_types(self): - """Return all event types as an iter of strings.""" - event_types = set() - events = self.db.event.find() - - for event in events: - event_type = event['event_type'] - if event_type not in event_types: - event_types.add(event_type) - yield event_type - - def get_trait_types(self, event_type): - """Return a dictionary containing the name and data type of the trait. - - Only trait types for the provided event_type are returned. - - :param event_type: the type of the Event. - """ - trait_names = set() - events = self.db.event.find({'event_type': event_type}) - - for event in events: - for trait in event['traits']: - trait_name = trait['trait_name'] - if trait_name not in trait_names: - # Here we check that our method return only unique - # trait types. Method will return only one trait type. It - # is proposed that certain trait name could have only one - # trait type. - trait_names.add(trait_name) - yield {'name': trait_name, - 'data_type': trait['trait_type']} - - def get_traits(self, event_type, trait_name=None): - """Return all trait instances associated with an event_type. - - If trait_type is specified, only return instances of that trait type. - - :param event_type: the type of the Event to filter by - :param trait_name: the name of the Trait to filter by - """ - if not trait_name: - events = self.db.event.find({'event_type': event_type}) - else: - # We choose events that simultaneously have event_type and certain - # trait_name, and retrieve events contains only mentioned traits. - events = self.db.event.find({'$and': [{'event_type': event_type}, - {'traits.trait_name': trait_name}]}, - {'traits': {'$elemMatch': - {'trait_name': trait_name}} - }) - for event in events: - for trait in event['traits']: - yield ev_models.Trait(name=trait['trait_name'], - dtype=trait['trait_type'], - value=trait['trait_value']) - def query_samples(self, filter_expr=None, orderby=None, limit=None): if limit == 0: return [] diff --git a/ceilometer/tests/storage/test_impl_db2.py b/ceilometer/tests/storage/test_impl_db2.py index b786c8b39..217d0de88 100644 --- a/ceilometer/tests/storage/test_impl_db2.py +++ b/ceilometer/tests/storage/test_impl_db2.py @@ -24,6 +24,7 @@ """ from ceilometer.alarm.storage import impl_db2 as impl_db2_alarm +from ceilometer.event.storage import impl_db2 as impl_db2_event from ceilometer.storage import impl_db2 from ceilometer.tests import base as test_base @@ -62,12 +63,19 @@ class CapabilitiesTest(test_base.BaseTestCase): 'stddev': False, 'cardinality': False}} }, - 'events': {'query': {'simple': True}} + 'events': {'query': {'simple': False}} } actual_capabilities = impl_db2.Connection.get_capabilities() self.assertEqual(expected_capabilities, actual_capabilities) + def test_event_capabilities(self): + expected_capabilities = { + 'events': {'query': {'simple': True}}, + } + actual_capabilities = impl_db2_event.Connection.get_capabilities() + self.assertEqual(expected_capabilities, actual_capabilities) + def test_alarm_capabilities(self): expected_capabilities = { 'alarms': {'query': {'simple': True, diff --git a/ceilometer/tests/storage/test_impl_mongodb.py b/ceilometer/tests/storage/test_impl_mongodb.py index 3666f3e47..cbe4f0182 100644 --- a/ceilometer/tests/storage/test_impl_mongodb.py +++ b/ceilometer/tests/storage/test_impl_mongodb.py @@ -24,6 +24,7 @@ """ from ceilometer.alarm.storage import impl_mongodb as impl_mongodb_alarm +from ceilometer.event.storage import impl_mongodb as impl_mongodb_event from ceilometer.storage import base from ceilometer.storage import impl_mongodb from ceilometer.tests import base as test_base @@ -175,12 +176,19 @@ class CapabilitiesTest(test_base.BaseTestCase): 'stddev': True, 'cardinality': True}} }, - 'events': {'query': {'simple': True}} + 'events': {'query': {'simple': False}} } actual_capabilities = impl_mongodb.Connection.get_capabilities() self.assertEqual(expected_capabilities, actual_capabilities) + def test_event_capabilities(self): + expected_capabilities = { + 'events': {'query': {'simple': True}}, + } + actual_capabilities = impl_mongodb_event.Connection.get_capabilities() + self.assertEqual(expected_capabilities, actual_capabilities) + def test_alarm_capabilities(self): expected_capabilities = { 'alarms': {'query': {'simple': True,