storage: remove get_volume_sum and get_volume_max

This is part of blueprint remove-obsolete-storage-driver-methods

Change-Id: I9730f5163cefe17789d6be0de67bfc54ae49f40d
Signed-off-by: Julien Danjou <julien@danjou.info>
This commit is contained in:
Julien Danjou 2013-03-29 15:26:57 +01:00
parent 376e55e835
commit 1816a982fe
11 changed files with 64 additions and 479 deletions

View File

@ -593,6 +593,26 @@ def compute_duration_by_resource(resource, meter):
) )
def _get_statistics(stats_type, meter=None, resource=None, project=None):
q_ts = _get_query_timestamps(flask.request.args)
f = storage.EventFilter(
meter=meter,
project=project,
resource=resource,
start=q_ts['query_start'],
end=q_ts['query_end'],
)
# TODO(sberler): do we want to return an error if the resource
# does not exist?
results = list(flask.request.storage_conn.get_meter_statistics(f))
value = None
if results:
value = results[0][stats_type] # there should only be one!
return flask.jsonify(volume=value)
@blueprint.route('/resources/<resource>/meters/<meter>/volume/max') @blueprint.route('/resources/<resource>/meters/<meter>/volume/max')
def compute_max_resource_volume(resource, meter): def compute_max_resource_volume(resource, meter):
"""Return the max volume for a meter. """Return the max volume for a meter.
@ -606,24 +626,12 @@ def compute_max_resource_volume(resource, meter):
:param search_offset: Number of minutes before and :param search_offset: Number of minutes before and
after start and end timestamps to query. after start and end timestamps to query.
""" """
q_ts = _get_query_timestamps(flask.request.args) return _get_statistics(
'max',
# Query the database for the max volume
f = storage.EventFilter(
meter=meter, meter=meter,
project=acl.get_limited_to_project(flask.request.headers),
resource=resource, resource=resource,
start=q_ts['query_start'], project=acl.get_limited_to_project(flask.request.headers),
end=q_ts['query_end'],
) )
# TODO(sberler): do we want to return an error if the resource
# does not exist?
results = list(flask.request.storage_conn.get_volume_max(f))
value = None
if results:
value = results[0].get('value') # there should only be one!
return flask.jsonify(volume=value)
@blueprint.route('/resources/<resource>/meters/<meter>/volume/sum') @blueprint.route('/resources/<resource>/meters/<meter>/volume/sum')
@ -639,24 +647,12 @@ def compute_resource_volume_sum(resource, meter):
:param search_offset: Number of minutes before and :param search_offset: Number of minutes before and
after start and end timestamps to query. after start and end timestamps to query.
""" """
q_ts = _get_query_timestamps(flask.request.args) return _get_statistics(
'sum',
# Query the database for the max volume
f = storage.EventFilter(
meter=meter, meter=meter,
project=acl.get_limited_to_project(flask.request.headers),
resource=resource, resource=resource,
start=q_ts['query_start'], project=acl.get_limited_to_project(flask.request.headers),
end=q_ts['query_end'],
) )
# TODO(sberler): do we want to return an error if the resource
# does not exist?
results = list(flask.request.storage_conn.get_volume_sum(f))
value = None
if results:
value = results[0].get('value') # there should only be one!
return flask.jsonify(volume=value)
@blueprint.route('/projects/<project>/meters/<meter>/volume/max') @blueprint.route('/projects/<project>/meters/<meter>/volume/max')
@ -673,24 +669,7 @@ def compute_project_volume_max(project, meter):
after start and end timestamps to query. after start and end timestamps to query.
""" """
check_authorized_project(project) check_authorized_project(project)
return _get_statistics('max', project=project, meter=meter)
q_ts = _get_query_timestamps(flask.request.args)
f = storage.EventFilter(meter=meter,
project=project,
start=q_ts['query_start'],
end=q_ts['query_end'],
)
# FIXME(sberler): Currently get_volume_max is really always grouping
# by resource_id. We should add a new function in the storage driver
# that does not do this grouping (and potentially rename the existing
# one to get_volume_max_by_resource())
results = list(flask.request.storage_conn.get_volume_max(f))
value = None
if results:
value = max(result.get('value') for result in results)
return flask.jsonify(volume=value)
@blueprint.route('/projects/<project>/meters/<meter>/volume/sum') @blueprint.route('/projects/<project>/meters/<meter>/volume/sum')
@ -708,20 +687,8 @@ def compute_project_volume_sum(project, meter):
""" """
check_authorized_project(project) check_authorized_project(project)
q_ts = _get_query_timestamps(flask.request.args) return _get_statistics(
'sum',
f = storage.EventFilter(meter=meter, meter=meter,
project=project, project=project,
start=q_ts['query_start'],
end=q_ts['query_end'],
) )
# FIXME(sberler): Currently get_volume_max is really always grouping
# by resource_id. We should add a new function in the storage driver
# that does not do this grouping (and potentially rename the existing
# one to get_volume_max_by_resource())
results = list(flask.request.storage_conn.get_volume_sum(f))
value = None
if results:
value = sum(result.get('value') for result in results)
return flask.jsonify(volume=value)

