Merge "Allow to enable time to live on metering sample"

This commit is contained in:
Jenkins 2013-07-16 14:06:31 +00:00 committed by Gerrit Code Review
commit 30319310ed
11 changed files with 293 additions and 6 deletions

View File

@ -33,7 +33,7 @@ LOG = log.getLogger(__name__)
STORAGE_ENGINE_NAMESPACE = 'ceilometer.storage' STORAGE_ENGINE_NAMESPACE = 'ceilometer.storage'
STORAGE_OPTS = [ OLD_STORAGE_OPTS = [
cfg.StrOpt('database_connection', cfg.StrOpt('database_connection',
secret=True, secret=True,
default=None, default=None,
@ -41,8 +41,18 @@ STORAGE_OPTS = [
), ),
] ]
cfg.CONF.register_opts(OLD_STORAGE_OPTS)
STORAGE_OPTS = [
cfg.IntOpt('time_to_live',
default=-1,
help="""number of seconds that samples are kept
in the database for (<= 0 means forever)"""),
]
cfg.CONF.register_opts(STORAGE_OPTS, group='database')
cfg.CONF.register_opts(STORAGE_OPTS)
cfg.CONF.import_opt('connection', cfg.CONF.import_opt('connection',
'ceilometer.openstack.common.db.sqlalchemy.session', 'ceilometer.openstack.common.db.sqlalchemy.session',
group='database') group='database')
@ -132,3 +142,11 @@ class EventFilter(object):
def dbsync(): def dbsync():
service.prepare_service() service.prepare_service()
get_connection(cfg.CONF).upgrade() get_connection(cfg.CONF).upgrade()
def expirer():
service.prepare_service()
LOG.debug("Clearing expired metering data")
storage_conn = get_connection(cfg.CONF)
storage_conn.clear_expired_metering_data(
cfg.CONF.database.time_to_live)

View File

@ -82,6 +82,15 @@ class Connection(object):
All timestamps must be naive utc datetime object. All timestamps must be naive utc datetime object.
""" """
@abc.abstractmethod
def clear_expired_metering_data(self, ttl):
"""Clear expired data from the backend storage system according to the
time-to-live.
:param ttl: Number of seconds to keep records for.
"""
@abc.abstractmethod @abc.abstractmethod
def get_users(self, source=None): def get_users(self, source=None):
"""Return an iterable of user id strings. """Return an iterable of user id strings.

View File

@ -257,6 +257,15 @@ class Connection(base.Connection):
record['f:message'] = json.dumps(data) record['f:message'] = json.dumps(data)
meter_table.put(row, record) meter_table.put(row, record)
def clear_expired_metering_data(self, ttl):
"""Clear expired data from the backend storage system according to the
time-to-live.
:param ttl: Number of seconds to keep records for.
"""
raise NotImplementedError
def get_users(self, source=None): def get_users(self, source=None):
"""Return an iterable of user id strings. """Return an iterable of user id strings.

View File

@ -62,6 +62,15 @@ class Connection(base.Connection):
data['resource_id'], data['resource_id'],
data['counter_volume']) data['counter_volume'])
def clear_expired_metering_data(self, ttl):
"""Clear expired data from the backend storage system according to the
time-to-live.
:param ttl: Number of seconds to keep records for.
"""
LOG.info("Dropping data with TTL %d", ttl)
def get_users(self, source=None): def get_users(self, source=None):
"""Return an iterable of user id strings. """Return an iterable of user id strings.

View File

