Allow to enable time to live on metering sample
The collector start the metering sample cleanup process every 'database_clear_expired_data_interval' seconds to remove sample according the 'database_time_to_live' If the backend support ttl natively (mongodb >= 2.2), the cleanup function in the backend is just a dump, and the backend handles the TTL itself. If the backend doesn't support ttl at all, it will raise a NotImplementedException caught by collector to fill the log correctly. Blueprint: db-ttl Change-Id: I869ce6f50065d0ae8d7095a260efbfcd33165eef
This commit is contained in:
parent
e84189590e
commit
2c2a07a688
@ -33,7 +33,7 @@ LOG = log.getLogger(__name__)
|
||||
|
||||
STORAGE_ENGINE_NAMESPACE = 'ceilometer.storage'
|
||||
|
||||
STORAGE_OPTS = [
|
||||
OLD_STORAGE_OPTS = [
|
||||
cfg.StrOpt('database_connection',
|
||||
secret=True,
|
||||
default=None,
|
||||
@ -41,8 +41,18 @@ STORAGE_OPTS = [
|
||||
),
|
||||
]
|
||||
|
||||
cfg.CONF.register_opts(OLD_STORAGE_OPTS)
|
||||
|
||||
|
||||
STORAGE_OPTS = [
|
||||
cfg.IntOpt('time_to_live',
|
||||
default=-1,
|
||||
help="""number of seconds that samples are kept
|
||||
in the database for (<= 0 means forever)"""),
|
||||
]
|
||||
|
||||
cfg.CONF.register_opts(STORAGE_OPTS, group='database')
|
||||
|
||||
cfg.CONF.register_opts(STORAGE_OPTS)
|
||||
cfg.CONF.import_opt('connection',
|
||||
'ceilometer.openstack.common.db.sqlalchemy.session',
|
||||
group='database')
|
||||
@ -132,3 +142,11 @@ class EventFilter(object):
|
||||
def dbsync():
|
||||
service.prepare_service()
|
||||
get_connection(cfg.CONF).upgrade()
|
||||
|
||||
|
||||
def expirer():
|
||||
service.prepare_service()
|
||||
LOG.debug("Clearing expired metering data")
|
||||
storage_conn = get_connection(cfg.CONF)
|
||||
storage_conn.clear_expired_metering_data(
|
||||
cfg.CONF.database.time_to_live)
|
||||
|
@ -82,6 +82,15 @@ class Connection(object):
|
||||
All timestamps must be naive utc datetime object.
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def clear_expired_metering_data(self, ttl):
|
||||
"""Clear expired data from the backend storage system according to the
|
||||
time-to-live.
|
||||
|
||||
:param ttl: Number of seconds to keep records for.
|
||||
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_users(self, source=None):
|
||||
"""Return an iterable of user id strings.
|
||||
|
@ -257,6 +257,15 @@ class Connection(base.Connection):
|
||||
record['f:message'] = json.dumps(data)
|
||||
meter_table.put(row, record)
|
||||
|
||||
def clear_expired_metering_data(self, ttl):
|
||||
"""Clear expired data from the backend storage system according to the
|
||||
time-to-live.
|
||||
|
||||
:param ttl: Number of seconds to keep records for.
|
||||
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def get_users(self, source=None):
|
||||
"""Return an iterable of user id strings.
|
||||
|
||||
|
@ -62,6 +62,15 @@ class Connection(base.Connection):
|
||||
data['resource_id'],
|
||||
data['counter_volume'])
|
||||
|
||||
def clear_expired_metering_data(self, ttl):
|
||||
"""Clear expired data from the backend storage system according to the
|
||||
time-to-live.
|
||||
|
||||
:param ttl: Number of seconds to keep records for.
|
||||
|
||||
"""
|
||||
LOG.info("Dropping data with TTL %d", ttl)
|
||||
|
||||
def get_users(self, source=None):
|
||||
"""Return an iterable of user id strings.
|
||||
|
||||
|
@ -21,6 +21,7 @@
|
||||
"""
|
||||
|
||||
import copy
|
||||
import datetime
|
||||
import operator
|
||||
import os
|
||||
import re
|
||||
@ -34,9 +35,14 @@ import pymongo
|
||||
from oslo.config import cfg
|
||||
|
||||
from ceilometer.openstack.common import log
|
||||
from ceilometer.openstack.common import timeutils
|
||||
from ceilometer import storage
|
||||
from ceilometer.storage import base
|
||||
from ceilometer.storage import models
|
||||
|
||||
cfg.CONF.import_opt('time_to_live', 'ceilometer.storage',
|
||||
group="database")
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
@ -181,6 +187,17 @@ class Connection(base.Connection):
|
||||
|
||||
CONNECTION_POOL = ConnectionPool()
|
||||
|
||||
REDUCE_GROUP_CLEAN = bson.code.Code("""
|
||||
function ( curr, result ) {
|
||||
if (result.resources.indexOf(curr.resource_id) < 0)
|
||||
result.resources.push(curr.resource_id);
|
||||
if (result.users.indexOf(curr.user_id) < 0)
|
||||
result.users.push(curr.user_id);
|
||||
if (result.projects.indexOf(curr.project_id) < 0)
|
||||
result.projects.push(curr.project_id);
|
||||
}
|
||||
""")
|
||||
|
||||
MAP_STATS = bson.code.Code("""
|
||||
function () {
|
||||
emit('statistics', { min : this.counter_volume,
|
||||
@ -293,6 +310,39 @@ class Connection(base.Connection):
|
||||
('source', pymongo.ASCENDING),
|
||||
], name='meter_idx')
|
||||
|
||||
# Since mongodb 2.2 support db-ttl natively
|
||||
if self._is_natively_ttl_supported():
|
||||
self._ensure_meter_ttl_index()
|
||||
|
||||
def _ensure_meter_ttl_index(self):
|
||||
indexes = self.db.meter.index_information()
|
||||
|
||||
ttl = cfg.CONF.database.time_to_live
|
||||
|
||||
if ttl <= 0:
|
||||
if 'meter_ttl' in indexes:
|
||||
self.db.meter.drop_index('meter_ttl')
|
||||
return
|
||||
|
||||
if 'meter_ttl' in indexes:
|
||||
# NOTE(sileht): manually check expireAfterSeconds because
|
||||
# ensure_index doesn't update index options if the index already
|
||||
# exists
|
||||
if ttl == indexes['meter_ttl'].get('expireAfterSeconds', -1):
|
||||
return
|
||||
|
||||
self.db.meter.drop_index('meter_ttl')
|
||||
|
||||
self.db.meter.create_index(
|
||||
[('timestamp', pymongo.ASCENDING)],
|
||||
expireAfterSeconds=ttl,
|
||||
name='meter_ttl'
|
||||
)
|
||||
|
||||
def _is_natively_ttl_supported(self):
|
||||
# Assume is not supported if we can get the version
|
||||
return self.conn.server_info().get('versionArray', []) >= [2, 2]
|
||||
|
||||
@staticmethod
|
||||
def upgrade(version=None):
|
||||
pass
|
||||
@ -358,6 +408,35 @@ class Connection(base.Connection):
|
||||
record = copy.copy(data)
|
||||
self.db.meter.insert(record)
|
||||
|
||||
def clear_expired_metering_data(self, ttl):
|
||||
"""Clear expired data from the backend storage system according to the
|
||||
time-to-live.
|
||||
|
||||
:param ttl: Number of seconds to keep records for.
|
||||
|
||||
"""
|
||||
# Before mongodb 2.2 we need to clear expired data manually
|
||||
if not self._is_natively_ttl_supported():
|
||||
end = timeutils.utcnow() - datetime.timedelta(seconds=ttl)
|
||||
f = storage.SampleFilter(end=end)
|
||||
q = make_query_from_filter(f, require_meter=False)
|
||||
self.db.meter.remove(q)
|
||||
|
||||
results = self.db.meter.group(
|
||||
key={},
|
||||
condition={},
|
||||
reduce=self.REDUCE_GROUP_CLEAN,
|
||||
initial={
|
||||
'resources': [],
|
||||
'users': [],
|
||||
'projects': [],
|
||||
}
|
||||
)[0]
|
||||
|
||||
self.db.user.remove({'_id': {'$nin': results['users']}})
|
||||
self.db.project.remove({'_id': {'$nin': results['projects']}})
|
||||
self.db.resource.remove({'_id': {'$nin': results['resources']}})
|
||||
|
||||
def get_users(self, source=None):
|
||||
"""Return an iterable of user id strings.
|
||||
|
||||
|
@ -19,6 +19,7 @@
|
||||
|
||||
from __future__ import absolute_import
|
||||
|
||||
import datetime
|
||||
import operator
|
||||
import os
|
||||
import uuid
|
||||
@ -221,6 +222,35 @@ class Connection(base.Connection):
|
||||
meter.message_id = data['message_id']
|
||||
session.flush()
|
||||
|
||||
@staticmethod
|
||||
def clear_expired_metering_data(ttl):
|
||||
"""Clear expired data from the backend storage system according to the
|
||||
time-to-live.
|
||||
|
||||
:param ttl: Number of seconds to keep records for.
|
||||
|
||||
"""
|
||||
session = sqlalchemy_session.get_session()
|
||||
query = session.query(Meter.id)
|
||||
end = timeutils.utcnow() - datetime.timedelta(seconds=ttl)
|
||||
query = query.filter(Meter.timestamp < end)
|
||||
query.delete()
|
||||
|
||||
query = session.query(User.id).filter(~User.id.in_(
|
||||
session.query(Meter.user_id).group_by(Meter.user_id)
|
||||
))
|
||||
query.delete(synchronize_session='fetch')
|
||||
|
||||
query = session.query(Project.id).filter(~Project.id.in_(
|
||||
session.query(Meter.project_id).group_by(Meter.project_id)
|
||||
))
|
||||
query.delete(synchronize_session='fetch')
|
||||
|
||||
query = session.query(Resource.id).filter(~Resource.id.in_(
|
||||
session.query(Meter.resource_id).group_by(Meter.resource_id)
|
||||
))
|
||||
query.delete(synchronize_session='fetch')
|
||||
|
||||
@staticmethod
|
||||
def get_users(source=None):
|
||||
"""Return an iterable of user id strings.
|
||||
|
@ -526,6 +526,15 @@
|
||||
#pool_timeout=<None>
|
||||
|
||||
|
||||
#
|
||||
# Options defined in ceilometer.storage
|
||||
#
|
||||
|
||||
# number of seconds that samples are kept in the database for
|
||||
# (<= 0 means forever) (integer value)
|
||||
#time_to_live=-1
|
||||
|
||||
|
||||
[alarm]
|
||||
|
||||
#
|
||||
@ -635,4 +644,4 @@
|
||||
#password=<None>
|
||||
|
||||
|
||||
# Total option count: 122
|
||||
# Total option count: 123
|
||||
|
@ -111,6 +111,7 @@ console_scripts =
|
||||
ceilometer-agent-central = ceilometer.central.manager:agent_central
|
||||
ceilometer-agent-compute = ceilometer.compute.manager:agent_compute
|
||||
ceilometer-dbsync = ceilometer.storage:dbsync
|
||||
ceilometer-expirer = ceilometer.storage:expirer
|
||||
ceilometer-collector = ceilometer.collector.service:collector
|
||||
ceilometer-collector-udp = ceilometer.collector.service:udp_collector
|
||||
ceilometer-alarm-singleton = ceilometer.alarm.service:singleton_alarm
|
||||
|
@ -26,6 +26,7 @@ import datetime
|
||||
from oslo.config import cfg
|
||||
|
||||
from ceilometer.publisher import rpc
|
||||
from ceilometer.openstack.common import timeutils
|
||||
from ceilometer import counter
|
||||
from ceilometer import storage
|
||||
from ceilometer.tests import db as test_db
|
||||
@ -40,6 +41,10 @@ class DBTestBase(test_db.TestBase):
|
||||
super(DBTestBase, self).setUp()
|
||||
self.prepare_data()
|
||||
|
||||
def tearDown(self):
|
||||
timeutils.utcnow.override_time = None
|
||||
super(DBTestBase, self).tearDown()
|
||||
|
||||
def prepare_data(self):
|
||||
original_timestamps = [(2012, 7, 2, 10, 40), (2012, 7, 2, 10, 41),
|
||||
(2012, 7, 2, 10, 41), (2012, 7, 2, 10, 42),
|
||||
@ -496,6 +501,46 @@ class RawSampleTest(DBTestBase):
|
||||
assert results
|
||||
assert len(results) == 1
|
||||
|
||||
def test_clear_metering_data(self):
|
||||
timeutils.utcnow.override_time = datetime.datetime(2012, 7, 2, 10, 45)
|
||||
|
||||
try:
|
||||
self.conn.clear_expired_metering_data(3 * 60)
|
||||
except NotImplementedError:
|
||||
got_not_imp = True
|
||||
self.assertTrue(got_not_imp)
|
||||
return
|
||||
|
||||
f = storage.SampleFilter(meter='instance')
|
||||
results = list(self.conn.get_samples(f))
|
||||
self.assertEqual(len(results), 5)
|
||||
results = list(self.conn.get_users())
|
||||
self.assertEqual(len(results), 5)
|
||||
results = list(self.conn.get_projects())
|
||||
self.assertEqual(len(results), 5)
|
||||
results = list(self.conn.get_resources())
|
||||
self.assertEqual(len(results), 5)
|
||||
|
||||
def test_clear_metering_data_no_data_to_remove(self):
|
||||
timeutils.utcnow.override_time = datetime.datetime(2010, 7, 2, 10, 45)
|
||||
|
||||
try:
|
||||
self.conn.clear_expired_metering_data(3 * 60)
|
||||
except NotImplementedError:
|
||||
got_not_imp = True
|
||||
self.assertTrue(got_not_imp)
|
||||
return
|
||||
|
||||
f = storage.SampleFilter(meter='instance')
|
||||
results = list(self.conn.get_samples(f))
|
||||
self.assertEqual(len(results), 10)
|
||||
results = list(self.conn.get_users())
|
||||
self.assertEqual(len(results), 9)
|
||||
results = list(self.conn.get_projects())
|
||||
self.assertEqual(len(results), 8)
|
||||
results = list(self.conn.get_resources())
|
||||
self.assertEqual(len(results), 9)
|
||||
|
||||
|
||||
class StatisticsTest(DBTestBase):
|
||||
|
||||
|
@ -45,6 +45,61 @@ class MongoDBConnection(MongoDBEngineTestBase):
|
||||
impl_mongodb.Connection(cfg.CONF).conn)
|
||||
|
||||
|
||||
class IndexTest(MongoDBEngineTestBase):
|
||||
def test_meter_ttl_index_absent(self):
|
||||
# create a fake index and check it is deleted
|
||||
self.conn.db.meter.ensure_index('foo', name='meter_ttl')
|
||||
cfg.CONF.set_override('time_to_live', -1, group='database')
|
||||
|
||||
self.conn._ensure_meter_ttl_index()
|
||||
self.assertTrue(self.conn.db.meter.ensure_index('foo',
|
||||
name='meter_ttl'))
|
||||
cfg.CONF.set_override('time_to_live', 456789, group='database')
|
||||
self.conn._ensure_meter_ttl_index()
|
||||
self.assertFalse(self.conn.db.meter.ensure_index('foo',
|
||||
name='meter_ttl'))
|
||||
|
||||
def test_meter_ttl_index_present(self):
|
||||
cfg.CONF.set_override('time_to_live', 456789, group='database')
|
||||
self.conn._ensure_meter_ttl_index()
|
||||
self.assertFalse(self.conn.db.meter.ensure_index('foo',
|
||||
name='meter_ttl'))
|
||||
self.assertEqual(self.conn.db.meter.index_information()[
|
||||
'meter_ttl']['expireAfterSeconds'], 456789)
|
||||
|
||||
cfg.CONF.set_override('time_to_live', -1, group='database')
|
||||
self.conn._ensure_meter_ttl_index()
|
||||
self.assertTrue(self.conn.db.meter.ensure_index('foo',
|
||||
name='meter_ttl'))
|
||||
|
||||
def test_ttl_index_is_supported(self):
|
||||
self.mox.StubOutWithMock(self.conn.conn, "server_info")
|
||||
self.conn.conn.server_info().AndReturn({'versionArray': [2, 4, 5, 0]})
|
||||
|
||||
self.mox.ReplayAll()
|
||||
self.assertTrue(self.conn._is_natively_ttl_supported())
|
||||
self.mox.UnsetStubs()
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_ttl_index_is_not_supported(self):
|
||||
self.mox.StubOutWithMock(self.conn.conn, "server_info")
|
||||
self.conn.conn.server_info().AndReturn({'versionArray': [2, 0, 1, 0]})
|
||||
|
||||
self.mox.ReplayAll()
|
||||
self.assertFalse(self.conn._is_natively_ttl_supported())
|
||||
self.mox.UnsetStubs()
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_ttl_index_is_unkown(self):
|
||||
self.mox.StubOutWithMock(self.conn.conn, "server_info")
|
||||
self.conn.conn.server_info().AndReturn({})
|
||||
|
||||
self.mox.ReplayAll()
|
||||
self.assertFalse(self.conn._is_natively_ttl_supported())
|
||||
self.mox.UnsetStubs()
|
||||
self.mox.VerifyAll()
|
||||
|
||||
|
||||
class UserTest(base.UserTest, MongoDBEngineTestBase):
|
||||
pass
|
||||
|
||||
@ -62,7 +117,25 @@ class MeterTest(base.MeterTest, MongoDBEngineTestBase):
|
||||
|
||||
|
||||
class RawSampleTest(base.RawSampleTest, MongoDBEngineTestBase):
|
||||
pass
|
||||
def test_clear_metering_data(self):
|
||||
# NOTE(sileht): ensure this tests is played for any version of mongo
|
||||
self.mox.StubOutWithMock(self.conn, "_is_natively_ttl_supported")
|
||||
self.conn._is_natively_ttl_supported().AndReturn(False)
|
||||
|
||||
self.mox.ReplayAll()
|
||||
super(RawSampleTest, self).test_clear_metering_data()
|
||||
self.mox.UnsetStubs()
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_clear_metering_data_no_data_to_remove(self):
|
||||
# NOTE(sileht): ensure this tests is played for any version of mongo
|
||||
self.mox.StubOutWithMock(self.conn, "_is_natively_ttl_supported")
|
||||
self.conn._is_natively_ttl_supported().AndReturn(False)
|
||||
|
||||
self.mox.ReplayAll()
|
||||
super(RawSampleTest, self).test_clear_metering_data_no_data_to_remove()
|
||||
self.mox.UnsetStubs()
|
||||
self.mox.VerifyAll()
|
||||
|
||||
|
||||
class StatisticsTest(base.StatisticsTest, MongoDBEngineTestBase):
|
||||
|
@ -27,9 +27,9 @@ import time
|
||||
from ceilometer.tests import base
|
||||
|
||||
|
||||
class BinDbsyncTestCase(base.TestCase):
|
||||
class BinTestCase(base.TestCase):
|
||||
def setUp(self):
|
||||
super(BinDbsyncTestCase, self).setUp()
|
||||
super(BinTestCase, self).setUp()
|
||||
self.tempfile = self.temp_config_file_path()
|
||||
with open(self.tempfile, 'w') as tmp:
|
||||
tmp.write("[database]\n")
|
||||
@ -40,6 +40,11 @@ class BinDbsyncTestCase(base.TestCase):
|
||||
"--config-file=%s" % self.tempfile])
|
||||
self.assertEqual(subp.wait(), 0)
|
||||
|
||||
def test_run_expirer(self):
|
||||
subp = subprocess.Popen(['ceilometer-expirer',
|
||||
"--config-file=%s" % self.tempfile])
|
||||
self.assertEqual(subp.wait(), 0)
|
||||
|
||||
|
||||
class BinSendCounterTestCase(base.TestCase):
|
||||
def setUp(self):
|
||||
|
Loading…
Reference in New Issue
Block a user