View File

@ -145,30 +145,6 @@ class Connection(object):
:func:`ceilometer.meter.meter_message_from_counter`. :func:`ceilometer.meter.meter_message_from_counter`.
""" """
@abc.abstractmethod
def get_volume_sum(self, event_filter):
"""Return the sum of the volume field for the samples
described by the query parameters.
The filter must have a meter value set.
{ 'resource_id': UUID string for the resource,
'value': The sum for the volume.
}
"""
@abc.abstractmethod
def get_volume_max(self, event_filter):
"""Return the maximum of the volume field for the samples
described by the query parameters.
The filter must have a meter value set.
{ 'resource_id': UUID string for the resource,
'value': The max for the volume.
}
"""
@abc.abstractmethod @abc.abstractmethod
def get_event_interval(self, event_filter): def get_event_interval(self, event_filter):
"""Return the min and max timestamps from samples, """Return the min and max timestamps from samples,

View File

@ -478,10 +478,19 @@ class Connection(base.Connection):
row_stop=stop) row_stop=stop)
) )
start_time = event_filter.start \ if event_filter.start:
or timeutils.parse_strtime(meters[-1]['f:timestamp']) start_time = event_filter.start
end_time = event_filter.end \ elif meters:
or timeutils.parse_strtime(meters[0]['f:timestamp']) start_time = timeutils.parse_strtime(meters[-1]['f:timestamp'])
else:
start_time = None
if event_filter.end:
end_time = event_filter.end
elif meters:
end_time = timeutils.parse_strtime(meters[0]['f:timestamp'])
else:
end_time = None
results = [] results = []
@ -519,37 +528,6 @@ class Connection(base.Connection):
self._update_meter_stats(results[-1], meter) self._update_meter_stats(results[-1], meter)
return list(results) return list(results)
def get_volume_sum(self, event_filter):
"""Return the sum of the volume field for the samples
described by the query parameters.
"""
q, start, stop = make_query_from_filter(event_filter)
LOG.debug("q: %s" % q)
gen = self.meter.scan(filter=q, row_start=start, row_stop=stop)
results = defaultdict(int)
for ignored, meter in gen:
results[meter['f:resource_id']] \
+= int(meter['f:counter_volume'])
return ({'resource_id': k, 'value': v}
for (k, v) in results.iteritems())
def get_volume_max(self, event_filter):
"""Return the maximum of the volume field for the samples
described by the query parameters.
"""
q, start, stop = make_query_from_filter(event_filter)
LOG.debug("q: %s" % q)
gen = self.meter.scan(filter=q, row_start=start, row_stop=stop)
results = defaultdict(int)
for ignored, meter in gen:
results[meter['f:resource_id']] = \
max(results[meter['f:resource_id']],
int(meter['f:counter_volume']))
return ({'resource_id': k, 'value': v}
for (k, v) in results.iteritems())
def get_event_interval(self, event_filter): def get_event_interval(self, event_filter):
"""Return the min and max timestamps from samples, """Return the min and max timestamps from samples,
using the event_filter to limit the samples seen. using the event_filter to limit the samples seen.

