225 lines
8.3 KiB
Python
225 lines
8.3 KiB
Python
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
"""HBase storage backend
|
|
"""
|
|
import datetime
|
|
import os
|
|
|
|
import happybase
|
|
from oslo.utils import netutils
|
|
from six.moves.urllib import parse as urlparse
|
|
|
|
from ceilometer.alarm.storage import base
|
|
from ceilometer.alarm.storage import models
|
|
from ceilometer.openstack.common.gettextutils import _
|
|
from ceilometer.openstack.common import log
|
|
from ceilometer.storage.hbase import inmemory as hbase_inmemory
|
|
from ceilometer.storage.hbase import utils as hbase_utils
|
|
from ceilometer import utils
|
|
|
|
LOG = log.getLogger(__name__)
|
|
|
|
|
|
AVAILABLE_CAPABILITIES = {
|
|
'alarms': {'query': {'simple': True,
|
|
'complex': False},
|
|
'history': {'query': {'simple': True,
|
|
'complex': False}}},
|
|
}
|
|
|
|
|
|
AVAILABLE_STORAGE_CAPABILITIES = {
|
|
'storage': {'production_ready': True},
|
|
}
|
|
|
|
|
|
class Connection(base.Connection):
|
|
"""Put the data into a HBase database
|
|
|
|
Collections:
|
|
|
|
- alarm:
|
|
|
|
- row_key: uuid of alarm
|
|
- Column Families:
|
|
|
|
f: contains the raw incoming alarm data
|
|
|
|
- alarm_h:
|
|
|
|
- row_key: uuid of alarm + "_" + reversed timestamp
|
|
- Column Families:
|
|
|
|
f: raw incoming alarm_history data. Timestamp becomes now()
|
|
if not determined
|
|
"""
|
|
|
|
CAPABILITIES = utils.update_nested(base.Connection.CAPABILITIES,
|
|
AVAILABLE_CAPABILITIES)
|
|
STORAGE_CAPABILITIES = utils.update_nested(
|
|
base.Connection.STORAGE_CAPABILITIES,
|
|
AVAILABLE_STORAGE_CAPABILITIES,
|
|
)
|
|
_memory_instance = None
|
|
|
|
ALARM_TABLE = "alarm"
|
|
ALARM_HISTORY_TABLE = "alarm_h"
|
|
|
|
def __init__(self, url):
|
|
"""Hbase Connection Initialization."""
|
|
opts = self._parse_connection_url(url)
|
|
|
|
if opts['host'] == '__test__':
|
|
url = os.environ.get('CEILOMETER_TEST_HBASE_URL')
|
|
if url:
|
|
# Reparse URL, but from the env variable now
|
|
opts = self._parse_connection_url(url)
|
|
self.conn_pool = self._get_connection_pool(opts)
|
|
else:
|
|
# This is a in-memory usage for unit tests
|
|
if Connection._memory_instance is None:
|
|
LOG.debug(_('Creating a new in-memory HBase '
|
|
'Connection object'))
|
|
Connection._memory_instance = (hbase_inmemory.
|
|
MConnectionPool())
|
|
self.conn_pool = Connection._memory_instance
|
|
else:
|
|
self.conn_pool = self._get_connection_pool(opts)
|
|
|
|
def upgrade(self):
|
|
with self.conn_pool.connection() as conn:
|
|
conn.create_table(self.ALARM_TABLE, {'f': dict()})
|
|
conn.create_table(self.ALARM_HISTORY_TABLE, {'f': dict()})
|
|
|
|
def clear(self):
|
|
LOG.debug(_('Dropping HBase schema...'))
|
|
with self.conn_pool.connection() as conn:
|
|
for table in [self.ALARM_TABLE,
|
|
self.ALARM_HISTORY_TABLE]:
|
|
try:
|
|
conn.disable_table(table)
|
|
except Exception:
|
|
LOG.debug(_('Cannot disable table but ignoring error'))
|
|
try:
|
|
conn.delete_table(table)
|
|
except Exception:
|
|
LOG.debug(_('Cannot delete table but ignoring error'))
|
|
|
|
@staticmethod
|
|
def _get_connection_pool(conf):
|
|
"""Return a connection pool to the database.
|
|
|
|
.. note::
|
|
|
|
The tests use a subclass to override this and return an
|
|
in-memory connection pool.
|
|
"""
|
|
LOG.debug(_('connecting to HBase on %(host)s:%(port)s') % (
|
|
{'host': conf['host'], 'port': conf['port']}))
|
|
return happybase.ConnectionPool(size=100, host=conf['host'],
|
|
port=conf['port'],
|
|
table_prefix=conf['table_prefix'])
|
|
|
|
@staticmethod
|
|
def _parse_connection_url(url):
|
|
"""Parse connection parameters from a database url.
|
|
|
|
.. note::
|
|
|
|
HBase Thrift does not support authentication and there is no
|
|
database name, so we are not looking for these in the url.
|
|
"""
|
|
opts = {}
|
|
result = netutils.urlsplit(url)
|
|
opts['table_prefix'] = urlparse.parse_qs(
|
|
result.query).get('table_prefix', [None])[0]
|
|
opts['dbtype'] = result.scheme
|
|
if ':' in result.netloc:
|
|
opts['host'], port = result.netloc.split(':')
|
|
else:
|
|
opts['host'] = result.netloc
|
|
port = 9090
|
|
opts['port'] = port and int(port) or 9090
|
|
return opts
|
|
|
|
def update_alarm(self, alarm):
|
|
"""Create an alarm.
|
|
|
|
:param alarm: The alarm to create. It is Alarm object, so we need to
|
|
call as_dict()
|
|
"""
|
|
_id = alarm.alarm_id
|
|
alarm_to_store = hbase_utils.serialize_entry(alarm.as_dict())
|
|
with self.conn_pool.connection() as conn:
|
|
alarm_table = conn.table(self.ALARM_TABLE)
|
|
alarm_table.put(_id, alarm_to_store)
|
|
stored_alarm = hbase_utils.deserialize_entry(
|
|
alarm_table.row(_id))[0]
|
|
return models.Alarm(**stored_alarm)
|
|
|
|
create_alarm = update_alarm
|
|
|
|
def delete_alarm(self, alarm_id):
|
|
with self.conn_pool.connection() as conn:
|
|
alarm_table = conn.table(self.ALARM_TABLE)
|
|
alarm_table.delete(alarm_id)
|
|
|
|
def get_alarms(self, name=None, user=None, state=None, meter=None,
|
|
project=None, enabled=None, alarm_id=None, pagination=None):
|
|
|
|
if pagination:
|
|
raise NotImplementedError('Pagination not implemented')
|
|
if meter:
|
|
raise NotImplementedError('Filter by meter not implemented')
|
|
|
|
q = hbase_utils.make_query(alarm_id=alarm_id, name=name,
|
|
enabled=enabled, user_id=user,
|
|
project_id=project, state=state)
|
|
|
|
with self.conn_pool.connection() as conn:
|
|
alarm_table = conn.table(self.ALARM_TABLE)
|
|
gen = alarm_table.scan(filter=q)
|
|
for ignored, data in gen:
|
|
stored_alarm = hbase_utils.deserialize_entry(data)[0]
|
|
yield models.Alarm(**stored_alarm)
|
|
|
|
def get_alarm_changes(self, alarm_id, on_behalf_of,
|
|
user=None, project=None, type=None,
|
|
start_timestamp=None, start_timestamp_op=None,
|
|
end_timestamp=None, end_timestamp_op=None):
|
|
q = hbase_utils.make_query(alarm_id=alarm_id,
|
|
on_behalf_of=on_behalf_of, type=type,
|
|
user_id=user, project_id=project)
|
|
start_row, end_row = hbase_utils.make_timestamp_query(
|
|
hbase_utils.make_general_rowkey_scan,
|
|
start=start_timestamp, start_op=start_timestamp_op,
|
|
end=end_timestamp, end_op=end_timestamp_op, bounds_only=True,
|
|
some_id=alarm_id)
|
|
with self.conn_pool.connection() as conn:
|
|
alarm_history_table = conn.table(self.ALARM_HISTORY_TABLE)
|
|
gen = alarm_history_table.scan(filter=q, row_start=start_row,
|
|
row_stop=end_row)
|
|
for ignored, data in gen:
|
|
stored_entry = hbase_utils.deserialize_entry(data)[0]
|
|
yield models.AlarmChange(**stored_entry)
|
|
|
|
def record_alarm_change(self, alarm_change):
|
|
"""Record alarm change event."""
|
|
alarm_change_dict = hbase_utils.serialize_entry(alarm_change)
|
|
ts = alarm_change.get('timestamp') or datetime.datetime.now()
|
|
rts = hbase_utils.timestamp(ts)
|
|
with self.conn_pool.connection() as conn:
|
|
alarm_history_table = conn.table(self.ALARM_HISTORY_TABLE)
|
|
alarm_history_table.put(alarm_change.get('alarm_id') + "_" +
|
|
str(rts), alarm_change_dict)
|