implement get_volume_max for sqlalchemy

Fixes bug #1057679

Change-Id: I0034bec7b5da23496ac04224013e571f7ea82abc
This commit is contained in:
John Tran 2012-10-26 03:16:54 +00:00
parent b937aa509b
commit 9902dafc54
4 changed files with 259 additions and 15 deletions

View File

@ -19,9 +19,11 @@
"""
from stevedore import driver
from datetime import datetime
from ceilometer.openstack.common import log
from ceilometer.openstack.common import cfg
from ceilometer.openstack.common import timeutils
from urlparse import urlparse
@ -83,8 +85,16 @@ class EventFilter(object):
resource=None, meter=None, source=None):
self.user = user
self.project = project
self.start = start
self.end = end
self.start = self._sanitize_timestamp(start)
self.end = self._sanitize_timestamp(end)
self.resource = resource
self.meter = meter
self.source = source
def _sanitize_timestamp(self, timestamp):
"""Return a naive utc datetime object"""
if not timestamp:
return timestamp
if not isinstance(timestamp, datetime):
timestamp = timeutils.parse_isotime(timestamp)
return timeutils.normalize_time(timestamp)

View File

@ -19,11 +19,11 @@
import copy
import datetime
from ceilometer.openstack.common import log
from ceilometer.openstack.common import cfg
from ceilometer.openstack.common import cfg, log, timeutils
from ceilometer.storage import base
from ceilometer.storage.sqlalchemy.models import Meter, Project, Resource
from ceilometer.storage.sqlalchemy.models import Source, User
from ceilometer.storage.sqlalchemy.session import func
import ceilometer.storage.sqlalchemy.session as sqlalchemy_session
LOG = log.getLogger(__name__)
@ -83,9 +83,11 @@ def make_query_from_filter(query, event_filter, require_meter=True):
if event_filter.source:
query = query.filter_by(source=event_filter.source)
if event_filter.start:
query = query = query.filter(Meter.timestamp >= event_filter.start)
ts_start = event_filter.start
query = query.filter(Meter.timestamp >= ts_start)
if event_filter.end:
query = query = query.filter(Meter.timestamp < event_filter.end)
ts_end = event_filter.end
query = query.filter(Meter.timestamp < ts_end)
if event_filter.user:
query = query.filter_by(user_id=event_filter.user)
elif event_filter.project:
@ -249,13 +251,26 @@ class Connection(base.Connection):
del e['id']
yield e
def _make_volume_query(self, event_filter, counter_volume_func):
"""Returns complex Meter counter_volume query for max and sum"""
subq = model_query(Meter.id, session=self.session)
subq = make_query_from_filter(subq, event_filter, require_meter=False)
subq = subq.subquery()
mainq = self.session.query(Resource.id, counter_volume_func)
mainq = mainq.join(Meter).group_by(Resource.id)
return mainq.filter(Meter.id.in_(subq))
def get_volume_sum(self, event_filter):
# it isn't clear these are used
pass
counter_volume_func = func.sum(Meter.counter_volume)
query = self._make_volume_query(event_filter, counter_volume_func)
results = query.all()
return ({'resource_id': x, 'value': y} for x, y in results)
def get_volume_max(self, event_filter):
# it isn't clear these are used
pass
counter_volume_func = func.max(Meter.counter_volume)
query = self._make_volume_query(event_filter, counter_volume_func)
results = query.all()
return ({'resource_id': x, 'value': y} for x, y in results)
def get_event_interval(self, event_filter):
"""Return the min and max timestamps from events,
@ -263,7 +278,6 @@ class Connection(base.Connection):
( datetime.datetime(), datetime.datetime() )
"""
func = sqlalchemy_session.sqlalchemy.func
query = self.session.query(func.min(Meter.timestamp),
func.max(Meter.timestamp))
query = make_query_from_filter(query, event_filter)

View File

@ -22,6 +22,7 @@ import re
import time
import sqlalchemy
from sqlalchemy import func
from sqlalchemy.exc import DisconnectionError, OperationalError
import sqlalchemy.orm
from sqlalchemy.pool import NullPool, StaticPool

View File

@ -20,6 +20,7 @@ import datetime
import logging
import os
import re
import sqlalchemy
import unittest
from ceilometer import counter
@ -36,7 +37,8 @@ LOG = logging.getLogger(__name__)
CEILOMETER_TEST_LIVE = bool(int(os.environ.get('CEILOMETER_TEST_LIVE', 0)))
if CEILOMETER_TEST_LIVE:
MYSQL_DBNAME = 'ceilometer_test'
MYSQL_URL = 'mysql://ceilometer:somepass@localhost/%s' % MYSQL_DBNAME
MYSQL_BASE_URL = 'mysql://ceilometer:somepass@localhost/'
MYSQL_URL = MYSQL_BASE_URL + MYSQL_DBNAME
class Connection(impl_sqlalchemy.Connection):
@ -49,10 +51,10 @@ class Connection(impl_sqlalchemy.Connection):
raise
class SQLAlchemyEngineTestBase(unittest.TestCase):
class SQLAlchemyEngineSubBase(unittest.TestCase):
def tearDown(self):
super(SQLAlchemyEngineTestBase, self).tearDown()
super(SQLAlchemyEngineSubBase, self).tearDown()
engine_conn = self.session.bind.connect()
if CEILOMETER_TEST_LIVE:
engine_conn.execute('drop database %s' % MYSQL_DBNAME)
@ -62,7 +64,7 @@ class SQLAlchemyEngineTestBase(unittest.TestCase):
self.session.bind.dispose()
def setUp(self):
super(SQLAlchemyEngineTestBase, self).setUp()
super(SQLAlchemyEngineSubBase, self).setUp()
self.conf = cfg.CONF
self.conf.database_connection = 'sqlite://'
@ -72,12 +74,28 @@ class SQLAlchemyEngineTestBase(unittest.TestCase):
# should pull from conf file but for now manually specified
# just make sure ceilometer_test db exists in mysql
self.conf.database_connection = MYSQL_URL
engine = sqlalchemy.create_engine(MYSQL_BASE_URL)
engine_conn = engine.connect()
try:
engine_conn.execute('drop database %s' % MYSQL_DBNAME)
except sqlalchemy.exc.OperationalError:
pass
engine_conn.execute('create database %s' % MYSQL_DBNAME)
self.conn = Connection(self.conf)
self.session = self.conn.session
migration.db_sync()
class SQLAlchemyEngineTestBase(SQLAlchemyEngineSubBase):
def tearDown(self):
super(SQLAlchemyEngineTestBase, self).tearDown()
def setUp(self):
super(SQLAlchemyEngineTestBase, self).setUp()
self.counter = counter.Counter(
'instance',
counter.TYPE_CUMULATIVE,
@ -458,3 +476,204 @@ class TestGetEventInterval(SQLAlchemyEngineTestBase):
s, e = self.conn.get_event_interval(self._filter)
assert s is None
assert e is None
class SumTest(SQLAlchemyEngineTestBase):
def setUp(self):
super(SumTest, self).setUp()
def test_by_user(self):
f = storage.EventFilter(
user='user-id',
meter='instance',
)
results = list(self.conn.get_volume_sum(f))
counts = dict((r['resource_id'], r['value'])
for r in results)
assert counts['resource-id'] == 1
assert counts['resource-id-alternate'] == 1
assert set(counts.keys()) == set(['resource-id',
'resource-id-alternate'])
def test_by_project(self):
f = storage.EventFilter(
project='project-id',
meter='instance',
)
results = list(self.conn.get_volume_sum(f))
assert results
counts = dict((r['resource_id'], r['value'])
for r in results)
assert counts['resource-id'] == 1
assert counts['resource-id-alternate'] == 2
assert set(counts.keys()) == set(['resource-id',
'resource-id-alternate'])
def test_one_resource(self):
f = storage.EventFilter(
user='user-id',
meter='instance',
resource='resource-id',
)
results = list(self.conn.get_volume_sum(f))
assert results
counts = dict((r['resource_id'], r['value'])
for r in results)
assert counts['resource-id'] == 1
assert set(counts.keys()) == set(['resource-id'])
class MaxProjectTest(SQLAlchemyEngineSubBase):
def setUp(self):
super(MaxProjectTest, self).setUp()
self.counters = []
for i in range(3):
c = counter.Counter(
'volume.size',
'gauge',
5 + i,
'user-id',
'project1',
'resource-id-%s' % i,
timestamp=datetime.datetime(2012, 9, 25, 10 + i, 30 + i),
resource_metadata={'display_name': 'test-volume',
'tag': 'self.counter',
}
)
self.counters.append(c)
msg = meter.meter_message_from_counter(c,
cfg.CONF.metering_secret,
'source1',
)
self.conn.record_metering_data(msg)
def test_no_bounds(self):
expected = [{'value': 5.0, 'resource_id': u'resource-id-0'},
{'value': 6.0, 'resource_id': u'resource-id-1'},
{'value': 7.0, 'resource_id': u'resource-id-2'}]
f = storage.EventFilter(project='project1')
results = list(self.conn.get_volume_max(f))
assert results == expected
def test_start_timestamp(self):
expected = [{'value': 6L, 'resource_id': u'resource-id-1'},
{'value': 7L, 'resource_id': u'resource-id-2'}]
f = storage.EventFilter(project='project1',
start='2012-09-25T11:30:00')
results = list(self.conn.get_volume_max(f))
assert results == expected
def test_start_timestamp_after(self):
f = storage.EventFilter(project='project1',
start='2012-09-25T12:34:00')
results = list(self.conn.get_volume_max(f))
assert results == []
def test_end_timestamp(self):
expected = [{'value': 5L, 'resource_id': u'resource-id-0'}]
f = storage.EventFilter(project='project1', end='2012-09-25T11:30:00')
results = list(self.conn.get_volume_max(f))
assert results == expected
def test_end_timestamp_before(self):
f = storage.EventFilter(project='project1', end='2012-09-25T09:54:00')
results = list(self.conn.get_volume_max(f))
assert results == []
def test_start_end_timestamp(self):
expected = [{'value': 6L, 'resource_id': u'resource-id-1'}]
f = storage.EventFilter(project='project1',
start='2012-09-25T11:30:00',
end='2012-09-25T11:32:00')
results = list(self.conn.get_volume_max(f))
assert results == expected
class MaxResourceTest(SQLAlchemyEngineSubBase):
def setUp(self):
super(MaxResourceTest, self).setUp()
self.counters = []
for i in range(3):
c = counter.Counter(
'volume.size',
'gauge',
5 + i,
'user-id',
'project1',
'resource-id',
timestamp=datetime.datetime(2012, 9, 25, 10 + i, 30 + i),
resource_metadata={'display_name': 'test-volume',
'tag': 'self.counter',
}
)
self.counters.append(c)
msg = meter.meter_message_from_counter(c,
cfg.CONF.metering_secret,
'source1',
)
self.conn.record_metering_data(msg)
def test_no_bounds(self):
expected = [{'value': 7L, 'resource_id': u'resource-id'}]
f = storage.EventFilter(resource='resource-id')
results = list(self.conn.get_volume_max(f))
assert results == expected
def test_start_timestamp(self):
expected = [{'value': 7L, 'resource_id': u'resource-id'}]
f = storage.EventFilter(resource='resource-id',
start='2012-09-25T11:30:00')
results = list(self.conn.get_volume_max(f))
assert results == expected
def test_start_timestamp_after(self):
f = storage.EventFilter(resource='resource-id',
start='2012-09-25T12:34:00')
results = list(self.conn.get_volume_max(f))
assert results == []
def test_end_timestamp(self):
expected = [{'value': 5L, 'resource_id': u'resource-id'}]
f = storage.EventFilter(resource='resource-id',
end='2012-09-25T11:30:00')
results = list(self.conn.get_volume_max(f))
assert results == expected
def test_end_timestamp_before(self):
f = storage.EventFilter(resource='resource-id',
end='2012-09-25T09:54:00')
results = list(self.conn.get_volume_max(f))
assert results == []
def test_start_end_timestamp(self):
expected = [{'value': 6L, 'resource_id': u'resource-id'}]
f = storage.EventFilter(resource='resource-id',
start='2012-09-25T11:30:00',
end='2012-09-25T11:32:00')
results = list(self.conn.get_volume_max(f))
assert results == expected