View File

@ -124,16 +124,6 @@ class Connection(base.Connection):
""" """
return [] return []
def get_volume_sum(self, event_filter):
"""Return the sum of the volume field for the samples
described by the query parameters.
"""
def get_volume_max(self, event_filter):
"""Return the maximum of the volume field for the samples
described by the query parameters.
"""
def get_event_interval(self, event_filter): def get_event_interval(self, event_filter):
"""Return the min and max timestamp for samples """Return the min and max timestamp for samples
matching the event_filter. matching the event_filter.

View File

@ -132,34 +132,6 @@ class Connection(base.Connection):
_mim_instance = None _mim_instance = None
# JavaScript function for doing map-reduce to get a counter volume
# total.
MAP_COUNTER_VOLUME = bson.code.Code("""
function() {
emit(this.resource_id, this.counter_volume);
}
""")
# JavaScript function for doing map-reduce to get a maximum value
# from a range. (from
# http://cookbook.mongodb.org/patterns/finding_max_and_min/)
REDUCE_MAX = bson.code.Code("""
function (key, values) {
return Math.max.apply(Math, values);
}
""")
# JavaScript function for doing map-reduce to get a sum.
REDUCE_SUM = bson.code.Code("""
function (key, values) {
var total = 0;
for (var i = 0; i < values.length; i++) {
total += values[i];
}
return total;
}
""")
# MAP_TIMESTAMP and REDUCE_MIN_MAX are based on the recipe # MAP_TIMESTAMP and REDUCE_MIN_MAX are based on the recipe
# http://cookbook.mongodb.org/patterns/finding_max_and_min_values_for_a_key # http://cookbook.mongodb.org/patterns/finding_max_and_min_values_for_a_key
MAP_TIMESTAMP = bson.code.Code(""" MAP_TIMESTAMP = bson.code.Code("""
@ -548,32 +520,6 @@ class Connection(base.Connection):
return sorted((r['value'] for r in results['results']), return sorted((r['value'] for r in results['results']),
key=operator.itemgetter('period_start')) key=operator.itemgetter('period_start'))
def get_volume_sum(self, event_filter):
"""Return the sum of the volume field for the samples
described by the query parameters.
"""
q = make_query_from_filter(event_filter)
results = self.db.meter.map_reduce(self.MAP_COUNTER_VOLUME,
self.REDUCE_SUM,
{'inline': 1},
query=q,
)
return ({'resource_id': r['_id'], 'value': r['value']}
for r in results['results'])
def get_volume_max(self, event_filter):
"""Return the maximum of the volume field for the samples
described by the query parameters.
"""
q = make_query_from_filter(event_filter)
results = self.db.meter.map_reduce(self.MAP_COUNTER_VOLUME,
self.REDUCE_MAX,
{'inline': 1},
query=q,
)
return ({'resource_id': r['_id'], 'value': r['value']}
for r in results['results'])
def _fix_interval_min_max(self, a_min, a_max): def _fix_interval_min_max(self, a_min, a_max):
if hasattr(a_min, 'valueOf') and a_min.valueOf is not None: if hasattr(a_min, 'valueOf') and a_min.valueOf is not None:
# NOTE (dhellmann): HACK ALERT # NOTE (dhellmann): HACK ALERT

View File

@ -352,18 +352,6 @@ class Connection(base.Connection):
mainq = mainq.join(Meter).group_by(Resource.id) mainq = mainq.join(Meter).group_by(Resource.id)
return mainq.filter(Meter.id.in_(subq)) return mainq.filter(Meter.id.in_(subq))
def get_volume_sum(self, event_filter):
counter_volume_func = func.sum(Meter.counter_volume)
query = self._make_volume_query(event_filter, counter_volume_func)
results = query.all()
return ({'resource_id': x, 'value': y} for x, y in results)
def get_volume_max(self, event_filter):
counter_volume_func = func.max(Meter.counter_volume)
query = self._make_volume_query(event_filter, counter_volume_func)
results = query.all()
return ({'resource_id': x, 'value': y} for x, y in results)
def get_event_interval(self, event_filter): def get_event_interval(self, event_filter):
"""Return the min and max timestamps from samples, """Return the min and max timestamps from samples,
using the event_filter to limit the samples seen. using the event_filter to limit the samples seen.
@ -398,8 +386,10 @@ class Connection(base.Connection):
'sum': result.sum, 'sum': result.sum,
'duration_start': result.tsmin, 'duration_start': result.tsmin,
'duration_end': result.tsmax, 'duration_end': result.tsmax,
'duration': timeutils.delta_seconds(result.tsmin, 'duration': (timeutils.delta_seconds(result.tsmin,
result.tsmax), result.tsmax)
if result.tsmin and result.tsmax
else None),
'period': period, 'period': period,
'period_start': period_start, 'period_start': period_start,
'period_end': period_end} 'period_end': period_end}

