Merge "[Mongodb] Implement events on Mongodb and DB2"
This commit is contained in:
commit
f7a0820f47
@ -36,6 +36,10 @@ cfg.CONF.import_opt('max_retries', 'ceilometer.openstack.common.db.options',
|
||||
cfg.CONF.import_opt('retry_interval', 'ceilometer.openstack.common.db.options',
|
||||
group="database")
|
||||
|
||||
EVENT_TRAIT_TYPES = {'none': 0, 'string': 1, 'integer': 2, 'float': 3,
|
||||
'datetime': 4}
|
||||
OP_SIGN = {'lt': '$lt', 'le': '$lte', 'ne': '$ne', 'gt': '$gt', 'ge': '$gte'}
|
||||
|
||||
|
||||
def make_timestamp_range(start, end,
|
||||
start_timestamp_op=None, end_timestamp_op=None):
|
||||
@ -63,6 +67,51 @@ def make_timestamp_range(start, end,
|
||||
return ts_range
|
||||
|
||||
|
||||
def make_events_query_from_filter(event_filter):
|
||||
"""Return start and stop row for filtering and a query.
|
||||
|
||||
Query is based on the selected parameter.
|
||||
|
||||
:param event_filter: storage.EventFilter object.
|
||||
"""
|
||||
q = {}
|
||||
ts_range = make_timestamp_range(event_filter.start_time,
|
||||
event_filter.end_time)
|
||||
if ts_range:
|
||||
q['timestamp'] = ts_range
|
||||
if event_filter.event_type:
|
||||
q['event_type'] = event_filter.event_type
|
||||
if event_filter.message_id:
|
||||
q['_id'] = event_filter.message_id
|
||||
|
||||
if event_filter.traits_filter:
|
||||
q.setdefault('traits')
|
||||
for trait_filter in event_filter.traits_filter:
|
||||
op = trait_filter.pop('op', 'eq')
|
||||
dict_query = {}
|
||||
for k, v in trait_filter.iteritems():
|
||||
if v is not None:
|
||||
# All parameters in EventFilter['traits'] are optional, so
|
||||
# we need to check if they are in the query or no.
|
||||
if k == 'key':
|
||||
dict_query.setdefault('trait_name', v)
|
||||
elif k in ['string', 'integer', 'datetime', 'float']:
|
||||
dict_query.setdefault('trait_type',
|
||||
EVENT_TRAIT_TYPES[k])
|
||||
dict_query.setdefault('trait_value',
|
||||
v if op == 'eq'
|
||||
else {OP_SIGN[op]: v})
|
||||
dict_query = {'$elemMatch': dict_query}
|
||||
if q['traits'] is None:
|
||||
q['traits'] = dict_query
|
||||
elif q.get('$and') is None:
|
||||
q.setdefault('$and', [{'traits': q.pop('traits')},
|
||||
{'traits': dict_query}])
|
||||
else:
|
||||
q['$and'].append({'traits': dict_query})
|
||||
return q
|
||||
|
||||
|
||||
def make_query_from_filter(sample_filter, require_meter=True):
|
||||
"""Return a query dictionary based on the settings in the filter.
|
||||
|
||||
|
@ -19,9 +19,12 @@
|
||||
"""
|
||||
|
||||
|
||||
import operator
|
||||
|
||||
import pymongo
|
||||
|
||||
from ceilometer.alarm.storage import models as alarm_models
|
||||
from ceilometer.openstack.common.gettextutils import _
|
||||
from ceilometer.openstack.common import log
|
||||
from ceilometer.storage import base
|
||||
from ceilometer.storage import models
|
||||
@ -41,6 +44,7 @@ COMMON_AVAILABLE_CAPABILITIES = {
|
||||
'complex': True},
|
||||
'history': {'query': {'simple': True,
|
||||
'complex': True}}},
|
||||
'events': {'query': {'simple': True}},
|
||||
}
|
||||
|
||||
|
||||
@ -224,6 +228,119 @@ class Connection(base.Connection):
|
||||
pymongo.DESCENDING)],
|
||||
None)
|
||||
|
||||
def record_events(self, event_models):
|
||||
"""Write the events to database.
|
||||
|
||||
Return a list of events of type models.Event.DUPLICATE in case of
|
||||
trying to write an already existing event to the database, or
|
||||
models.Event.UNKONW_PROBLEM in case of any failures with recording the
|
||||
event in the database.
|
||||
|
||||
:param event_models: a list of models.Event objects.
|
||||
"""
|
||||
problem_events = []
|
||||
for event_model in event_models:
|
||||
traits = []
|
||||
if event_model.traits:
|
||||
for trait in event_model.traits:
|
||||
traits.append({'trait_name': trait.name,
|
||||
'trait_type': trait.dtype,
|
||||
'trait_value': trait.value})
|
||||
try:
|
||||
self.db.event.insert(
|
||||
{'_id': event_model.message_id,
|
||||
'event_type': event_model.event_type,
|
||||
'timestamp': event_model.generated,
|
||||
'traits': traits})
|
||||
except pymongo.errors.DuplicateKeyError:
|
||||
problem_events.append((models.Event.DUPLICATE,
|
||||
event_model))
|
||||
except Exception as ex:
|
||||
LOG.exception(_("Failed to record event: %s") % ex)
|
||||
problem_events.append((models.Event.UNKNOWN_PROBLEM,
|
||||
event_model))
|
||||
return problem_events
|
||||
|
||||
def get_events(self, event_filter):
|
||||
"""Return a list of models.Event objects.
|
||||
|
||||
:param event_filter: storage.EventFilter object, consists of filters
|
||||
for events that are stored in database.
|
||||
"""
|
||||
q = pymongo_utils.make_events_query_from_filter(event_filter)
|
||||
res_events = []
|
||||
for event in self.db.event.find(q):
|
||||
traits = []
|
||||
for trait in event['traits']:
|
||||
traits.append(models.Trait(name=trait['trait_name'],
|
||||
dtype=int(trait['trait_type']),
|
||||
value=trait['trait_value']))
|
||||
res_events.append(models.Event(message_id=event['_id'],
|
||||
event_type=event['event_type'],
|
||||
generated=event['timestamp'],
|
||||
traits=traits))
|
||||
return res_events
|
||||
|
||||
def get_event_types(self):
|
||||
"""Return all event types as an iter of strings."""
|
||||
event_types = set()
|
||||
events = self.db.event.find()
|
||||
|
||||
for event in events:
|
||||
event_type = event['event_type']
|
||||
if event_type not in event_types:
|
||||
event_types.add(event_type)
|
||||
yield event_type
|
||||
|
||||
def get_trait_types(self, event_type):
|
||||
"""Return a dictionary containing the name and data type of the trait.
|
||||
|
||||
Only trait types for the provided event_type are returned.
|
||||
|
||||
:param event_type: the type of the Event.
|
||||
"""
|
||||
trait_names = set()
|
||||
events = self.db.event.find({'event_type': event_type})
|
||||
|
||||
for event in events:
|
||||
for trait in event['traits']:
|
||||
trait_name = trait['trait_name']
|
||||
if trait_name not in trait_names:
|
||||
# Here we check that our method return only unique
|
||||
# trait types. Method will return only one trait type. It
|
||||
# is proposed that certain trait name could have only one
|
||||
# trait type.
|
||||
trait_names.add(trait_name)
|
||||
yield {'name': trait_name,
|
||||
'data_type': trait['trait_type']}
|
||||
|
||||
def get_traits(self, event_type, trait_name=None):
|
||||
"""Return all trait instances associated with an event_type.
|
||||
|
||||
If trait_type is specified, only return instances of that trait type.
|
||||
|
||||
:param event_type: the type of the Event to filter by
|
||||
:param trait_name: the name of the Trait to filter by
|
||||
"""
|
||||
if not trait_name:
|
||||
events = self.db.event.find({'event_type': event_type})
|
||||
else:
|
||||
# We choose events that simultaneously have event_type and certain
|
||||
# trait_name, and retrieve events contains only mentioned traits.
|
||||
events = self.db.event.find({'$and': [{'event_type': event_type},
|
||||
{'traits.trait_name': trait_name}]},
|
||||
{'traits': {'$elemMatch':
|
||||
{'trait_name': trait_name}}
|
||||
})
|
||||
traits = []
|
||||
for event in events:
|
||||
for trait in event['traits']:
|
||||
traits.append(models.Trait(name=trait['trait_name'],
|
||||
dtype=trait['trait_type'],
|
||||
value=trait['trait_value']))
|
||||
for trait in sorted(traits, key=operator.attrgetter('dtype')):
|
||||
yield trait
|
||||
|
||||
def query_samples(self, filter_expr=None, orderby=None, limit=None):
|
||||
return self._retrieve_data(filter_expr, orderby, limit, models.Meter)
|
||||
|
||||
|
@ -65,7 +65,7 @@ class CapabilitiesTest(test_base.BaseTestCase):
|
||||
'complex': True},
|
||||
'history': {'query': {'simple': True,
|
||||
'complex': True}}},
|
||||
'events': {'query': {'simple': False}}
|
||||
'events': {'query': {'simple': True}}
|
||||
}
|
||||
|
||||
actual_capabilities = impl_db2.Connection.get_capabilities()
|
||||
|
@ -178,7 +178,7 @@ class CapabilitiesTest(test_base.BaseTestCase):
|
||||
'complex': True},
|
||||
'history': {'query': {'simple': True,
|
||||
'complex': True}}},
|
||||
'events': {'query': {'simple': False}}
|
||||
'events': {'query': {'simple': True}}
|
||||
}
|
||||
|
||||
actual_capabilities = impl_mongodb.Connection.get_capabilities()
|
||||
|
Loading…
x
Reference in New Issue
Block a user