diff --git a/ceilometer/storage/__init__.py b/ceilometer/storage/__init__.py index e793f7c70..9f18f1b96 100644 --- a/ceilometer/storage/__init__.py +++ b/ceilometer/storage/__init__.py @@ -19,9 +19,11 @@ """ from stevedore import driver +from datetime import datetime from ceilometer.openstack.common import log from ceilometer.openstack.common import cfg +from ceilometer.openstack.common import timeutils from urlparse import urlparse @@ -83,8 +85,16 @@ class EventFilter(object): resource=None, meter=None, source=None): self.user = user self.project = project - self.start = start - self.end = end + self.start = self._sanitize_timestamp(start) + self.end = self._sanitize_timestamp(end) self.resource = resource self.meter = meter self.source = source + + def _sanitize_timestamp(self, timestamp): + """Return a naive utc datetime object""" + if not timestamp: + return timestamp + if not isinstance(timestamp, datetime): + timestamp = timeutils.parse_isotime(timestamp) + return timeutils.normalize_time(timestamp) diff --git a/ceilometer/storage/impl_sqlalchemy.py b/ceilometer/storage/impl_sqlalchemy.py index 553b4ec2b..b28e34fd5 100644 --- a/ceilometer/storage/impl_sqlalchemy.py +++ b/ceilometer/storage/impl_sqlalchemy.py @@ -19,11 +19,11 @@ import copy import datetime -from ceilometer.openstack.common import log -from ceilometer.openstack.common import cfg +from ceilometer.openstack.common import cfg, log, timeutils from ceilometer.storage import base from ceilometer.storage.sqlalchemy.models import Meter, Project, Resource from ceilometer.storage.sqlalchemy.models import Source, User +from ceilometer.storage.sqlalchemy.session import func import ceilometer.storage.sqlalchemy.session as sqlalchemy_session LOG = log.getLogger(__name__) @@ -83,9 +83,11 @@ def make_query_from_filter(query, event_filter, require_meter=True): if event_filter.source: query = query.filter_by(source=event_filter.source) if event_filter.start: - query = query = query.filter(Meter.timestamp >= event_filter.start) + ts_start = event_filter.start + query = query.filter(Meter.timestamp >= ts_start) if event_filter.end: - query = query = query.filter(Meter.timestamp < event_filter.end) + ts_end = event_filter.end + query = query.filter(Meter.timestamp < ts_end) if event_filter.user: query = query.filter_by(user_id=event_filter.user) elif event_filter.project: @@ -249,13 +251,26 @@ class Connection(base.Connection): del e['id'] yield e + def _make_volume_query(self, event_filter, counter_volume_func): + """Returns complex Meter counter_volume query for max and sum""" + subq = model_query(Meter.id, session=self.session) + subq = make_query_from_filter(subq, event_filter, require_meter=False) + subq = subq.subquery() + mainq = self.session.query(Resource.id, counter_volume_func) + mainq = mainq.join(Meter).group_by(Resource.id) + return mainq.filter(Meter.id.in_(subq)) + def get_volume_sum(self, event_filter): - # it isn't clear these are used - pass + counter_volume_func = func.sum(Meter.counter_volume) + query = self._make_volume_query(event_filter, counter_volume_func) + results = query.all() + return ({'resource_id': x, 'value': y} for x, y in results) def get_volume_max(self, event_filter): - # it isn't clear these are used - pass + counter_volume_func = func.max(Meter.counter_volume) + query = self._make_volume_query(event_filter, counter_volume_func) + results = query.all() + return ({'resource_id': x, 'value': y} for x, y in results) def get_event_interval(self, event_filter): """Return the min and max timestamps from events, @@ -263,7 +278,6 @@ class Connection(base.Connection): ( datetime.datetime(), datetime.datetime() ) """ - func = sqlalchemy_session.sqlalchemy.func query = self.session.query(func.min(Meter.timestamp), func.max(Meter.timestamp)) query = make_query_from_filter(query, event_filter) diff --git a/ceilometer/storage/sqlalchemy/session.py b/ceilometer/storage/sqlalchemy/session.py index 858037c14..10de99e9e 100644 --- a/ceilometer/storage/sqlalchemy/session.py +++ b/ceilometer/storage/sqlalchemy/session.py @@ -22,6 +22,7 @@ import re import time import sqlalchemy +from sqlalchemy import func from sqlalchemy.exc import DisconnectionError, OperationalError import sqlalchemy.orm from sqlalchemy.pool import NullPool, StaticPool diff --git a/tests/storage/test_impl_sqlalchemy.py b/tests/storage/test_impl_sqlalchemy.py index f58e4b070..54b24d1eb 100644 --- a/tests/storage/test_impl_sqlalchemy.py +++ b/tests/storage/test_impl_sqlalchemy.py @@ -20,6 +20,7 @@ import datetime import logging import os import re +import sqlalchemy import unittest from ceilometer import counter @@ -36,7 +37,8 @@ LOG = logging.getLogger(__name__) CEILOMETER_TEST_LIVE = bool(int(os.environ.get('CEILOMETER_TEST_LIVE', 0))) if CEILOMETER_TEST_LIVE: MYSQL_DBNAME = 'ceilometer_test' - MYSQL_URL = 'mysql://ceilometer:somepass@localhost/%s' % MYSQL_DBNAME + MYSQL_BASE_URL = 'mysql://ceilometer:somepass@localhost/' + MYSQL_URL = MYSQL_BASE_URL + MYSQL_DBNAME class Connection(impl_sqlalchemy.Connection): @@ -49,10 +51,10 @@ class Connection(impl_sqlalchemy.Connection): raise -class SQLAlchemyEngineTestBase(unittest.TestCase): +class SQLAlchemyEngineSubBase(unittest.TestCase): def tearDown(self): - super(SQLAlchemyEngineTestBase, self).tearDown() + super(SQLAlchemyEngineSubBase, self).tearDown() engine_conn = self.session.bind.connect() if CEILOMETER_TEST_LIVE: engine_conn.execute('drop database %s' % MYSQL_DBNAME) @@ -62,7 +64,7 @@ class SQLAlchemyEngineTestBase(unittest.TestCase): self.session.bind.dispose() def setUp(self): - super(SQLAlchemyEngineTestBase, self).setUp() + super(SQLAlchemyEngineSubBase, self).setUp() self.conf = cfg.CONF self.conf.database_connection = 'sqlite://' @@ -72,12 +74,28 @@ class SQLAlchemyEngineTestBase(unittest.TestCase): # should pull from conf file but for now manually specified # just make sure ceilometer_test db exists in mysql self.conf.database_connection = MYSQL_URL + engine = sqlalchemy.create_engine(MYSQL_BASE_URL) + engine_conn = engine.connect() + try: + engine_conn.execute('drop database %s' % MYSQL_DBNAME) + except sqlalchemy.exc.OperationalError: + pass + engine_conn.execute('create database %s' % MYSQL_DBNAME) self.conn = Connection(self.conf) self.session = self.conn.session migration.db_sync() + +class SQLAlchemyEngineTestBase(SQLAlchemyEngineSubBase): + + def tearDown(self): + super(SQLAlchemyEngineTestBase, self).tearDown() + + def setUp(self): + super(SQLAlchemyEngineTestBase, self).setUp() + self.counter = counter.Counter( 'instance', counter.TYPE_CUMULATIVE, @@ -458,3 +476,204 @@ class TestGetEventInterval(SQLAlchemyEngineTestBase): s, e = self.conn.get_event_interval(self._filter) assert s is None assert e is None + + +class SumTest(SQLAlchemyEngineTestBase): + + def setUp(self): + super(SumTest, self).setUp() + + def test_by_user(self): + f = storage.EventFilter( + user='user-id', + meter='instance', + ) + results = list(self.conn.get_volume_sum(f)) + counts = dict((r['resource_id'], r['value']) + for r in results) + assert counts['resource-id'] == 1 + assert counts['resource-id-alternate'] == 1 + assert set(counts.keys()) == set(['resource-id', + 'resource-id-alternate']) + + def test_by_project(self): + f = storage.EventFilter( + project='project-id', + meter='instance', + ) + results = list(self.conn.get_volume_sum(f)) + assert results + counts = dict((r['resource_id'], r['value']) + for r in results) + assert counts['resource-id'] == 1 + assert counts['resource-id-alternate'] == 2 + assert set(counts.keys()) == set(['resource-id', + 'resource-id-alternate']) + + def test_one_resource(self): + f = storage.EventFilter( + user='user-id', + meter='instance', + resource='resource-id', + ) + results = list(self.conn.get_volume_sum(f)) + assert results + counts = dict((r['resource_id'], r['value']) + for r in results) + assert counts['resource-id'] == 1 + assert set(counts.keys()) == set(['resource-id']) + + +class MaxProjectTest(SQLAlchemyEngineSubBase): + + def setUp(self): + super(MaxProjectTest, self).setUp() + + self.counters = [] + for i in range(3): + c = counter.Counter( + 'volume.size', + 'gauge', + 5 + i, + 'user-id', + 'project1', + 'resource-id-%s' % i, + timestamp=datetime.datetime(2012, 9, 25, 10 + i, 30 + i), + resource_metadata={'display_name': 'test-volume', + 'tag': 'self.counter', + } + ) + self.counters.append(c) + msg = meter.meter_message_from_counter(c, + cfg.CONF.metering_secret, + 'source1', + ) + self.conn.record_metering_data(msg) + + def test_no_bounds(self): + expected = [{'value': 5.0, 'resource_id': u'resource-id-0'}, + {'value': 6.0, 'resource_id': u'resource-id-1'}, + {'value': 7.0, 'resource_id': u'resource-id-2'}] + + f = storage.EventFilter(project='project1') + + results = list(self.conn.get_volume_max(f)) + assert results == expected + + def test_start_timestamp(self): + expected = [{'value': 6L, 'resource_id': u'resource-id-1'}, + {'value': 7L, 'resource_id': u'resource-id-2'}] + + f = storage.EventFilter(project='project1', + start='2012-09-25T11:30:00') + + results = list(self.conn.get_volume_max(f)) + assert results == expected + + def test_start_timestamp_after(self): + f = storage.EventFilter(project='project1', + start='2012-09-25T12:34:00') + + results = list(self.conn.get_volume_max(f)) + assert results == [] + + def test_end_timestamp(self): + expected = [{'value': 5L, 'resource_id': u'resource-id-0'}] + + f = storage.EventFilter(project='project1', end='2012-09-25T11:30:00') + + results = list(self.conn.get_volume_max(f)) + assert results == expected + + def test_end_timestamp_before(self): + f = storage.EventFilter(project='project1', end='2012-09-25T09:54:00') + + results = list(self.conn.get_volume_max(f)) + assert results == [] + + def test_start_end_timestamp(self): + expected = [{'value': 6L, 'resource_id': u'resource-id-1'}] + + f = storage.EventFilter(project='project1', + start='2012-09-25T11:30:00', + end='2012-09-25T11:32:00') + + results = list(self.conn.get_volume_max(f)) + assert results == expected + + +class MaxResourceTest(SQLAlchemyEngineSubBase): + + def setUp(self): + super(MaxResourceTest, self).setUp() + + self.counters = [] + for i in range(3): + c = counter.Counter( + 'volume.size', + 'gauge', + 5 + i, + 'user-id', + 'project1', + 'resource-id', + timestamp=datetime.datetime(2012, 9, 25, 10 + i, 30 + i), + resource_metadata={'display_name': 'test-volume', + 'tag': 'self.counter', + } + ) + self.counters.append(c) + msg = meter.meter_message_from_counter(c, + cfg.CONF.metering_secret, + 'source1', + ) + self.conn.record_metering_data(msg) + + def test_no_bounds(self): + expected = [{'value': 7L, 'resource_id': u'resource-id'}] + + f = storage.EventFilter(resource='resource-id') + + results = list(self.conn.get_volume_max(f)) + assert results == expected + + def test_start_timestamp(self): + expected = [{'value': 7L, 'resource_id': u'resource-id'}] + + f = storage.EventFilter(resource='resource-id', + start='2012-09-25T11:30:00') + + results = list(self.conn.get_volume_max(f)) + assert results == expected + + def test_start_timestamp_after(self): + f = storage.EventFilter(resource='resource-id', + start='2012-09-25T12:34:00') + + results = list(self.conn.get_volume_max(f)) + assert results == [] + + def test_end_timestamp(self): + expected = [{'value': 5L, 'resource_id': u'resource-id'}] + + f = storage.EventFilter(resource='resource-id', + end='2012-09-25T11:30:00') + + results = list(self.conn.get_volume_max(f)) + assert results == expected + + def test_end_timestamp_before(self): + f = storage.EventFilter(resource='resource-id', + end='2012-09-25T09:54:00') + + results = list(self.conn.get_volume_max(f)) + assert results == [] + + def test_start_end_timestamp(self): + expected = [{'value': 6L, 'resource_id': u'resource-id'}] + + f = storage.EventFilter(resource='resource-id', + start='2012-09-25T11:30:00', + end='2012-09-25T11:32:00') + + results = list(self.conn.get_volume_max(f)) + assert results == expected