Convert storage drivers to return models

Update the storage drivers to return instances of models classes
instead of dictionaries to ensure that they always return the same
values.

The get_volume_sum(), get_volume_max(), and get_event_interval()
methods are not modified because there will be another changeset
later replacing the use of those methods with calls to
get_meter_statistics().

blueprint storage-api-models

Change-Id: I79da8cc69f70a4f4c24aa13e6a5d2982011eee88
Signed-off-by: Doug Hellmann <doug.hellmann@dreamhost.com>
This commit is contained in:
Doug Hellmann 2013-03-27 18:45:59 -04:00
parent 4cea39a7a4
commit 912235cfce
10 changed files with 547 additions and 403 deletions

View File

@ -49,7 +49,14 @@ LOG = log.getLogger(__name__)
operation_kind = wtypes.Enum(str, 'lt', 'le', 'eq', 'ne', 'ge', 'gt')
class Query(wtypes.Base):
class _Base(wtypes.Base):
@classmethod
def from_db_model(cls, m):
return cls(**(m.as_dict()))
class Query(_Base):
"""Query filter.
"""
@ -201,7 +208,7 @@ def _flatten_metadata(metadata):
return {}
class Sample(wtypes.Base):
class Sample(_Base):
"""A single measurement for a given meter and resource.
"""
@ -266,7 +273,7 @@ class Sample(wtypes.Base):
)
class Statistics(wtypes.Base):
class Statistics(_Base):
"""Computed statistics for a query.
"""
@ -374,7 +381,7 @@ class MeterController(rest.RestController):
kwargs = _query_to_kwargs(q, storage.EventFilter.__init__)
kwargs['meter'] = self._id
f = storage.EventFilter(**kwargs)
return [Sample(**e)
return [Sample.from_db_model(e)
for e in pecan.request.storage_conn.get_samples(f)
]
@ -391,6 +398,7 @@ class MeterController(rest.RestController):
kwargs['meter'] = self._id
f = storage.EventFilter(**kwargs)
computed = pecan.request.storage_conn.get_meter_statistics(f, period)
LOG.debug('computed value coming from %r', pecan.request.storage_conn)
# Find the original timestamp in the query to use for clamping
# the duration returned in the statistics.
start = end = None
@ -402,11 +410,11 @@ class MeterController(rest.RestController):
return [Statistics(start_timestamp=start,
end_timestamp=end,
**c)
**c.as_dict())
for c in computed]
class Meter(wtypes.Base):
class Meter(_Base):
"""One category of measurements.
"""
@ -454,11 +462,11 @@ class MetersController(rest.RestController):
:param q: Filter rules for the meters to be returned.
"""
kwargs = _query_to_kwargs(q, pecan.request.storage_conn.get_meters)
return [Meter(**m)
return [Meter.from_db_model(m)
for m in pecan.request.storage_conn.get_meters(**kwargs)]
class Resource(wtypes.Base):
class Resource(_Base):
"""An externally defined object for which samples have been received.
"""
@ -503,7 +511,7 @@ class ResourcesController(rest.RestController):
"""
r = list(pecan.request.storage_conn.get_resources(
resource=resource_id))[0]
return Resource(**r)
return Resource.from_db_model(r)
@wsme_pecan.wsexpose([Resource], [Query])
def get_all(self, q=[]):
@ -513,7 +521,7 @@ class ResourcesController(rest.RestController):
"""
kwargs = _query_to_kwargs(q, pecan.request.storage_conn.get_resources)
resources = [
Resource(**r)
Resource.from_db_model(r)
for r in pecan.request.storage_conn.get_resources(**kwargs)]
return resources

View File

@ -138,7 +138,7 @@ def list_meters_all():
meters = rq.storage_conn.get_meters(
project=acl.get_limited_to_project(rq.headers),
metaquery=_get_metaquery(rq.args))
return flask.jsonify(meters=list(meters))
return flask.jsonify(meters=[m.as_dict() for m in meters])
@blueprint.route('/resources/<resource>/meters')
@ -154,7 +154,7 @@ def list_meters_by_resource(resource):
resource=resource,
project=acl.get_limited_to_project(rq.headers),
metaquery=_get_metaquery(rq.args))
return flask.jsonify(meters=list(meters))
return flask.jsonify(meters=[m.as_dict() for m in meters])
@blueprint.route('/users/<user>/meters')
@ -170,7 +170,7 @@ def list_meters_by_user(user):
user=user,
project=acl.get_limited_to_project(rq.headers),
metaquery=_get_metaquery(rq.args))
return flask.jsonify(meters=list(meters))
return flask.jsonify(meters=[m.as_dict() for m in meters])
@blueprint.route('/projects/<project>/meters')
@ -187,7 +187,7 @@ def list_meters_by_project(project):
meters = rq.storage_conn.get_meters(
project=project,
metaquery=_get_metaquery(rq.args))
return flask.jsonify(meters=list(meters))
return flask.jsonify(meters=[m.as_dict() for m in meters])
@blueprint.route('/sources/<source>/meters')
@ -203,7 +203,7 @@ def list_meters_by_source(source):
source=source,
project=acl.get_limited_to_project(rq.headers),
metaquery=_get_metaquery(rq.args))
return flask.jsonify(meters=list(meters))
return flask.jsonify(meters=[m.as_dict() for m in meters])
## APIs for working with resources.
@ -221,7 +221,7 @@ def _list_resources(source=None, user=None, project=None):
end_timestamp=q_ts['end_timestamp'],
metaquery=_get_metaquery(rq.args),
)
return flask.jsonify(resources=list(resources))
return flask.jsonify(resources=[r.as_dict() for r in resources])
@blueprint.route('/projects/<project>/resources')
@ -405,8 +405,8 @@ def _list_samples(meter,
end=q_ts['end_timestamp'],
metaquery=_get_metaquery(flask.request.args),
)
events = list(flask.request.storage_conn.get_samples(f))
jsonified = flask.jsonify(events=events)
events = flask.request.storage_conn.get_samples(f)
jsonified = flask.jsonify(events=[e.as_dict() for e in events])
if request_wants_html():
return flask.templating.render_template('list_event.html',
user=user,
@ -608,7 +608,7 @@ def _get_statistics(stats_type, meter=None, resource=None, project=None):
results = list(flask.request.storage_conn.get_meter_statistics(f))
value = None
if results:
value = results[0][stats_type] # there should only be one!
value = getattr(results[0], stats_type) # there should only be one!
return flask.jsonify(volume=value)

View File

@ -100,15 +100,8 @@ class Connection(object):
def get_resources(self, user=None, project=None, source=None,
start_timestamp=None, end_timestamp=None,
metaquery={}, resource=None):
"""Return an iterable of dictionaries containing resource information.
{ 'resource_id': UUID of the resource,
'project_id': UUID of project owning the resource,
'user_id': UUID of user owning the resource,
'timestamp': UTC datetime of last update to the resource,
'metadata': most current metadata for the resource,
'meter': list of the meters reporting data for the resource,
}
"""Return an iterable of models.Resource instances containing
resource information.
:param user: Optional ID for user that owns the resource.
:param project: Optional ID for project that owns the resource.
@ -122,15 +115,8 @@ class Connection(object):
@abc.abstractmethod
def get_meters(self, user=None, project=None, resource=None, source=None,
metaquery={}):
"""Return an iterable of dictionaries containing meter information.
{ 'name': name of the meter,
'type': type of the meter (guage, counter),
'unit': unit of the meter,
'resource_id': UUID of the resource,
'project_id': UUID of project owning the resource,
'user_id': UUID of user owning the resource,
}
"""Return an iterable of model.Meter instances containing meter
information.
:param user: Optional ID for user that owns the resource.
:param project: Optional ID for project that owns the resource.
@ -141,8 +127,7 @@ class Connection(object):
@abc.abstractmethod
def get_samples(self, event_filter):
"""Return an iterable of samples as created by
:func:`ceilometer.meter.meter_message_from_counter`.
"""Return an iterable of model.Sample instances
"""
@abc.abstractmethod
@ -155,24 +140,9 @@ class Connection(object):
@abc.abstractmethod
def get_meter_statistics(self, event_filter, period=None):
"""Return a dictionary containing meter statistics.
described by the query parameters.
"""Return an iterable of model.Statistics instances
The filter must have a meter value set.
{ 'min':
'max':
'avg':
'sum':
'count':
'period':
'period_start':
'period_end':
'duration':
'duration_start':
'duration_end':
}
"""
@abc.abstractmethod

View File

@ -60,6 +60,7 @@ from oslo.config import cfg
from ceilometer.openstack.common import log, timeutils
from ceilometer.storage import base
from ceilometer.storage import models
LOG = log.getLogger(__name__)
@ -304,16 +305,7 @@ class Connection(base.Connection):
def get_resources(self, user=None, project=None, source=None,
start_timestamp=None, end_timestamp=None,
metaquery={}):
"""Return an iterable of dictionaries containing resource information.
:type end_timestamp: object
{ 'resource_id': UUID of the resource,
'project_id': UUID of project owning the resource,
'user_id': UUID of user owning the resource,
'timestamp': UTC datetime of last update to the resource,
'metadata': most current metadata for the resource,
'meter': list of the meters reporting data for the resource,
}
"""Return an iterable of models.Resource instances
:param user: Optional ID for user that owns the resource.
:param project: Optional ID for project that owns the resource.
@ -342,33 +334,21 @@ class Connection(base.Connection):
query_only=True, require_meter=False)
LOG.debug("q: %s" % q)
for resource_id, data in self.resource.rows(resource_ids):
r = {'resource_id': resource_id,
'metadata': json.loads(data['f:metadata']),
'project_id': data['f:project_id'],
'source': data['f:source'],
'user_id': data['f:user_id'],
'meter': []}
for m in data:
if m.startswith('f:m_'):
name, type, unit = m[4:].split("!")
r['meter'].append({"counter_name": name,
"counter_type": type,
"counter_unit": unit})
yield r
yield models.Resource(
resource_id=resource_id,
project_id=data['f:project_id'],
user_id=data['f:user_id'],
metadata=json.loads(data['f:metadata']),
meter=[
models.ResourceMeter(*(m[4:].split("!")))
for m in data
if m.startswith('f:m_')
],
)
def get_meters(self, user=None, project=None, resource=None, source=None,
metaquery={}):
"""Return an iterable of dictionaries containing meter information.
{ 'name': name of the meter,
'type': type of the meter (guage, counter),
'unit': unit of the meter,
'resource_id': UUID of the resource,
'project_id': UUID of project owning the resource,
'user_id': UUID of user owning the resource,
}
"""Return an iterable of models.Meter instances
:param user: Optional ID for user that owns the resource.
:param project: Optional ID for project that owns the resource.
@ -398,30 +378,27 @@ class Connection(base.Connection):
if meter is None:
continue
name, type, unit = meter[4:].split("!")
m = {'name': name,
'type': type,
'unit': unit,
'resource_id': data['f:resource_id'],
'project_id': data['f:project_id'],
'user_id': data['f:user_id'],
}
yield m
yield models.Meter(
name=name,
type=type,
unit=unit,
resource_id=data['f:resource_id'],
project_id=data['f:project_id'],
user_id=data['f:user_id'],
)
def get_samples(self, event_filter):
"""Return an iterable of samples as created by
:func:`ceilometer.meter.meter_message_from_counter`.
"""Return an iterable of models.Sample instances
"""
q, start, stop = make_query_from_filter(event_filter,
require_meter=False)
LOG.debug("q: %s" % q)
gen = self.meter.scan(filter=q, row_start=start, row_stop=stop)
meters = []
for ignored, meter in gen:
meter = json.loads(meter['f:message'])
meter['timestamp'] = timeutils.parse_strtime(meter['timestamp'])
meters.append(meter)
return meters
yield models.Sample(**meter)
def _update_meter_stats(self, stat, meter):
"""Do the stats calculation on a requested time bucket in stats dict
@ -434,41 +411,29 @@ class Connection(base.Connection):
"""
vol = int(meter['f:counter_volume'])
ts = timeutils.parse_strtime(meter['f:timestamp'])
stat['min'] = min(vol, stat['min'] or vol)
stat['max'] = max(vol, stat['max'])
stat['sum'] = vol + (stat['sum'] or 0)
stat['count'] += 1
stat['avg'] = (stat['sum'] / float(stat['count']))
stat['duration_start'] = min(ts, stat['duration_start'] or ts)
stat['duration_end'] = max(ts, stat['duration_end'] or ts)
stat['duration'] = \
timeutils.delta_seconds(stat['duration_start'],
stat['duration_end'])
stat.min = min(vol, stat.min or vol)
stat.max = max(vol, stat.max)
stat.sum = vol + (stat.sum or 0)
stat.count += 1
stat.avg = (stat.sum / float(stat.count))
stat.duration_start = min(ts, stat.duration_start or ts)
stat.duration_end = max(ts, stat.duration_end or ts)
stat.duration = \
timeutils.delta_seconds(stat.duration_start,
stat.duration_end)
def get_meter_statistics(self, event_filter, period=None):
"""Return a dictionary containing meter statistics.
described by the query parameters.
"""Return an iterable of models.Statistics instances containing meter
statistics described by the query parameters.
The filter must have a meter value set.
{ 'min':
'max':
'avg':
'sum':
'count':
'period':
'period_start':
'period_end':
'duration':
'duration_start':
'duration_end':
}
.. note::
Due to HBase limitations the aggregations are implemented
in the driver itself, therefore this method will be quite slow
because of all the Thrift traffic it is going to create.
"""
q, start, stop = make_query_from_filter(event_filter)
@ -508,25 +473,26 @@ class Connection(base.Connection):
start_time, ts) / period) * period
period_start = start_time + datetime.timedelta(0, offset)
if not len(results) or not results[-1]['period_start'] == \
if not len(results) or not results[-1].period_start == \
period_start:
if period:
period_end = period_start + datetime.timedelta(
0, period)
results.append({'count': 0,
'min': 0,
'max': 0,
'avg': 0,
'sum': 0,
'period': period,
'period_start': period_start,
'period_end': period_end,
'duration': None,
'duration_start': None,
'duration_end': None,
})
results.append(
models.Statistics(count=0,
min=0,
max=0,
avg=0,
sum=0,
period=period,
period_start=period_start,
period_end=period_end,
duration=None,
duration_start=None,
duration_end=None)
)
self._update_meter_stats(results[-1], meter)
return list(results)
return results
def get_event_interval(self, event_filter):
"""Return the min and max timestamps from samples,

