move db2 and mongo driver to event tree
this patch mves the event related code to the respective db2 and mongo drivers in event tree Change-Id: I1990f93143aae470fc9b70287c230260fef67b32 Partially-Implements: blueprint dedicated-event-db
This commit is contained in:
parent
4473a8c4c3
commit
fa205ac32c
@ -12,9 +12,53 @@
|
||||
# under the License.
|
||||
"""DB2 storage backend
|
||||
"""
|
||||
import pymongo
|
||||
|
||||
from ceilometer.event.storage import pymongo_base
|
||||
from ceilometer.storage import impl_db2
|
||||
from ceilometer import storage
|
||||
from ceilometer.storage.mongo import utils as pymongo_utils
|
||||
|
||||
|
||||
class Connection(impl_db2.Connection, pymongo_base.Connection):
|
||||
class Connection(pymongo_base.Connection):
|
||||
"""The db2 event storage for Ceilometer."""
|
||||
|
||||
CONNECTION_POOL = pymongo_utils.ConnectionPool()
|
||||
|
||||
def __init__(self, url):
|
||||
|
||||
# Since we are using pymongo, even though we are connecting to DB2
|
||||
# we still have to make sure that the scheme which used to distinguish
|
||||
# db2 driver from mongodb driver be replaced so that pymongo will not
|
||||
# produce an exception on the scheme.
|
||||
url = url.replace('db2:', 'mongodb:', 1)
|
||||
self.conn = self.CONNECTION_POOL.connect(url)
|
||||
|
||||
# Require MongoDB 2.2 to use aggregate(), since we are using mongodb
|
||||
# as backend for test, the following code is necessary to make sure
|
||||
# that the test wont try aggregate on older mongodb during the test.
|
||||
# For db2, the versionArray won't be part of the server_info, so there
|
||||
# will not be exception when real db2 gets used as backend.
|
||||
server_info = self.conn.server_info()
|
||||
if server_info.get('sysInfo'):
|
||||
self._using_mongodb = True
|
||||
else:
|
||||
self._using_mongodb = False
|
||||
|
||||
if self._using_mongodb and server_info.get('versionArray') < [2, 2]:
|
||||
raise storage.StorageBadVersion("Need at least MongoDB 2.2")
|
||||
|
||||
connection_options = pymongo.uri_parser.parse_uri(url)
|
||||
self.db = getattr(self.conn, connection_options['database'])
|
||||
if connection_options.get('username'):
|
||||
self.db.authenticate(connection_options['username'],
|
||||
connection_options['password'])
|
||||
|
||||
self.upgrade()
|
||||
|
||||
def clear(self):
|
||||
# drop_database command does nothing on db2 database since this has
|
||||
# not been implemented. However calling this method is important for
|
||||
# removal of all the empty dbs created during the test runs since
|
||||
# test run is against mongodb on Jenkins
|
||||
self.conn.drop_database(self.db)
|
||||
self.conn.close()
|
||||
|
@ -11,9 +11,42 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
"""MongoDB storage backend"""
|
||||
import pymongo
|
||||
|
||||
from ceilometer.event.storage import pymongo_base
|
||||
from ceilometer.storage import impl_mongodb
|
||||
from ceilometer import storage
|
||||
from ceilometer.storage.mongo import utils as pymongo_utils
|
||||
|
||||
|
||||
class Connection(impl_mongodb.Connection, pymongo_base.Connection):
|
||||
class Connection(pymongo_base.Connection):
|
||||
"""Put the event data into a MongoDB database."""
|
||||
|
||||
CONNECTION_POOL = pymongo_utils.ConnectionPool()
|
||||
|
||||
def __init__(self, url):
|
||||
|
||||
# NOTE(jd) Use our own connection pooling on top of the Pymongo one.
|
||||
# We need that otherwise we overflow the MongoDB instance with new
|
||||
# connection since we instanciate a Pymongo client each time someone
|
||||
# requires a new storage connection.
|
||||
self.conn = self.CONNECTION_POOL.connect(url)
|
||||
|
||||
# Require MongoDB 2.4 to use $setOnInsert
|
||||
if self.conn.server_info()['versionArray'] < [2, 4]:
|
||||
raise storage.StorageBadVersion("Need at least MongoDB 2.4")
|
||||
|
||||
connection_options = pymongo.uri_parser.parse_uri(url)
|
||||
self.db = getattr(self.conn, connection_options['database'])
|
||||
if connection_options.get('username'):
|
||||
self.db.authenticate(connection_options['username'],
|
||||
connection_options['password'])
|
||||
|
||||
# NOTE(jd) Upgrading is just about creating index, so let's do this
|
||||
# on connection to be sure at least the TTL is correcly updated if
|
||||
# needed.
|
||||
self.upgrade()
|
||||
|
||||
def clear(self):
|
||||
self.conn.drop_database(self.db)
|
||||
# Connection will be reopened automatically if needed
|
||||
self.conn.close()
|
||||
|
@ -12,10 +12,17 @@
|
||||
# under the License.
|
||||
"""Common functions for MongoDB and DB2 backends
|
||||
"""
|
||||
from ceilometer.event.storage import base as event_base
|
||||
from ceilometer.storage import pymongo_base as base
|
||||
import pymongo
|
||||
|
||||
from ceilometer.event.storage import base
|
||||
from ceilometer.event.storage import models
|
||||
from ceilometer.openstack.common.gettextutils import _
|
||||
from ceilometer.openstack.common import log
|
||||
from ceilometer.storage.mongo import utils as pymongo_utils
|
||||
from ceilometer import utils
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
COMMON_AVAILABLE_CAPABILITIES = {
|
||||
'events': {'query': {'simple': True}},
|
||||
@ -27,7 +34,7 @@ AVAILABLE_STORAGE_CAPABILITIES = {
|
||||
}
|
||||
|
||||
|
||||
class Connection(base.Connection, event_base.Connection):
|
||||
class Connection(base.Connection):
|
||||
"""Base event Connection class for MongoDB and DB2 drivers."""
|
||||
CAPABILITIES = utils.update_nested(base.Connection.CAPABILITIES,
|
||||
COMMON_AVAILABLE_CAPABILITIES)
|
||||
@ -36,3 +43,112 @@ class Connection(base.Connection, event_base.Connection):
|
||||
base.Connection.STORAGE_CAPABILITIES,
|
||||
AVAILABLE_STORAGE_CAPABILITIES,
|
||||
)
|
||||
|
||||
def record_events(self, event_models):
|
||||
"""Write the events to database.
|
||||
|
||||
Return a list of events of type models.Event.DUPLICATE in case of
|
||||
trying to write an already existing event to the database, or
|
||||
models.Event.UNKONW_PROBLEM in case of any failures with recording the
|
||||
event in the database.
|
||||
|
||||
:param event_models: a list of models.Event objects.
|
||||
"""
|
||||
problem_events = []
|
||||
for event_model in event_models:
|
||||
traits = []
|
||||
if event_model.traits:
|
||||
for trait in event_model.traits:
|
||||
traits.append({'trait_name': trait.name,
|
||||
'trait_type': trait.dtype,
|
||||
'trait_value': trait.value})
|
||||
try:
|
||||
self.db.event.insert(
|
||||
{'_id': event_model.message_id,
|
||||
'event_type': event_model.event_type,
|
||||
'timestamp': event_model.generated,
|
||||
'traits': traits})
|
||||
except pymongo.errors.DuplicateKeyError as ex:
|
||||
LOG.exception(_("Failed to record duplicated event: %s") % ex)
|
||||
problem_events.append((models.Event.DUPLICATE,
|
||||
event_model))
|
||||
except Exception as ex:
|
||||
LOG.exception(_("Failed to record event: %s") % ex)
|
||||
problem_events.append((models.Event.UNKNOWN_PROBLEM,
|
||||
event_model))
|
||||
return problem_events
|
||||
|
||||
def get_events(self, event_filter):
|
||||
"""Return an iter of models.Event objects.
|
||||
|
||||
:param event_filter: storage.EventFilter object, consists of filters
|
||||
for events that are stored in database.
|
||||
"""
|
||||
q = pymongo_utils.make_events_query_from_filter(event_filter)
|
||||
for event in self.db.event.find(q):
|
||||
traits = []
|
||||
for trait in event['traits']:
|
||||
traits.append(models.Trait(name=trait['trait_name'],
|
||||
dtype=int(trait['trait_type']),
|
||||
value=trait['trait_value']))
|
||||
yield models.Event(message_id=event['_id'],
|
||||
event_type=event['event_type'],
|
||||
generated=event['timestamp'],
|
||||
traits=traits)
|
||||
|
||||
def get_event_types(self):
|
||||
"""Return all event types as an iter of strings."""
|
||||
event_types = set()
|
||||
events = self.db.event.find()
|
||||
|
||||
for event in events:
|
||||
event_type = event['event_type']
|
||||
if event_type not in event_types:
|
||||
event_types.add(event_type)
|
||||
yield event_type
|
||||
|
||||
def get_trait_types(self, event_type):
|
||||
"""Return a dictionary containing the name and data type of the trait.
|
||||
|
||||
Only trait types for the provided event_type are returned.
|
||||
|
||||
:param event_type: the type of the Event.
|
||||
"""
|
||||
trait_names = set()
|
||||
events = self.db.event.find({'event_type': event_type})
|
||||
|
||||
for event in events:
|
||||
for trait in event['traits']:
|
||||
trait_name = trait['trait_name']
|
||||
if trait_name not in trait_names:
|
||||
# Here we check that our method return only unique
|
||||
# trait types. Method will return only one trait type. It
|
||||
# is proposed that certain trait name could have only one
|
||||
# trait type.
|
||||
trait_names.add(trait_name)
|
||||
yield {'name': trait_name,
|
||||
'data_type': trait['trait_type']}
|
||||
|
||||
def get_traits(self, event_type, trait_name=None):
|
||||
"""Return all trait instances associated with an event_type.
|
||||
|
||||
If trait_type is specified, only return instances of that trait type.
|
||||
|
||||
:param event_type: the type of the Event to filter by
|
||||
:param trait_name: the name of the Trait to filter by
|
||||
"""
|
||||
if not trait_name:
|
||||
events = self.db.event.find({'event_type': event_type})
|
||||
else:
|
||||
# We choose events that simultaneously have event_type and certain
|
||||
# trait_name, and retrieve events contains only mentioned traits.
|
||||
events = self.db.event.find({'$and': [{'event_type': event_type},
|
||||
{'traits.trait_name': trait_name}]},
|
||||
{'traits': {'$elemMatch':
|
||||
{'trait_name': trait_name}}
|
||||
})
|
||||
for event in events:
|
||||
for trait in event['traits']:
|
||||
yield models.Trait(name=trait['trait_name'],
|
||||
dtype=trait['trait_type'],
|
||||
value=trait['trait_value'])
|
||||
|
@ -20,16 +20,11 @@
|
||||
import pymongo
|
||||
|
||||
import ceilometer
|
||||
from ceilometer.event.storage import models as ev_models
|
||||
from ceilometer.openstack.common.gettextutils import _
|
||||
from ceilometer.openstack.common import log
|
||||
from ceilometer.storage import base
|
||||
from ceilometer.storage import models
|
||||
from ceilometer.storage.mongo import utils as pymongo_utils
|
||||
from ceilometer import utils
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
COMMON_AVAILABLE_CAPABILITIES = {
|
||||
'meters': {'query': {'simple': True,
|
||||
@ -37,7 +32,6 @@ COMMON_AVAILABLE_CAPABILITIES = {
|
||||
'samples': {'query': {'simple': True,
|
||||
'metadata': True,
|
||||
'complex': True}},
|
||||
'events': {'query': {'simple': True}},
|
||||
}
|
||||
|
||||
|
||||
@ -113,116 +107,6 @@ class Connection(base.Connection):
|
||||
[("timestamp", pymongo.DESCENDING)],
|
||||
limit)
|
||||
|
||||
def record_events(self, event_models):
|
||||
"""Write the events to database.
|
||||
|
||||
Return a list of events of type models.Event.DUPLICATE in case of
|
||||
trying to write an already existing event to the database, or
|
||||
models.Event.UNKONW_PROBLEM in case of any failures with recording the
|
||||
event in the database.
|
||||
|
||||
:param event_models: a list of models.Event objects.
|
||||
"""
|
||||
problem_events = []
|
||||
for event_model in event_models:
|
||||
traits = []
|
||||
if event_model.traits:
|
||||
for trait in event_model.traits:
|
||||
traits.append({'trait_name': trait.name,
|
||||
'trait_type': trait.dtype,
|
||||
'trait_value': trait.value})
|
||||
try:
|
||||
self.db.event.insert(
|
||||
{'_id': event_model.message_id,
|
||||
'event_type': event_model.event_type,
|
||||
'timestamp': event_model.generated,
|
||||
'traits': traits})
|
||||
except pymongo.errors.DuplicateKeyError as ex:
|
||||
LOG.exception(_("Failed to record duplicated event: %s") % ex)
|
||||
problem_events.append((ev_models.Event.DUPLICATE,
|
||||
event_model))
|
||||
except Exception as ex:
|
||||
LOG.exception(_("Failed to record event: %s") % ex)
|
||||
problem_events.append((ev_models.Event.UNKNOWN_PROBLEM,
|
||||
event_model))
|
||||
return problem_events
|
||||
|
||||
def get_events(self, event_filter):
|
||||
"""Return an iter of models.Event objects.
|
||||
|
||||
:param event_filter: storage.EventFilter object, consists of filters
|
||||
for events that are stored in database.
|
||||
"""
|
||||
q = pymongo_utils.make_events_query_from_filter(event_filter)
|
||||
for event in self.db.event.find(q):
|
||||
traits = []
|
||||
for trait in event['traits']:
|
||||
traits.append(
|
||||
ev_models.Trait(name=trait['trait_name'],
|
||||
dtype=int(trait['trait_type']),
|
||||
value=trait['trait_value']))
|
||||
yield ev_models.Event(message_id=event['_id'],
|
||||
event_type=event['event_type'],
|
||||
generated=event['timestamp'],
|
||||
traits=traits)
|
||||
|
||||
def get_event_types(self):
|
||||
"""Return all event types as an iter of strings."""
|
||||
event_types = set()
|
||||
events = self.db.event.find()
|
||||
|
||||
for event in events:
|
||||
event_type = event['event_type']
|
||||
if event_type not in event_types:
|
||||
event_types.add(event_type)
|
||||
yield event_type
|
||||
|
||||
def get_trait_types(self, event_type):
|
||||
"""Return a dictionary containing the name and data type of the trait.
|
||||
|
||||
Only trait types for the provided event_type are returned.
|
||||
|
||||
:param event_type: the type of the Event.
|
||||
"""
|
||||
trait_names = set()
|
||||
events = self.db.event.find({'event_type': event_type})
|
||||
|
||||
for event in events:
|
||||
for trait in event['traits']:
|
||||
trait_name = trait['trait_name']
|
||||
if trait_name not in trait_names:
|
||||
# Here we check that our method return only unique
|
||||
# trait types. Method will return only one trait type. It
|
||||
# is proposed that certain trait name could have only one
|
||||
# trait type.
|
||||
trait_names.add(trait_name)
|
||||
yield {'name': trait_name,
|
||||
'data_type': trait['trait_type']}
|
||||
|
||||
def get_traits(self, event_type, trait_name=None):
|
||||
"""Return all trait instances associated with an event_type.
|
||||
|
||||
If trait_type is specified, only return instances of that trait type.
|
||||
|
||||
:param event_type: the type of the Event to filter by
|
||||
:param trait_name: the name of the Trait to filter by
|
||||
"""
|
||||
if not trait_name:
|
||||
events = self.db.event.find({'event_type': event_type})
|
||||
else:
|
||||
# We choose events that simultaneously have event_type and certain
|
||||
# trait_name, and retrieve events contains only mentioned traits.
|
||||
events = self.db.event.find({'$and': [{'event_type': event_type},
|
||||
{'traits.trait_name': trait_name}]},
|
||||
{'traits': {'$elemMatch':
|
||||
{'trait_name': trait_name}}
|
||||
})
|
||||
for event in events:
|
||||
for trait in event['traits']:
|
||||
yield ev_models.Trait(name=trait['trait_name'],
|
||||
dtype=trait['trait_type'],
|
||||
value=trait['trait_value'])
|
||||
|
||||
def query_samples(self, filter_expr=None, orderby=None, limit=None):
|
||||
if limit == 0:
|
||||
return []
|
||||
|
@ -24,6 +24,7 @@
|
||||
"""
|
||||
|
||||
from ceilometer.alarm.storage import impl_db2 as impl_db2_alarm
|
||||
from ceilometer.event.storage import impl_db2 as impl_db2_event
|
||||
from ceilometer.storage import impl_db2
|
||||
from ceilometer.tests import base as test_base
|
||||
|
||||
@ -62,12 +63,19 @@ class CapabilitiesTest(test_base.BaseTestCase):
|
||||
'stddev': False,
|
||||
'cardinality': False}}
|
||||
},
|
||||
'events': {'query': {'simple': True}}
|
||||
'events': {'query': {'simple': False}}
|
||||
}
|
||||
|
||||
actual_capabilities = impl_db2.Connection.get_capabilities()
|
||||
self.assertEqual(expected_capabilities, actual_capabilities)
|
||||
|
||||
def test_event_capabilities(self):
|
||||
expected_capabilities = {
|
||||
'events': {'query': {'simple': True}},
|
||||
}
|
||||
actual_capabilities = impl_db2_event.Connection.get_capabilities()
|
||||
self.assertEqual(expected_capabilities, actual_capabilities)
|
||||
|
||||
def test_alarm_capabilities(self):
|
||||
expected_capabilities = {
|
||||
'alarms': {'query': {'simple': True,
|
||||
|
@ -24,6 +24,7 @@
|
||||
"""
|
||||
|
||||
from ceilometer.alarm.storage import impl_mongodb as impl_mongodb_alarm
|
||||
from ceilometer.event.storage import impl_mongodb as impl_mongodb_event
|
||||
from ceilometer.storage import base
|
||||
from ceilometer.storage import impl_mongodb
|
||||
from ceilometer.tests import base as test_base
|
||||
@ -175,12 +176,19 @@ class CapabilitiesTest(test_base.BaseTestCase):
|
||||
'stddev': True,
|
||||
'cardinality': True}}
|
||||
},
|
||||
'events': {'query': {'simple': True}}
|
||||
'events': {'query': {'simple': False}}
|
||||
}
|
||||
|
||||
actual_capabilities = impl_mongodb.Connection.get_capabilities()
|
||||
self.assertEqual(expected_capabilities, actual_capabilities)
|
||||
|
||||
def test_event_capabilities(self):
|
||||
expected_capabilities = {
|
||||
'events': {'query': {'simple': True}},
|
||||
}
|
||||
actual_capabilities = impl_mongodb_event.Connection.get_capabilities()
|
||||
self.assertEqual(expected_capabilities, actual_capabilities)
|
||||
|
||||
def test_alarm_capabilities(self):
|
||||
expected_capabilities = {
|
||||
'alarms': {'query': {'simple': True,
|
||||
|
Loading…
Reference in New Issue
Block a user