8a589eb927
There's a lot of driver that are not implementing everything, and as time goes by, this is getting and going to be more and more common. Not all drivers are suited to do everything, so implementing every methods as abstract makes a lot of NotImplementedError raising redundant. This patch makes stop using that and remove a lot of useless copy pasted code. Change-Id: I9e48c4f0484033061a3c3461d761e4cea4b8e75e
961 lines
35 KiB
Python
961 lines
35 KiB
Python
# -*- encoding: utf-8 -*-
|
|
#
|
|
# Copyright © 2012 New Dream Network, LLC (DreamHost)
|
|
# Copyright © 2013 eNovance
|
|
#
|
|
# Author: Doug Hellmann <doug.hellmann@dreamhost.com>
|
|
# Julien Danjou <julien@danjou.info>
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
"""MongoDB storage backend
|
|
"""
|
|
|
|
import calendar
|
|
import copy
|
|
import json
|
|
import operator
|
|
import weakref
|
|
|
|
import bson.code
|
|
import bson.objectid
|
|
import pymongo
|
|
|
|
from oslo.config import cfg
|
|
|
|
from ceilometer.openstack.common.gettextutils import _ # noqa
|
|
from ceilometer.openstack.common import log
|
|
from ceilometer import storage
|
|
from ceilometer.storage import base
|
|
from ceilometer.storage import models
|
|
|
|
cfg.CONF.import_opt('time_to_live', 'ceilometer.storage',
|
|
group="database")
|
|
|
|
LOG = log.getLogger(__name__)
|
|
|
|
|
|
class MongoDBStorage(base.StorageEngine):
|
|
"""Put the data into a MongoDB database
|
|
|
|
Collections::
|
|
|
|
- user
|
|
- { _id: user id
|
|
source: [ array of source ids reporting for the user ]
|
|
}
|
|
- project
|
|
- { _id: project id
|
|
source: [ array of source ids reporting for the project ]
|
|
}
|
|
- meter
|
|
- the raw incoming data
|
|
- resource
|
|
- the metadata for resources
|
|
- { _id: uuid of resource,
|
|
metadata: metadata dictionaries
|
|
user_id: uuid
|
|
project_id: uuid
|
|
meter: [ array of {counter_name: string, counter_type: string,
|
|
counter_unit: string} ]
|
|
}
|
|
"""
|
|
|
|
def get_connection(self, conf):
|
|
"""Return a Connection instance based on the configuration settings.
|
|
"""
|
|
return Connection(conf)
|
|
|
|
|
|
def make_timestamp_range(start, end,
|
|
start_timestamp_op=None, end_timestamp_op=None):
|
|
"""Given two possible datetimes and their operations, create the query
|
|
document to find timestamps within that range.
|
|
By default, using $gte for the lower bound and $lt for the
|
|
upper bound.
|
|
"""
|
|
ts_range = {}
|
|
|
|
if start:
|
|
if start_timestamp_op == 'gt':
|
|
start_timestamp_op = '$gt'
|
|
else:
|
|
start_timestamp_op = '$gte'
|
|
ts_range[start_timestamp_op] = start
|
|
|
|
if end:
|
|
if end_timestamp_op == 'le':
|
|
end_timestamp_op = '$lte'
|
|
else:
|
|
end_timestamp_op = '$lt'
|
|
ts_range[end_timestamp_op] = end
|
|
return ts_range
|
|
|
|
|
|
def make_query_from_filter(sample_filter, require_meter=True):
|
|
"""Return a query dictionary based on the settings in the filter.
|
|
|
|
:param filter: SampleFilter instance
|
|
:param require_meter: If true and the filter does not have a meter,
|
|
raise an error.
|
|
"""
|
|
q = {}
|
|
|
|
if sample_filter.user:
|
|
q['user_id'] = sample_filter.user
|
|
if sample_filter.project:
|
|
q['project_id'] = sample_filter.project
|
|
|
|
if sample_filter.meter:
|
|
q['counter_name'] = sample_filter.meter
|
|
elif require_meter:
|
|
raise RuntimeError('Missing required meter specifier')
|
|
|
|
ts_range = make_timestamp_range(sample_filter.start, sample_filter.end,
|
|
sample_filter.start_timestamp_op,
|
|
sample_filter.end_timestamp_op)
|
|
if ts_range:
|
|
q['timestamp'] = ts_range
|
|
|
|
if sample_filter.resource:
|
|
q['resource_id'] = sample_filter.resource
|
|
if sample_filter.source:
|
|
q['source'] = sample_filter.source
|
|
|
|
# so the samples call metadata resource_metadata, so we convert
|
|
# to that.
|
|
q.update(dict(('resource_%s' % k, v)
|
|
for (k, v) in sample_filter.metaquery.iteritems()))
|
|
return q
|
|
|
|
|
|
class ConnectionPool(object):
|
|
|
|
def __init__(self):
|
|
self._pool = {}
|
|
|
|
def connect(self, url):
|
|
connection_options = pymongo.uri_parser.parse_uri(url)
|
|
del connection_options['database']
|
|
del connection_options['username']
|
|
del connection_options['password']
|
|
del connection_options['collection']
|
|
pool_key = tuple(connection_options)
|
|
|
|
if pool_key in self._pool:
|
|
client = self._pool.get(pool_key)()
|
|
if client:
|
|
return client
|
|
LOG.info(_('Connecting to MongoDB on %s'),
|
|
connection_options['nodelist'])
|
|
client = pymongo.MongoClient(
|
|
url,
|
|
safe=True)
|
|
self._pool[pool_key] = weakref.ref(client)
|
|
return client
|
|
|
|
|
|
class Connection(base.Connection):
|
|
"""MongoDB connection.
|
|
"""
|
|
|
|
CONNECTION_POOL = ConnectionPool()
|
|
|
|
REDUCE_GROUP_CLEAN = bson.code.Code("""
|
|
function ( curr, result ) {
|
|
if (result.resources.indexOf(curr.resource_id) < 0)
|
|
result.resources.push(curr.resource_id);
|
|
if (result.users.indexOf(curr.user_id) < 0)
|
|
result.users.push(curr.user_id);
|
|
if (result.projects.indexOf(curr.project_id) < 0)
|
|
result.projects.push(curr.project_id);
|
|
}
|
|
""")
|
|
|
|
EMIT_STATS_COMMON = """
|
|
emit(%(key_val)s, { unit: this.counter_unit,
|
|
min : this.counter_volume,
|
|
max : this.counter_volume,
|
|
sum : this.counter_volume,
|
|
count : NumberInt(1),
|
|
groupby : %(groupby_val)s,
|
|
duration_start : this.timestamp,
|
|
duration_end : this.timestamp,
|
|
period_start : %(period_start_val)s,
|
|
period_end : %(period_end_val)s} )
|
|
"""
|
|
|
|
MAP_STATS_PERIOD_VAR = """
|
|
var period = %(period)d * 1000;
|
|
var period_first = %(period_first)d * 1000;
|
|
var period_start = period_first
|
|
+ (Math.floor(new Date(this.timestamp.getTime()
|
|
- period_first) / period)
|
|
* period);
|
|
"""
|
|
|
|
MAP_STATS_GROUPBY_VAR = """
|
|
var groupby_fields = %(groupby_fields)s;
|
|
var groupby = {};
|
|
var groupby_key = {};
|
|
|
|
for ( var i=0; i<groupby_fields.length; i++ ) {
|
|
groupby[groupby_fields[i]] = this[groupby_fields[i]]
|
|
groupby_key[groupby_fields[i]] = this[groupby_fields[i]]
|
|
}
|
|
"""
|
|
|
|
PARAMS_MAP_STATS = {'key_val': '\'statistics\'',
|
|
'groupby_val': 'null',
|
|
'period_start_val': 'this.timestamp',
|
|
'period_end_val': 'this.timestamp'}
|
|
|
|
MAP_STATS = bson.code.Code("function () {" +
|
|
EMIT_STATS_COMMON % PARAMS_MAP_STATS +
|
|
"}")
|
|
|
|
PARAMS_MAP_STATS_PERIOD = {
|
|
'key_val': 'period_start',
|
|
'groupby_val': 'null',
|
|
'period_start_val': 'new Date(period_start)',
|
|
'period_end_val': 'new Date(period_start + period)'
|
|
}
|
|
|
|
MAP_STATS_PERIOD = bson.code.Code(
|
|
"function () {" +
|
|
MAP_STATS_PERIOD_VAR +
|
|
EMIT_STATS_COMMON % PARAMS_MAP_STATS_PERIOD +
|
|
"}")
|
|
|
|
PARAMS_MAP_STATS_GROUPBY = {'key_val': 'groupby_key',
|
|
'groupby_val': 'groupby',
|
|
'period_start_val': 'this.timestamp',
|
|
'period_end_val': 'this.timestamp'}
|
|
|
|
MAP_STATS_GROUPBY = bson.code.Code(
|
|
"function () {" +
|
|
MAP_STATS_GROUPBY_VAR +
|
|
EMIT_STATS_COMMON % PARAMS_MAP_STATS_GROUPBY +
|
|
"}")
|
|
|
|
PARAMS_MAP_STATS_PERIOD_GROUPBY = {
|
|
'key_val': 'groupby_key',
|
|
'groupby_val': 'groupby',
|
|
'period_start_val': 'new Date(period_start)',
|
|
'period_end_val': 'new Date(period_start + period)'
|
|
}
|
|
|
|
MAP_STATS_PERIOD_GROUPBY = bson.code.Code(
|
|
"function () {" +
|
|
MAP_STATS_PERIOD_VAR +
|
|
MAP_STATS_GROUPBY_VAR +
|
|
" groupby_key['period_start'] = period_start\n" +
|
|
EMIT_STATS_COMMON % PARAMS_MAP_STATS_PERIOD_GROUPBY +
|
|
"}")
|
|
|
|
REDUCE_STATS = bson.code.Code("""
|
|
function (key, values) {
|
|
var res = { unit: values[0].unit,
|
|
min: values[0].min,
|
|
max: values[0].max,
|
|
count: values[0].count,
|
|
sum: values[0].sum,
|
|
groupby: values[0].groupby,
|
|
period_start: values[0].period_start,
|
|
period_end: values[0].period_end,
|
|
duration_start: values[0].duration_start,
|
|
duration_end: values[0].duration_end };
|
|
for ( var i=1; i<values.length; i++ ) {
|
|
if ( values[i].min < res.min )
|
|
res.min = values[i].min;
|
|
if ( values[i].max > res.max )
|
|
res.max = values[i].max;
|
|
res.count = NumberInt(res.count + values[i].count);
|
|
res.sum += values[i].sum;
|
|
if ( values[i].duration_start < res.duration_start )
|
|
res.duration_start = values[i].duration_start;
|
|
if ( values[i].duration_end > res.duration_end )
|
|
res.duration_end = values[i].duration_end;
|
|
}
|
|
return res;
|
|
}
|
|
""")
|
|
|
|
FINALIZE_STATS = bson.code.Code("""
|
|
function (key, value) {
|
|
value.avg = value.sum / value.count;
|
|
value.duration = (value.duration_end - value.duration_start) / 1000;
|
|
value.period = NumberInt((value.period_end - value.period_start)
|
|
/ 1000);
|
|
return value;
|
|
}""")
|
|
|
|
SORT_OPERATION_MAPPING = {'desc': (pymongo.DESCENDING, '$lt'),
|
|
'asc': (pymongo.ASCENDING, '$gt')}
|
|
|
|
def __init__(self, conf):
|
|
url = conf.database.connection
|
|
|
|
# NOTE(jd) Use our own connection pooling on top of the Pymongo one.
|
|
# We need that otherwise we overflow the MongoDB instance with new
|
|
# connection since we instanciate a Pymongo client each time someone
|
|
# requires a new storage connection.
|
|
self.conn = self.CONNECTION_POOL.connect(url)
|
|
|
|
# Require MongoDB 2.2 to use aggregate() and TTL
|
|
if self.conn.server_info()['versionArray'] < [2, 2]:
|
|
raise storage.StorageBadVersion("Need at least MongoDB 2.2")
|
|
|
|
connection_options = pymongo.uri_parser.parse_uri(url)
|
|
self.db = getattr(self.conn, connection_options['database'])
|
|
if connection_options.get('username'):
|
|
self.db.authenticate(connection_options['username'],
|
|
connection_options['password'])
|
|
|
|
# NOTE(jd) Upgrading is just about creating index, so let's do this
|
|
# on connection to be sure at least the TTL is correcly updated if
|
|
# needed.
|
|
self.upgrade()
|
|
|
|
def upgrade(self):
|
|
# Establish indexes
|
|
#
|
|
# We need variations for user_id vs. project_id because of the
|
|
# way the indexes are stored in b-trees. The user_id and
|
|
# project_id values are usually mutually exclusive in the
|
|
# queries, so the database won't take advantage of an index
|
|
# including both.
|
|
for primary in ['user_id', 'project_id']:
|
|
self.db.resource.ensure_index([
|
|
(primary, pymongo.ASCENDING),
|
|
('source', pymongo.ASCENDING),
|
|
], name='resource_idx')
|
|
self.db.meter.ensure_index([
|
|
('resource_id', pymongo.ASCENDING),
|
|
(primary, pymongo.ASCENDING),
|
|
('counter_name', pymongo.ASCENDING),
|
|
('timestamp', pymongo.ASCENDING),
|
|
('source', pymongo.ASCENDING),
|
|
], name='meter_idx')
|
|
self.db.meter.ensure_index([('timestamp', pymongo.DESCENDING)],
|
|
name='timestamp_idx')
|
|
|
|
indexes = self.db.meter.index_information()
|
|
|
|
ttl = cfg.CONF.database.time_to_live
|
|
|
|
if ttl <= 0:
|
|
if 'meter_ttl' in indexes:
|
|
self.db.meter.drop_index('meter_ttl')
|
|
return
|
|
|
|
if 'meter_ttl' in indexes:
|
|
# NOTE(sileht): manually check expireAfterSeconds because
|
|
# ensure_index doesn't update index options if the index already
|
|
# exists
|
|
if ttl == indexes['meter_ttl'].get('expireAfterSeconds', -1):
|
|
return
|
|
|
|
self.db.meter.drop_index('meter_ttl')
|
|
|
|
self.db.meter.create_index(
|
|
[('timestamp', pymongo.ASCENDING)],
|
|
expireAfterSeconds=ttl,
|
|
name='meter_ttl'
|
|
)
|
|
|
|
def clear(self):
|
|
self.conn.drop_database(self.db)
|
|
# Connection will be reopened automatically if needed
|
|
self.conn.close()
|
|
|
|
def record_metering_data(self, data):
|
|
"""Write the data to the backend storage system.
|
|
|
|
:param data: a dictionary such as returned by
|
|
ceilometer.meter.meter_message_from_counter
|
|
"""
|
|
# Make sure we know about the user and project
|
|
self.db.user.update(
|
|
{'_id': data['user_id']},
|
|
{'$addToSet': {'source': data['source'],
|
|
},
|
|
},
|
|
upsert=True,
|
|
)
|
|
self.db.project.update(
|
|
{'_id': data['project_id']},
|
|
{'$addToSet': {'source': data['source'],
|
|
},
|
|
},
|
|
upsert=True,
|
|
)
|
|
|
|
# Record the updated resource metadata
|
|
self.db.resource.update(
|
|
{'_id': data['resource_id']},
|
|
{'$set': {'project_id': data['project_id'],
|
|
'user_id': data['user_id'],
|
|
'metadata': data['resource_metadata'],
|
|
'source': data['source'],
|
|
},
|
|
'$addToSet': {'meter': {'counter_name': data['counter_name'],
|
|
'counter_type': data['counter_type'],
|
|
'counter_unit': data['counter_unit'],
|
|
},
|
|
},
|
|
},
|
|
upsert=True,
|
|
)
|
|
|
|
# Record the raw data for the meter. Use a copy so we do not
|
|
# modify a data structure owned by our caller (the driver adds
|
|
# a new key '_id').
|
|
record = copy.copy(data)
|
|
self.db.meter.insert(record)
|
|
|
|
def clear_expired_metering_data(self, ttl):
|
|
"""Clear expired data from the backend storage system according to the
|
|
time-to-live.
|
|
|
|
:param ttl: Number of seconds to keep records for.
|
|
|
|
"""
|
|
results = self.db.meter.group(
|
|
key={},
|
|
condition={},
|
|
reduce=self.REDUCE_GROUP_CLEAN,
|
|
initial={
|
|
'resources': [],
|
|
'users': [],
|
|
'projects': [],
|
|
}
|
|
)[0]
|
|
|
|
self.db.user.remove({'_id': {'$nin': results['users']}})
|
|
self.db.project.remove({'_id': {'$nin': results['projects']}})
|
|
self.db.resource.remove({'_id': {'$nin': results['resources']}})
|
|
|
|
@staticmethod
|
|
def _get_marker(db_collection, marker_pairs):
|
|
"""Return the mark document according to the attribute-value pairs.
|
|
|
|
:param db_collection: Database collection that be query.
|
|
:param maker_pairs: Attribute-value pairs filter.
|
|
"""
|
|
if db_collection is None:
|
|
return
|
|
if not marker_pairs:
|
|
return
|
|
ret = db_collection.find(marker_pairs, limit=2)
|
|
|
|
if ret.count() == 0:
|
|
raise base.NoResultFound
|
|
elif ret.count() > 1:
|
|
raise base.MultipleResultsFound
|
|
else:
|
|
_ret = ret.__getitem__(0)
|
|
return _ret
|
|
|
|
@classmethod
|
|
def _recurse_sort_keys(cls, sort_keys, marker, flag):
|
|
_first = sort_keys[0]
|
|
value = marker[_first]
|
|
if len(sort_keys) == 1:
|
|
return {_first: {flag: value}}
|
|
else:
|
|
criteria_equ = {_first: {'eq': value}}
|
|
criteria_cmp = cls._recurse_sort_keys(sort_keys[1:], marker, flag)
|
|
return dict(criteria_equ, ** criteria_cmp)
|
|
|
|
@classmethod
|
|
def _build_paginate_query(cls, marker, sort_keys=[], sort_dir='desc'):
|
|
"""Returns a query with sorting / pagination.
|
|
|
|
Pagination works by requiring sort_key and sort_dir.
|
|
We use the last item in previous page as the 'marker' for pagination.
|
|
So we return values that follow the passed marker in the order.
|
|
:param q: The query dict passed in.
|
|
:param marker: the last item of the previous page; we return the next
|
|
results after this item.
|
|
:param sort_keys: array of attributes by which results be sorted.
|
|
:param sort_dir: direction in which results be sorted (asc, desc).
|
|
:return: sort parameters, query to use
|
|
"""
|
|
all_sort = []
|
|
all_sort, _op = cls._build_sort_instructions(sort_keys, sort_dir)
|
|
|
|
if marker is not None:
|
|
sort_criteria_list = []
|
|
|
|
for i in range(0, len(sort_keys)):
|
|
sort_criteria_list.append(cls._recurse_sort_keys(
|
|
sort_keys[:(len(sort_keys) - i)],
|
|
marker, _op))
|
|
|
|
metaquery = {"$or": sort_criteria_list}
|
|
else:
|
|
metaquery = {}
|
|
|
|
return all_sort, metaquery
|
|
|
|
@classmethod
|
|
def _build_sort_instructions(cls, sort_keys=[], sort_dir='desc'):
|
|
"""Returns a sort_instruction and paging operator.
|
|
|
|
Sort instructions are used in the query to determine what attributes
|
|
to sort on and what direction to use.
|
|
:param q: The query dict passed in.
|
|
:param sort_keys: array of attributes by which results be sorted.
|
|
:param sort_dir: direction in which results be sorted (asc, desc).
|
|
:return: sort instructions and paging operator
|
|
"""
|
|
sort_instructions = []
|
|
_sort_dir, operation = cls.SORT_OPERATION_MAPPING.get(
|
|
sort_dir, cls.SORT_OPERATION_MAPPING['desc'])
|
|
|
|
for _sort_key in sort_keys:
|
|
_instruction = (_sort_key, _sort_dir)
|
|
sort_instructions.append(_instruction)
|
|
|
|
return sort_instructions, operation
|
|
|
|
@classmethod
|
|
def paginate_query(cls, q, db_collection, limit=None, marker=None,
|
|
sort_keys=[], sort_dir='desc'):
|
|
"""Returns a query result with sorting / pagination.
|
|
|
|
Pagination works by requiring sort_key and sort_dir.
|
|
We use the last item in previous page as the 'marker' for pagination.
|
|
So we return values that follow the passed marker in the order.
|
|
:param q: the query dict passed in.
|
|
:param db_collection: Database collection that be query.
|
|
:param limit: maximum number of items to return.
|
|
:param marker: the last item of the previous page; we return the next
|
|
results after this item.
|
|
:param sort_keys: array of attributes by which results be sorted.
|
|
:param sort_dir: direction in which results be sorted (asc, desc).
|
|
return: The query with sorting/pagination added.
|
|
"""
|
|
|
|
all_sort, query = cls._build_paginate_query(marker,
|
|
sort_keys,
|
|
sort_dir)
|
|
q.update(query)
|
|
|
|
#NOTE(Fengqian):MongoDB collection.find can not handle limit
|
|
#when it equals None, it will raise TypeError, so we treate
|
|
#None as 0 for the value of limit.
|
|
if limit is None:
|
|
limit = 0
|
|
return db_collection.find(q, limit=limit, sort=all_sort)
|
|
|
|
def get_users(self, source=None):
|
|
"""Return an iterable of user id strings.
|
|
|
|
:param source: Optional source filter.
|
|
"""
|
|
q = {}
|
|
if source is not None:
|
|
q['source'] = source
|
|
|
|
return (doc['_id'] for doc in
|
|
self.db.user.find(q, fields=['_id'],
|
|
sort=[('_id', pymongo.ASCENDING)]))
|
|
|
|
def get_projects(self, source=None):
|
|
"""Return an iterable of project id strings.
|
|
|
|
:param source: Optional source filter.
|
|
"""
|
|
q = {}
|
|
if source is not None:
|
|
q['source'] = source
|
|
|
|
return (doc['_id'] for doc in
|
|
self.db.project.find(q, fields=['_id'],
|
|
sort=[('_id', pymongo.ASCENDING)]))
|
|
|
|
def get_resources(self, user=None, project=None, source=None,
|
|
start_timestamp=None, start_timestamp_op=None,
|
|
end_timestamp=None, end_timestamp_op=None,
|
|
metaquery={}, resource=None, pagination=None):
|
|
"""Return an iterable of models.Resource instances
|
|
|
|
:param user: Optional ID for user that owns the resource.
|
|
:param project: Optional ID for project that owns the resource.
|
|
:param source: Optional source filter.
|
|
:param start_timestamp: Optional modified timestamp start range.
|
|
:param start_timestamp_op: Optional start time operator, like gt, ge.
|
|
:param end_timestamp: Optional modified timestamp end range.
|
|
:param end_timestamp_op: Optional end time operator, like lt, le.
|
|
:param metaquery: Optional dict with metadata to match on.
|
|
:param resource: Optional resource filter.
|
|
:param pagination: Optional pagination query.
|
|
"""
|
|
if pagination:
|
|
raise NotImplementedError(_('Pagination not implemented'))
|
|
|
|
q = {}
|
|
if user is not None:
|
|
q['user_id'] = user
|
|
if project is not None:
|
|
q['project_id'] = project
|
|
if source is not None:
|
|
q['source'] = source
|
|
if resource is not None:
|
|
q['resource_id'] = resource
|
|
# Add resource_ prefix so it matches the field in the db
|
|
q.update(dict(('resource_' + k, v)
|
|
for (k, v) in metaquery.iteritems()))
|
|
|
|
# FIXME(dhellmann): This may not perform very well,
|
|
# but doing any better will require changing the database
|
|
# schema and that will need more thought than I have time
|
|
# to put into it today.
|
|
if start_timestamp or end_timestamp:
|
|
# Look for resources matching the above criteria and with
|
|
# samples in the time range we care about, then change the
|
|
# resource query to return just those resources by id.
|
|
ts_range = make_timestamp_range(start_timestamp, end_timestamp,
|
|
start_timestamp_op,
|
|
end_timestamp_op)
|
|
if ts_range:
|
|
q['timestamp'] = ts_range
|
|
|
|
sort_keys = base._handle_sort_key('resource')
|
|
sort_instructions = self._build_sort_instructions(sort_keys)[0]
|
|
|
|
aggregate = self.db.meter.aggregate([
|
|
{"$match": q},
|
|
{"$sort": dict(sort_instructions)},
|
|
{"$group": {
|
|
"_id": "$resource_id",
|
|
"user_id": {"$first": "$user_id"},
|
|
"project_id": {"$first": "$project_id"},
|
|
"source": {"$first": "$source"},
|
|
"first_sample_timestamp": {"$min": "$timestamp"},
|
|
"last_sample_timestamp": {"$max": "$timestamp"},
|
|
"metadata": {"$first": "$resource_metadata"},
|
|
"meters_name": {"$push": "$counter_name"},
|
|
"meters_type": {"$push": "$counter_type"},
|
|
"meters_unit": {"$push": "$counter_unit"},
|
|
}},
|
|
])
|
|
|
|
for result in aggregate['result']:
|
|
yield models.Resource(
|
|
resource_id=result['_id'],
|
|
user_id=result['user_id'],
|
|
project_id=result['project_id'],
|
|
first_sample_timestamp=result['first_sample_timestamp'],
|
|
last_sample_timestamp=result['last_sample_timestamp'],
|
|
source=result['source'],
|
|
metadata=result['metadata'],
|
|
meter=[
|
|
models.ResourceMeter(
|
|
counter_name=m_n,
|
|
counter_type=m_t,
|
|
counter_unit=m_u,
|
|
)
|
|
for m_n, m_u, m_t in zip(result['meters_name'],
|
|
result['meters_unit'],
|
|
result['meters_type'])
|
|
],
|
|
)
|
|
|
|
def get_meters(self, user=None, project=None, resource=None, source=None,
|
|
metaquery={}, pagination=None):
|
|
"""Return an iterable of models.Meter instances
|
|
|
|
:param user: Optional ID for user that owns the resource.
|
|
:param project: Optional ID for project that owns the resource.
|
|
:param resource: Optional resource filter.
|
|
:param source: Optional source filter.
|
|
:param metaquery: Optional dict with metadata to match on.
|
|
:param pagination: Optional pagination query.
|
|
"""
|
|
if pagination:
|
|
raise NotImplementedError(_('Pagination not implemented'))
|
|
|
|
q = {}
|
|
if user is not None:
|
|
q['user_id'] = user
|
|
if project is not None:
|
|
q['project_id'] = project
|
|
if resource is not None:
|
|
q['_id'] = resource
|
|
if source is not None:
|
|
q['source'] = source
|
|
q.update(metaquery)
|
|
|
|
for r in self.db.resource.find(q):
|
|
for r_meter in r['meter']:
|
|
yield models.Meter(
|
|
name=r_meter['counter_name'],
|
|
type=r_meter['counter_type'],
|
|
# Return empty string if 'counter_unit' is not valid for
|
|
# backward compatibility.
|
|
unit=r_meter.get('counter_unit', ''),
|
|
resource_id=r['_id'],
|
|
project_id=r['project_id'],
|
|
source=r['source'],
|
|
user_id=r['user_id'],
|
|
)
|
|
|
|
def get_samples(self, sample_filter, limit=None):
|
|
"""Return an iterable of model.Sample instances.
|
|
|
|
:param sample_filter: Filter.
|
|
:param limit: Maximum number of results to return.
|
|
"""
|
|
if limit == 0:
|
|
return
|
|
q = make_query_from_filter(sample_filter, require_meter=False)
|
|
if limit:
|
|
samples = self.db.meter.find(
|
|
q, limit=limit, sort=[("timestamp", pymongo.DESCENDING)])
|
|
else:
|
|
samples = self.db.meter.find(
|
|
q, sort=[("timestamp", pymongo.DESCENDING)])
|
|
|
|
for s in samples:
|
|
# Remove the ObjectId generated by the database when
|
|
# the sample was inserted. It is an implementation
|
|
# detail that should not leak outside of the driver.
|
|
del s['_id']
|
|
# Backward compatibility for samples without units
|
|
s['counter_unit'] = s.get('counter_unit', '')
|
|
yield models.Sample(**s)
|
|
|
|
def get_meter_statistics(self, sample_filter, period=None, groupby=None):
|
|
"""Return an iterable of models.Statistics instance containing meter
|
|
statistics described by the query parameters.
|
|
|
|
The filter must have a meter value set.
|
|
|
|
"""
|
|
if (groupby and
|
|
set(groupby) - set(['user_id', 'project_id',
|
|
'resource_id', 'source'])):
|
|
raise NotImplementedError("Unable to group by these fields")
|
|
|
|
q = make_query_from_filter(sample_filter)
|
|
|
|
if period:
|
|
if sample_filter.start:
|
|
period_start = sample_filter.start
|
|
else:
|
|
period_start = self.db.meter.find(
|
|
limit=1, sort=[('timestamp',
|
|
pymongo.ASCENDING)])[0]['timestamp']
|
|
period_start = int(calendar.timegm(period_start.utctimetuple()))
|
|
params_period = {'period': period,
|
|
'period_first': period_start,
|
|
'groupby_fields': json.dumps(groupby)}
|
|
if groupby:
|
|
map_stats = self.MAP_STATS_PERIOD_GROUPBY % params_period
|
|
else:
|
|
map_stats = self.MAP_STATS_PERIOD % params_period
|
|
else:
|
|
if groupby:
|
|
params_groupby = {'groupby_fields': json.dumps(groupby)}
|
|
map_stats = self.MAP_STATS_GROUPBY % params_groupby
|
|
else:
|
|
map_stats = self.MAP_STATS
|
|
|
|
results = self.db.meter.map_reduce(
|
|
map_stats,
|
|
self.REDUCE_STATS,
|
|
{'inline': 1},
|
|
finalize=self.FINALIZE_STATS,
|
|
query=q,
|
|
)
|
|
|
|
# FIXME(terriyu) Fix get_meter_statistics() so we don't use sorted()
|
|
# to return the results
|
|
return sorted(
|
|
(models.Statistics(**(r['value'])) for r in results['results']),
|
|
key=operator.attrgetter('period_start'))
|
|
|
|
@staticmethod
|
|
def _decode_matching_metadata(matching_metadata):
|
|
if isinstance(matching_metadata, dict):
|
|
#note(sileht): keep compatibility with alarm
|
|
#with matching_metadata as a dict
|
|
return matching_metadata
|
|
else:
|
|
new_matching_metadata = {}
|
|
for elem in matching_metadata:
|
|
new_matching_metadata[elem['key']] = elem['value']
|
|
return new_matching_metadata
|
|
|
|
@classmethod
|
|
def _ensure_encapsulated_rule_format(cls, alarm):
|
|
"""This ensure the alarm returned by the storage have the correct
|
|
format. The previous format looks like:
|
|
{
|
|
'alarm_id': '0ld-4l3rt',
|
|
'enabled': True,
|
|
'name': 'old-alert',
|
|
'description': 'old-alert',
|
|
'timestamp': None,
|
|
'meter_name': 'cpu',
|
|
'user_id': 'me',
|
|
'project_id': 'and-da-boys',
|
|
'comparison_operator': 'lt',
|
|
'threshold': 36,
|
|
'statistic': 'count',
|
|
'evaluation_periods': 1,
|
|
'period': 60,
|
|
'state': "insufficient data",
|
|
'state_timestamp': None,
|
|
'ok_actions': [],
|
|
'alarm_actions': ['http://nowhere/alarms'],
|
|
'insufficient_data_actions': [],
|
|
'repeat_actions': False,
|
|
'matching_metadata': {'key': 'value'}
|
|
# or 'matching_metadata': [{'key': 'key', 'value': 'value'}]
|
|
}
|
|
"""
|
|
|
|
if isinstance(alarm.get('rule'), dict):
|
|
return
|
|
|
|
alarm['type'] = 'threshold'
|
|
alarm['rule'] = {}
|
|
alarm['matching_metadata'] = cls._decode_matching_metadata(
|
|
alarm['matching_metadata'])
|
|
for field in ['period', 'evaluation_periods', 'threshold',
|
|
'statistic', 'comparison_operator', 'meter_name']:
|
|
if field in alarm:
|
|
alarm['rule'][field] = alarm[field]
|
|
del alarm[field]
|
|
|
|
query = []
|
|
for key in alarm['matching_metadata']:
|
|
query.append({'field': key,
|
|
'op': 'eq',
|
|
'value': alarm['matching_metadata'][key],
|
|
'type': 'string'})
|
|
del alarm['matching_metadata']
|
|
alarm['rule']['query'] = query
|
|
|
|
def get_alarms(self, name=None, user=None,
|
|
project=None, enabled=None, alarm_id=None, pagination=None):
|
|
"""Yields a lists of alarms that match filters
|
|
:param name: The Alarm name.
|
|
:param user: Optional ID for user that owns the resource.
|
|
:param project: Optional ID for project that owns the resource.
|
|
:param enabled: Optional boolean to list disable alarm.
|
|
:param alarm_id: Optional alarm_id to return one alarm.
|
|
:param pagination: Optional pagination query.
|
|
"""
|
|
if pagination:
|
|
raise NotImplementedError(_('Pagination not implemented'))
|
|
|
|
q = {}
|
|
if user is not None:
|
|
q['user_id'] = user
|
|
if project is not None:
|
|
q['project_id'] = project
|
|
if name is not None:
|
|
q['name'] = name
|
|
if enabled is not None:
|
|
q['enabled'] = enabled
|
|
if alarm_id is not None:
|
|
q['alarm_id'] = alarm_id
|
|
|
|
for alarm in self.db.alarm.find(q):
|
|
a = {}
|
|
a.update(alarm)
|
|
del a['_id']
|
|
self._ensure_encapsulated_rule_format(a)
|
|
yield models.Alarm(**a)
|
|
|
|
def update_alarm(self, alarm):
|
|
"""update alarm
|
|
"""
|
|
data = alarm.as_dict()
|
|
|
|
self.db.alarm.update(
|
|
{'alarm_id': alarm.alarm_id},
|
|
{'$set': data},
|
|
upsert=True)
|
|
|
|
stored_alarm = self.db.alarm.find({'alarm_id': alarm.alarm_id})[0]
|
|
del stored_alarm['_id']
|
|
self._ensure_encapsulated_rule_format(stored_alarm)
|
|
return models.Alarm(**stored_alarm)
|
|
|
|
create_alarm = update_alarm
|
|
|
|
def delete_alarm(self, alarm_id):
|
|
"""Delete a alarm
|
|
"""
|
|
self.db.alarm.remove({'alarm_id': alarm_id})
|
|
|
|
def get_alarm_changes(self, alarm_id, on_behalf_of,
|
|
user=None, project=None, type=None,
|
|
start_timestamp=None, start_timestamp_op=None,
|
|
end_timestamp=None, end_timestamp_op=None):
|
|
"""Yields list of AlarmChanges describing alarm history
|
|
|
|
Changes are always sorted in reverse order of occurence, given
|
|
the importance of currency.
|
|
|
|
Segregation for non-administrative users is done on the basis
|
|
of the on_behalf_of parameter. This allows such users to have
|
|
visibility on both the changes initiated by themselves directly
|
|
(generally creation, rule changes, or deletion) and also on those
|
|
changes initiated on their behalf by the alarming service (state
|
|
transitions after alarm thresholds are crossed).
|
|
|
|
:param alarm_id: ID of alarm to return changes for
|
|
:param on_behalf_of: ID of tenant to scope changes query (None for
|
|
administrative user, indicating all projects)
|
|
:param user: Optional ID of user to return changes for
|
|
:param project: Optional ID of project to return changes for
|
|
:project type: Optional change type
|
|
:param start_timestamp: Optional modified timestamp start range
|
|
:param start_timestamp_op: Optional timestamp start range operation
|
|
:param end_timestamp: Optional modified timestamp end range
|
|
:param end_timestamp_op: Optional timestamp end range operation
|
|
"""
|
|
q = dict(alarm_id=alarm_id)
|
|
if on_behalf_of is not None:
|
|
q['on_behalf_of'] = on_behalf_of
|
|
if user is not None:
|
|
q['user_id'] = user
|
|
if project is not None:
|
|
q['project_id'] = project
|
|
if type is not None:
|
|
q['type'] = type
|
|
if start_timestamp or end_timestamp:
|
|
ts_range = make_timestamp_range(start_timestamp, end_timestamp,
|
|
start_timestamp_op,
|
|
end_timestamp_op)
|
|
if ts_range:
|
|
q['timestamp'] = ts_range
|
|
|
|
sort = [("timestamp", pymongo.DESCENDING)]
|
|
for alarm_change in self.db.alarm_history.find(q, sort=sort):
|
|
ac = {}
|
|
ac.update(alarm_change)
|
|
del ac['_id']
|
|
yield models.AlarmChange(**ac)
|
|
|
|
def record_alarm_change(self, alarm_change):
|
|
"""Record alarm change event.
|
|
"""
|
|
self.db.alarm_history.insert(alarm_change)
|