View File

@ -33,6 +33,7 @@ import pymongo
from ceilometer.openstack.common import log
from ceilometer.storage import base
from ceilometer.storage import models
LOG = log.getLogger(__name__)
@ -366,15 +367,7 @@ class Connection(base.Connection):
def get_resources(self, user=None, project=None, source=None,
start_timestamp=None, end_timestamp=None,
metaquery={}, resource=None):
"""Return an iterable of dictionaries containing resource information.
{ 'resource_id': UUID of the resource,
'project_id': UUID of project owning the resource,
'user_id': UUID of user owning the resource,
'timestamp': UTC datetime of last update to the resource,
'metadata': most current metadata for the resource,
'meter': list of the meters reporting data for the resource,
}
"""Return an iterable of models.Resource instances
:param user: Optional ID for user that owns the resource.
:param project: Optional ID for project that owns the resource.
@ -416,25 +409,24 @@ class Connection(base.Connection):
resource_ids = self.db.meter.find(q).distinct('resource_id')
q = {'_id': {'$in': resource_ids}}
for resource in self.db.resource.find(q):
r = {}
r.update(resource)
# Replace the '_id' key with 'resource_id' to meet the
# caller's expectations.
r['resource_id'] = r['_id']
del r['_id']
yield r
yield models.Resource(
resource_id=resource['_id'],
project_id=resource['project_id'],
user_id=resource['user_id'],
metadata=resource['metadata'],
meter=[
models.ResourceMeter(
counter_name=meter['counter_name'],
counter_type=meter['counter_type'],
counter_unit=meter['counter_unit'],
)
for meter in resource['meter']
],
)
def get_meters(self, user=None, project=None, resource=None, source=None,
metaquery={}):
"""Return an iterable of dictionaries containing meter information.
{ 'name': name of the meter,
'type': type of the meter (guage, counter),
'unit': unit of the meter,
'resource_id': UUID of the resource,
'project_id': UUID of project owning the resource,
'user_id': UUID of user owning the resource,
}
"""Return an iterable of models.Meter instances
:param user: Optional ID for user that owns the resource.
:param project: Optional ID for project that owns the resource.
@ -455,16 +447,16 @@ class Connection(base.Connection):
for r in self.db.resource.find(q):
for r_meter in r['meter']:
m = {}
m['name'] = r_meter['counter_name']
m['type'] = r_meter['counter_type']
yield models.Meter(
name=r_meter['counter_name'],
type=r_meter['counter_type'],
# Return empty string if 'counter_unit' is not valid for
# backward compaitiblity.
m['unit'] = r_meter.get('counter_unit', '')
m['resource_id'] = r['_id']
m['project_id'] = r['project_id']
m['user_id'] = r['user_id']
yield m
unit=r_meter.get('counter_unit', ''),
resource_id=r['_id'],
project_id=r['project_id'],
user_id=r['user_id'],
)
def get_samples(self, event_filter):
"""Return an iterable of samples as created by
@ -477,27 +469,14 @@ class Connection(base.Connection):
# the event was inserted. It is an implementation
# detail that should not leak outside of the driver.
del s['_id']
yield s
yield models.Sample(**s)
def get_meter_statistics(self, event_filter, period=None):
"""Return a dictionary containing meter statistics.
described by the query parameters.
"""Return an iterable of models.Statistics instance containing meter
statistics described by the query parameters.
The filter must have a meter value set.
{ 'min':
'max':
'avg':
'sum':
'count':
'period':
'period_start':
'period_end':
'duration':
'duration_start':
'duration_end':
}
"""
q = make_query_from_filter(event_filter)
@ -517,8 +496,9 @@ class Connection(base.Connection):
query=q,
)
return sorted((r['value'] for r in results['results']),
key=operator.itemgetter('period_start'))
return sorted((models.Statistics(**(r['value']))
for r in results['results']),
key=operator.attrgetter('period_start'))
def _fix_interval_min_max(self, a_min, a_max):
if hasattr(a_min, 'valueOf') and a_min.valueOf is not None:

