Merge "Remove code duplication Part 2"
This commit is contained in:
commit
f3b63a2dc7
@ -329,35 +329,6 @@ class Connection(pymongo_base.Connection):
|
||||
user_id=latest_meter['user_id'],
|
||||
metadata=latest_meter['resource_metadata'])
|
||||
|
||||
def get_samples(self, sample_filter, limit=None):
|
||||
"""Return an iterable of model.Sample instances.
|
||||
|
||||
:param sample_filter: Filter.
|
||||
:param limit: Maximum number of results to return.
|
||||
"""
|
||||
if limit == 0:
|
||||
return
|
||||
q = pymongo_base.make_query_from_filter(sample_filter,
|
||||
require_meter=False)
|
||||
|
||||
if limit:
|
||||
samples = self.db.meter.find(
|
||||
q, limit=limit, sort=[("timestamp", pymongo.DESCENDING)])
|
||||
else:
|
||||
samples = self.db.meter.find(
|
||||
q, sort=[("timestamp", pymongo.DESCENDING)])
|
||||
|
||||
for s in samples:
|
||||
# Remove the ObjectId generated by the database when
|
||||
# the sample was inserted. It is an implementation
|
||||
# detail that should not leak outside of the driver.
|
||||
del s['_id']
|
||||
# Backward compatibility for samples without units
|
||||
s['counter_unit'] = s.get('counter_unit', '')
|
||||
# Tolerate absence of recorded_at in older datapoints
|
||||
s['recorded_at'] = s.get('recorded_at')
|
||||
yield models.Sample(**s)
|
||||
|
||||
def get_meter_statistics(self, sample_filter, period=None, groupby=None):
|
||||
"""Return an iterable of models.Statistics instance containing meter
|
||||
statistics described by the query parameters.
|
||||
@ -438,37 +409,3 @@ class Connection(pymongo_base.Connection):
|
||||
stat.period_start = stat.duration_start
|
||||
stat.period_end = stat.duration_end
|
||||
yield stat
|
||||
|
||||
def get_alarms(self, name=None, user=None,
|
||||
project=None, enabled=None, alarm_id=None, pagination=None):
|
||||
"""Yields a lists of alarms that match filters
|
||||
:param user: Optional ID for user that owns the resource.
|
||||
:param project: Optional ID for project that owns the resource.
|
||||
:param enabled: Optional boolean to list disable alarm.
|
||||
:param alarm_id: Optional alarm_id to return one alarm.
|
||||
:param metaquery: Optional dict with metadata to match on.
|
||||
:param resource: Optional resource filter.
|
||||
:param pagination: Optional pagination query.
|
||||
"""
|
||||
|
||||
if pagination:
|
||||
raise NotImplementedError(_('Pagination not implemented'))
|
||||
|
||||
q = {}
|
||||
if user is not None:
|
||||
q['user_id'] = user
|
||||
if project is not None:
|
||||
q['project_id'] = project
|
||||
if name is not None:
|
||||
q['name'] = name
|
||||
if enabled is not None:
|
||||
q['enabled'] = enabled
|
||||
if alarm_id is not None:
|
||||
q['alarm_id'] = alarm_id
|
||||
|
||||
for alarm in self.db.alarm.find(q):
|
||||
a = {}
|
||||
a.update(alarm)
|
||||
del a['_id']
|
||||
self._ensure_encapsulated_rule_format(a)
|
||||
yield models.Alarm(**a)
|
||||
|
@ -249,20 +249,6 @@ class Connection(pymongo_base.Connection):
|
||||
return merge;
|
||||
}""")
|
||||
|
||||
operators = {"<": "$lt",
|
||||
">": "$gt",
|
||||
"<=": "$lte",
|
||||
"=<": "$lte",
|
||||
">=": "$gte",
|
||||
"=>": "$gte",
|
||||
"!=": "$ne",
|
||||
"in": "$in"}
|
||||
complex_operators = {"or": "$or",
|
||||
"and": "$and"}
|
||||
|
||||
ordering_functions = {"asc": pymongo.ASCENDING,
|
||||
"desc": pymongo.DESCENDING}
|
||||
|
||||
def __init__(self, conf):
|
||||
url = conf.database.connection
|
||||
|
||||
@ -596,99 +582,6 @@ class Connection(pymongo_base.Connection):
|
||||
finally:
|
||||
self.db[out].drop()
|
||||
|
||||
def _retrieve_samples(self, query, orderby, limit):
|
||||
if limit is not None:
|
||||
samples = self.db.meter.find(query,
|
||||
limit=limit,
|
||||
sort=orderby)
|
||||
else:
|
||||
samples = self.db.meter.find(query,
|
||||
sort=orderby)
|
||||
|
||||
for s in samples:
|
||||
# Remove the ObjectId generated by the database when
|
||||
# the sample was inserted. It is an implementation
|
||||
# detail that should not leak outside of the driver.
|
||||
del s['_id']
|
||||
# Backward compatibility for samples without units
|
||||
s['counter_unit'] = s.get('counter_unit', '')
|
||||
# Tolerate absence of recorded_at in older datapoints
|
||||
s['recorded_at'] = s.get('recorded_at')
|
||||
yield models.Sample(**s)
|
||||
|
||||
def get_samples(self, sample_filter, limit=None):
|
||||
"""Return an iterable of model.Sample instances.
|
||||
|
||||
:param sample_filter: Filter.
|
||||
:param limit: Maximum number of results to return.
|
||||
"""
|
||||
if limit == 0:
|
||||
return []
|
||||
q = pymongo_base.make_query_from_filter(sample_filter,
|
||||
require_meter=False)
|
||||
|
||||
return self._retrieve_samples(q,
|
||||
[("timestamp", pymongo.DESCENDING)],
|
||||
limit)
|
||||
|
||||
def _retrieve_data(self, filter_expr, orderby, limit, model):
|
||||
if limit == 0:
|
||||
return []
|
||||
query_filter = {}
|
||||
orderby_filter = [("timestamp", pymongo.DESCENDING)]
|
||||
if orderby is not None:
|
||||
orderby_filter = self._transform_orderby(orderby)
|
||||
if filter_expr is not None:
|
||||
query_filter = self._transform_filter(
|
||||
filter_expr)
|
||||
|
||||
retrieve = {models.Meter: self._retrieve_samples,
|
||||
models.Alarm: self._retrieve_alarms,
|
||||
models.AlarmChange: self._retrieve_alarm_changes}
|
||||
return retrieve[model](query_filter, orderby_filter, limit)
|
||||
|
||||
def query_samples(self, filter_expr=None, orderby=None, limit=None):
|
||||
return self._retrieve_data(filter_expr, orderby, limit, models.Meter)
|
||||
|
||||
def _transform_orderby(self, orderby):
|
||||
orderby_filter = []
|
||||
|
||||
for field in orderby:
|
||||
field_name = field.keys()[0]
|
||||
ordering = self.ordering_functions[field.values()[0]]
|
||||
orderby_filter.append((field_name, ordering))
|
||||
return orderby_filter
|
||||
|
||||
def _transform_filter(self, condition):
|
||||
|
||||
def process_json_tree(condition_tree):
|
||||
operator_node = condition_tree.keys()[0]
|
||||
nodes = condition_tree.values()[0]
|
||||
|
||||
if operator_node in self.complex_operators:
|
||||
element_list = []
|
||||
for node in nodes:
|
||||
element = process_json_tree(node)
|
||||
element_list.append(element)
|
||||
complex_operator = self.complex_operators[operator_node]
|
||||
op = {complex_operator: element_list}
|
||||
return op
|
||||
else:
|
||||
field_name = nodes.keys()[0]
|
||||
field_value = nodes.values()[0]
|
||||
# no operator for equal in Mongo
|
||||
if operator_node == "=":
|
||||
op = {field_name: field_value}
|
||||
return op
|
||||
if operator_node in self.operators:
|
||||
operator = self.operators[operator_node]
|
||||
op = {
|
||||
field_name: {
|
||||
operator: field_value}}
|
||||
return op
|
||||
|
||||
return process_json_tree(condition)
|
||||
|
||||
def get_meter_statistics(self, sample_filter, period=None, groupby=None):
|
||||
"""Return an iterable of models.Statistics instance containing meter
|
||||
statistics described by the query parameters.
|
||||
@ -739,64 +632,6 @@ class Connection(pymongo_base.Connection):
|
||||
(models.Statistics(**(r['value'])) for r in results['results']),
|
||||
key=operator.attrgetter('period_start'))
|
||||
|
||||
def _retrieve_alarms(self, query_filter, orderby, limit):
|
||||
if limit is not None:
|
||||
alarms = self.db.alarm.find(query_filter,
|
||||
limit=limit,
|
||||
sort=orderby)
|
||||
else:
|
||||
alarms = self.db.alarm.find(
|
||||
query_filter, sort=orderby)
|
||||
|
||||
for alarm in alarms:
|
||||
a = {}
|
||||
a.update(alarm)
|
||||
del a['_id']
|
||||
self._ensure_encapsulated_rule_format(a)
|
||||
yield models.Alarm(**a)
|
||||
|
||||
def get_alarms(self, name=None, user=None,
|
||||
project=None, enabled=None, alarm_id=None, pagination=None):
|
||||
"""Yields a lists of alarms that match filters
|
||||
:param name: The Alarm name.
|
||||
:param user: Optional ID for user that owns the resource.
|
||||
:param project: Optional ID for project that owns the resource.
|
||||
:param enabled: Optional boolean to list disable alarm.
|
||||
:param alarm_id: Optional alarm_id to return one alarm.
|
||||
:param pagination: Optional pagination query.
|
||||
"""
|
||||
if pagination:
|
||||
raise NotImplementedError(_('Pagination not implemented'))
|
||||
|
||||
q = {}
|
||||
if user is not None:
|
||||
q['user_id'] = user
|
||||
if project is not None:
|
||||
q['project_id'] = project
|
||||
if name is not None:
|
||||
q['name'] = name
|
||||
if enabled is not None:
|
||||
q['enabled'] = enabled
|
||||
if alarm_id is not None:
|
||||
q['alarm_id'] = alarm_id
|
||||
|
||||
return self._retrieve_alarms(q, [], None)
|
||||
|
||||
def _retrieve_alarm_changes(self, query_filter, orderby, limit):
|
||||
if limit is not None:
|
||||
alarms_history = self.db.alarm_history.find(query_filter,
|
||||
limit=limit,
|
||||
sort=orderby)
|
||||
else:
|
||||
alarms_history = self.db.alarm_history.find(
|
||||
query_filter, sort=orderby)
|
||||
|
||||
for alarm_history in alarms_history:
|
||||
ah = {}
|
||||
ah.update(alarm_history)
|
||||
del ah['_id']
|
||||
yield models.AlarmChange(**ah)
|
||||
|
||||
def get_alarm_changes(self, alarm_id, on_behalf_of,
|
||||
user=None, project=None, type=None,
|
||||
start_timestamp=None, start_timestamp_op=None,
|
||||
@ -851,11 +686,6 @@ class Connection(pymongo_base.Connection):
|
||||
"""
|
||||
self.db.alarm_history.insert(alarm_change)
|
||||
|
||||
def query_alarms(self, filter_expr=None, orderby=None, limit=None):
|
||||
"""Return an iterable of model.Alarm objects.
|
||||
"""
|
||||
return self._retrieve_data(filter_expr, orderby, limit, models.Alarm)
|
||||
|
||||
def query_alarm_history(self, filter_expr=None, orderby=None, limit=None):
|
||||
"""Return an iterable of model.AlarmChange objects.
|
||||
"""
|
||||
|
@ -130,6 +130,21 @@ class Connection(base.Connection):
|
||||
"""Base Connection class for MongoDB and DB2 drivers.
|
||||
"""
|
||||
|
||||
operators = {"<": "$lt",
|
||||
">": "$gt",
|
||||
"<=": "$lte",
|
||||
"=<": "$lte",
|
||||
">=": "$gte",
|
||||
"=>": "$gte",
|
||||
"!=": "$ne",
|
||||
"in": "$in"}
|
||||
|
||||
complex_operators = {"or": "$or",
|
||||
"and": "$and"}
|
||||
|
||||
ordering_functions = {"asc": pymongo.ASCENDING,
|
||||
"desc": pymongo.DESCENDING}
|
||||
|
||||
def get_users(self, source=None):
|
||||
"""Return an iterable of user id strings.
|
||||
|
||||
@ -218,6 +233,160 @@ class Connection(base.Connection):
|
||||
"""
|
||||
self.db.alarm.remove({'alarm_id': alarm_id})
|
||||
|
||||
def get_samples(self, sample_filter, limit=None):
|
||||
"""Return an iterable of model.Sample instances.
|
||||
|
||||
:param sample_filter: Filter.
|
||||
:param limit: Maximum number of results to return.
|
||||
"""
|
||||
if limit == 0:
|
||||
return []
|
||||
q = make_query_from_filter(sample_filter,
|
||||
require_meter=False)
|
||||
|
||||
return self._retrieve_samples(q,
|
||||
[("timestamp", pymongo.DESCENDING)],
|
||||
limit)
|
||||
|
||||
def get_alarms(self, name=None, user=None,
|
||||
project=None, enabled=None, alarm_id=None, pagination=None):
|
||||
"""Yields a lists of alarms that match filters
|
||||
:param name: The Alarm name.
|
||||
:param user: Optional ID for user that owns the resource.
|
||||
:param project: Optional ID for project that owns the resource.
|
||||
:param enabled: Optional boolean to list disable alarm.
|
||||
:param alarm_id: Optional alarm_id to return one alarm.
|
||||
:param pagination: Optional pagination query.
|
||||
"""
|
||||
if pagination:
|
||||
raise NotImplementedError(_('Pagination not implemented'))
|
||||
|
||||
q = {}
|
||||
if user is not None:
|
||||
q['user_id'] = user
|
||||
if project is not None:
|
||||
q['project_id'] = project
|
||||
if name is not None:
|
||||
q['name'] = name
|
||||
if enabled is not None:
|
||||
q['enabled'] = enabled
|
||||
if alarm_id is not None:
|
||||
q['alarm_id'] = alarm_id
|
||||
|
||||
return self._retrieve_alarms(q, [], None)
|
||||
|
||||
def query_samples(self, filter_expr=None, orderby=None, limit=None):
|
||||
return self._retrieve_data(filter_expr, orderby, limit, models.Meter)
|
||||
|
||||
def query_alarms(self, filter_expr=None, orderby=None, limit=None):
|
||||
"""Return an iterable of model.Alarm objects.
|
||||
"""
|
||||
return self._retrieve_data(filter_expr, orderby, limit, models.Alarm)
|
||||
|
||||
def _retrieve_data(self, filter_expr, orderby, limit, model):
|
||||
if limit == 0:
|
||||
return []
|
||||
query_filter = {}
|
||||
orderby_filter = [("timestamp", pymongo.DESCENDING)]
|
||||
if orderby is not None:
|
||||
orderby_filter = self._transform_orderby(orderby)
|
||||
if filter_expr is not None:
|
||||
query_filter = self._transform_filter(filter_expr)
|
||||
|
||||
retrieve = {models.Meter: self._retrieve_samples,
|
||||
models.Alarm: self._retrieve_alarms,
|
||||
models.AlarmChange: self._retrieve_alarm_changes}
|
||||
return retrieve[model](query_filter, orderby_filter, limit)
|
||||
|
||||
def _retrieve_samples(self, query, orderby, limit):
|
||||
if limit is not None:
|
||||
samples = self.db.meter.find(query,
|
||||
limit=limit,
|
||||
sort=orderby)
|
||||
else:
|
||||
samples = self.db.meter.find(query,
|
||||
sort=orderby)
|
||||
|
||||
for s in samples:
|
||||
# Remove the ObjectId generated by the database when
|
||||
# the sample was inserted. It is an implementation
|
||||
# detail that should not leak outside of the driver.
|
||||
del s['_id']
|
||||
# Backward compatibility for samples without units
|
||||
s['counter_unit'] = s.get('counter_unit', '')
|
||||
# Tolerate absence of recorded_at in older datapoints
|
||||
s['recorded_at'] = s.get('recorded_at')
|
||||
yield models.Sample(**s)
|
||||
|
||||
def _retrieve_alarms(self, query_filter, orderby, limit):
|
||||
if limit is not None:
|
||||
alarms = self.db.alarm.find(query_filter,
|
||||
limit=limit,
|
||||
sort=orderby)
|
||||
else:
|
||||
alarms = self.db.alarm.find(query_filter, sort=orderby)
|
||||
|
||||
for alarm in alarms:
|
||||
a = {}
|
||||
a.update(alarm)
|
||||
del a['_id']
|
||||
self._ensure_encapsulated_rule_format(a)
|
||||
yield models.Alarm(**a)
|
||||
|
||||
def _retrieve_alarm_changes(self, query_filter, orderby, limit):
|
||||
if limit is not None:
|
||||
alarms_history = self.db.alarm_history.find(query_filter,
|
||||
limit=limit,
|
||||
sort=orderby)
|
||||
else:
|
||||
alarms_history = self.db.alarm_history.find(
|
||||
query_filter, sort=orderby)
|
||||
|
||||
for alarm_history in alarms_history:
|
||||
ah = {}
|
||||
ah.update(alarm_history)
|
||||
del ah['_id']
|
||||
yield models.AlarmChange(**ah)
|
||||
|
||||
def _transform_orderby(self, orderby):
|
||||
orderby_filter = []
|
||||
|
||||
for field in orderby:
|
||||
field_name = field.keys()[0]
|
||||
ordering = self.ordering_functions[field.values()[0]]
|
||||
orderby_filter.append((field_name, ordering))
|
||||
return orderby_filter
|
||||
|
||||
def _transform_filter(self, condition):
|
||||
|
||||
def process_json_tree(condition_tree):
|
||||
operator_node = condition_tree.keys()[0]
|
||||
nodes = condition_tree.values()[0]
|
||||
|
||||
if operator_node in self.complex_operators:
|
||||
element_list = []
|
||||
for node in nodes:
|
||||
element = process_json_tree(node)
|
||||
element_list.append(element)
|
||||
complex_operator = self.complex_operators[operator_node]
|
||||
op = {complex_operator: element_list}
|
||||
return op
|
||||
else:
|
||||
field_name = nodes.keys()[0]
|
||||
field_value = nodes.values()[0]
|
||||
# no operator for equal in Mongo
|
||||
if operator_node == "=":
|
||||
op = {field_name: field_value}
|
||||
return op
|
||||
if operator_node in self.operators:
|
||||
operator = self.operators[operator_node]
|
||||
op = {
|
||||
field_name: {
|
||||
operator: field_value}}
|
||||
return op
|
||||
|
||||
return process_json_tree(condition)
|
||||
|
||||
@classmethod
|
||||
def _ensure_encapsulated_rule_format(cls, alarm):
|
||||
"""This ensure the alarm returned by the storage have the correct
|
||||
|
Loading…
x
Reference in New Issue
Block a user