View File

@ -342,50 +342,6 @@ class RawEventTest(DBTestBase):
assert len(results) == 1 assert len(results) == 1
class SumTest(DBTestBase):
def test_by_user(self):
f = storage.EventFilter(
user='user-id',
meter='instance',
)
results = list(self.conn.get_volume_sum(f))
assert results
counts = dict((r['resource_id'], r['value'])
for r in results)
assert counts['resource-id'] == 1
assert counts['resource-id-alternate'] == 1
assert set(counts.keys()) == set(['resource-id',
'resource-id-alternate'])
def test_by_project(self):
f = storage.EventFilter(
project='project-id',
meter='instance',
)
results = list(self.conn.get_volume_sum(f))
assert results
counts = dict((r['resource_id'], r['value'])
for r in results)
assert counts['resource-id'] == 1
assert counts['resource-id-alternate'] == 2
assert set(counts.keys()) == set(['resource-id',
'resource-id-alternate'])
def test_one_resource(self):
f = storage.EventFilter(
user='user-id',
meter='instance',
resource='resource-id',
)
results = list(self.conn.get_volume_sum(f))
assert results
counts = dict((r['resource_id'], r['value'])
for r in results)
assert counts['resource-id'] == 1
assert set(counts.keys()) == set(['resource-id'])
class TestGetEventInterval(DBTestBase): class TestGetEventInterval(DBTestBase):
def setUp(self): def setUp(self):
@ -481,173 +437,6 @@ class TestGetEventInterval(DBTestBase):
assert e is None assert e is None
class MaxProjectTest(DBTestBase):
def prepare_data(self):
self.counters = []
for i in range(3):
c = counter.Counter(
'volume.size',
'gauge',
'GiB',
5 + i,
'user-id',
'project1',
'resource-id-%s' % i,
timestamp=datetime.datetime(2012, 9, 25, 10 + i, 30 + i),
resource_metadata={'display_name': 'test-volume',
'tag': 'self.counter',
}
)
self.counters.append(c)
msg = meter.meter_message_from_counter(c,
cfg.CONF.metering_secret,
'source1',
)
self.conn.record_metering_data(msg)
def test_no_bounds(self):
expected = [{'value': 5.0, 'resource_id': u'resource-id-0'},
{'value': 6.0, 'resource_id': u'resource-id-1'},
{'value': 7.0, 'resource_id': u'resource-id-2'}]
f = storage.EventFilter(project='project1',
meter='volume.size')
results = list(self.conn.get_volume_max(f))
assert results == expected
def test_start_timestamp(self):
expected = [{'value': 6L, 'resource_id': u'resource-id-1'},
{'value': 7L, 'resource_id': u'resource-id-2'}]
f = storage.EventFilter(project='project1',
meter='volume.size',
start='2012-09-25T11:30:00')
results = list(self.conn.get_volume_max(f))
assert results == expected
def test_start_timestamp_after(self):
f = storage.EventFilter(project='project1',
meter='volume.size',
start='2012-09-25T12:34:00')
results = list(self.conn.get_volume_max(f))
assert results == []
def test_end_timestamp(self):
expected = [{'value': 5L, 'resource_id': u'resource-id-0'}]
f = storage.EventFilter(project='project1',
meter='volume.size',
end='2012-09-25T11:30:00')
results = list(self.conn.get_volume_max(f))
assert results == expected
def test_end_timestamp_before(self):
f = storage.EventFilter(project='project1',
meter='volume.size',
end='2012-09-25T09:54:00')
results = list(self.conn.get_volume_max(f))
assert results == []
def test_start_end_timestamp(self):
expected = [{'value': 6L, 'resource_id': u'resource-id-1'}]
f = storage.EventFilter(project='project1',
meter='volume.size',
start='2012-09-25T11:30:00',
end='2012-09-25T11:32:00')
results = list(self.conn.get_volume_max(f))
assert results == expected
class MaxResourceTest(DBTestBase):
def prepare_data(self):
self.counters = []
for i in range(3):
c = counter.Counter(
'volume.size',
'gauge',
'GiB',
5 + i,
'user-id',
'project1',
'resource-id',
timestamp=datetime.datetime(2012, 9, 25, 10 + i, 30 + i),
resource_metadata={'display_name': 'test-volume',
'tag': 'self.counter',
}
)
self.counters.append(c)
msg = meter.meter_message_from_counter(c,
cfg.CONF.metering_secret,
'source1',
)
self.conn.record_metering_data(msg)
def test_no_bounds(self):
expected = [{'value': 7L, 'resource_id': u'resource-id'}]
f = storage.EventFilter(resource='resource-id',
meter='volume.size')
results = list(self.conn.get_volume_max(f))
assert results == expected
def test_start_timestamp(self):
expected = [{'value': 7L, 'resource_id': u'resource-id'}]
f = storage.EventFilter(resource='resource-id',
meter='volume.size',
start='2012-09-25T11:30:00')
results = list(self.conn.get_volume_max(f))
assert results == expected
def test_start_timestamp_after(self):
f = storage.EventFilter(resource='resource-id',
meter='volume.size',
start='2012-09-25T12:34:00')
results = list(self.conn.get_volume_max(f))
assert results == []
def test_end_timestamp(self):
expected = [{'value': 5L, 'resource_id': u'resource-id'}]
f = storage.EventFilter(resource='resource-id',
meter='volume.size',
end='2012-09-25T11:30:00')
results = list(self.conn.get_volume_max(f))
assert results == expected
def test_end_timestamp_before(self):
f = storage.EventFilter(resource='resource-id',
meter='volume.size',
end='2012-09-25T09:54:00')
results = list(self.conn.get_volume_max(f))
assert results == []
def test_start_end_timestamp(self):
expected = [{'value': 6L, 'resource_id': u'resource-id'}]
f = storage.EventFilter(resource='resource-id',
meter='volume.size',
start='2012-09-25T11:30:00',
end='2012-09-25T11:32:00')
results = list(self.conn.get_volume_max(f))
assert results == expected
class StatisticsTest(DBTestBase): class StatisticsTest(DBTestBase):
def prepare_data(self): def prepare_data(self):

