Merge "Limit number of records deleted by aodh-expirer"
This commit is contained in:
commit
f035533291
@ -33,9 +33,22 @@ def expirer():
|
|||||||
|
|
||||||
if conf.database.alarm_history_time_to_live > 0:
|
if conf.database.alarm_history_time_to_live > 0:
|
||||||
LOG.debug("Clearing expired alarm history data")
|
LOG.debug("Clearing expired alarm history data")
|
||||||
storage_conn = storage.get_connection_from_config(conf)
|
conn = storage.get_connection_from_config(conf)
|
||||||
storage_conn.clear_expired_alarm_history_data(
|
max_count = conf.database.alarm_histories_delete_batch_size
|
||||||
conf.database.alarm_history_time_to_live)
|
try:
|
||||||
|
if max_count > 0:
|
||||||
|
conn.clear_expired_alarm_history_data(
|
||||||
|
conf.database.alarm_history_time_to_live,
|
||||||
|
max_count)
|
||||||
|
else:
|
||||||
|
deleted = max_count = 100
|
||||||
|
while deleted and deleted > 0:
|
||||||
|
deleted = conn.clear_expired_alarm_history_data(
|
||||||
|
conf.database.alarm_history_time_to_live,
|
||||||
|
max_count)
|
||||||
|
except TypeError:
|
||||||
|
LOG.warning("Storage driver does not support "
|
||||||
|
"'alarm_histories_delete_batch_size' config option.")
|
||||||
else:
|
else:
|
||||||
LOG.info("Nothing to clean, database alarm history time to live "
|
LOG.info("Nothing to clean, database alarm history time to live "
|
||||||
"is disabled")
|
"is disabled")
|
||||||
|
@ -34,6 +34,11 @@ OPTS = [
|
|||||||
default=-1,
|
default=-1,
|
||||||
help=("Number of seconds that alarm histories are kept "
|
help=("Number of seconds that alarm histories are kept "
|
||||||
"in the database for (<= 0 means forever).")),
|
"in the database for (<= 0 means forever).")),
|
||||||
|
cfg.IntOpt('alarm_histories_delete_batch_size',
|
||||||
|
default=0,
|
||||||
|
min=0,
|
||||||
|
help=("Number of alarm histories to be deleted in one "
|
||||||
|
"iteration from the database (0 means all).")),
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
@ -191,13 +191,13 @@ class Connection(object):
|
|||||||
return cls.STORAGE_CAPABILITIES
|
return cls.STORAGE_CAPABILITIES
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def clear_expired_alarm_history_data(alarm_history_ttl):
|
def clear_expired_alarm_history_data(ttl, max_count=None):
|
||||||
"""Clear expired alarm history data from the backend storage system.
|
"""Clear expired alarm history data from the backend storage system.
|
||||||
|
|
||||||
Clearing occurs according to the time-to-live.
|
Clearing occurs according to the time-to-live.
|
||||||
|
|
||||||
:param alarm_history_ttl: Number of seconds to keep alarm history
|
:param ttl: Number of seconds to keep alarm history records for.
|
||||||
records for.
|
:param max_count: Number of records to delete.
|
||||||
"""
|
"""
|
||||||
raise aodh.NotImplementedError('Clearing alarm history '
|
raise aodh.NotImplementedError('Clearing alarm history '
|
||||||
'not implemented')
|
'not implemented')
|
||||||
|
@ -56,13 +56,12 @@ class Connection(base.Connection):
|
|||||||
"""Delete an alarm and its history data."""
|
"""Delete an alarm and its history data."""
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def clear_expired_alarm_history_data(alarm_history_ttl):
|
def clear_expired_alarm_history_data(ttl, max_count=None):
|
||||||
"""Clear expired alarm history data from the backend storage system.
|
"""Clear expired alarm history data from the backend storage system.
|
||||||
|
|
||||||
Clearing occurs according to the time-to-live.
|
Clearing occurs according to the time-to-live.
|
||||||
|
|
||||||
:param alarm_history_ttl: Number of seconds to keep alarm history
|
:param ttl: Number of seconds to keep alarm history records for.
|
||||||
records for.
|
:param max_count: Number of records to delete.
|
||||||
"""
|
"""
|
||||||
LOG.info('Dropping alarm history data with TTL %d',
|
LOG.info('Dropping alarm history %d data with TTL %d', max_count, ttl)
|
||||||
alarm_history_ttl)
|
|
||||||
|
@ -398,21 +398,23 @@ class Connection(base.Connection):
|
|||||||
alarm_change_row.update(alarm_change)
|
alarm_change_row.update(alarm_change)
|
||||||
session.add(alarm_change_row)
|
session.add(alarm_change_row)
|
||||||
|
|
||||||
def clear_expired_alarm_history_data(self, alarm_history_ttl):
|
def clear_expired_alarm_history_data(self, ttl, max_count=100):
|
||||||
"""Clear expired alarm history data from the backend storage system.
|
"""Clear expired alarm history data from the backend storage system.
|
||||||
|
|
||||||
Clearing occurs according to the time-to-live.
|
Clearing occurs according to the time-to-live.
|
||||||
|
|
||||||
:param alarm_history_ttl: Number of seconds to keep alarm history
|
:param ttl: Number of seconds to keep alarm history records for.
|
||||||
records for.
|
:param max_count: Number of records to delete.
|
||||||
"""
|
"""
|
||||||
session = self._engine_facade.get_session()
|
session = self._engine_facade.get_session()
|
||||||
with session.begin():
|
with session.begin():
|
||||||
valid_start = (timeutils.utcnow() -
|
end = timeutils.utcnow() - datetime.timedelta(seconds=ttl)
|
||||||
datetime.timedelta(seconds=alarm_history_ttl))
|
alarm_history_q = (session.query(models.AlarmChange.event_id)
|
||||||
deleted_rows = (session.query(models.AlarmChange)
|
.filter(models.AlarmChange.timestamp < end))
|
||||||
.filter(models.AlarmChange.timestamp < valid_start)
|
event_ids = [i[0] for i in alarm_history_q.limit(max_count)]
|
||||||
.delete())
|
deleted_rows = session.query(models.AlarmChange).filter(
|
||||||
|
models.AlarmChange.event_id.in_(event_ids)
|
||||||
|
).delete(synchronize_session="fetch")
|
||||||
LOG.info("%d alarm histories are removed from database",
|
LOG.info("%d alarm histories are removed from database",
|
||||||
deleted_rows)
|
deleted_rows)
|
||||||
|
|
||||||
|
@ -277,7 +277,7 @@ class AlarmHistoryTest(AlarmTestBase):
|
|||||||
|
|
||||||
def _clear_alarm_history(self, utcnow, ttl, count):
|
def _clear_alarm_history(self, utcnow, ttl, count):
|
||||||
self.mock_utcnow.return_value = utcnow
|
self.mock_utcnow.return_value = utcnow
|
||||||
self.alarm_conn.clear_expired_alarm_history_data(ttl)
|
self.alarm_conn.clear_expired_alarm_history_data(ttl, 100)
|
||||||
history = list(self.alarm_conn.query_alarm_history())
|
history = list(self.alarm_conn.query_alarm_history())
|
||||||
self.assertEqual(count, len(history))
|
self.assertEqual(count, len(history))
|
||||||
|
|
||||||
|
@ -55,6 +55,7 @@ class BinTestCase(base.BaseTestCase):
|
|||||||
def test_run_expirer_ttl_enabled(self):
|
def test_run_expirer_ttl_enabled(self):
|
||||||
content = ("[database]\n"
|
content = ("[database]\n"
|
||||||
"alarm_history_time_to_live=1\n"
|
"alarm_history_time_to_live=1\n"
|
||||||
|
"alarm_histories_delete_batch_size=10\n"
|
||||||
"connection=log://localhost\n")
|
"connection=log://localhost\n")
|
||||||
content = content.encode('utf-8')
|
content = content.encode('utf-8')
|
||||||
self.tempfile = fileutils.write_to_tempfile(content=content,
|
self.tempfile = fileutils.write_to_tempfile(content=content,
|
||||||
@ -67,7 +68,7 @@ class BinTestCase(base.BaseTestCase):
|
|||||||
stderr=subprocess.PIPE)
|
stderr=subprocess.PIPE)
|
||||||
out, __ = subp.communicate()
|
out, __ = subp.communicate()
|
||||||
self.assertEqual(0, subp.poll())
|
self.assertEqual(0, subp.poll())
|
||||||
msg = "Dropping alarm history data with TTL 1"
|
msg = "Dropping alarm history 10 data with TTL 1"
|
||||||
msg = msg.encode('utf-8')
|
msg = msg.encode('utf-8')
|
||||||
self.assertIn(msg, out)
|
self.assertIn(msg, out)
|
||||||
|
|
||||||
|
@ -0,0 +1,7 @@
|
|||||||
|
---
|
||||||
|
features:
|
||||||
|
- |
|
||||||
|
A new ``alarm_histories_delete_bacth_size`` option has been added to limit
|
||||||
|
a number of alarm histories deleted from the database by aodh-expirer in
|
||||||
|
a single iteration. This parameter is useful when there are a lot of alarm
|
||||||
|
histories in the database.
|
Loading…
x
Reference in New Issue
Block a user