Alarm support in HBase Part 1

Two additional tables are introduced for alarms and alarm_history.
Ordering in alarm_history was achived by that fact that HBase
stores entries in alphabetical order. It is needed to enlarge the
preciseness of reverse_timestamp because alarms history entries may be
generated once per second, especially in tests.

Partially implements bp hbase-alarming

Change-Id: I41bc60f13779899af0e8f8e01f85b2ee6d1c8aae
This commit is contained in:
Nadya Privalova 2014-02-13 19:23:00 +04:00
parent 8bdc6d5421
commit c1d3ffa2ca

View File

@ -64,6 +64,11 @@ class HBaseStorage(base.StorageEngine):
project_id: uuid
meter: [ array of {counter_name: string, counter_type: string} ]
}
- alarm
- the raw incoming alarm data
- alarm_h
- raw incoming alarm_history data. Timestamp becomes now()
if not determined
"""
@staticmethod
@ -83,6 +88,8 @@ class Connection(base.Connection):
USER_TABLE = "user"
RESOURCE_TABLE = "resource"
METER_TABLE = "meter"
ALARM_TABLE = "alarm"
ALARM_HISTORY_TABLE = "alarm_h"
def __init__(self, conf):
"""Hbase Connection Initialization."""
@ -110,13 +117,17 @@ class Connection(base.Connection):
self.conn.create_table(self.USER_TABLE, {'f': dict()})
self.conn.create_table(self.RESOURCE_TABLE, {'f': dict()})
self.conn.create_table(self.METER_TABLE, {'f': dict()})
self.conn.create_table(self.ALARM_TABLE, {'f': dict()})
self.conn.create_table(self.ALARM_HISTORY_TABLE, {'f': dict()})
def clear(self):
LOG.debug(_('Dropping HBase schema...'))
for table in [self.PROJECT_TABLE,
self.USER_TABLE,
self.RESOURCE_TABLE,
self.METER_TABLE]:
self.METER_TABLE,
self.ALARM_TABLE,
self.ALARM_HISTORY_TABLE]:
try:
self.conn.disable_table(table)
except Exception:
@ -162,6 +173,90 @@ class Connection(base.Connection):
opts['port'] = port and int(port) or 9090
return opts
def update_alarm(self, alarm):
"""Create an alarm.
:param alarm: The alarm to create. It is Alarm object, so we need to
call as_dict()
"""
_id = alarm.alarm_id
alarm_table = self.conn.table(self.ALARM_TABLE)
alarm_to_store = serialize_entry(alarm.as_dict())
alarm_table.put(_id, alarm_to_store)
stored_alarm = deserialize_entry(alarm_table.row(_id))
return models.Alarm(**stored_alarm)
create_alarm = update_alarm
def delete_alarm(self, alarm_id):
alarm_table = self.conn.table(self.ALARM_TABLE)
alarm_table.delete(alarm_id)
def get_alarms(self, name=None, user=None,
project=None, enabled=None, alarm_id=None, pagination=None):
if pagination:
raise NotImplementedError(_('Pagination not implemented'))
alarm_table = self.conn.table(self.ALARM_TABLE)
#TODO(nprivalova): to be refactored
if enabled is not None:
enabled = json.dumps(enabled)
q = make_query(require_meter=False, query_only=True,
alarm_id=alarm_id, name=name,
enabled=enabled, user_id=user, project_id=project)
gen = alarm_table.scan(filter=q)
for ignored, data in gen:
stored_alarm = deserialize_entry(data)
yield models.Alarm(**stored_alarm)
def get_alarm_changes(self, alarm_id, on_behalf_of,
user=None, project=None, type=None,
start_timestamp=None, start_timestamp_op=None,
end_timestamp=None, end_timestamp_op=None):
alarm_history_table = self.conn.table(self.ALARM_HISTORY_TABLE)
q = make_query(start=start_timestamp,
start_op=start_timestamp_op,
end=end_timestamp,
end_op=end_timestamp_op,
require_meter=False, query_only=True,
include_rts_in_filters=False, alarm_id=alarm_id,
on_behalf_of=on_behalf_of, type=type, user_id=user,
project_id=project)
#TODO(nprivalova): refactor make_query to move this stuff there
rts_start = str(reverse_timestamp(start_timestamp) + 1) \
if start_timestamp else ""
rts_end = str(reverse_timestamp(end_timestamp) + 1) \
if end_timestamp else ""
if start_timestamp_op == 'gt':
rts_start = str(long(rts_start) - 2)
if end_timestamp_op == 'le':
rts_end = str(long(rts_end) - 1)
gen = alarm_history_table.scan(filter=q, row_start=rts_end,
row_stop=rts_start)
for ignored, data in gen:
stored_entry = deserialize_entry(data)
# It is needed to return 'details' field as string
detail = stored_entry['detail']
if detail:
stored_entry['detail'] = json.dumps(detail)
yield models.AlarmChange(**stored_entry)
def record_alarm_change(self, alarm_change):
"""Record alarm change event.
"""
alarm_change_dict = serialize_entry(alarm_change)
ts = alarm_change.get('timestamp') or datetime.datetime.now()
rts = reverse_timestamp(ts)
alarm_history_table = self.conn.table(self.ALARM_HISTORY_TABLE)
alarm_history_table.put(alarm_change.get('alarm_id') + "_" + str(rts),
alarm_change_dict)
def record_metering_data(self, data):
"""Write the data to the backend storage system.
@ -325,10 +420,10 @@ class Connection(base.Connection):
)
meter_table = self.conn.table(self.METER_TABLE)
q, start_row, stop_row = make_query(user=user,
project=project,
q, start_row, stop_row = make_query(user_id=user,
project_id=project,
source=source,
resource=resource,
resource_id=resource,
start=start_timestamp,
start_op=start_timestamp_op,
end=end_timestamp,
@ -374,7 +469,7 @@ class Connection(base.Connection):
if pagination:
raise NotImplementedError(_('Pagination not implemented'))
resource_table = self.conn.table(self.RESOURCE_TABLE)
q = make_query(user=user, project=project, resource=resource,
q = make_query(user_id=user, project_id=project, resource_id=resource,
source=source, metaquery=metaquery,
require_meter=False, query_only=True)
LOG.debug(_("Query Resource table: %s") % q)
@ -556,6 +651,9 @@ class MTable(object):
def put(self, key, data):
self._rows[key] = data
def delete(self, key):
del self._rows[key]
def scan(self, filter=None, columns=[], row_start=None, row_stop=None):
sorted_keys = sorted(self._rows)
# copy data between row_start and row_stop into a dict
@ -662,27 +760,21 @@ def reverse_timestamp(dt):
"""
epoch = datetime.datetime(1970, 1, 1)
td = dt - epoch
ts = (td.microseconds +
(td.seconds + td.days * 24 * 3600) * 100000) / 100000
ts = td.microseconds + td.seconds * 1000000 + td.days * 86400000000
return 0x7fffffffffffffff - ts
def make_query(user=None, project=None, meter=None,
resource=None, source=None, start=None, start_op=None,
end=None, end_op=None, metaquery=None, message_id=None,
require_meter=True, query_only=False):
def make_query(meter=None, start=None, start_op=None,
end=None, end_op=None, metaquery=None,
require_meter=True, query_only=False,
include_rts_in_filters=True, **kwargs):
"""Return a filter query string based on the selected parameters.
:param user: Optional user-id
:param project: Optional project-id
:param meter: Optional counter-name
:param resource: Optional resource-id
:param source: Optional source-id
:param start: Optional start timestamp
:param start_op: Optional start timestamp operator, like gt, ge
:param end: Optional end timestamp
:param end_op: Optional end timestamp operator, like lt, le
:param message_id: Optional message_id
:param require_meter: If true and the filter does not have a meter,
raise an error.
:param query_only: If true only returns the filter query,
@ -690,21 +782,10 @@ def make_query(user=None, project=None, meter=None,
"""
q = []
if user:
q.append("SingleColumnValueFilter ('f', 'user_id', =, 'binary:%s')"
% user)
if project:
q.append("SingleColumnValueFilter ('f', 'project_id', =, 'binary:%s')"
% project)
if resource:
q.append("SingleColumnValueFilter ('f', 'resource_id', =, 'binary:%s')"
% resource)
if message_id:
q.append("SingleColumnValueFilter ('f', 'message_id', =, 'binary:%s')"
% message_id)
if source:
q.append("SingleColumnValueFilter "
"('f', 'source', =, 'binary:%s')" % source)
for key, value in kwargs.iteritems():
if value is not None:
q.append("SingleColumnValueFilter "
"('f', '%s', =, 'binary:%s')" % (key, value))
start_row, end_row = "", ""
rts_start = str(reverse_timestamp(start) + 1) if start else ""
@ -728,7 +809,7 @@ def make_query(user=None, project=None, meter=None,
"('f', 'counter_name', =, 'binary:%s')" % meter)
elif require_meter:
raise RuntimeError('Missing required meter specifier')
else:
elif include_rts_in_filters:
if rts_start:
q.append("SingleColumnValueFilter ('f', 'rts', <=, 'binary:%s')" %
rts_start)
@ -766,13 +847,18 @@ def make_query_from_filter(sample_filter, require_meter=True):
:param require_meter: If true and the filter does not have a meter,
raise an error.
"""
return make_query(sample_filter.user, sample_filter.project,
sample_filter.meter, sample_filter.resource,
sample_filter.source, sample_filter.start,
sample_filter.start_timestamp_op,
sample_filter.end, sample_filter.end_timestamp_op,
sample_filter.metaquery, sample_filter.message_id,
require_meter)
return make_query(user_id=sample_filter.user,
project_id=sample_filter.project,
meter=sample_filter.meter,
resource_id=sample_filter.resource,
source=sample_filter.source,
start=sample_filter.start,
start_op=sample_filter.start_timestamp_op,
end=sample_filter.end,
end_op=sample_filter.end_timestamp_op,
metaquery=sample_filter.metaquery,
message_id=sample_filter.message_id,
require_meter=require_meter)
def _make_rowkey_scan(meter, rts_start=None, rts_end=None):
@ -813,3 +899,38 @@ def _resource_id_from_record_tuple(record):
"""Extract resource_id from HBase tuple record
"""
return record[1]['f:resource_id']
#TODO(nprivalova): to be refactored, will be used everywhere in impl_hbase
# without additional ifs
def serialize_entry(entry_from_user):
result_dict = copy.copy(entry_from_user)
keys = result_dict.keys()
for key in keys:
val = result_dict[key]
if isinstance(val, datetime.datetime):
val = timeutils.strtime(val)
if not isinstance(val, basestring):
val = json.dumps(val)
result_dict['f:' + key] = val
del result_dict[key]
return result_dict
def deserialize_entry(stored_entry):
result_entry = copy.copy(stored_entry)
keys = result_entry.keys()
for key in keys:
val = result_entry[key]
try:
val = json.loads(val)
except ValueError:
pass
if "timestamp" in key and val:
val = timeutils.parse_strtime(val)
# There is no int in wsme models
if isinstance(val, (int, long, float)) and not isinstance(val, bool):
val = str(val)
result_entry[key[2:]] = val
del result_entry[key]
return result_entry