Merge "Move hbase alarms driver code to alarm tree"

This commit is contained in:
Jenkins 2014-07-25 05:28:32 +00:00 committed by Gerrit Code Review
commit a77dd2b540
4 changed files with 238 additions and 104 deletions

View File

@ -0,0 +1,224 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""HBase storage backend
"""
import datetime
import os
import happybase
from six.moves.urllib import parse as urlparse
from ceilometer.alarm.storage import base
from ceilometer.alarm.storage import models
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import log
from ceilometer.openstack.common import network_utils
from ceilometer.storage.hbase import inmemory as hbase_inmemory
from ceilometer.storage.hbase import utils as hbase_utils
from ceilometer import utils
LOG = log.getLogger(__name__)
AVAILABLE_CAPABILITIES = {
'alarms': {'query': {'simple': True,
'complex': False},
'history': {'query': {'simple': True,
'complex': False}}},
}
AVAILABLE_STORAGE_CAPABILITIES = {
'storage': {'production_ready': True},
}
class Connection(base.Connection):
"""Put the data into a HBase database
Collections:
- alarm:
- row_key: uuid of alarm
- Column Families:
f: contains the raw incoming alarm data
- alarm_h:
- row_key: uuid of alarm + "_" + reversed timestamp
- Column Families:
f: raw incoming alarm_history data. Timestamp becomes now()
if not determined
"""
CAPABILITIES = utils.update_nested(base.Connection.CAPABILITIES,
AVAILABLE_CAPABILITIES)
STORAGE_CAPABILITIES = utils.update_nested(
base.Connection.STORAGE_CAPABILITIES,
AVAILABLE_STORAGE_CAPABILITIES,
)
_memory_instance = None
ALARM_TABLE = "alarm"
ALARM_HISTORY_TABLE = "alarm_h"
def __init__(self, url):
"""Hbase Connection Initialization."""
opts = self._parse_connection_url(url)
if opts['host'] == '__test__':
url = os.environ.get('CEILOMETER_TEST_HBASE_URL')
if url:
# Reparse URL, but from the env variable now
opts = self._parse_connection_url(url)
self.conn_pool = self._get_connection_pool(opts)
else:
# This is a in-memory usage for unit tests
if Connection._memory_instance is None:
LOG.debug(_('Creating a new in-memory HBase '
'Connection object'))
Connection._memory_instance = (hbase_inmemory.
MConnectionPool())
self.conn_pool = Connection._memory_instance
else:
self.conn_pool = self._get_connection_pool(opts)
def upgrade(self):
with self.conn_pool.connection() as conn:
conn.create_table(self.ALARM_TABLE, {'f': dict()})
conn.create_table(self.ALARM_HISTORY_TABLE, {'f': dict()})
def clear(self):
LOG.debug(_('Dropping HBase schema...'))
with self.conn_pool.connection() as conn:
for table in [self.ALARM_TABLE,
self.ALARM_HISTORY_TABLE]:
try:
conn.disable_table(table)
except Exception:
LOG.debug(_('Cannot disable table but ignoring error'))
try:
conn.delete_table(table)
except Exception:
LOG.debug(_('Cannot delete table but ignoring error'))
@staticmethod
def _get_connection_pool(conf):
"""Return a connection pool to the database.
.. note::
The tests use a subclass to override this and return an
in-memory connection pool.
"""
LOG.debug(_('connecting to HBase on %(host)s:%(port)s') % (
{'host': conf['host'], 'port': conf['port']}))
return happybase.ConnectionPool(size=100, host=conf['host'],
port=conf['port'],
table_prefix=conf['table_prefix'])
@staticmethod
def _parse_connection_url(url):
"""Parse connection parameters from a database url.
.. note::
HBase Thrift does not support authentication and there is no
database name, so we are not looking for these in the url.
"""
opts = {}
result = network_utils.urlsplit(url)
opts['table_prefix'] = urlparse.parse_qs(
result.query).get('table_prefix', [None])[0]
opts['dbtype'] = result.scheme
if ':' in result.netloc:
opts['host'], port = result.netloc.split(':')
else:
opts['host'] = result.netloc
port = 9090
opts['port'] = port and int(port) or 9090
return opts
def update_alarm(self, alarm):
"""Create an alarm.
:param alarm: The alarm to create. It is Alarm object, so we need to
call as_dict()
"""
_id = alarm.alarm_id
alarm_to_store = hbase_utils.serialize_entry(alarm.as_dict())
with self.conn_pool.connection() as conn:
alarm_table = conn.table(self.ALARM_TABLE)
alarm_table.put(_id, alarm_to_store)
stored_alarm = hbase_utils.deserialize_entry(
alarm_table.row(_id))[0]
return models.Alarm(**stored_alarm)
create_alarm = update_alarm
def delete_alarm(self, alarm_id):
with self.conn_pool.connection() as conn:
alarm_table = conn.table(self.ALARM_TABLE)
alarm_table.delete(alarm_id)
def get_alarms(self, name=None, user=None, state=None, meter=None,
project=None, enabled=None, alarm_id=None, pagination=None):
if pagination:
raise NotImplementedError('Pagination not implemented')
if meter:
raise NotImplementedError('Filter by meter not implemented')
q = hbase_utils.make_query(alarm_id=alarm_id, name=name,
enabled=enabled, user_id=user,
project_id=project, state=state)
with self.conn_pool.connection() as conn:
alarm_table = conn.table(self.ALARM_TABLE)
gen = alarm_table.scan(filter=q)
for ignored, data in gen:
stored_alarm = hbase_utils.deserialize_entry(data)[0]
yield models.Alarm(**stored_alarm)
def get_alarm_changes(self, alarm_id, on_behalf_of,
user=None, project=None, type=None,
start_timestamp=None, start_timestamp_op=None,
end_timestamp=None, end_timestamp_op=None):
q = hbase_utils.make_query(alarm_id=alarm_id,
on_behalf_of=on_behalf_of, type=type,
user_id=user, project_id=project)
start_row, end_row = hbase_utils.make_timestamp_query(
hbase_utils.make_general_rowkey_scan,
start=start_timestamp, start_op=start_timestamp_op,
end=end_timestamp, end_op=end_timestamp_op, bounds_only=True,
some_id=alarm_id)
with self.conn_pool.connection() as conn:
alarm_history_table = conn.table(self.ALARM_HISTORY_TABLE)
gen = alarm_history_table.scan(filter=q, row_start=start_row,
row_stop=end_row)
for ignored, data in gen:
stored_entry = hbase_utils.deserialize_entry(data)[0]
yield models.AlarmChange(**stored_entry)
def record_alarm_change(self, alarm_change):
"""Record alarm change event."""
alarm_change_dict = hbase_utils.serialize_entry(alarm_change)
ts = alarm_change.get('timestamp') or datetime.datetime.now()
rts = hbase_utils.timestamp(ts)
with self.conn_pool.connection() as conn:
alarm_history_table = conn.table(self.ALARM_HISTORY_TABLE)
alarm_history_table.put(alarm_change.get('alarm_id') + "_" +
str(rts), alarm_change_dict)

View File

@ -21,8 +21,6 @@ import time
import happybase
from six.moves.urllib import parse as urlparse
from ceilometer.alarm.storage import base as alarm_base
from ceilometer.alarm.storage import models as alarm_models
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import log
from ceilometer.openstack.common import network_utils
@ -47,10 +45,6 @@ AVAILABLE_CAPABILITIES = {
'metadata': True},
'aggregation': {'standard': True}},
'events': {'query': {'simple': True}},
'alarms': {'query': {'simple': True,
'complex': False},
'history': {'query': {'simple': True,
'complex': False}}},
}
@ -59,7 +53,7 @@ AVAILABLE_STORAGE_CAPABILITIES = {
}
class Connection(base.Connection, alarm_base.Connection):
class Connection(base.Connection):
"""Put the data into a HBase database
Collections:
@ -116,21 +110,6 @@ class Connection(base.Connection, alarm_base.Connection):
"%s+%s+%s!%s!%s" % (rts, source, counter_name, counter_type,
counter_unit)
- alarm:
- row_key: uuid of alarm
- Column Families:
f: contains the raw incoming alarm data
- alarm_h:
- row_key: uuid of alarm + "_" + reversed timestamp
- Column Families:
f: raw incoming alarm_history data. Timestamp becomes now()
if not determined
- events:
- row_key: timestamp of event's generation + uuid of event
@ -158,8 +137,6 @@ class Connection(base.Connection, alarm_base.Connection):
RESOURCE_TABLE = "resource"
METER_TABLE = "meter"
ALARM_TABLE = "alarm"
ALARM_HISTORY_TABLE = "alarm_h"
EVENT_TABLE = "event"
def __init__(self, url):
@ -187,8 +164,6 @@ class Connection(base.Connection, alarm_base.Connection):
with self.conn_pool.connection() as conn:
conn.create_table(self.RESOURCE_TABLE, {'f': dict(max_versions=1)})
conn.create_table(self.METER_TABLE, {'f': dict(max_versions=1)})
conn.create_table(self.ALARM_TABLE, {'f': dict()})
conn.create_table(self.ALARM_HISTORY_TABLE, {'f': dict()})
conn.create_table(self.EVENT_TABLE, {'f': dict(max_versions=1)})
def clear(self):
@ -196,8 +171,6 @@ class Connection(base.Connection, alarm_base.Connection):
with self.conn_pool.connection() as conn:
for table in [self.RESOURCE_TABLE,
self.METER_TABLE,
self.ALARM_TABLE,
self.ALARM_HISTORY_TABLE,
self.EVENT_TABLE]:
try:
conn.disable_table(table)
@ -245,77 +218,6 @@ class Connection(base.Connection, alarm_base.Connection):
opts['port'] = port and int(port) or 9090
return opts
def update_alarm(self, alarm):
"""Create an alarm.
:param alarm: The alarm to create. It is Alarm object, so we need to
call as_dict()
"""
_id = alarm.alarm_id
alarm_to_store = hbase_utils.serialize_entry(alarm.as_dict())
with self.conn_pool.connection() as conn:
alarm_table = conn.table(self.ALARM_TABLE)
alarm_table.put(_id, alarm_to_store)
stored_alarm = hbase_utils.deserialize_entry(
alarm_table.row(_id))[0]
return alarm_models.Alarm(**stored_alarm)
create_alarm = update_alarm
def delete_alarm(self, alarm_id):
with self.conn_pool.connection() as conn:
alarm_table = conn.table(self.ALARM_TABLE)
alarm_table.delete(alarm_id)
def get_alarms(self, name=None, user=None, state=None, meter=None,
project=None, enabled=None, alarm_id=None, pagination=None):
if pagination:
raise NotImplementedError('Pagination not implemented')
if meter:
raise NotImplementedError('Filter by meter not implemented')
q = hbase_utils.make_query(alarm_id=alarm_id, name=name,
enabled=enabled, user_id=user,
project_id=project, state=state)
with self.conn_pool.connection() as conn:
alarm_table = conn.table(self.ALARM_TABLE)
gen = alarm_table.scan(filter=q)
for ignored, data in gen:
stored_alarm = hbase_utils.deserialize_entry(data)[0]
yield alarm_models.Alarm(**stored_alarm)
def get_alarm_changes(self, alarm_id, on_behalf_of,
user=None, project=None, type=None,
start_timestamp=None, start_timestamp_op=None,
end_timestamp=None, end_timestamp_op=None):
q = hbase_utils.make_query(alarm_id=alarm_id,
on_behalf_of=on_behalf_of, type=type,
user_id=user, project_id=project)
start_row, end_row = hbase_utils.make_timestamp_query(
hbase_utils.make_general_rowkey_scan,
start=start_timestamp, start_op=start_timestamp_op,
end=end_timestamp, end_op=end_timestamp_op, bounds_only=True,
some_id=alarm_id)
with self.conn_pool.connection() as conn:
alarm_history_table = conn.table(self.ALARM_HISTORY_TABLE)
gen = alarm_history_table.scan(filter=q, row_start=start_row,
row_stop=end_row)
for ignored, data in gen:
stored_entry = hbase_utils.deserialize_entry(data)[0]
yield alarm_models.AlarmChange(**stored_entry)
def record_alarm_change(self, alarm_change):
"""Record alarm change event."""
alarm_change_dict = hbase_utils.serialize_entry(alarm_change)
ts = alarm_change.get('timestamp') or datetime.datetime.now()
rts = hbase_utils.timestamp(ts)
with self.conn_pool.connection() as conn:
alarm_history_table = conn.table(self.ALARM_HISTORY_TABLE)
alarm_history_table.put(alarm_change.get('alarm_id') + "_" +
str(rts), alarm_change_dict)
def record_metering_data(self, data):
"""Write the data to the backend storage system.

View File

@ -25,6 +25,7 @@
"""
import mock
from ceilometer.alarm.storage import impl_hbase as hbase_alarm
from ceilometer.storage.hbase import inmemory as hbase_inmemory
from ceilometer.storage import impl_hbase as hbase
from ceilometer.tests import base as test_base
@ -90,16 +91,23 @@ class CapabilitiesTest(test_base.BaseTestCase):
'stddev': False,
'cardinality': False}}
},
'alarms': {'query': {'simple': True,
'complex': False},
'history': {'query': {'simple': True,
'complex': False}}},
'events': {'query': {'simple': True}},
}
actual_capabilities = hbase.Connection.get_capabilities()
self.assertEqual(expected_capabilities, actual_capabilities)
def test_alarm_capabilities(self):
expected_capabilities = {
'alarms': {'query': {'simple': True,
'complex': False},
'history': {'query': {'simple': True,
'complex': False}}},
}
actual_capabilities = hbase_alarm.Connection.get_capabilities()
self.assertEqual(expected_capabilities, actual_capabilities)
def test_storage_capabilities(self):
expected_capabilities = {
'storage': {'production_ready': True},

View File

@ -167,7 +167,7 @@ ceilometer.alarm.storage =
mysql = ceilometer.storage.impl_sqlalchemy:Connection
postgresql = ceilometer.storage.impl_sqlalchemy:Connection
sqlite = ceilometer.storage.impl_sqlalchemy:Connection
hbase = ceilometer.storage.impl_hbase:Connection
hbase = ceilometer.alarm.storage.impl_hbase:Connection
db2 = ceilometer.alarm.storage.impl_db2:Connection
ceilometer.metering.storage =