Merge "[HBase] Implement events on HBase"
This commit is contained in:
commit
99839f9785
@ -22,6 +22,7 @@ import datetime
|
||||
import hashlib
|
||||
import itertools
|
||||
import json
|
||||
import operator
|
||||
import os
|
||||
import re
|
||||
import six.moves.urllib.parse as urlparse
|
||||
@ -51,6 +52,7 @@ AVAILABLE_CAPABILITIES = {
|
||||
'statistics': {'query': {'simple': True,
|
||||
'metadata': True},
|
||||
'aggregation': {'standard': True}},
|
||||
'events': {'query': {'simple': True}},
|
||||
}
|
||||
|
||||
|
||||
@ -58,6 +60,11 @@ AVAILABLE_STORAGE_CAPABILITIES = {
|
||||
'storage': {'production_ready': True},
|
||||
}
|
||||
|
||||
DTYPE_NAMES = {'none': 0, 'string': 1, 'integer': 2, 'float': 3,
|
||||
'datetime': 4}
|
||||
OP_SIGN = {'eq': '=', 'lt': '<', 'le': '<=', 'ne': '!=', 'gt': '>',
|
||||
'ge': '>='}
|
||||
|
||||
|
||||
class Connection(base.Connection):
|
||||
"""Put the data into a HBase database
|
||||
@ -113,6 +120,16 @@ class Connection(base.Connection):
|
||||
- Column Families:
|
||||
f: raw incoming alarm_history data. Timestamp becomes now()
|
||||
if not determined
|
||||
|
||||
- events
|
||||
- row_key: timestamp of event's generation + uuid of event
|
||||
in format: "%s+%s" % (ts, Event.message_id)
|
||||
-Column Families:
|
||||
f: contains the following qualifiers:
|
||||
-event_type: description of event's type
|
||||
-timestamp: time stamp of event generation
|
||||
-all traits for this event in format
|
||||
"%s+%s" % (trait_name, trait_type)
|
||||
"""
|
||||
|
||||
CAPABILITIES = utils.update_nested(base.Connection.CAPABILITIES,
|
||||
@ -127,6 +144,7 @@ class Connection(base.Connection):
|
||||
METER_TABLE = "meter"
|
||||
ALARM_TABLE = "alarm"
|
||||
ALARM_HISTORY_TABLE = "alarm_h"
|
||||
EVENT_TABLE = "event"
|
||||
|
||||
def __init__(self, url):
|
||||
"""Hbase Connection Initialization."""
|
||||
@ -154,6 +172,7 @@ class Connection(base.Connection):
|
||||
conn.create_table(self.METER_TABLE, {'f': dict(max_versions=1)})
|
||||
conn.create_table(self.ALARM_TABLE, {'f': dict()})
|
||||
conn.create_table(self.ALARM_HISTORY_TABLE, {'f': dict()})
|
||||
conn.create_table(self.EVENT_TABLE, {'f': dict(max_versions=1)})
|
||||
|
||||
def clear(self):
|
||||
LOG.debug(_('Dropping HBase schema...'))
|
||||
@ -161,7 +180,8 @@ class Connection(base.Connection):
|
||||
for table in [self.RESOURCE_TABLE,
|
||||
self.METER_TABLE,
|
||||
self.ALARM_TABLE,
|
||||
self.ALARM_HISTORY_TABLE]:
|
||||
self.ALARM_HISTORY_TABLE,
|
||||
self.EVENT_TABLE]:
|
||||
try:
|
||||
conn.disable_table(table)
|
||||
except Exception:
|
||||
@ -268,7 +288,7 @@ class Connection(base.Connection):
|
||||
"""
|
||||
alarm_change_dict = serialize_entry(alarm_change)
|
||||
ts = alarm_change.get('timestamp') or datetime.datetime.now()
|
||||
rts = reverse_timestamp(ts)
|
||||
rts = timestamp(ts)
|
||||
with self.conn_pool.connection() as conn:
|
||||
alarm_history_table = conn.table(self.ALARM_HISTORY_TABLE)
|
||||
alarm_history_table.put(alarm_change.get('alarm_id') + "_" +
|
||||
@ -306,7 +326,7 @@ class Connection(base.Connection):
|
||||
|
||||
# We use reverse timestamps in rowkeys as they are sorted
|
||||
# alphabetically.
|
||||
rts = reverse_timestamp(data['timestamp'])
|
||||
rts = timestamp(data['timestamp'])
|
||||
row = "%s_%d_%s" % (data['counter_name'], rts, m.hexdigest())
|
||||
record = serialize_entry(data, **{'source': data['source'],
|
||||
'rts': rts,
|
||||
@ -557,6 +577,139 @@ class Connection(base.Connection):
|
||||
self._update_meter_stats(results[-1], meter[0])
|
||||
return results
|
||||
|
||||
def record_events(self, event_models):
|
||||
"""Write the events to Hbase.
|
||||
|
||||
:param event_models: a list of models.Event objects.
|
||||
"""
|
||||
problem_events = []
|
||||
|
||||
with self.conn_pool.connection() as conn:
|
||||
events_table = conn.table(self.EVENT_TABLE)
|
||||
for event_model in event_models:
|
||||
# Row key consists of timestamp and message_id from
|
||||
# models.Event or purposes of storage event sorted by
|
||||
# timestamp in the database.
|
||||
ts = event_model.generated
|
||||
row = "%d_%s" % (timestamp(ts, reverse=False),
|
||||
event_model.message_id)
|
||||
event_type = event_model.event_type
|
||||
traits = {}
|
||||
if event_model.traits:
|
||||
for trait in event_model.traits:
|
||||
key = "%s+%d" % (trait.name, trait.dtype)
|
||||
traits[key] = trait.value
|
||||
record = serialize_entry(traits, event_type=event_type,
|
||||
timestamp=ts)
|
||||
try:
|
||||
events_table.put(row, record)
|
||||
except Exception as ex:
|
||||
LOG.debug(_("Failed to record event: %s") % ex)
|
||||
problem_events.append((models.Event.UNKNOWN_PROBLEM,
|
||||
event_model))
|
||||
return problem_events
|
||||
|
||||
def get_events(self, event_filter):
|
||||
"""Return an iterable of models.Event objects.
|
||||
|
||||
:param event_filter: storage.EventFilter object, consists of filters
|
||||
for events that are stored in database.
|
||||
"""
|
||||
q, start, stop = make_events_query_from_filter(event_filter)
|
||||
with self.conn_pool.connection() as conn:
|
||||
events_table = conn.table(self.EVENT_TABLE)
|
||||
|
||||
gen = events_table.scan(filter=q, row_start=start, row_stop=stop)
|
||||
|
||||
events = []
|
||||
for event_id, data in gen:
|
||||
traits = []
|
||||
events_dict = deserialize_entry(data)[0]
|
||||
for key, value in events_dict.items():
|
||||
if (not key.startswith('event_type')
|
||||
and not key.startswith('timestamp')):
|
||||
trait_name, trait_dtype = key.rsplit('+', 1)
|
||||
traits.append(models.Trait(name=trait_name,
|
||||
dtype=int(trait_dtype),
|
||||
value=value))
|
||||
ts, mess = event_id.split('_', 1)
|
||||
|
||||
events.append(models.Event(
|
||||
message_id=mess,
|
||||
event_type=events_dict['event_type'],
|
||||
generated=events_dict['timestamp'],
|
||||
traits=sorted(traits, key=(lambda item:
|
||||
getattr(item, 'dtype')))
|
||||
))
|
||||
return events
|
||||
|
||||
def get_event_types(self):
|
||||
"""Return all event types as an iterable of strings."""
|
||||
with self.conn_pool.connection() as conn:
|
||||
events_table = conn.table(self.EVENT_TABLE)
|
||||
gen = events_table.scan()
|
||||
|
||||
event_types = set()
|
||||
for event_id, data in gen:
|
||||
events_dict = deserialize_entry(data)[0]
|
||||
for key, value in events_dict.items():
|
||||
if key.startswith('event_type'):
|
||||
if value not in event_types:
|
||||
event_types.add(value)
|
||||
yield value
|
||||
|
||||
def get_trait_types(self, event_type):
|
||||
"""Return a dictionary containing the name and data type of the trait.
|
||||
|
||||
Only trait types for the provided event_type are returned.
|
||||
:param event_type: the type of the Event
|
||||
"""
|
||||
|
||||
q = make_query(event_type=event_type)
|
||||
trait_types = set()
|
||||
with self.conn_pool.connection() as conn:
|
||||
events_table = conn.table(self.EVENT_TABLE)
|
||||
gen = events_table.scan(filter=q)
|
||||
for event_id, data in gen:
|
||||
events_dict = deserialize_entry(data)[0]
|
||||
for key, value in events_dict.items():
|
||||
if (not key.startswith('event_type') and
|
||||
not key.startswith('timestamp')):
|
||||
name, tt_number = key.rsplit('+', 1)
|
||||
if name not in trait_types:
|
||||
# Here we check that our method return only unique
|
||||
# trait types, for ex. if it is found the same trait
|
||||
# types in different events with equal event_type,
|
||||
# method will return only one trait type. It is
|
||||
# proposed that certain trait name could have only one
|
||||
# trait type.
|
||||
trait_types.add(name)
|
||||
data_type = models.Trait.type_names[int(tt_number)]
|
||||
yield {'name': name, 'data_type': data_type}
|
||||
|
||||
def get_traits(self, event_type, trait_type=None):
|
||||
"""Return all trait instances associated with an event_type. If
|
||||
trait_type is specified, only return instances of that trait type.
|
||||
|
||||
:param event_type: the type of the Event to filter by
|
||||
:param trait_type: the name of the Trait to filter by
|
||||
"""
|
||||
q = make_query(event_type=event_type, trait_type=trait_type)
|
||||
traits = []
|
||||
with self.conn_pool.connection() as conn:
|
||||
events_table = conn.table(self.EVENT_TABLE)
|
||||
gen = events_table.scan(filter=q)
|
||||
for event_id, data in gen:
|
||||
events_dict = deserialize_entry(data)[0]
|
||||
for key, value in events_dict.items():
|
||||
if (not key.startswith('event_type') and
|
||||
not key.startswith('timestamp')):
|
||||
name, tt_number = key.rsplit('+', 1)
|
||||
traits.append(models.Trait(name=name,
|
||||
dtype=int(tt_number), value=value))
|
||||
for trait in sorted(traits, key=operator.attrgetter('dtype')):
|
||||
yield trait
|
||||
|
||||
|
||||
###############
|
||||
# This is a very crude version of "in-memory HBase", which implements just
|
||||
@ -655,6 +808,55 @@ class MTable(object):
|
||||
"yet" % op)
|
||||
return r
|
||||
|
||||
@staticmethod
|
||||
def ColumnPrefixFilter(args, rows):
|
||||
"""This is filter for testing "in-memory HBase".
|
||||
|
||||
This method is called from scan() when 'ColumnPrefixFilter' is found
|
||||
in the 'filter' argument
|
||||
:param args is list of filter arguments, contain prefix of column
|
||||
:param rows is dict of row prefixes for filtering
|
||||
"""
|
||||
value = args[0]
|
||||
column = 'f:' + value
|
||||
r = {}
|
||||
for row, data in rows.items():
|
||||
column_dict = {}
|
||||
for key in data:
|
||||
if key.startswith(column):
|
||||
column_dict[key] = data[key]
|
||||
r[row] = column_dict
|
||||
return r
|
||||
|
||||
@staticmethod
|
||||
def RowFilter(args, rows):
|
||||
"""This is filter for testing "in-memory HBase".
|
||||
|
||||
This method is called from scan() when 'RowFilter'
|
||||
is found in the 'filter' argument
|
||||
:param args is list of filter arguments, it contains operator and
|
||||
sought string
|
||||
:param rows is dict of rows which are filtered
|
||||
"""
|
||||
op = args[0]
|
||||
value = args[1]
|
||||
if value.startswith('regexstring:'):
|
||||
value = value[len('regexstring:'):]
|
||||
r = {}
|
||||
for row, data in rows.items():
|
||||
try:
|
||||
g = re.search(value, row).group()
|
||||
if op == '=':
|
||||
if g == row:
|
||||
r[row] = data
|
||||
else:
|
||||
raise NotImplementedError("In-memory "
|
||||
"RowFilter doesn't support "
|
||||
"the %s operation yet" % op)
|
||||
except AttributeError:
|
||||
pass
|
||||
return r
|
||||
|
||||
|
||||
class MConnectionPool(object):
|
||||
def __init__(self):
|
||||
@ -696,19 +898,56 @@ class MConnection(object):
|
||||
|
||||
#################################################
|
||||
# Here be various HBase helpers
|
||||
def reverse_timestamp(dt):
|
||||
"""Reverse timestamp so that newer timestamps are represented by smaller
|
||||
numbers than older ones.
|
||||
def timestamp(dt, reverse=True):
|
||||
"""Timestamp is count of milliseconds since start of epoch.
|
||||
|
||||
Reverse timestamps is a technique used in HBase rowkey design. When period
|
||||
Timestamps is a technique used in HBase rowkey design. When period
|
||||
queries are required the HBase rowkeys must include timestamps, but as
|
||||
rowkeys in HBase are ordered lexicographically, the timestamps must be
|
||||
reversed.
|
||||
rowkeys in HBase are ordered lexicographically.
|
||||
|
||||
Same for the reversed timestamps, but the order will be opposite.
|
||||
|
||||
:param: dt: datetime which is translated to the (reversed or not) timestamp
|
||||
:param: reverse: is a boolean parameter for reverse or straight count of
|
||||
timestamp in milliseconds
|
||||
:return count or reversed count of milliseconds since start of epoch
|
||||
"""
|
||||
epoch = datetime.datetime(1970, 1, 1)
|
||||
td = dt - epoch
|
||||
ts = td.microseconds + td.seconds * 1000000 + td.days * 86400000000
|
||||
return 0x7fffffffffffffff - ts
|
||||
return 0x7fffffffffffffff - ts if reverse else ts
|
||||
|
||||
|
||||
def make_events_query_from_filter(event_filter):
|
||||
"""Return start and stop row for filtering and a query which based on the
|
||||
selected parameter.
|
||||
|
||||
:param event_filter: storage.EventFilter object.
|
||||
"""
|
||||
q = []
|
||||
res_q = None
|
||||
start = "%s" % (timestamp(event_filter.start_time, reverse=False)
|
||||
if event_filter.start_time else "")
|
||||
stop = "%s" % (timestamp(event_filter.end_time, reverse=False)
|
||||
if event_filter.end_time else "")
|
||||
if event_filter.event_type:
|
||||
q.append("SingleColumnValueFilter ('f', 'event_type', = , "
|
||||
"'binary:%s')" % dump(event_filter.event_type))
|
||||
if event_filter.message_id:
|
||||
q.append("RowFilter ( = , 'regexstring:\d*_%s')" %
|
||||
event_filter.message_id)
|
||||
if len(q):
|
||||
res_q = " AND ".join(q)
|
||||
|
||||
if event_filter.traits_filter:
|
||||
for trait_filter in event_filter.traits_filter:
|
||||
q_trait = make_query(trait_query=True, **trait_filter)
|
||||
if q_trait:
|
||||
if res_q:
|
||||
res_q += " AND " + q_trait
|
||||
else:
|
||||
res_q = q_trait
|
||||
return res_q, start, stop
|
||||
|
||||
|
||||
def make_timestamp_query(func, start=None, start_op=None, end=None,
|
||||
@ -747,8 +986,8 @@ def make_timestamp_query(func, start=None, start_op=None, end=None,
|
||||
|
||||
def get_start_end_rts(start, start_op, end, end_op):
|
||||
|
||||
rts_start = str(reverse_timestamp(start) + 1) if start else ""
|
||||
rts_end = str(reverse_timestamp(end) + 1) if end else ""
|
||||
rts_start = str(timestamp(start) + 1) if start else ""
|
||||
rts_end = str(timestamp(end) + 1) if end else ""
|
||||
|
||||
#By default, we are using ge for lower bound and lt for upper bound
|
||||
if start_op == 'gt':
|
||||
@ -759,23 +998,42 @@ def get_start_end_rts(start, start_op, end, end_op):
|
||||
return rts_start, rts_end
|
||||
|
||||
|
||||
def make_query(metaquery=None, **kwargs):
|
||||
def make_query(metaquery=None, trait_query=None, **kwargs):
|
||||
"""Return a filter query string based on the selected parameters.
|
||||
|
||||
:param metaquery: optional metaquery dict
|
||||
:param trait_query: optional boolean, for trait_query from kwargs
|
||||
:param kwargs: key-value pairs to filter on. Key should be a real
|
||||
column name in db
|
||||
"""
|
||||
q = []
|
||||
res_q = None
|
||||
|
||||
# Query for traits if a little differ from others it is constructed with
|
||||
# SingleColumnValueFilter with the possibility to choose an operator for
|
||||
# value
|
||||
if trait_query:
|
||||
trait_name = kwargs.pop('key')
|
||||
op = kwargs.pop('op', 'eq')
|
||||
for k, v in kwargs.items():
|
||||
if v is not None:
|
||||
res_q = ("SingleColumnValueFilter "
|
||||
"('f', '%s+%d', %s, 'binary:%s', true, true)" %
|
||||
(trait_name, DTYPE_NAMES[k], OP_SIGN[op],
|
||||
dump(v)))
|
||||
return res_q
|
||||
|
||||
# Note: we use extended constructor for SingleColumnValueFilter here.
|
||||
# It is explicitly specified that entry should not be returned if CF is not
|
||||
# found in table.
|
||||
for key, value in kwargs.items():
|
||||
for key, value in sorted(kwargs.items()):
|
||||
if value is not None:
|
||||
if key == 'source':
|
||||
q.append("SingleColumnValueFilter "
|
||||
"('f', 's_%s', =, 'binary:%s', true, true)" %
|
||||
(value, dump('1')))
|
||||
elif key == 'trait_type':
|
||||
q.append("ColumnPrefixFilter('%s')" % value)
|
||||
else:
|
||||
q.append("SingleColumnValueFilter "
|
||||
"('f', '%s', =, 'binary:%s', true, true)" %
|
||||
|
@ -92,7 +92,7 @@ class CapabilitiesTest(test_base.BaseTestCase):
|
||||
'complex': False},
|
||||
'history': {'query': {'simple': False,
|
||||
'complex': False}}},
|
||||
'events': {'query': {'simple': False}}
|
||||
'events': {'query': {'simple': True}},
|
||||
}
|
||||
|
||||
actual_capabilities = hbase.Connection.get_capabilities()
|
||||
|
@ -2717,6 +2717,7 @@ class EventTestBase(tests_db.TestBase,
|
||||
|
||||
|
||||
class EventTest(EventTestBase):
|
||||
@tests_db.run_with('sqlite', 'mongodb', 'db2')
|
||||
def test_duplicate_message_id(self):
|
||||
now = datetime.datetime.utcnow()
|
||||
m = [models.Event("1", "Foo", now, None),
|
||||
|
Loading…
x
Reference in New Issue
Block a user