@ -21,6 +21,7 @@
""" """
import copy import copy
import datetime
import operator import operator
import os import os
import re import re
@ -34,9 +35,14 @@ import pymongo
from oslo.config import cfg from oslo.config import cfg
from ceilometer.openstack.common import log from ceilometer.openstack.common import log
from ceilometer.openstack.common import timeutils
from ceilometer import storage
from ceilometer.storage import base from ceilometer.storage import base
from ceilometer.storage import models from ceilometer.storage import models
cfg.CONF.import_opt('time_to_live', 'ceilometer.storage',
group="database")
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
@ -181,6 +187,17 @@ class Connection(base.Connection):
CONNECTION_POOL = ConnectionPool() CONNECTION_POOL = ConnectionPool()
REDUCE_GROUP_CLEAN = bson.code.Code("""
function ( curr, result ) {
if (result.resources.indexOf(curr.resource_id) < 0)
result.resources.push(curr.resource_id);
if (result.users.indexOf(curr.user_id) < 0)
result.users.push(curr.user_id);
if (result.projects.indexOf(curr.project_id) < 0)
result.projects.push(curr.project_id);
}
""")
MAP_STATS = bson.code.Code(""" MAP_STATS = bson.code.Code("""
function () { function () {
emit('statistics', { min : this.counter_volume, emit('statistics', { min : this.counter_volume,
@ -293,6 +310,39 @@ class Connection(base.Connection):
('source', pymongo.ASCENDING), ('source', pymongo.ASCENDING),
], name='meter_idx') ], name='meter_idx')
# Since mongodb 2.2 support db-ttl natively
if self._is_natively_ttl_supported():
self._ensure_meter_ttl_index()
def _ensure_meter_ttl_index(self):
indexes = self.db.meter.index_information()
ttl = cfg.CONF.database.time_to_live
if ttl <= 0:
if 'meter_ttl' in indexes:
self.db.meter.drop_index('meter_ttl')
return
if 'meter_ttl' in indexes:
# NOTE(sileht): manually check expireAfterSeconds because
# ensure_index doesn't update index options if the index already
# exists
if ttl == indexes['meter_ttl'].get('expireAfterSeconds', -1):
return
self.db.meter.drop_index('meter_ttl')
self.db.meter.create_index(
[('timestamp', pymongo.ASCENDING)],
expireAfterSeconds=ttl,
name='meter_ttl'
)
def _is_natively_ttl_supported(self):
# Assume is not supported if we can get the version
return self.conn.server_info().get('versionArray', []) >= [2, 2]
@staticmethod @staticmethod
def upgrade(version=None): def upgrade(version=None):
pass pass
@ -358,6 +408,35 @@ class Connection(base.Connection):
record = copy.copy(data) record = copy.copy(data)
self.db.meter.insert(record) self.db.meter.insert(record)
def clear_expired_metering_data(self, ttl):
"""Clear expired data from the backend storage system according to the
time-to-live.
:param ttl: Number of seconds to keep records for.
"""
# Before mongodb 2.2 we need to clear expired data manually
if not self._is_natively_ttl_supported():
end = timeutils.utcnow() - datetime.timedelta(seconds=ttl)
f = storage.SampleFilter(end=end)
q = make_query_from_filter(f, require_meter=False)
self.db.meter.remove(q)
results = self.db.meter.group(
key={},
condition={},
reduce=self.REDUCE_GROUP_CLEAN,
initial={
'resources': [],
'users': [],
'projects': [],
}
)[0]
self.db.user.remove({'_id': {'$nin': results['users']}})
self.db.project.remove({'_id': {'$nin': results['projects']}})
self.db.resource.remove({'_id': {'$nin': results['resources']}})
def get_users(self, source=None): def get_users(self, source=None):
"""Return an iterable of user id strings. """Return an iterable of user id strings.

View File

@ -19,6 +19,7 @@
from __future__ import absolute_import from __future__ import absolute_import
import datetime
import operator import operator
import os import os
import uuid import uuid
@ -221,6 +222,35 @@ class Connection(base.Connection):
meter.message_id = data['message_id'] meter.message_id = data['message_id']
session.flush() session.flush()
@staticmethod
def clear_expired_metering_data(ttl):
"""Clear expired data from the backend storage system according to the
time-to-live.
:param ttl: Number of seconds to keep records for.
"""
session = sqlalchemy_session.get_session()
query = session.query(Meter.id)
end = timeutils.utcnow() - datetime.timedelta(seconds=ttl)
query = query.filter(Meter.timestamp < end)
query.delete()
query = session.query(User.id).filter(~User.id.in_(
session.query(Meter.user_id).group_by(Meter.user_id)
))
query.delete(synchronize_session='fetch')
query = session.query(Project.id).filter(~Project.id.in_(
session.query(Meter.project_id).group_by(Meter.project_id)
))
query.delete(synchronize_session='fetch')
query = session.query(Resource.id).filter(~Resource.id.in_(
session.query(Meter.resource_id).group_by(Meter.resource_id)
))
query.delete(synchronize_session='fetch')
@staticmethod @staticmethod
def get_users(source=None): def get_users(source=None):
"""Return an iterable of user id strings. """Return an iterable of user id strings.

View File

@ -526,6 +526,15 @@
#pool_timeout=<None> #pool_timeout=<None>
#
# Options defined in ceilometer.storage
#
# number of seconds that samples are kept in the database for
# (<= 0 means forever) (integer value)
#time_to_live=-1
[alarm] [alarm]
# #
@ -635,4 +644,4 @@
#password=<None> #password=<None>
# Total option count: 122 # Total option count: 123

View File

@ -113,6 +113,7 @@ console_scripts =
ceilometer-agent-central = ceilometer.central.manager:agent_central ceilometer-agent-central = ceilometer.central.manager:agent_central
ceilometer-agent-compute = ceilometer.compute.manager:agent_compute ceilometer-agent-compute = ceilometer.compute.manager:agent_compute
ceilometer-dbsync = ceilometer.storage:dbsync ceilometer-dbsync = ceilometer.storage:dbsync
ceilometer-expirer = ceilometer.storage:expirer
ceilometer-collector = ceilometer.collector.service:collector ceilometer-collector = ceilometer.collector.service:collector
ceilometer-collector-udp = ceilometer.collector.service:udp_collector ceilometer-collector-udp = ceilometer.collector.service:udp_collector
ceilometer-alarm-singleton = ceilometer.alarm.service:singleton_alarm ceilometer-alarm-singleton = ceilometer.alarm.service:singleton_alarm

View File

@ -26,6 +26,7 @@ import datetime
from oslo.config import cfg from oslo.config import cfg
from ceilometer.publisher import rpc from ceilometer.publisher import rpc
from ceilometer.openstack.common import timeutils
from ceilometer import counter from ceilometer import counter
from ceilometer import storage from ceilometer import storage
from ceilometer.tests import db as test_db from ceilometer.tests import db as test_db
@ -40,6 +41,10 @@ class DBTestBase(test_db.TestBase):
super(DBTestBase, self).setUp() super(DBTestBase, self).setUp()
self.prepare_data() self.prepare_data()
def tearDown(self):
timeutils.utcnow.override_time = None
super(DBTestBase, self).tearDown()
def prepare_data(self): def prepare_data(self):
original_timestamps = [(2012, 7, 2, 10, 40), (2012, 7, 2, 10, 41), original_timestamps = [(2012, 7, 2, 10, 40), (2012, 7, 2, 10, 41),
(2012, 7, 2, 10, 41), (2012, 7, 2, 10, 42), (2012, 7, 2, 10, 41), (2012, 7, 2, 10, 42),
@ -496,6 +501,46 @@ class RawSampleTest(DBTestBase):
assert results assert results
assert len(results) == 1 assert len(results) == 1
def test_clear_metering_data(self):
timeutils.utcnow.override_time = datetime.datetime(2012, 7, 2, 10, 45)
try:
self.conn.clear_expired_metering_data(3 * 60)
except NotImplementedError:
got_not_imp = True
self.assertTrue(got_not_imp)
return
f = storage.SampleFilter(meter='instance')
results = list(self.conn.get_samples(f))
self.assertEqual(len(results), 5)
results = list(self.conn.get_users())
self.assertEqual(len(results), 5)
results = list(self.conn.get_projects())
self.assertEqual(len(results), 5)
results = list(self.conn.get_resources())
self.assertEqual(len(results), 5)
def test_clear_metering_data_no_data_to_remove(self):
timeutils.utcnow.override_time = datetime.datetime(2010, 7, 2, 10, 45)
try:
self.conn.clear_expired_metering_data(3 * 60)
except NotImplementedError:
got_not_imp = True
self.assertTrue(got_not_imp)
return
f = storage.SampleFilter(meter='instance')
results = list(self.conn.get_samples(f))
self.assertEqual(len(results), 10)
results = list(self.conn.get_users())
self.assertEqual(len(results), 9)
results = list(self.conn.get_projects())
self.assertEqual(len(results), 8)
results = list(self.conn.get_resources())
self.assertEqual(len(results), 9)
class StatisticsTest(DBTestBase): class StatisticsTest(DBTestBase):

View File

@ -45,6 +45,61 @@ class MongoDBConnection(MongoDBEngineTestBase):
impl_mongodb.Connection(cfg.CONF).conn) impl_mongodb.Connection(cfg.CONF).conn)
class IndexTest(MongoDBEngineTestBase):
def test_meter_ttl_index_absent(self):
# create a fake index and check it is deleted
self.conn.db.meter.ensure_index('foo', name='meter_ttl')
cfg.CONF.set_override('time_to_live', -1, group='database')
self.conn._ensure_meter_ttl_index()
self.assertTrue(self.conn.db.meter.ensure_index('foo',
name='meter_ttl'))
cfg.CONF.set_override('time_to_live', 456789, group='database')
self.conn._ensure_meter_ttl_index()
self.assertFalse(self.conn.db.meter.ensure_index('foo',
name='meter_ttl'))
def test_meter_ttl_index_present(self):
cfg.CONF.set_override('time_to_live', 456789, group='database')
self.conn._ensure_meter_ttl_index()
self.assertFalse(self.conn.db.meter.ensure_index('foo',
name='meter_ttl'))
self.assertEqual(self.conn.db.meter.index_information()[
'meter_ttl']['expireAfterSeconds'], 456789)
cfg.CONF.set_override('time_to_live', -1, group='database')
self.conn._ensure_meter_ttl_index()
self.assertTrue(self.conn.db.meter.ensure_index('foo',
name='meter_ttl'))
def test_ttl_index_is_supported(self):
self.mox.StubOutWithMock(self.conn.conn, "server_info")
self.conn.conn.server_info().AndReturn({'versionArray': [2, 4, 5, 0]})
self.mox.ReplayAll()
self.assertTrue(self.conn._is_natively_ttl_supported())
self.mox.UnsetStubs()
self.mox.VerifyAll()
def test_ttl_index_is_not_supported(self):
self.mox.StubOutWithMock(self.conn.conn, "server_info")
self.conn.conn.server_info().AndReturn({'versionArray': [2, 0, 1, 0]})
self.mox.ReplayAll()
self.assertFalse(self.conn._is_natively_ttl_supported())
self.mox.UnsetStubs()
self.mox.VerifyAll()
def test_ttl_index_is_unkown(self):
self.mox.StubOutWithMock(self.conn.conn, "server_info")
self.conn.conn.server_info().AndReturn({})
self.mox.ReplayAll()
self.assertFalse(self.conn._is_natively_ttl_supported())
self.mox.UnsetStubs()
self.mox.VerifyAll()
class UserTest(base.UserTest, MongoDBEngineTestBase): class UserTest(base.UserTest, MongoDBEngineTestBase):
pass pass
@ -62,7 +117,25 @@ class MeterTest(base.MeterTest, MongoDBEngineTestBase):
class RawSampleTest(base.RawSampleTest, MongoDBEngineTestBase): class RawSampleTest(base.RawSampleTest, MongoDBEngineTestBase):
pass def test_clear_metering_data(self):
# NOTE(sileht): ensure this tests is played for any version of mongo
self.mox.StubOutWithMock(self.conn, "_is_natively_ttl_supported")
self.conn._is_natively_ttl_supported().AndReturn(False)
self.mox.ReplayAll()
super(RawSampleTest, self).test_clear_metering_data()
self.mox.UnsetStubs()
self.mox.VerifyAll()
def test_clear_metering_data_no_data_to_remove(self):
# NOTE(sileht): ensure this tests is played for any version of mongo
self.mox.StubOutWithMock(self.conn, "_is_natively_ttl_supported")
self.conn._is_natively_ttl_supported().AndReturn(False)
self.mox.ReplayAll()
super(RawSampleTest, self).test_clear_metering_data_no_data_to_remove()
self.mox.UnsetStubs()
self.mox.VerifyAll()
class StatisticsTest(base.StatisticsTest, MongoDBEngineTestBase): class StatisticsTest(base.StatisticsTest, MongoDBEngineTestBase):

View File

@ -27,9 +27,9 @@ import time
from ceilometer.tests import base from ceilometer.tests import base
class BinDbsyncTestCase(base.TestCase): class BinTestCase(base.TestCase):
def setUp(self): def setUp(self):
super(BinDbsyncTestCase, self).setUp() super(BinTestCase, self).setUp()
self.tempfile = self.temp_config_file_path() self.tempfile = self.temp_config_file_path()
with open(self.tempfile, 'w') as tmp: with open(self.tempfile, 'w') as tmp:
tmp.write("[database]\n") tmp.write("[database]\n")
@ -40,6 +40,11 @@ class BinDbsyncTestCase(base.TestCase):
"--config-file=%s" % self.tempfile]) "--config-file=%s" % self.tempfile])
self.assertEqual(subp.wait(), 0) self.assertEqual(subp.wait(), 0)
def test_run_expirer(self):
subp = subprocess.Popen(['ceilometer-expirer',
"--config-file=%s" % self.tempfile])
self.assertEqual(subp.wait(), 0)
class BinSendCounterTestCase(base.TestCase): class BinSendCounterTestCase(base.TestCase):
def setUp(self): def setUp(self):