diff --git a/ceilometer/storage/__init__.py b/ceilometer/storage/__init__.py index 9751df586..952f38883 100644 --- a/ceilometer/storage/__init__.py +++ b/ceilometer/storage/__init__.py @@ -33,7 +33,7 @@ LOG = log.getLogger(__name__) STORAGE_ENGINE_NAMESPACE = 'ceilometer.storage' -STORAGE_OPTS = [ +OLD_STORAGE_OPTS = [ cfg.StrOpt('database_connection', secret=True, default=None, @@ -41,8 +41,18 @@ STORAGE_OPTS = [ ), ] +cfg.CONF.register_opts(OLD_STORAGE_OPTS) + + +STORAGE_OPTS = [ + cfg.IntOpt('time_to_live', + default=-1, + help="""number of seconds that samples are kept +in the database for (<= 0 means forever)"""), +] + +cfg.CONF.register_opts(STORAGE_OPTS, group='database') -cfg.CONF.register_opts(STORAGE_OPTS) cfg.CONF.import_opt('connection', 'ceilometer.openstack.common.db.sqlalchemy.session', group='database') @@ -132,3 +142,11 @@ class EventFilter(object): def dbsync(): service.prepare_service() get_connection(cfg.CONF).upgrade() + + +def expirer(): + service.prepare_service() + LOG.debug("Clearing expired metering data") + storage_conn = get_connection(cfg.CONF) + storage_conn.clear_expired_metering_data( + cfg.CONF.database.time_to_live) diff --git a/ceilometer/storage/base.py b/ceilometer/storage/base.py index a7d80b842..87670d47d 100644 --- a/ceilometer/storage/base.py +++ b/ceilometer/storage/base.py @@ -82,6 +82,15 @@ class Connection(object): All timestamps must be naive utc datetime object. """ + @abc.abstractmethod + def clear_expired_metering_data(self, ttl): + """Clear expired data from the backend storage system according to the + time-to-live. + + :param ttl: Number of seconds to keep records for. + + """ + @abc.abstractmethod def get_users(self, source=None): """Return an iterable of user id strings. diff --git a/ceilometer/storage/impl_hbase.py b/ceilometer/storage/impl_hbase.py index 69ef2a6c0..e2b0ad8a0 100644 --- a/ceilometer/storage/impl_hbase.py +++ b/ceilometer/storage/impl_hbase.py @@ -257,6 +257,15 @@ class Connection(base.Connection): record['f:message'] = json.dumps(data) meter_table.put(row, record) + def clear_expired_metering_data(self, ttl): + """Clear expired data from the backend storage system according to the + time-to-live. + + :param ttl: Number of seconds to keep records for. + + """ + raise NotImplementedError + def get_users(self, source=None): """Return an iterable of user id strings. diff --git a/ceilometer/storage/impl_log.py b/ceilometer/storage/impl_log.py index 1c3ffa68b..ae3155f08 100644 --- a/ceilometer/storage/impl_log.py +++ b/ceilometer/storage/impl_log.py @@ -62,6 +62,15 @@ class Connection(base.Connection): data['resource_id'], data['counter_volume']) + def clear_expired_metering_data(self, ttl): + """Clear expired data from the backend storage system according to the + time-to-live. + + :param ttl: Number of seconds to keep records for. + + """ + LOG.info("Dropping data with TTL %d", ttl) + def get_users(self, source=None): """Return an iterable of user id strings. diff --git a/ceilometer/storage/impl_mongodb.py b/ceilometer/storage/impl_mongodb.py index be7a58d3f..2ab2ae53d 100644 --- a/ceilometer/storage/impl_mongodb.py +++ b/ceilometer/storage/impl_mongodb.py @@ -21,6 +21,7 @@ """ import copy +import datetime import operator import os import re @@ -34,9 +35,14 @@ import pymongo from oslo.config import cfg from ceilometer.openstack.common import log +from ceilometer.openstack.common import timeutils +from ceilometer import storage from ceilometer.storage import base from ceilometer.storage import models +cfg.CONF.import_opt('time_to_live', 'ceilometer.storage', + group="database") + LOG = log.getLogger(__name__) @@ -181,6 +187,17 @@ class Connection(base.Connection): CONNECTION_POOL = ConnectionPool() + REDUCE_GROUP_CLEAN = bson.code.Code(""" + function ( curr, result ) { + if (result.resources.indexOf(curr.resource_id) < 0) + result.resources.push(curr.resource_id); + if (result.users.indexOf(curr.user_id) < 0) + result.users.push(curr.user_id); + if (result.projects.indexOf(curr.project_id) < 0) + result.projects.push(curr.project_id); + } + """) + MAP_STATS = bson.code.Code(""" function () { emit('statistics', { min : this.counter_volume, @@ -293,6 +310,39 @@ class Connection(base.Connection): ('source', pymongo.ASCENDING), ], name='meter_idx') + # Since mongodb 2.2 support db-ttl natively + if self._is_natively_ttl_supported(): + self._ensure_meter_ttl_index() + + def _ensure_meter_ttl_index(self): + indexes = self.db.meter.index_information() + + ttl = cfg.CONF.database.time_to_live + + if ttl <= 0: + if 'meter_ttl' in indexes: + self.db.meter.drop_index('meter_ttl') + return + + if 'meter_ttl' in indexes: + # NOTE(sileht): manually check expireAfterSeconds because + # ensure_index doesn't update index options if the index already + # exists + if ttl == indexes['meter_ttl'].get('expireAfterSeconds', -1): + return + + self.db.meter.drop_index('meter_ttl') + + self.db.meter.create_index( + [('timestamp', pymongo.ASCENDING)], + expireAfterSeconds=ttl, + name='meter_ttl' + ) + + def _is_natively_ttl_supported(self): + # Assume is not supported if we can get the version + return self.conn.server_info().get('versionArray', []) >= [2, 2] + @staticmethod def upgrade(version=None): pass @@ -358,6 +408,35 @@ class Connection(base.Connection): record = copy.copy(data) self.db.meter.insert(record) + def clear_expired_metering_data(self, ttl): + """Clear expired data from the backend storage system according to the + time-to-live. + + :param ttl: Number of seconds to keep records for. + + """ + # Before mongodb 2.2 we need to clear expired data manually + if not self._is_natively_ttl_supported(): + end = timeutils.utcnow() - datetime.timedelta(seconds=ttl) + f = storage.SampleFilter(end=end) + q = make_query_from_filter(f, require_meter=False) + self.db.meter.remove(q) + + results = self.db.meter.group( + key={}, + condition={}, + reduce=self.REDUCE_GROUP_CLEAN, + initial={ + 'resources': [], + 'users': [], + 'projects': [], + } + )[0] + + self.db.user.remove({'_id': {'$nin': results['users']}}) + self.db.project.remove({'_id': {'$nin': results['projects']}}) + self.db.resource.remove({'_id': {'$nin': results['resources']}}) + def get_users(self, source=None): """Return an iterable of user id strings. diff --git a/ceilometer/storage/impl_sqlalchemy.py b/ceilometer/storage/impl_sqlalchemy.py index e95911f16..b233ec66b 100644 --- a/ceilometer/storage/impl_sqlalchemy.py +++ b/ceilometer/storage/impl_sqlalchemy.py @@ -19,6 +19,7 @@ from __future__ import absolute_import +import datetime import operator import os import uuid @@ -221,6 +222,35 @@ class Connection(base.Connection): meter.message_id = data['message_id'] session.flush() + @staticmethod + def clear_expired_metering_data(ttl): + """Clear expired data from the backend storage system according to the + time-to-live. + + :param ttl: Number of seconds to keep records for. + + """ + session = sqlalchemy_session.get_session() + query = session.query(Meter.id) + end = timeutils.utcnow() - datetime.timedelta(seconds=ttl) + query = query.filter(Meter.timestamp < end) + query.delete() + + query = session.query(User.id).filter(~User.id.in_( + session.query(Meter.user_id).group_by(Meter.user_id) + )) + query.delete(synchronize_session='fetch') + + query = session.query(Project.id).filter(~Project.id.in_( + session.query(Meter.project_id).group_by(Meter.project_id) + )) + query.delete(synchronize_session='fetch') + + query = session.query(Resource.id).filter(~Resource.id.in_( + session.query(Meter.resource_id).group_by(Meter.resource_id) + )) + query.delete(synchronize_session='fetch') + @staticmethod def get_users(source=None): """Return an iterable of user id strings. diff --git a/etc/ceilometer/ceilometer.conf.sample b/etc/ceilometer/ceilometer.conf.sample index 70b66c0e6..45d7689bd 100644 --- a/etc/ceilometer/ceilometer.conf.sample +++ b/etc/ceilometer/ceilometer.conf.sample @@ -526,6 +526,15 @@ #pool_timeout= +# +# Options defined in ceilometer.storage +# + +# number of seconds that samples are kept in the database for +# (<= 0 means forever) (integer value) +#time_to_live=-1 + + [alarm] # @@ -635,4 +644,4 @@ #password= -# Total option count: 122 +# Total option count: 123 diff --git a/setup.cfg b/setup.cfg index 45c3889df..95070144a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -113,6 +113,7 @@ console_scripts = ceilometer-agent-central = ceilometer.central.manager:agent_central ceilometer-agent-compute = ceilometer.compute.manager:agent_compute ceilometer-dbsync = ceilometer.storage:dbsync + ceilometer-expirer = ceilometer.storage:expirer ceilometer-collector = ceilometer.collector.service:collector ceilometer-collector-udp = ceilometer.collector.service:udp_collector ceilometer-alarm-singleton = ceilometer.alarm.service:singleton_alarm diff --git a/tests/storage/base.py b/tests/storage/base.py index a9f87b5a3..096119e01 100644 --- a/tests/storage/base.py +++ b/tests/storage/base.py @@ -26,6 +26,7 @@ import datetime from oslo.config import cfg from ceilometer.publisher import rpc +from ceilometer.openstack.common import timeutils from ceilometer import counter from ceilometer import storage from ceilometer.tests import db as test_db @@ -40,6 +41,10 @@ class DBTestBase(test_db.TestBase): super(DBTestBase, self).setUp() self.prepare_data() + def tearDown(self): + timeutils.utcnow.override_time = None + super(DBTestBase, self).tearDown() + def prepare_data(self): original_timestamps = [(2012, 7, 2, 10, 40), (2012, 7, 2, 10, 41), (2012, 7, 2, 10, 41), (2012, 7, 2, 10, 42), @@ -496,6 +501,46 @@ class RawSampleTest(DBTestBase): assert results assert len(results) == 1 + def test_clear_metering_data(self): + timeutils.utcnow.override_time = datetime.datetime(2012, 7, 2, 10, 45) + + try: + self.conn.clear_expired_metering_data(3 * 60) + except NotImplementedError: + got_not_imp = True + self.assertTrue(got_not_imp) + return + + f = storage.SampleFilter(meter='instance') + results = list(self.conn.get_samples(f)) + self.assertEqual(len(results), 5) + results = list(self.conn.get_users()) + self.assertEqual(len(results), 5) + results = list(self.conn.get_projects()) + self.assertEqual(len(results), 5) + results = list(self.conn.get_resources()) + self.assertEqual(len(results), 5) + + def test_clear_metering_data_no_data_to_remove(self): + timeutils.utcnow.override_time = datetime.datetime(2010, 7, 2, 10, 45) + + try: + self.conn.clear_expired_metering_data(3 * 60) + except NotImplementedError: + got_not_imp = True + self.assertTrue(got_not_imp) + return + + f = storage.SampleFilter(meter='instance') + results = list(self.conn.get_samples(f)) + self.assertEqual(len(results), 10) + results = list(self.conn.get_users()) + self.assertEqual(len(results), 9) + results = list(self.conn.get_projects()) + self.assertEqual(len(results), 8) + results = list(self.conn.get_resources()) + self.assertEqual(len(results), 9) + class StatisticsTest(DBTestBase): diff --git a/tests/storage/test_impl_mongodb.py b/tests/storage/test_impl_mongodb.py index 5dff9af69..bff915064 100644 --- a/tests/storage/test_impl_mongodb.py +++ b/tests/storage/test_impl_mongodb.py @@ -45,6 +45,61 @@ class MongoDBConnection(MongoDBEngineTestBase): impl_mongodb.Connection(cfg.CONF).conn) +class IndexTest(MongoDBEngineTestBase): + def test_meter_ttl_index_absent(self): + # create a fake index and check it is deleted + self.conn.db.meter.ensure_index('foo', name='meter_ttl') + cfg.CONF.set_override('time_to_live', -1, group='database') + + self.conn._ensure_meter_ttl_index() + self.assertTrue(self.conn.db.meter.ensure_index('foo', + name='meter_ttl')) + cfg.CONF.set_override('time_to_live', 456789, group='database') + self.conn._ensure_meter_ttl_index() + self.assertFalse(self.conn.db.meter.ensure_index('foo', + name='meter_ttl')) + + def test_meter_ttl_index_present(self): + cfg.CONF.set_override('time_to_live', 456789, group='database') + self.conn._ensure_meter_ttl_index() + self.assertFalse(self.conn.db.meter.ensure_index('foo', + name='meter_ttl')) + self.assertEqual(self.conn.db.meter.index_information()[ + 'meter_ttl']['expireAfterSeconds'], 456789) + + cfg.CONF.set_override('time_to_live', -1, group='database') + self.conn._ensure_meter_ttl_index() + self.assertTrue(self.conn.db.meter.ensure_index('foo', + name='meter_ttl')) + + def test_ttl_index_is_supported(self): + self.mox.StubOutWithMock(self.conn.conn, "server_info") + self.conn.conn.server_info().AndReturn({'versionArray': [2, 4, 5, 0]}) + + self.mox.ReplayAll() + self.assertTrue(self.conn._is_natively_ttl_supported()) + self.mox.UnsetStubs() + self.mox.VerifyAll() + + def test_ttl_index_is_not_supported(self): + self.mox.StubOutWithMock(self.conn.conn, "server_info") + self.conn.conn.server_info().AndReturn({'versionArray': [2, 0, 1, 0]}) + + self.mox.ReplayAll() + self.assertFalse(self.conn._is_natively_ttl_supported()) + self.mox.UnsetStubs() + self.mox.VerifyAll() + + def test_ttl_index_is_unkown(self): + self.mox.StubOutWithMock(self.conn.conn, "server_info") + self.conn.conn.server_info().AndReturn({}) + + self.mox.ReplayAll() + self.assertFalse(self.conn._is_natively_ttl_supported()) + self.mox.UnsetStubs() + self.mox.VerifyAll() + + class UserTest(base.UserTest, MongoDBEngineTestBase): pass @@ -62,7 +117,25 @@ class MeterTest(base.MeterTest, MongoDBEngineTestBase): class RawSampleTest(base.RawSampleTest, MongoDBEngineTestBase): - pass + def test_clear_metering_data(self): + # NOTE(sileht): ensure this tests is played for any version of mongo + self.mox.StubOutWithMock(self.conn, "_is_natively_ttl_supported") + self.conn._is_natively_ttl_supported().AndReturn(False) + + self.mox.ReplayAll() + super(RawSampleTest, self).test_clear_metering_data() + self.mox.UnsetStubs() + self.mox.VerifyAll() + + def test_clear_metering_data_no_data_to_remove(self): + # NOTE(sileht): ensure this tests is played for any version of mongo + self.mox.StubOutWithMock(self.conn, "_is_natively_ttl_supported") + self.conn._is_natively_ttl_supported().AndReturn(False) + + self.mox.ReplayAll() + super(RawSampleTest, self).test_clear_metering_data_no_data_to_remove() + self.mox.UnsetStubs() + self.mox.VerifyAll() class StatisticsTest(base.StatisticsTest, MongoDBEngineTestBase): diff --git a/tests/test_bin.py b/tests/test_bin.py index 56cc0e9d1..a358b5794 100644 --- a/tests/test_bin.py +++ b/tests/test_bin.py @@ -27,9 +27,9 @@ import time from ceilometer.tests import base -class BinDbsyncTestCase(base.TestCase): +class BinTestCase(base.TestCase): def setUp(self): - super(BinDbsyncTestCase, self).setUp() + super(BinTestCase, self).setUp() self.tempfile = self.temp_config_file_path() with open(self.tempfile, 'w') as tmp: tmp.write("[database]\n") @@ -40,6 +40,11 @@ class BinDbsyncTestCase(base.TestCase): "--config-file=%s" % self.tempfile]) self.assertEqual(subp.wait(), 0) + def test_run_expirer(self): + subp = subprocess.Popen(['ceilometer-expirer', + "--config-file=%s" % self.tempfile]) + self.assertEqual(subp.wait(), 0) + class BinSendCounterTestCase(base.TestCase): def setUp(self):