View File

@ -26,6 +26,7 @@ from sqlalchemy import func
from ceilometer.openstack.common import log
from ceilometer.openstack.common import timeutils
from ceilometer.storage import base
from ceilometer.storage import models as api_models
from ceilometer.storage.sqlalchemy import migration
from ceilometer.storage.sqlalchemy.models import Meter, Project, Resource
from ceilometer.storage.sqlalchemy.models import Source, User, Base
@ -223,15 +224,7 @@ class Connection(base.Connection):
def get_resources(self, user=None, project=None, source=None,
start_timestamp=None, end_timestamp=None,
metaquery={}, resource=None):
"""Return an iterable of dictionaries containing resource information.
{ 'resource_id': UUID of the resource,
'project_id': UUID of project owning the resource,
'user_id': UUID of user owning the resource,
'timestamp': UTC datetime of last update to the resource,
'metadata': most current metadata for the resource,
'meter': list of the meters reporting data for the resource,
}
"""Return an iterable of api_models.Resource instances
:param user: Optional ID for user that owns the resource.
:param project: Optional ID for project that owns the resource.
@ -258,33 +251,24 @@ class Connection(base.Connection):
raise NotImplementedError('metaquery not implemented')
for meter in query.all():
r = row2dict(meter.resource)
r['resource_id'] = r['id']
del r['id']
# Replace the 'resource_metadata' with 'metadata'
r['metadata'] = r['resource_metadata']
del r['resource_metadata']
r['meter'] = [
{
'counter_name': meter.counter_name,
'counter_type': meter.counter_type,
'counter_unit': meter.counter_unit,
}
for meter in meter.resource.meters
]
yield r
yield api_models.Resource(
resource_id=meter.resource_id,
project_id=meter.project_id,
user_id=meter.user_id,
metadata=meter.resource_metadata,
meter=[
api_models.ResourceMeter(
counter_name=m.counter_name,
counter_type=m.counter_type,
counter_unit=m.counter_unit,
)
for m in meter.resource.meters
],
)
def get_meters(self, user=None, project=None, resource=None, source=None,
metaquery={}):
"""Return an iterable of dictionaries containing meter information.
{ 'name': name of the meter,
'type': type of the meter (guage, counter),
'unit': unit of the meter,
'resource_id': UUID of the resource,
'project_id': UUID of project owning the resource,
'user_id': UUID of user owning the resource,
}
"""Return an iterable of api_models.Meter instances
:param user: Optional ID for user that owns the resource.
:param project: Optional ID for project that owns the resource.
@ -312,18 +296,17 @@ class Connection(base.Connection):
if meter.counter_name in meter_names:
continue
meter_names.add(meter.counter_name)
m = {}
m['resource_id'] = resource.id
m['project_id'] = resource.project_id
m['user_id'] = resource.user_id
m['name'] = meter.counter_name
m['type'] = meter.counter_type
m['unit'] = meter.counter_unit
yield m
yield api_models.Meter(
name=meter.counter_name,
type=meter.counter_type,
unit=meter.counter_unit,
resource_id=resource.id,
project_id=resource.project_id,
user_id=resource.user_id,
)
def get_samples(self, event_filter):
"""Return an iterable of samples as created by
:func:`ceilometer.meter.meter_message_from_counter`.
"""Return an iterable of api_models.Samples
"""
query = self.session.query(Meter)
query = make_query_from_filter(query, event_filter,
@ -334,14 +317,23 @@ class Connection(base.Connection):
# Remove the id generated by the database when
# the event was inserted. It is an implementation
# detail that should not leak outside of the driver.
s = row2dict(s)
del s['id']
yield api_models.Sample(
# Replace 'sources' with 'source' to meet the caller's
# expectation, Meter.sources contains one and only one
# source in the current implementation.
s['source'] = s['sources'][0]['id']
del s['sources']
yield s
source=s.sources[0].id,
counter_name=s.counter_name,
counter_type=s.counter_type,
counter_unit=s.counter_unit,
counter_volume=s.counter_volume,
user_id=s.user_id,
project_id=s.project_id,
resource_id=s.resource_id,
timestamp=s.timestamp,
resource_metadata=s.resource_metadata,
message_id=s.message_id,
message_signature=s.message_signature,
)
def _make_volume_query(self, event_filter, counter_volume_func):
"""Returns complex Meter counter_volume query for max and sum."""
@ -378,47 +370,37 @@ class Connection(base.Connection):
return make_query_from_filter(query, event_filter)
@staticmethod
def _stats_result_to_dict(result, period, period_start, period_end):
return {'count': int(result.count),
'min': result.min,
'max': result.max,
'avg': result.avg,
'sum': result.sum,
'duration_start': result.tsmin,
'duration_end': result.tsmax,
'duration': (timeutils.delta_seconds(result.tsmin,
result.tsmax)
if result.tsmin and result.tsmax
else None),
'period': period,
'period_start': period_start,
'period_end': period_end}
def _stats_result_to_model(result, period, period_start, period_end):
duration = (timeutils.delta_seconds(result.tsmin, result.tsmax)
if result.tsmin is not None and result.tsmax is not None
else None)
return api_models.Statistics(
count=int(result.count),
min=result.min,
max=result.max,
avg=result.avg,
sum=result.sum,
duration_start=result.tsmin,
duration_end=result.tsmax,
duration=duration,
period=period,
period_start=period_start,
period_end=period_end,
)
def get_meter_statistics(self, event_filter, period=None):
"""Return a dictionary containing meter statistics.
described by the query parameters.
"""Return an iterable of api_models.Statistics instances containing
meter statistics described by the query parameters.
The filter must have a meter value set.
{ 'min':
'max':
'avg':
'sum':
'count':
'period':
'period_start':
'period_end':
'duration':
'duration_start':
'duration_end':
}
"""
if not period or not event_filter.start or not event_filter.end:
res = self._make_stats_query(event_filter).all()[0]
if not period:
return [self._stats_result_to_dict(res, 0, res.tsmin, res.tsmax)]
yield self._stats_result_to_model(res, 0, res.tsmin, res.tsmax)
return
query = self._make_stats_query(event_filter)
# HACK(jd) This is an awful method to compute stats by period, but
@ -426,7 +408,6 @@ class Connection(base.Connection):
# code, so here it is, admire! We're going to do one request to get
# stats by period. We would like to use GROUP BY, but there's no
# portable way to manipulate timestamp in SQL, so we can't.
results = []
for period_start, period_end in base.iter_period(
event_filter.start or res.tsmin,
event_filter.end or res.tsmax,
@ -434,29 +415,12 @@ class Connection(base.Connection):
q = query.filter(Meter.timestamp >= period_start)
q = q.filter(Meter.timestamp < period_end)
r = q.all()[0]
# Don't add results that didn't have any event
# Don't return results that didn't have any event
if r.count:
results.append(self._stats_result_to_dict(
yield self._stats_result_to_model(
result=r,
period=int(timeutils.delta_seconds(period_start,
period_end)),
period_start=period_start,
period_end=period_end,
))
return results
def row2dict(row, srcflag=False):
"""Convert User, Project, Meter, Resource instance to dictionary object
with nested Source(s) and Meter(s)
"""
d = copy.copy(row.__dict__)
for col in ['_sa_instance_state', 'sources']:
if col in d:
del d[col]
if not srcflag:
d['sources'] = map(lambda x: row2dict(x, True), row.sources)
if d.get('meters') is not None:
d['meters'] = map(lambda x: row2dict(x, True), d['meters'])
return d
)

View File

@ -0,0 +1,176 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2013 New Dream Network, LLC (DreamHost)
#
# Author: Doug Hellmann <doug.hellmann@dreamhost.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Model classes for use in the storage API.
"""
class Model(object):
"""Base class for storage API models.
"""
def __init__(self, **kwds):
self.fields = list(kwds)
for k, v in kwds.iteritems():
setattr(self, k, v)
def as_dict(self):
d = {}
for f in self.fields:
v = getattr(self, f)
if isinstance(v, Model):
v = v.as_dict()
elif isinstance(v, list) and v and isinstance(v[0], Model):
v = [sub.as_dict() for sub in v]
d[f] = v
return d
def __eq__(self, other):
return self.as_dict() == other.as_dict()
class Resource(Model):
"""Something for which sample data has been collected.
"""
def __init__(self, resource_id, project_id, user_id, metadata, meter):
"""
:param resource_id: UUID of the resource
:param project_id: UUID of project owning the resource
:param user_id: UUID of user owning the resource
:param metadata: most current metadata for the resource (a dict)
:param meter: list of the meters reporting data for the resource,
"""
Model.__init__(self,
resource_id=resource_id,
project_id=project_id,
user_id=user_id,
metadata=metadata,
meter=meter,
)
class ResourceMeter(Model):
"""The definitions of the meters for which data has been collected
for a resource.
See Resource.meter field.
"""
def __init__(self, counter_name, counter_type, counter_unit):
"""
:param counter_name: the name of the counter updating the resource
:param counter_type: one of gauge, delta, cumulative
:param counter_unit: official units name for the sample data
"""
Model.__init__(self,
counter_name=counter_name,
counter_type=counter_type,
counter_unit=counter_unit,
)
class Meter(Model):
"""Definition of a meter for which sample data has been collected.
"""
def __init__(self, name, type, unit, resource_id, project_id, user_id):
"""
:param name: name of the meter
:param type: type of the meter (guage, counter)
:param unit: unit of the meter
:param resource_id: UUID of the resource
:param project_id: UUID of project owning the resource
:param user_id: UUID of user owning the resource
"""
Model.__init__(self,
name=name,
type=type,
unit=unit,
resource_id=resource_id,
project_id=project_id,
user_id=user_id,
)
class Sample(Model):
"""One collected data point.
"""
def __init__(self,
source,
counter_name, counter_type, counter_unit, counter_volume,
user_id, project_id, resource_id,
timestamp, resource_metadata,
message_id,
message_signature,
):
"""
:param source: the identifier for the user/project id definition
:param counter_name: the name of the measurement being taken
:param counter_type: the type of the measurement
:param counter_unit: the units for the measurement
:param counter_volume: the measured value
:param user_id: the user that triggered the event and measurement
:param project_id: the project that owns the resource
:param resource_id: the thing on which the measurement was taken
:param timestamp: the time of the measurement
:param resource_metadata: extra details about the resource
:param message_id: a message identifier
:param message_signature: a hash created from the rest of the
message data
"""
Model.__init__(self,
source=source,
counter_name=counter_name,
counter_type=counter_type,
counter_unit=counter_unit,
counter_volume=counter_volume,
user_id=user_id,
project_id=project_id,
resource_id=resource_id,
timestamp=timestamp,
resource_metadata=resource_metadata,
message_id=message_id,
message_signature=message_signature)
class Statistics(Model):
"""Computed statistics based on a set of sample data.
"""
def __init__(self,
min, max, avg, sum, count,
period, period_start, period_end,
duration, duration_start, duration_end):
"""
:param min: The smallest volume found
:param max: The largest volume found
:param avg: The average of all volumes found
:param sum: The total of all volumes found
:param count: The number of samples found
:param period: The length of the time range covered by these stats
:param period_start: The timestamp for the start of the period
:param period_end: The timestamp for the end of the period
:param duration: The total time for the matching samples
:param duration_start: The earliest time for the matching samples
:param duration_end: The latest time for the matching samples
"""
Model.__init__(self,
min=min, max=max, avg=avg, sum=sum, count=count,
period=period, period_start=period_start,
period_end=period_end, duration=duration,
duration_start=duration_start,
duration_end=duration_end)

View File

@ -23,6 +23,7 @@ import logging
from ceilometer.openstack.common import timeutils
from ceilometer.storage import impl_mongodb
from ceilometer.storage import models
from .base import FunctionalTest
LOG = logging.getLogger(__name__)
@ -58,13 +59,26 @@ class TestComputeDurationByResource(FunctionalTest):
def get_interval(ignore_self, event_filter, period):
assert event_filter.start
assert event_filter.end
if (event_filter.start > end
or event_filter.end < start):
if (event_filter.start > end or event_filter.end < start):
return []
return [{'count': 0,
# ...
'duration_start': max(event_filter.start, start),
'duration_end': min(event_filter.end, end)}]
duration_start = max(event_filter.start, start)
duration_end = min(event_filter.end, end)
duration = timeutils.delta_seconds(duration_start, duration_end)
return [
models.Statistics(
min=0,
max=0,
avg=0,
sum=0,
count=0,
period=None,
period_start=None,
period_end=None,
duration=duration,
duration_start=duration_start,
duration_end=duration_end,
)
]
self._stub_interval_func(get_interval)
def _invoke_api(self):
@ -124,14 +138,21 @@ class TestComputeDurationByResource(FunctionalTest):
def test_without_end_timestamp(self):
def get_interval(ignore_self, event_filter, period):
return [{'count': 0,
'min': None,
'max': None,
'avg': None,
'qty': None,
'duration': None,
'duration_start': self.late1,
'duration_end': self.late2}]
return [
models.Statistics(
count=0,
min=None,
max=None,
avg=None,
duration=None,
duration_start=self.late1,
duration_end=self.late2,
sum=0,
period=None,
period_start=None,
period_end=None,
)
]
self._stub_interval_func(get_interval)
data = self.get_json('/meters/instance:m1.tiny/statistics',
q=[{'field': 'timestamp',
@ -146,14 +167,22 @@ class TestComputeDurationByResource(FunctionalTest):
def test_without_start_timestamp(self):
def get_interval(ignore_self, event_filter, period):
return [{'count': 0,
'min': None,
'max': None,
'avg': None,
'qty': None,
'duration': None,
'duration_start': self.early1,
'duration_end': self.early2}]
return [
models.Statistics(
count=0,
min=None,
max=None,
avg=None,
duration=None,
duration_start=self.early1,
duration_end=self.early2,
#
sum=0,
period=None,
period_start=None,
period_end=None,
)
]
return (self.early1, self.early2)
self._stub_interval_func(get_interval)
data = self.get_json('/meters/instance:m1.tiny/statistics',

View File

@ -29,6 +29,7 @@ from ceilometer.collector import meter
from ceilometer import counter
from ceilometer import storage
from ceilometer.tests import db as test_db
from ceilometer.storage import models
class DBTestBase(test_db.TestBase):
@ -153,17 +154,14 @@ class ResourceTest(DBTestBase):
resources = list(self.conn.get_resources())
assert len(resources) == 4
for resource in resources:
if resource['resource_id'] != 'resource-id':
if resource.resource_id != 'resource-id':
continue
assert resource['resource_id'] == 'resource-id'
assert resource['project_id'] == 'project-id'
assert resource['user_id'] == 'user-id'
assert resource['metadata']['display_name'] == 'test-server'
foo = map(lambda x: [x['counter_name'],
x['counter_type'],
x['counter_unit']],
resource['meter'])
assert ['instance', 'cumulative', ''] in foo
assert resource.resource_id == 'resource-id'
assert resource.project_id == 'project-id'
assert resource.user_id == 'user-id'
assert resource.metadata['display_name'] == 'test-server'
self.assertIn(models.ResourceMeter('instance', 'cumulative', ''),
resource.meter)
break
else:
assert False, 'Never found resource-id'
@ -171,14 +169,14 @@ class ResourceTest(DBTestBase):
def test_get_resources_start_timestamp(self):
timestamp = datetime.datetime(2012, 7, 2, 10, 42)
resources = list(self.conn.get_resources(start_timestamp=timestamp))
resource_ids = [r['resource_id'] for r in resources]
resource_ids = [r.resource_id for r in resources]
expected = set(['resource-id-2', 'resource-id-3'])
assert set(resource_ids) == expected
def test_get_resources_end_timestamp(self):
timestamp = datetime.datetime(2012, 7, 2, 10, 42)
resources = list(self.conn.get_resources(end_timestamp=timestamp))
resource_ids = [r['resource_id'] for r in resources]
resource_ids = [r.resource_id for r in resources]
expected = set(['resource-id', 'resource-id-alternate'])
assert set(resource_ids) == expected
@ -187,25 +185,25 @@ class ResourceTest(DBTestBase):
end_ts = datetime.datetime(2012, 7, 2, 10, 43)
resources = list(self.conn.get_resources(start_timestamp=start_ts,
end_timestamp=end_ts))
resource_ids = [r['resource_id'] for r in resources]
resource_ids = [r.resource_id for r in resources]
assert set(resource_ids) == set(['resource-id-2'])
def test_get_resources_by_source(self):
resources = list(self.conn.get_resources(source='test-1'))
assert len(resources) == 1
ids = set(r['resource_id'] for r in resources)
ids = set(r.resource_id for r in resources)
assert ids == set(['resource-id'])
def test_get_resources_by_user(self):
resources = list(self.conn.get_resources(user='user-id'))
assert len(resources) == 2
ids = set(r['resource_id'] for r in resources)
ids = set(r.resource_id for r in resources)
assert ids == set(['resource-id', 'resource-id-alternate'])
def test_get_resources_by_project(self):
resources = list(self.conn.get_resources(project='project-id'))
assert len(resources) == 2
ids = set(r['resource_id'] for r in resources)
ids = set(r.resource_id for r in resources)
assert ids == set(['resource-id', 'resource-id-alternate'])
def test_get_resources_by_metaquery(self):
@ -266,14 +264,14 @@ class RawEventTest(DBTestBase):
results = list(self.conn.get_samples(f))
assert len(results) == 2
for meter in results:
assert meter in [self.msg1, self.msg2]
assert meter.as_dict() in [self.msg1, self.msg2]
def test_get_samples_by_project(self):
f = storage.EventFilter(project='project-id')
results = list(self.conn.get_samples(f))
assert results
for meter in results:
assert meter in [self.msg1, self.msg2, self.msg3]
assert meter.as_dict() in [self.msg1, self.msg2, self.msg3]
def test_get_samples_by_resource(self):
f = storage.EventFilter(user='user-id', resource='resource-id')
@ -281,7 +279,7 @@ class RawEventTest(DBTestBase):
assert results
meter = results[0]
assert meter is not None
assert meter == self.msg1
assert meter.as_dict() == self.msg1
def test_get_samples_by_metaquery(self):
q = {'metadata.display_name': 'test-server'}
@ -291,7 +289,7 @@ class RawEventTest(DBTestBase):
results = list(self.conn.get_samples(f))
assert results
for meter in results:
assert meter in self.msgs
assert meter.as_dict() in self.msgs
except NotImplementedError:
got_not_imp = True
self.assertTrue(got_not_imp)
@ -303,7 +301,7 @@ class RawEventTest(DBTestBase):
)
results = list(self.conn.get_samples(f))
assert len(results) == 1
assert results[0]['timestamp'] == datetime.datetime(2012, 7, 2, 10, 41)
assert results[0].timestamp == datetime.datetime(2012, 7, 2, 10, 41)
def test_get_samples_by_end_time(self):
f = storage.EventFilter(
@ -313,7 +311,7 @@ class RawEventTest(DBTestBase):
results = list(self.conn.get_samples(f))
length = len(results)
assert length == 1
assert results[0]['timestamp'] == datetime.datetime(2012, 7, 2, 10, 40)
assert results[0].timestamp == datetime.datetime(2012, 7, 2, 10, 40)
def test_get_samples_by_both_times(self):
f = storage.EventFilter(
@ -323,7 +321,7 @@ class RawEventTest(DBTestBase):
results = list(self.conn.get_samples(f))
length = len(results)
assert length == 1
assert results[0]['timestamp'] == datetime.datetime(2012, 7, 2, 10, 42)
assert results[0].timestamp == datetime.datetime(2012, 7, 2, 10, 42)
def test_get_samples_by_name(self):
f = storage.EventFilter(user='user-id', meter='no-such-meter')
@ -487,31 +485,31 @@ class StatisticsTest(DBTestBase):
user='user-5',
meter='volume.size',
)
results = self.conn.get_meter_statistics(f)[0]
self.assertEqual(results['duration'],
results = list(self.conn.get_meter_statistics(f))[0]
self.assertEqual(results.duration,
(datetime.datetime(2012, 9, 25, 12, 32)
- datetime.datetime(2012, 9, 25, 10, 30)).seconds)
assert results['count'] == 3
assert results['min'] == 8
assert results['max'] == 10
assert results['sum'] == 27
assert results['avg'] == 9
assert results.count == 3
assert results.min == 8
assert results.max == 10
assert results.sum == 27
assert results.avg == 9
def test_no_period_in_query(self):
f = storage.EventFilter(
user='user-5',
meter='volume.size',
)
results = self.conn.get_meter_statistics(f)[0]
assert results['period'] == 0
results = list(self.conn.get_meter_statistics(f))[0]
assert results.period == 0
def test_period_is_int(self):
f = storage.EventFilter(
meter='volume.size',
)
results = self.conn.get_meter_statistics(f)[0]
assert(isinstance(results['period'], int))
assert results['count'] == 6
results = list(self.conn.get_meter_statistics(f))[0]
assert(isinstance(results.period, int))
assert results.count == 6
def test_by_user_period(self):
f = storage.EventFilter(
@ -519,30 +517,30 @@ class StatisticsTest(DBTestBase):
meter='volume.size',
start='2012-09-25T10:28:00',
)
results = self.conn.get_meter_statistics(f, period=7200)
results = list(self.conn.get_meter_statistics(f, period=7200))
self.assertEqual(len(results), 2)
self.assertEqual(set(r['period_start'] for r in results),
self.assertEqual(set(r.period_start for r in results),
set([datetime.datetime(2012, 9, 25, 10, 28),
datetime.datetime(2012, 9, 25, 12, 28)]))
self.assertEqual(set(r['period_end'] for r in results),
self.assertEqual(set(r.period_end for r in results),
set([datetime.datetime(2012, 9, 25, 12, 28),
datetime.datetime(2012, 9, 25, 14, 28)]))
r = results[0]
self.assertEqual(r['period_start'],
self.assertEqual(r.period_start,
datetime.datetime(2012, 9, 25, 10, 28))
self.assertEqual(r['count'], 2)
self.assertEqual(r['avg'], 8.5)
self.assertEqual(r['min'], 8)
self.assertEqual(r['max'], 9)
self.assertEqual(r['sum'], 17)
self.assertEqual(r['period'], 7200)
self.assertIsInstance(r['period'], int)
expected_end = r['period_start'] + datetime.timedelta(seconds=7200)
self.assertEqual(r['period_end'], expected_end)
self.assertEqual(r['duration'], 3660)
self.assertEqual(r['duration_start'],
self.assertEqual(r.count, 2)
self.assertEqual(r.avg, 8.5)
self.assertEqual(r.min, 8)
self.assertEqual(r.max, 9)
self.assertEqual(r.sum, 17)
self.assertEqual(r.period, 7200)
self.assertIsInstance(r.period, int)
expected_end = r.period_start + datetime.timedelta(seconds=7200)
self.assertEqual(r.period_end, expected_end)
self.assertEqual(r.duration, 3660)
self.assertEqual(r.duration_start,
datetime.datetime(2012, 9, 25, 10, 30))
self.assertEqual(r['duration_end'],
self.assertEqual(r.duration_end,
datetime.datetime(2012, 9, 25, 11, 31))
def test_by_user_period_start_end(self):
@ -552,24 +550,24 @@ class StatisticsTest(DBTestBase):
start='2012-09-25T10:28:00',
end='2012-09-25T11:28:00',
)
results = self.conn.get_meter_statistics(f, period=1800)
results = list(self.conn.get_meter_statistics(f, period=1800))
self.assertEqual(len(results), 1)
r = results[0]
self.assertEqual(r['period_start'],
self.assertEqual(r.period_start,
datetime.datetime(2012, 9, 25, 10, 28))
self.assertEqual(r['count'], 1)
self.assertEqual(r['avg'], 8)
self.assertEqual(r['min'], 8)
self.assertEqual(r['max'], 8)
self.assertEqual(r['sum'], 8)
self.assertEqual(r['period'], 1800)
self.assertEqual(r['period_end'],
r['period_start']
self.assertEqual(r.count, 1)
self.assertEqual(r.avg, 8)
self.assertEqual(r.min, 8)
self.assertEqual(r.max, 8)
self.assertEqual(r.sum, 8)
self.assertEqual(r.period, 1800)
self.assertEqual(r.period_end,
r.period_start
+ datetime.timedelta(seconds=1800))
self.assertEqual(r['duration'], 0)
self.assertEqual(r['duration_start'],
self.assertEqual(r.duration, 0)
self.assertEqual(r.duration_start,
datetime.datetime(2012, 9, 25, 10, 30))
self.assertEqual(r['duration_end'],
self.assertEqual(r.duration_end,
datetime.datetime(2012, 9, 25, 10, 30))
def test_by_project(self):
@ -579,28 +577,28 @@ class StatisticsTest(DBTestBase):
start='2012-09-25T11:30:00',
end='2012-09-25T11:32:00',
)
results = self.conn.get_meter_statistics(f)[0]
self.assertEqual(results['duration'], 0)
assert results['count'] == 1
assert results['min'] == 6
assert results['max'] == 6
assert results['sum'] == 6
assert results['avg'] == 6
results = list(self.conn.get_meter_statistics(f))[0]
self.assertEqual(results.duration, 0)
assert results.count == 1
assert results.min == 6
assert results.max == 6
assert results.sum == 6
assert results.avg == 6
def test_one_resource(self):
f = storage.EventFilter(
user='user-id',
meter='volume.size',
)
results = self.conn.get_meter_statistics(f)[0]
self.assertEqual(results['duration'],
results = list(self.conn.get_meter_statistics(f))[0]
self.assertEqual(results.duration,
(datetime.datetime(2012, 9, 25, 12, 32)
- datetime.datetime(2012, 9, 25, 10, 30)).seconds)
assert results['count'] == 3
assert results['min'] == 5
assert results['max'] == 7
assert results['sum'] == 18
assert results['avg'] == 6
assert results.count == 3
assert results.min == 5
assert results.max == 7
assert results.sum == 18
assert results.avg == 6
class CounterDataTypeTest(DBTestBase):
@ -666,17 +664,17 @@ class CounterDataTypeTest(DBTestBase):
meter='dummyBigCounter',
)
results = list(self.conn.get_samples(f))
self.assertEqual(results[0]['counter_volume'], 3372036854775807)
self.assertEqual(results[0].counter_volume, 3372036854775807)
f = storage.EventFilter(
meter='dummySmallCounter',
)
results = list(self.conn.get_samples(f))
self.assertEqual(results[0]['counter_volume'], -3372036854775807)
self.assertEqual(results[0].counter_volume, -3372036854775807)
def test_storage_can_handle_float_values(self):
f = storage.EventFilter(
meter='floatCounter',
)
results = list(self.conn.get_samples(f))
self.assertEqual(results[0]['counter_volume'], 1938495037.53697)
self.assertEqual(results[0].counter_volume, 1938495037.53697)

View File

@ -0,0 +1,53 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2013 New Dream Network, LLC (DreamHost)
#
# Author: Doug Hellmann <doug.hellmann@dreamhost.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import unittest
from ceilometer.storage import models
class FakeModel(models.Model):
def __init__(self, arg1, arg2):
models.Model.__init__(self, arg1=arg1, arg2=arg2)
class ModelTest(unittest.TestCase):
def test_create_attributes(self):
m = FakeModel(1, 2)
self.assertEqual(m.arg1, 1)
self.assertEqual(m.arg2, 2)
def test_as_dict(self):
m = FakeModel(1, 2)
d = m.as_dict()
self.assertEqual(d, {'arg1': 1, 'arg2': 2})
def test_as_dict_recursive(self):
m = FakeModel(1, FakeModel('a', 'b'))
d = m.as_dict()
self.assertEqual(d, {'arg1': 1,
'arg2': {'arg1': 'a',
'arg2': 'b'}})
def test_as_dict_recursive_list(self):
m = FakeModel(1, [FakeModel('a', 'b')])
d = m.as_dict()
self.assertEqual(d, {'arg1': 1,
'arg2': [{'arg1': 'a',
'arg2': 'b'}]})