View File

@ -56,18 +56,6 @@ class TestGetEventInterval(base.TestGetEventInterval,
pass pass
class SumTest(base.SumTest, HBaseEngineTestBase):
pass
class MaxProjectTest(base.MaxProjectTest, HBaseEngineTestBase):
pass
class MaxResourceTest(base.MaxResourceTest, HBaseEngineTestBase):
pass
class StatisticsTest(base.StatisticsTest, HBaseEngineTestBase): class StatisticsTest(base.StatisticsTest, HBaseEngineTestBase):
pass pass

View File

@ -90,13 +90,6 @@ class RawEventTest(base.RawEventTest, MongoDBEngineTestBase):
pass pass
class SumTest(base.SumTest, MongoDBEngineTestBase):
def setUp(self):
super(SumTest, self).setUp()
require_map_reduce(self.conn)
class TestGetEventInterval(base.TestGetEventInterval, MongoDBEngineTestBase): class TestGetEventInterval(base.TestGetEventInterval, MongoDBEngineTestBase):
def setUp(self): def setUp(self):
@ -104,20 +97,6 @@ class TestGetEventInterval(base.TestGetEventInterval, MongoDBEngineTestBase):
require_map_reduce(self.conn) require_map_reduce(self.conn)
class MaxProjectTest(base.MaxProjectTest, MongoDBEngineTestBase):
def setUp(self):
super(MaxProjectTest, self).setUp()
require_map_reduce(self.conn)
class MaxResourceTest(base.MaxResourceTest, MongoDBEngineTestBase):
def setUp(self):
super(MaxResourceTest, self).setUp()
require_map_reduce(self.conn)
class StatisticsTest(base.StatisticsTest, MongoDBEngineTestBase): class StatisticsTest(base.StatisticsTest, MongoDBEngineTestBase):
def setUp(self): def setUp(self):

View File

@ -58,18 +58,6 @@ class TestGetEventInterval(base.TestGetEventInterval,
pass pass
class SumTest(base.SumTest, SQLAlchemyEngineTestBase):
pass
class MaxProjectTest(base.MaxProjectTest, SQLAlchemyEngineTestBase):
pass
class MaxResourceTest(base.MaxResourceTest, SQLAlchemyEngineTestBase):
pass
class StatisticsTest(base.StatisticsTest, SQLAlchemyEngineTestBase): class StatisticsTest(base.StatisticsTest, SQLAlchemyEngineTestBase):
pass pass

View File

@ -41,23 +41,20 @@ def show_resources(db, args):
for k, v in sorted(resource['metadata'].iteritems()): for k, v in sorted(resource['metadata'].iteritems()):
print ' %-10s : %s' % (k, v) print ' %-10s : %s' % (k, v)
for meter in resource['meter']: for meter in resource['meter']:
totals = db.get_statistics(storage.EventFilter(
user=u,
meter=meter['counter_name'],
resource=resource['resource_id'],
))
# FIXME(dhellmann): Need a way to tell whether to use # FIXME(dhellmann): Need a way to tell whether to use
# max() or sum() by meter name without hard-coding. # max() or sum() by meter name without hard-coding.
if meter['counter_name'] in ['cpu', 'disk']: if meter['counter_name'] in ['cpu', 'disk']:
totals = db.get_volume_max(storage.EventFilter( value = totals[0]['max']
user=u,
meter=meter['counter_name'],
resource=resource['resource_id'],
))
else: else:
totals = db.get_volume_sum(storage.EventFilter( value = totals[0]['sum']
user=u,
meter=meter['counter_name'],
resource=resource['resource_id'],
))
print ' %s (%s): %s' % \ print ' %s (%s): %s' % \
(meter['counter_name'], meter['counter_type'], (meter['counter_name'], meter['counter_type'],
totals.next()['value']) value)
def show_total_resources(db, args): def show_total_resources(db, args):
@ -68,18 +65,15 @@ def show_total_resources(db, args):
for u in users: for u in users:
print u print u
for meter in ['disk', 'cpu', 'instance']: for meter in ['disk', 'cpu', 'instance']:
stats = db.get_statistics(storage.EventFilter(
user=u,
meter=meter,
))
if meter in ['cpu', 'disk']: if meter in ['cpu', 'disk']:
total = db.get_volume_max(storage.EventFilter( total = stats['max']
user=u,
meter=meter,
))
else: else:
total = db.get_volume_sum(storage.EventFilter( total = stats['sum']
user=u, print ' ', meter, total
meter=meter,
))
for t in total:
print ' ', meter, t['resource_id'], t['value']
def show_raw(db, args): def show_raw(db, args):