Merge "Remove deprecated non-SQL drivers"
This commit is contained in:
commit
a5f1b9b6ab
@ -1,162 +0,0 @@
|
|||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
|
|
||||||
"""
|
|
||||||
A tool for migrating alarms and alarms history data from NoSQL to SQL.
|
|
||||||
|
|
||||||
NOTES:
|
|
||||||
|
|
||||||
- Users need to specify the source NoSQL connection url and the destination SQL
|
|
||||||
connection URL with this tool, an usage example:
|
|
||||||
aodh-data-migration --nosql-conn \
|
|
||||||
mongodb://aodh:password@127.0.0.1:27017/aodh --sql-conn \
|
|
||||||
mysql+pymysql://root:password@127.0.0.1/aodh?charset=utf8
|
|
||||||
|
|
||||||
- Both the alarm data and alarm history data will be migrated when running this
|
|
||||||
tool, but the history data migration can be avoid by specifying False of
|
|
||||||
--migrate-history option.
|
|
||||||
|
|
||||||
- It is better to ensure the db connection is OK when running this tool, and
|
|
||||||
this tool can be run repeatedly, the duplicated data will be skipped.
|
|
||||||
|
|
||||||
- This tool depends on the NoSQL and SQL drivers of Aodh, so it is should be
|
|
||||||
used only before the removal of NoSQL drivers.
|
|
||||||
|
|
||||||
- This tool has been tested OK in devstack environment, but users need to be
|
|
||||||
cautious with this, because the data migration between storage backends is
|
|
||||||
a bit dangerous.
|
|
||||||
|
|
||||||
"""
|
|
||||||
|
|
||||||
import argparse
|
|
||||||
import logging
|
|
||||||
import sys
|
|
||||||
|
|
||||||
from oslo_config import cfg
|
|
||||||
from oslo_db import exception
|
|
||||||
from oslo_db import options as db_options
|
|
||||||
from aodh.i18n import _LE, _LI, _LW
|
|
||||||
import six.moves.urllib.parse as urlparse
|
|
||||||
|
|
||||||
from aodh import storage
|
|
||||||
|
|
||||||
root_logger = logging.getLogger('')
|
|
||||||
|
|
||||||
|
|
||||||
def get_parser():
|
|
||||||
parser = argparse.ArgumentParser(
|
|
||||||
description='A tool for Migrating alarms and alarms history from'
|
|
||||||
' NoSQL to SQL',
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
'--nosql-conn',
|
|
||||||
required=True,
|
|
||||||
type=str,
|
|
||||||
help='The source NoSQL database connection.',
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
'--sql-conn',
|
|
||||||
required=True,
|
|
||||||
type=str,
|
|
||||||
help='The destination SQL database connection.',
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
'--migrate-history',
|
|
||||||
default=True,
|
|
||||||
type=bool,
|
|
||||||
help='Migrate history data when migrate alarms or not,'
|
|
||||||
' True as Default.',
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
'--debug',
|
|
||||||
default=False,
|
|
||||||
action='store_true',
|
|
||||||
help='Show the debug level log messages.',
|
|
||||||
)
|
|
||||||
return parser
|
|
||||||
|
|
||||||
|
|
||||||
def _validate_conn_options(args):
|
|
||||||
nosql_scheme = urlparse.urlparse(args.nosql_conn).scheme
|
|
||||||
sql_scheme = urlparse.urlparse(args.sql_conn).scheme
|
|
||||||
if nosql_scheme not in ('mongodb', 'hbase'):
|
|
||||||
root_logger.error(_LE('Invalid source DB type %s, the source database '
|
|
||||||
'connection should be one of: [mongodb, hbase]'
|
|
||||||
), nosql_scheme)
|
|
||||||
sys.exit(1)
|
|
||||||
if sql_scheme not in ('mysql', 'mysql+pymysql', 'postgresql',
|
|
||||||
'sqlite'):
|
|
||||||
root_logger.error(_LE('Invalid destination DB type %s, the destination'
|
|
||||||
' database connection should be one of: '
|
|
||||||
'[mysql, postgresql, sqlite]'), sql_scheme)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
|
||||||
args = get_parser().parse_args()
|
|
||||||
|
|
||||||
# Set up logging to use the console
|
|
||||||
console = logging.StreamHandler(sys.stderr)
|
|
||||||
formatter = logging.Formatter(
|
|
||||||
'[%(asctime)s] %(levelname)-8s %(message)s')
|
|
||||||
console.setFormatter(formatter)
|
|
||||||
root_logger.addHandler(console)
|
|
||||||
if args.debug:
|
|
||||||
root_logger.setLevel(logging.DEBUG)
|
|
||||||
else:
|
|
||||||
root_logger.setLevel(logging.INFO)
|
|
||||||
|
|
||||||
_validate_conn_options(args)
|
|
||||||
|
|
||||||
nosql_conf = cfg.ConfigOpts()
|
|
||||||
db_options.set_defaults(nosql_conf, args.nosql_conn)
|
|
||||||
nosql_conf.register_opts(storage.OPTS, 'database')
|
|
||||||
nosql_conn = storage.get_connection_from_config(nosql_conf)
|
|
||||||
|
|
||||||
sql_conf = cfg.ConfigOpts()
|
|
||||||
db_options.set_defaults(sql_conf, args.sql_conn)
|
|
||||||
sql_conf.register_opts(storage.OPTS, 'database')
|
|
||||||
sql_conn = storage.get_connection_from_config(sql_conf)
|
|
||||||
|
|
||||||
root_logger.info(
|
|
||||||
_LI("Starting to migrate alarms data from NoSQL to SQL..."))
|
|
||||||
|
|
||||||
count = 0
|
|
||||||
for alarm in nosql_conn.get_alarms():
|
|
||||||
root_logger.debug("Migrating alarm %s..." % alarm.alarm_id)
|
|
||||||
try:
|
|
||||||
sql_conn.create_alarm(alarm)
|
|
||||||
count += 1
|
|
||||||
except exception.DBDuplicateEntry:
|
|
||||||
root_logger.warning(_LW("Duplicated alarm %s found, skipped."),
|
|
||||||
alarm.alarm_id)
|
|
||||||
if not args.migrate_history:
|
|
||||||
continue
|
|
||||||
|
|
||||||
history_count = 0
|
|
||||||
for history in nosql_conn.get_alarm_changes(alarm.alarm_id, None):
|
|
||||||
history_data = history.as_dict()
|
|
||||||
root_logger.debug(" Migrating alarm history data with"
|
|
||||||
" event_id %s..." % history_data['event_id'])
|
|
||||||
try:
|
|
||||||
sql_conn.record_alarm_change(history_data)
|
|
||||||
history_count += 1
|
|
||||||
except exception.DBDuplicateEntry:
|
|
||||||
root_logger.warning(
|
|
||||||
_LW(" Duplicated alarm history %s found, skipped."),
|
|
||||||
history_data['event_id'])
|
|
||||||
root_logger.info(_LI(" Migrated %(count)s history data of alarm "
|
|
||||||
"%(alarm_id)s"),
|
|
||||||
{'count': history_count, 'alarm_id': alarm.alarm_id})
|
|
||||||
|
|
||||||
root_logger.info(_LI("End alarms data migration from NoSQL to SQL, %s"
|
|
||||||
" alarms have been migrated."), count)
|
|
@ -22,7 +22,6 @@ from oslo_utils import timeutils
|
|||||||
import retrying
|
import retrying
|
||||||
import six.moves.urllib.parse as urlparse
|
import six.moves.urllib.parse as urlparse
|
||||||
from stevedore import driver
|
from stevedore import driver
|
||||||
import warnings
|
|
||||||
|
|
||||||
_NAMESPACE = 'aodh.storage'
|
_NAMESPACE = 'aodh.storage'
|
||||||
|
|
||||||
@ -58,12 +57,6 @@ def get_connection_from_config(conf):
|
|||||||
retries = conf.database.max_retries
|
retries = conf.database.max_retries
|
||||||
url = conf.database.connection
|
url = conf.database.connection
|
||||||
connection_scheme = urlparse.urlparse(url).scheme
|
connection_scheme = urlparse.urlparse(url).scheme
|
||||||
if connection_scheme not in ('mysql', 'mysql+pymysql', 'postgresql',
|
|
||||||
'sqlite'):
|
|
||||||
msg = ('Storage backend %s is deprecated, and all the NoSQL backends '
|
|
||||||
'will be removed in Aodh 4.0, please use SQL backend.' %
|
|
||||||
connection_scheme)
|
|
||||||
warnings.warn(msg)
|
|
||||||
LOG.debug('looking for %(name)r driver in %(namespace)r',
|
LOG.debug('looking for %(name)r driver in %(namespace)r',
|
||||||
{'name': connection_scheme, 'namespace': _NAMESPACE})
|
{'name': connection_scheme, 'namespace': _NAMESPACE})
|
||||||
mgr = driver.DriverManager(_NAMESPACE, connection_scheme)
|
mgr = driver.DriverManager(_NAMESPACE, connection_scheme)
|
||||||
|
@ -1,78 +0,0 @@
|
|||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
|
|
||||||
import happybase
|
|
||||||
from oslo_log import log
|
|
||||||
from oslo_utils import netutils
|
|
||||||
from six.moves.urllib import parse as urlparse
|
|
||||||
|
|
||||||
from aodh.storage.hbase import inmemory as hbase_inmemory
|
|
||||||
|
|
||||||
LOG = log.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class Connection(object):
|
|
||||||
"""Base connection class for HBase."""
|
|
||||||
|
|
||||||
_memory_instance = None
|
|
||||||
|
|
||||||
def __init__(self, conf, url):
|
|
||||||
"""Hbase Connection Initialization."""
|
|
||||||
opts = self._parse_connection_url(url)
|
|
||||||
|
|
||||||
if opts['host'] == '__test__':
|
|
||||||
# This is a in-memory usage for unit tests
|
|
||||||
if Connection._memory_instance is None:
|
|
||||||
LOG.debug('Creating a new in-memory HBase Connection object')
|
|
||||||
Connection._memory_instance = (hbase_inmemory.
|
|
||||||
MConnectionPool())
|
|
||||||
self.conn_pool = Connection._memory_instance
|
|
||||||
else:
|
|
||||||
self.conn_pool = self._get_connection_pool(opts)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _get_connection_pool(conf):
|
|
||||||
"""Return a connection pool to the database.
|
|
||||||
|
|
||||||
.. note::
|
|
||||||
|
|
||||||
The tests use a subclass to override this and return an
|
|
||||||
in-memory connection pool.
|
|
||||||
"""
|
|
||||||
LOG.debug('connecting to HBase on %(host)s:%(port)s',
|
|
||||||
{'host': conf['host'], 'port': conf['port']})
|
|
||||||
return happybase.ConnectionPool(size=100, host=conf['host'],
|
|
||||||
port=conf['port'],
|
|
||||||
table_prefix=conf['table_prefix'])
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _parse_connection_url(url):
|
|
||||||
"""Parse connection parameters from a database url.
|
|
||||||
|
|
||||||
.. note::
|
|
||||||
|
|
||||||
HBase Thrift does not support authentication and there is no
|
|
||||||
database name, so we are not looking for these in the url.
|
|
||||||
"""
|
|
||||||
opts = {}
|
|
||||||
result = netutils.urlsplit(url)
|
|
||||||
opts['table_prefix'] = urlparse.parse_qs(
|
|
||||||
result.query).get('table_prefix', [None])[0]
|
|
||||||
opts['dbtype'] = result.scheme
|
|
||||||
if ':' in result.netloc:
|
|
||||||
opts['host'], port = result.netloc.split(':')
|
|
||||||
else:
|
|
||||||
opts['host'] = result.netloc
|
|
||||||
port = 9090
|
|
||||||
opts['port'] = port and int(port) or 9090
|
|
||||||
return opts
|
|
@ -1,283 +0,0 @@
|
|||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
""" This is a very crude version of "in-memory HBase", which implements just
|
|
||||||
enough functionality of HappyBase API to support testing of our driver.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import copy
|
|
||||||
import re
|
|
||||||
|
|
||||||
from oslo_log import log
|
|
||||||
import six
|
|
||||||
|
|
||||||
import aodh
|
|
||||||
|
|
||||||
LOG = log.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class MTable(object):
|
|
||||||
"""HappyBase.Table mock."""
|
|
||||||
def __init__(self, name, families):
|
|
||||||
self.name = name
|
|
||||||
self.families = families
|
|
||||||
self._rows_with_ts = {}
|
|
||||||
|
|
||||||
def row(self, key, columns=None):
|
|
||||||
if key not in self._rows_with_ts:
|
|
||||||
return {}
|
|
||||||
res = copy.copy(sorted(six.iteritems(
|
|
||||||
self._rows_with_ts.get(key)))[-1][1])
|
|
||||||
if columns:
|
|
||||||
keys = res.keys()
|
|
||||||
for key in keys:
|
|
||||||
if key not in columns:
|
|
||||||
res.pop(key)
|
|
||||||
return res
|
|
||||||
|
|
||||||
def rows(self, keys):
|
|
||||||
return ((k, self.row(k)) for k in keys)
|
|
||||||
|
|
||||||
def put(self, key, data, ts=None):
|
|
||||||
# Note: Now we use 'timestamped' but only for one Resource table.
|
|
||||||
# That's why we may put ts='0' in case when ts is None. If it is
|
|
||||||
# needed to use 2 types of put in one table ts=0 cannot be used.
|
|
||||||
if ts is None:
|
|
||||||
ts = "0"
|
|
||||||
if key not in self._rows_with_ts:
|
|
||||||
self._rows_with_ts[key] = {ts: data}
|
|
||||||
else:
|
|
||||||
if ts in self._rows_with_ts[key]:
|
|
||||||
self._rows_with_ts[key][ts].update(data)
|
|
||||||
else:
|
|
||||||
self._rows_with_ts[key].update({ts: data})
|
|
||||||
|
|
||||||
def delete(self, key):
|
|
||||||
del self._rows_with_ts[key]
|
|
||||||
|
|
||||||
def _get_latest_dict(self, row):
|
|
||||||
# The idea here is to return latest versions of columns.
|
|
||||||
# In _rows_with_ts we store {row: {ts_1: {data}, ts_2: {data}}}.
|
|
||||||
# res will contain a list of tuples [(ts_1, {data}), (ts_2, {data})]
|
|
||||||
# sorted by ts, i.e. in this list ts_2 is the most latest.
|
|
||||||
# To get result as HBase provides we should iterate in reverse order
|
|
||||||
# and get from "latest" data only key-values that are not in newer data
|
|
||||||
data = {}
|
|
||||||
for i in sorted(six.iteritems(self._rows_with_ts[row])):
|
|
||||||
data.update(i[1])
|
|
||||||
return data
|
|
||||||
|
|
||||||
def scan(self, filter=None, columns=None, row_start=None, row_stop=None,
|
|
||||||
limit=None):
|
|
||||||
columns = columns or []
|
|
||||||
sorted_keys = sorted(self._rows_with_ts)
|
|
||||||
# copy data between row_start and row_stop into a dict
|
|
||||||
rows = {}
|
|
||||||
for row in sorted_keys:
|
|
||||||
if row_start and row < row_start:
|
|
||||||
continue
|
|
||||||
if row_stop and row > row_stop:
|
|
||||||
break
|
|
||||||
rows[row] = self._get_latest_dict(row)
|
|
||||||
|
|
||||||
if columns:
|
|
||||||
ret = {}
|
|
||||||
for row, data in six.iteritems(rows):
|
|
||||||
for key in data:
|
|
||||||
if key in columns:
|
|
||||||
ret[row] = data
|
|
||||||
rows = ret
|
|
||||||
if filter:
|
|
||||||
# TODO(jdanjou): we should really parse this properly,
|
|
||||||
# but at the moment we are only going to support AND here
|
|
||||||
filters = filter.split('AND')
|
|
||||||
for f in filters:
|
|
||||||
# Extract filter name and its arguments
|
|
||||||
g = re.search("(.*)\((.*),?\)", f)
|
|
||||||
fname = g.group(1).strip()
|
|
||||||
fargs = [s.strip().replace('\'', '')
|
|
||||||
for s in g.group(2).split(',')]
|
|
||||||
m = getattr(self, fname)
|
|
||||||
if callable(m):
|
|
||||||
# overwrite rows for filtering to take effect
|
|
||||||
# in case of multiple filters
|
|
||||||
rows = m(fargs, rows)
|
|
||||||
else:
|
|
||||||
raise aodh.NotImplementedError(
|
|
||||||
"%s filter is not implemented, "
|
|
||||||
"you may want to add it!" % filter)
|
|
||||||
for k in sorted(rows)[:limit]:
|
|
||||||
yield k, rows[k]
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def SingleColumnValueFilter(args, rows):
|
|
||||||
"""This is filter for testing "in-memory HBase".
|
|
||||||
|
|
||||||
This method is called from scan() when 'SingleColumnValueFilter'
|
|
||||||
is found in the 'filter' argument.
|
|
||||||
"""
|
|
||||||
op = args[2]
|
|
||||||
column = "%s:%s" % (args[0], args[1])
|
|
||||||
value = args[3]
|
|
||||||
if value.startswith('binary:'):
|
|
||||||
value = value[7:]
|
|
||||||
r = {}
|
|
||||||
for row in rows:
|
|
||||||
data = rows[row]
|
|
||||||
if op == '=':
|
|
||||||
if column in data and data[column] == value:
|
|
||||||
r[row] = data
|
|
||||||
elif op == '<':
|
|
||||||
if column in data and data[column] < value:
|
|
||||||
r[row] = data
|
|
||||||
elif op == '<=':
|
|
||||||
if column in data and data[column] <= value:
|
|
||||||
r[row] = data
|
|
||||||
elif op == '>':
|
|
||||||
if column in data and data[column] > value:
|
|
||||||
r[row] = data
|
|
||||||
elif op == '>=':
|
|
||||||
if column in data and data[column] >= value:
|
|
||||||
r[row] = data
|
|
||||||
elif op == '!=':
|
|
||||||
if column in data and data[column] != value:
|
|
||||||
r[row] = data
|
|
||||||
return r
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def ColumnPrefixFilter(args, rows):
|
|
||||||
"""This is filter for testing "in-memory HBase".
|
|
||||||
|
|
||||||
This method is called from scan() when 'ColumnPrefixFilter' is found
|
|
||||||
in the 'filter' argument.
|
|
||||||
|
|
||||||
:param args: a list of filter arguments, contain prefix of column
|
|
||||||
:param rows: a dict of row prefixes for filtering
|
|
||||||
"""
|
|
||||||
value = args[0]
|
|
||||||
column = 'f:' + value
|
|
||||||
r = {}
|
|
||||||
for row, data in rows.items():
|
|
||||||
column_dict = {}
|
|
||||||
for key in data:
|
|
||||||
if key.startswith(column):
|
|
||||||
column_dict[key] = data[key]
|
|
||||||
r[row] = column_dict
|
|
||||||
return r
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def RowFilter(args, rows):
|
|
||||||
"""This is filter for testing "in-memory HBase".
|
|
||||||
|
|
||||||
This method is called from scan() when 'RowFilter' is found in the
|
|
||||||
'filter' argument.
|
|
||||||
|
|
||||||
:param args: a list of filter arguments, it contains operator and
|
|
||||||
sought string
|
|
||||||
:param rows: a dict of rows which are filtered
|
|
||||||
"""
|
|
||||||
op = args[0]
|
|
||||||
value = args[1]
|
|
||||||
if value.startswith('binary:'):
|
|
||||||
value = value[len('binary:'):]
|
|
||||||
if value.startswith('regexstring:'):
|
|
||||||
value = value[len('regexstring:'):]
|
|
||||||
r = {}
|
|
||||||
for row, data in rows.items():
|
|
||||||
try:
|
|
||||||
g = re.search(value, row).group()
|
|
||||||
if op == '=':
|
|
||||||
if g == row:
|
|
||||||
r[row] = data
|
|
||||||
else:
|
|
||||||
raise aodh.NotImplementedError(
|
|
||||||
"In-memory "
|
|
||||||
"RowFilter doesn't support "
|
|
||||||
"the %s operation yet" % op)
|
|
||||||
except AttributeError:
|
|
||||||
pass
|
|
||||||
return r
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def QualifierFilter(args, rows):
|
|
||||||
"""This is filter for testing "in-memory HBase".
|
|
||||||
|
|
||||||
This method is called from scan() when 'QualifierFilter' is found in
|
|
||||||
the 'filter' argument
|
|
||||||
"""
|
|
||||||
op = args[0]
|
|
||||||
value = args[1]
|
|
||||||
is_regex = False
|
|
||||||
if value.startswith('binaryprefix:'):
|
|
||||||
value = value[len('binaryprefix:'):]
|
|
||||||
if value.startswith('regexstring:'):
|
|
||||||
value = value[len('regexstring:'):]
|
|
||||||
is_regex = True
|
|
||||||
column = 'f:' + value
|
|
||||||
r = {}
|
|
||||||
for row in rows:
|
|
||||||
data = rows[row]
|
|
||||||
r_data = {}
|
|
||||||
for key in data:
|
|
||||||
if ((op == '=' and key.startswith(column)) or
|
|
||||||
(op == '>=' and key >= column) or
|
|
||||||
(op == '<=' and key <= column) or
|
|
||||||
(op == '>' and key > column) or
|
|
||||||
(op == '<' and key < column) or
|
|
||||||
(is_regex and re.search(value, key))):
|
|
||||||
r_data[key] = data[key]
|
|
||||||
else:
|
|
||||||
raise aodh.NotImplementedError(
|
|
||||||
"In-memory QualifierFilter "
|
|
||||||
"doesn't support the %s "
|
|
||||||
"operation yet" % op)
|
|
||||||
if r_data:
|
|
||||||
r[row] = r_data
|
|
||||||
return r
|
|
||||||
|
|
||||||
|
|
||||||
class MConnectionPool(object):
|
|
||||||
def __init__(self):
|
|
||||||
self.conn = MConnection()
|
|
||||||
|
|
||||||
def connection(self):
|
|
||||||
return self.conn
|
|
||||||
|
|
||||||
|
|
||||||
class MConnection(object):
|
|
||||||
"""HappyBase.Connection mock."""
|
|
||||||
def __init__(self):
|
|
||||||
self.tables = {}
|
|
||||||
|
|
||||||
def __enter__(self, *args, **kwargs):
|
|
||||||
return self
|
|
||||||
|
|
||||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
||||||
pass
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def open():
|
|
||||||
LOG.debug("Opening in-memory HBase connection")
|
|
||||||
|
|
||||||
def create_table(self, n, families=None):
|
|
||||||
families = families or {}
|
|
||||||
if n in self.tables:
|
|
||||||
return self.tables[n]
|
|
||||||
t = MTable(n, families)
|
|
||||||
self.tables[n] = t
|
|
||||||
return t
|
|
||||||
|
|
||||||
def delete_table(self, name, use_prefix=True):
|
|
||||||
del self.tables[name]
|
|
||||||
|
|
||||||
def table(self, name):
|
|
||||||
return self.create_table(name)
|
|
@ -1,43 +0,0 @@
|
|||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
"""HBase storage backend migrations
|
|
||||||
"""
|
|
||||||
|
|
||||||
|
|
||||||
from aodh.storage.hbase import utils as hbase_utils
|
|
||||||
|
|
||||||
|
|
||||||
def migrate_alarm_history_table(conn, table):
|
|
||||||
"""Migrate table 'alarm_h' in HBase.
|
|
||||||
|
|
||||||
Change row format from ""%s_%s" % alarm_id, rts,
|
|
||||||
to new separator format "%s:%s" % alarm_id, rts
|
|
||||||
"""
|
|
||||||
alarm_h_table = conn.table(table)
|
|
||||||
alarm_h_filter = "RowFilter(=, 'regexstring:\\w*_\\d{19}')"
|
|
||||||
gen = alarm_h_table.scan(filter=alarm_h_filter)
|
|
||||||
for row, data in gen:
|
|
||||||
row_parts = row.rsplit('_', 1)
|
|
||||||
alarm_h_table.put(hbase_utils.prepare_key(*row_parts), data)
|
|
||||||
alarm_h_table.delete(row)
|
|
||||||
|
|
||||||
|
|
||||||
TABLE_MIGRATION_FUNCS = {'alarm_h': migrate_alarm_history_table}
|
|
||||||
|
|
||||||
|
|
||||||
def migrate_tables(conn, tables):
|
|
||||||
if type(tables) is not list:
|
|
||||||
tables = [tables]
|
|
||||||
for table in tables:
|
|
||||||
if table in TABLE_MIGRATION_FUNCS:
|
|
||||||
TABLE_MIGRATION_FUNCS.get(table)(conn, table)
|
|
@ -1,250 +0,0 @@
|
|||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
""" Various HBase helpers
|
|
||||||
"""
|
|
||||||
import copy
|
|
||||||
import datetime
|
|
||||||
import json
|
|
||||||
|
|
||||||
import bson.json_util
|
|
||||||
from happybase.hbase import ttypes
|
|
||||||
from oslo_log import log
|
|
||||||
import six
|
|
||||||
|
|
||||||
from aodh.i18n import _LW
|
|
||||||
|
|
||||||
LOG = log.getLogger(__name__)
|
|
||||||
|
|
||||||
OP_SIGN = {'eq': '=', 'lt': '<', 'le': '<=', 'ne': '!=', 'gt': '>', 'ge': '>='}
|
|
||||||
# We need this additional dictionary because we have reverted timestamp in
|
|
||||||
# row-keys for stored metrics
|
|
||||||
OP_SIGN_REV = {'eq': '=', 'lt': '>', 'le': '>=', 'ne': '!=', 'gt': '<',
|
|
||||||
'ge': '<='}
|
|
||||||
|
|
||||||
|
|
||||||
def timestamp(dt, reverse=True):
|
|
||||||
"""Timestamp is count of milliseconds since start of epoch.
|
|
||||||
|
|
||||||
If reverse=True then timestamp will be reversed. Such a technique is used
|
|
||||||
in HBase rowkey design when period queries are required. Because of the
|
|
||||||
fact that rows are sorted lexicographically it's possible to vary whether
|
|
||||||
the 'oldest' entries will be on top of the table or it should be the newest
|
|
||||||
ones (reversed timestamp case).
|
|
||||||
|
|
||||||
:param dt: datetime which is translated to timestamp
|
|
||||||
:param reverse: a boolean parameter for reverse or straight count of
|
|
||||||
timestamp in milliseconds
|
|
||||||
:return: count or reversed count of milliseconds since start of epoch
|
|
||||||
"""
|
|
||||||
epoch = datetime.datetime(1970, 1, 1)
|
|
||||||
td = dt - epoch
|
|
||||||
ts = td.microseconds + td.seconds * 1000000 + td.days * 86400000000
|
|
||||||
return 0x7fffffffffffffff - ts if reverse else ts
|
|
||||||
|
|
||||||
|
|
||||||
def make_timestamp_query(func, start=None, start_op=None, end=None,
|
|
||||||
end_op=None, bounds_only=False, **kwargs):
|
|
||||||
"""Return a filter start and stop row for filtering and a query.
|
|
||||||
|
|
||||||
Query is based on the fact that CF-name is 'rts'.
|
|
||||||
:param start: Optional start timestamp
|
|
||||||
:param start_op: Optional start timestamp operator, like gt, ge
|
|
||||||
:param end: Optional end timestamp
|
|
||||||
:param end_op: Optional end timestamp operator, like lt, le
|
|
||||||
:param bounds_only: if True than query will not be returned
|
|
||||||
:param func: a function that provide a format of row
|
|
||||||
:param kwargs: kwargs for :param func
|
|
||||||
"""
|
|
||||||
# We don't need to dump here because get_start_end_rts returns strings
|
|
||||||
rts_start, rts_end = get_start_end_rts(start, end)
|
|
||||||
start_row, end_row = func(rts_start, rts_end, **kwargs)
|
|
||||||
|
|
||||||
if bounds_only:
|
|
||||||
return start_row, end_row
|
|
||||||
|
|
||||||
q = []
|
|
||||||
start_op = start_op or 'ge'
|
|
||||||
end_op = end_op or 'lt'
|
|
||||||
if rts_start:
|
|
||||||
q.append("SingleColumnValueFilter ('f', 'rts', %s, 'binary:%s')" %
|
|
||||||
(OP_SIGN_REV[start_op], rts_start))
|
|
||||||
if rts_end:
|
|
||||||
q.append("SingleColumnValueFilter ('f', 'rts', %s, 'binary:%s')" %
|
|
||||||
(OP_SIGN_REV[end_op], rts_end))
|
|
||||||
|
|
||||||
res_q = None
|
|
||||||
if len(q):
|
|
||||||
res_q = " AND ".join(q)
|
|
||||||
|
|
||||||
return start_row, end_row, res_q
|
|
||||||
|
|
||||||
|
|
||||||
def get_start_end_rts(start, end):
|
|
||||||
|
|
||||||
rts_start = str(timestamp(start)) if start else ""
|
|
||||||
rts_end = str(timestamp(end)) if end else ""
|
|
||||||
return rts_start, rts_end
|
|
||||||
|
|
||||||
|
|
||||||
def make_query(**kwargs):
|
|
||||||
"""Return a filter query string based on the selected parameters.
|
|
||||||
|
|
||||||
:param kwargs: key-value pairs to filter on. Key should be a real
|
|
||||||
column name in db
|
|
||||||
"""
|
|
||||||
q = []
|
|
||||||
|
|
||||||
# Note: we use extended constructor for SingleColumnValueFilter here.
|
|
||||||
# It is explicitly specified that entry should not be returned if CF is not
|
|
||||||
# found in table.
|
|
||||||
for key, value in sorted(kwargs.items()):
|
|
||||||
if value is not None:
|
|
||||||
if key == 'source':
|
|
||||||
q.append("SingleColumnValueFilter "
|
|
||||||
"('f', 's_%s', =, 'binary:%s', true, true)" %
|
|
||||||
(value, dump('1')))
|
|
||||||
elif key == 'trait_type':
|
|
||||||
q.append("ColumnPrefixFilter('%s')" % value)
|
|
||||||
elif key == 'event_id':
|
|
||||||
q.append("RowFilter ( = , 'regexstring:\d*:%s')" % value)
|
|
||||||
elif key == 'exclude':
|
|
||||||
for k, v in six.iteritems(value):
|
|
||||||
q.append("SingleColumnValueFilter "
|
|
||||||
"('f', '%(k)s', !=, 'binary:%(v)s', true, true)" %
|
|
||||||
{'k': quote(k), 'v': dump(v)})
|
|
||||||
else:
|
|
||||||
q.append("SingleColumnValueFilter "
|
|
||||||
"('f', '%s', =, 'binary:%s', true, true)" %
|
|
||||||
(quote(key), dump(value)))
|
|
||||||
if len(q):
|
|
||||||
return " AND ".join(q)
|
|
||||||
|
|
||||||
|
|
||||||
def make_general_rowkey_scan(rts_start=None, rts_end=None, some_id=None):
|
|
||||||
"""If it's filter on some_id without start and end.
|
|
||||||
|
|
||||||
start_row = some_id while end_row = some_id + MAX_BYTE.
|
|
||||||
"""
|
|
||||||
if some_id is None:
|
|
||||||
return None, None
|
|
||||||
if not rts_start:
|
|
||||||
# NOTE(idegtiarov): Here we could not use chr > 122 because chr >= 123
|
|
||||||
# will be quoted and character will be turn in a composition that is
|
|
||||||
# started with '%' (chr(37)) that lexicographically is less than chr
|
|
||||||
# of number
|
|
||||||
rts_start = chr(122)
|
|
||||||
end_row = prepare_key(some_id, rts_start)
|
|
||||||
start_row = prepare_key(some_id, rts_end)
|
|
||||||
|
|
||||||
return start_row, end_row
|
|
||||||
|
|
||||||
|
|
||||||
def prepare_key(*args):
|
|
||||||
"""Prepares names for rows and columns with correct separator.
|
|
||||||
|
|
||||||
:param args: strings or numbers that we want our key construct of
|
|
||||||
:return: key with quoted args that are separated with character ":"
|
|
||||||
"""
|
|
||||||
key_quote = []
|
|
||||||
for key in args:
|
|
||||||
if isinstance(key, six.integer_types):
|
|
||||||
key = str(key)
|
|
||||||
key_quote.append(quote(key))
|
|
||||||
return ":".join(key_quote)
|
|
||||||
|
|
||||||
|
|
||||||
def deserialize_entry(entry):
|
|
||||||
"""Return a list of flatten results.
|
|
||||||
|
|
||||||
Result contains a dict of simple structures such as 'resource_id':1
|
|
||||||
|
|
||||||
:param entry: entry from HBase, without row name and timestamp
|
|
||||||
"""
|
|
||||||
flatten_result = {}
|
|
||||||
for k, v in entry.items():
|
|
||||||
if ':' in k[2:]:
|
|
||||||
key = tuple([unquote(i) for i in k[2:].split(':')])
|
|
||||||
else:
|
|
||||||
key = unquote(k[2:])
|
|
||||||
flatten_result[key] = load(v)
|
|
||||||
|
|
||||||
return flatten_result
|
|
||||||
|
|
||||||
|
|
||||||
def serialize_entry(data=None, **kwargs):
|
|
||||||
"""Return a dict that is ready to be stored to HBase
|
|
||||||
|
|
||||||
:param data: dict to be serialized
|
|
||||||
:param kwargs: additional args
|
|
||||||
"""
|
|
||||||
data = data or {}
|
|
||||||
entry_dict = copy.copy(data)
|
|
||||||
entry_dict.update(**kwargs)
|
|
||||||
|
|
||||||
result = {}
|
|
||||||
for k, v in entry_dict.items():
|
|
||||||
result['f:' + quote(k, ':')] = dump(v)
|
|
||||||
return result
|
|
||||||
|
|
||||||
|
|
||||||
def dump(data):
|
|
||||||
return json.dumps(data, default=bson.json_util.default)
|
|
||||||
|
|
||||||
|
|
||||||
def load(data):
|
|
||||||
return json.loads(data, object_hook=object_hook)
|
|
||||||
|
|
||||||
|
|
||||||
# We don't want to have tzinfo in decoded json.This object_hook is
|
|
||||||
# overwritten json_util.object_hook for $date
|
|
||||||
def object_hook(dct):
|
|
||||||
if "$date" in dct:
|
|
||||||
dt = bson.json_util.object_hook(dct)
|
|
||||||
return dt.replace(tzinfo=None)
|
|
||||||
return bson.json_util.object_hook(dct)
|
|
||||||
|
|
||||||
|
|
||||||
def create_tables(conn, tables, column_families):
|
|
||||||
for table in tables:
|
|
||||||
try:
|
|
||||||
conn.create_table(table, column_families)
|
|
||||||
except ttypes.AlreadyExists:
|
|
||||||
if conn.table_prefix:
|
|
||||||
table = ("%(table_prefix)s"
|
|
||||||
"%(separator)s"
|
|
||||||
"%(table_name)s" %
|
|
||||||
dict(table_prefix=conn.table_prefix,
|
|
||||||
separator=conn.table_prefix_separator,
|
|
||||||
table_name=table))
|
|
||||||
|
|
||||||
LOG.warning(_LW("Cannot create table %s because "
|
|
||||||
"it already exists. Ignoring error"), table)
|
|
||||||
|
|
||||||
|
|
||||||
def quote(s, *args):
|
|
||||||
"""Return quoted string even if it is unicode one.
|
|
||||||
|
|
||||||
:param s: string that should be quoted
|
|
||||||
:param args: any symbol we want to stay unquoted
|
|
||||||
"""
|
|
||||||
s_en = s.encode('utf8')
|
|
||||||
return six.moves.urllib.parse.quote(s_en, *args)
|
|
||||||
|
|
||||||
|
|
||||||
def unquote(s):
|
|
||||||
"""Return unquoted and decoded string.
|
|
||||||
|
|
||||||
:param s: string that should be unquoted
|
|
||||||
"""
|
|
||||||
s_de = six.moves.urllib.parse.unquote(s)
|
|
||||||
return s_de.decode('utf8')
|
|
@ -1,189 +0,0 @@
|
|||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
|
|
||||||
import datetime
|
|
||||||
import operator
|
|
||||||
|
|
||||||
from oslo_log import log
|
|
||||||
|
|
||||||
import aodh
|
|
||||||
from aodh import storage
|
|
||||||
from aodh.storage import base
|
|
||||||
from aodh.storage.hbase import base as hbase_base
|
|
||||||
from aodh.storage.hbase import migration as hbase_migration
|
|
||||||
from aodh.storage.hbase import utils as hbase_utils
|
|
||||||
from aodh.storage import models
|
|
||||||
|
|
||||||
LOG = log.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
AVAILABLE_CAPABILITIES = {
|
|
||||||
'alarms': {'query': {'simple': True,
|
|
||||||
'complex': False},
|
|
||||||
'history': {'query': {'simple': True,
|
|
||||||
'complex': False}}},
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
AVAILABLE_STORAGE_CAPABILITIES = {
|
|
||||||
'storage': {'production_ready': True},
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
class Connection(hbase_base.Connection, base.Connection):
|
|
||||||
"""Put the alarm data into a HBase database
|
|
||||||
|
|
||||||
Collections:
|
|
||||||
|
|
||||||
- alarm:
|
|
||||||
|
|
||||||
- row_key: uuid of alarm
|
|
||||||
- Column Families:
|
|
||||||
|
|
||||||
f: contains the raw incoming alarm data
|
|
||||||
|
|
||||||
- alarm_h:
|
|
||||||
|
|
||||||
- row_key: uuid of alarm + ":" + reversed timestamp
|
|
||||||
- Column Families:
|
|
||||||
|
|
||||||
f: raw incoming alarm_history data. Timestamp becomes now()
|
|
||||||
if not determined
|
|
||||||
"""
|
|
||||||
|
|
||||||
CAPABILITIES = base.update_nested(base.Connection.CAPABILITIES,
|
|
||||||
AVAILABLE_CAPABILITIES)
|
|
||||||
STORAGE_CAPABILITIES = base.update_nested(
|
|
||||||
base.Connection.STORAGE_CAPABILITIES,
|
|
||||||
AVAILABLE_STORAGE_CAPABILITIES,
|
|
||||||
)
|
|
||||||
_memory_instance = None
|
|
||||||
|
|
||||||
ALARM_TABLE = "alarm"
|
|
||||||
ALARM_HISTORY_TABLE = "alarm_h"
|
|
||||||
|
|
||||||
def upgrade(self):
|
|
||||||
tables = [self.ALARM_HISTORY_TABLE, self.ALARM_TABLE]
|
|
||||||
column_families = {'f': dict()}
|
|
||||||
with self.conn_pool.connection() as conn:
|
|
||||||
hbase_utils.create_tables(conn, tables, column_families)
|
|
||||||
hbase_migration.migrate_tables(conn, tables)
|
|
||||||
|
|
||||||
def clear(self):
|
|
||||||
LOG.debug('Dropping HBase schema...')
|
|
||||||
with self.conn_pool.connection() as conn:
|
|
||||||
for table in [self.ALARM_TABLE,
|
|
||||||
self.ALARM_HISTORY_TABLE]:
|
|
||||||
try:
|
|
||||||
conn.disable_table(table)
|
|
||||||
except Exception:
|
|
||||||
LOG.debug('Cannot disable table but ignoring error')
|
|
||||||
try:
|
|
||||||
conn.delete_table(table)
|
|
||||||
except Exception:
|
|
||||||
LOG.debug('Cannot delete table but ignoring error')
|
|
||||||
|
|
||||||
def update_alarm(self, alarm, upsert=False):
|
|
||||||
"""Create an alarm.
|
|
||||||
|
|
||||||
:param alarm: The alarm to create. It is Alarm object, so we need to
|
|
||||||
call as_dict()
|
|
||||||
"""
|
|
||||||
_id = alarm.alarm_id
|
|
||||||
alarm_to_store = hbase_utils.serialize_entry(alarm.as_dict())
|
|
||||||
with self.conn_pool.connection() as conn:
|
|
||||||
alarm_table = conn.table(self.ALARM_TABLE)
|
|
||||||
if not upsert:
|
|
||||||
q = hbase_utils.make_query(alarm_id=alarm.alarm_id)
|
|
||||||
query_alarm = alarm_table.scan(filter=q)
|
|
||||||
if len(list(query_alarm)) == 0:
|
|
||||||
raise storage.AlarmNotFound(alarm.alarm_id)
|
|
||||||
alarm_table.put(_id, alarm_to_store)
|
|
||||||
stored_alarm = hbase_utils.deserialize_entry(
|
|
||||||
alarm_table.row(_id))
|
|
||||||
return models.Alarm(**stored_alarm)
|
|
||||||
|
|
||||||
def create_alarm(self, alarm):
|
|
||||||
return self.update_alarm(alarm, upsert=True)
|
|
||||||
|
|
||||||
def delete_alarm(self, alarm_id):
|
|
||||||
"""Delete an alarm and its history data."""
|
|
||||||
with self.conn_pool.connection() as conn:
|
|
||||||
alarm_table = conn.table(self.ALARM_TABLE)
|
|
||||||
alarm_table.delete(alarm_id)
|
|
||||||
q = hbase_utils.make_query(alarm_id=alarm_id)
|
|
||||||
alarm_history_table = conn.table(self.ALARM_HISTORY_TABLE)
|
|
||||||
for alarm_id, ignored in alarm_history_table.scan(filter=q):
|
|
||||||
alarm_history_table.delete(alarm_id)
|
|
||||||
|
|
||||||
def get_alarms(self, name=None, user=None, state=None, meter=None,
|
|
||||||
project=None, enabled=None, alarm_id=None,
|
|
||||||
alarm_type=None, severity=None, exclude=None,
|
|
||||||
pagination=None):
|
|
||||||
if pagination:
|
|
||||||
raise aodh.NotImplementedError('Pagination query not implemented')
|
|
||||||
if meter:
|
|
||||||
raise aodh.NotImplementedError(
|
|
||||||
'Filter by meter not implemented')
|
|
||||||
|
|
||||||
q = hbase_utils.make_query(alarm_id=alarm_id, name=name,
|
|
||||||
enabled=enabled, user_id=user,
|
|
||||||
project_id=project, state=state,
|
|
||||||
type=alarm_type, severity=severity,
|
|
||||||
exclude=exclude)
|
|
||||||
|
|
||||||
with self.conn_pool.connection() as conn:
|
|
||||||
alarm_table = conn.table(self.ALARM_TABLE)
|
|
||||||
gen = alarm_table.scan(filter=q)
|
|
||||||
alarms = [hbase_utils.deserialize_entry(data)
|
|
||||||
for ignored, data in gen]
|
|
||||||
for alarm in sorted(
|
|
||||||
alarms,
|
|
||||||
key=operator.itemgetter('timestamp'),
|
|
||||||
reverse=True):
|
|
||||||
yield models.Alarm(**alarm)
|
|
||||||
|
|
||||||
def get_alarm_changes(self, alarm_id, on_behalf_of,
|
|
||||||
user=None, project=None, alarm_type=None,
|
|
||||||
severity=None, start_timestamp=None,
|
|
||||||
start_timestamp_op=None, end_timestamp=None,
|
|
||||||
end_timestamp_op=None, pagination=None):
|
|
||||||
if pagination:
|
|
||||||
raise aodh.NotImplementedError('Pagination query not implemented')
|
|
||||||
q = hbase_utils.make_query(alarm_id=alarm_id,
|
|
||||||
on_behalf_of=on_behalf_of, type=alarm_type,
|
|
||||||
user_id=user, project_id=project,
|
|
||||||
severity=severity)
|
|
||||||
start_row, end_row = hbase_utils.make_timestamp_query(
|
|
||||||
hbase_utils.make_general_rowkey_scan,
|
|
||||||
start=start_timestamp, start_op=start_timestamp_op,
|
|
||||||
end=end_timestamp, end_op=end_timestamp_op, bounds_only=True,
|
|
||||||
some_id=alarm_id)
|
|
||||||
with self.conn_pool.connection() as conn:
|
|
||||||
alarm_history_table = conn.table(self.ALARM_HISTORY_TABLE)
|
|
||||||
gen = alarm_history_table.scan(filter=q, row_start=start_row,
|
|
||||||
row_stop=end_row)
|
|
||||||
for ignored, data in gen:
|
|
||||||
stored_entry = hbase_utils.deserialize_entry(data)
|
|
||||||
yield models.AlarmChange(**stored_entry)
|
|
||||||
|
|
||||||
def record_alarm_change(self, alarm_change):
|
|
||||||
"""Record alarm change event."""
|
|
||||||
alarm_change_dict = hbase_utils.serialize_entry(alarm_change)
|
|
||||||
ts = alarm_change.get('timestamp') or datetime.datetime.now()
|
|
||||||
rts = hbase_utils.timestamp(ts)
|
|
||||||
with self.conn_pool.connection() as conn:
|
|
||||||
alarm_history_table = conn.table(self.ALARM_HISTORY_TABLE)
|
|
||||||
alarm_history_table.put(
|
|
||||||
hbase_utils.prepare_key(alarm_change.get('alarm_id'), rts),
|
|
||||||
alarm_change_dict)
|
|
@ -1,106 +0,0 @@
|
|||||||
#
|
|
||||||
# Copyright 2012 New Dream Network, LLC (DreamHost)
|
|
||||||
# Copyright 2013 eNovance
|
|
||||||
# Copyright 2014-2015 Red Hat, Inc
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
"""MongoDB storage backend"""
|
|
||||||
|
|
||||||
from oslo_log import log
|
|
||||||
import pymongo
|
|
||||||
|
|
||||||
from aodh import storage
|
|
||||||
from aodh.storage.mongo import utils as pymongo_utils
|
|
||||||
from aodh.storage import pymongo_base
|
|
||||||
|
|
||||||
LOG = log.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class Connection(pymongo_base.Connection):
|
|
||||||
"""Put the alarm data into a MongoDB database."""
|
|
||||||
|
|
||||||
CONNECTION_POOL = pymongo_utils.ConnectionPool()
|
|
||||||
|
|
||||||
def __init__(self, conf, url):
|
|
||||||
self.conf = conf
|
|
||||||
# NOTE(jd) Use our own connection pooling on top of the Pymongo one./
|
|
||||||
# We need that otherwise we overflow the MongoDB instance with new
|
|
||||||
# connection since we instantiate a Pymongo client each time someone
|
|
||||||
# requires a new storage connection.
|
|
||||||
self.conn = self.CONNECTION_POOL.connect(
|
|
||||||
url,
|
|
||||||
conf.database.max_retries,
|
|
||||||
conf.database.retry_interval)
|
|
||||||
|
|
||||||
# Require MongoDB 2.4 to use $setOnInsert
|
|
||||||
if self.conn.server_info()['versionArray'] < [2, 4]:
|
|
||||||
raise storage.StorageBadVersion("Need at least MongoDB 2.4")
|
|
||||||
|
|
||||||
connection_options = pymongo.uri_parser.parse_uri(url)
|
|
||||||
self.db = getattr(self.conn, connection_options['database'])
|
|
||||||
if connection_options.get('username'):
|
|
||||||
self.db.authenticate(connection_options['username'],
|
|
||||||
connection_options['password'])
|
|
||||||
|
|
||||||
# NOTE(jd) Upgrading is just about creating index, so let's do this
|
|
||||||
# on connection to be sure at least the TTL is correctly updated if
|
|
||||||
# needed.
|
|
||||||
self.upgrade()
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def update_ttl(ttl, ttl_index_name, index_field, coll):
|
|
||||||
"""Update or ensure time_to_live indexes.
|
|
||||||
|
|
||||||
:param ttl: time to live in seconds.
|
|
||||||
:param ttl_index_name: name of the index we want to update or ensure.
|
|
||||||
:param index_field: field with the index that we need to update.
|
|
||||||
:param coll: collection which indexes need to be updated.
|
|
||||||
"""
|
|
||||||
indexes = coll.index_information()
|
|
||||||
if ttl <= 0:
|
|
||||||
if ttl_index_name in indexes:
|
|
||||||
coll.drop_index(ttl_index_name)
|
|
||||||
return
|
|
||||||
|
|
||||||
if ttl_index_name in indexes:
|
|
||||||
return coll.database.command(
|
|
||||||
'collMod', coll.name,
|
|
||||||
index={'keyPattern': {index_field: pymongo.ASCENDING},
|
|
||||||
'expireAfterSeconds': ttl})
|
|
||||||
|
|
||||||
coll.ensure_index([(index_field, pymongo.ASCENDING)],
|
|
||||||
expireAfterSeconds=ttl,
|
|
||||||
name=ttl_index_name)
|
|
||||||
|
|
||||||
def upgrade(self):
|
|
||||||
super(Connection, self).upgrade()
|
|
||||||
# Establish indexes
|
|
||||||
ttl = self.conf.database.alarm_history_time_to_live
|
|
||||||
self.update_ttl(
|
|
||||||
ttl, 'alarm_history_ttl', 'timestamp', self.db.alarm_history)
|
|
||||||
|
|
||||||
def clear(self):
|
|
||||||
self.conn.drop_database(self.db.name)
|
|
||||||
# Connection will be reopened automatically if needed
|
|
||||||
self.conn.close()
|
|
||||||
|
|
||||||
def clear_expired_alarm_history_data(self, alarm_history_ttl):
|
|
||||||
"""Clear expired alarm history data from the backend storage system.
|
|
||||||
|
|
||||||
Clearing occurs according to the time-to-live.
|
|
||||||
|
|
||||||
:param alarm_history_ttl: Number of seconds to keep alarm history
|
|
||||||
records for.
|
|
||||||
"""
|
|
||||||
LOG.debug("Clearing expired alarm history data is based on native "
|
|
||||||
"MongoDB time to live feature and going in background.")
|
|
@ -30,6 +30,7 @@ from sqlalchemy import desc
|
|||||||
from sqlalchemy import func
|
from sqlalchemy import func
|
||||||
from sqlalchemy.orm import exc
|
from sqlalchemy.orm import exc
|
||||||
|
|
||||||
|
import aodh
|
||||||
from aodh.i18n import _LI
|
from aodh.i18n import _LI
|
||||||
from aodh import storage
|
from aodh import storage
|
||||||
from aodh.storage import base
|
from aodh.storage import base
|
||||||
@ -149,7 +150,7 @@ class Connection(base.Connection):
|
|||||||
return (self._row_to_alarm_model(x) for x in query.all())
|
return (self._row_to_alarm_model(x) for x in query.all())
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _get_pagination_query(query, pagination, api_model, model):
|
def _get_pagination_query(session, query, pagination, api_model, model):
|
||||||
if not pagination.get('sort'):
|
if not pagination.get('sort'):
|
||||||
pagination['sort'] = api_model.DEFAULT_SORT
|
pagination['sort'] = api_model.DEFAULT_SORT
|
||||||
marker = None
|
marker = None
|
||||||
@ -168,6 +169,9 @@ class Connection(base.Connection):
|
|||||||
# order when "severity" specified in sorts.
|
# order when "severity" specified in sorts.
|
||||||
for sort_key, sort_dir in pagination['sort'][::-1]:
|
for sort_key, sort_dir in pagination['sort'][::-1]:
|
||||||
if sort_key == 'severity':
|
if sort_key == 'severity':
|
||||||
|
engine = session.connection()
|
||||||
|
if engine.dialect.name != "mysql":
|
||||||
|
raise aodh.NotImplementedError
|
||||||
sort_dir_func = {'asc': asc, 'desc': desc}[sort_dir]
|
sort_dir_func = {'asc': asc, 'desc': desc}[sort_dir]
|
||||||
query = query.order_by(sort_dir_func(
|
query = query.order_by(sort_dir_func(
|
||||||
func.field(getattr(model, sort_key), 'low',
|
func.field(getattr(model, sort_key), 'low',
|
||||||
@ -222,7 +226,7 @@ class Connection(base.Connection):
|
|||||||
query = query.filter(getattr(models.Alarm, key) != value)
|
query = query.filter(getattr(models.Alarm, key) != value)
|
||||||
|
|
||||||
query = self._get_pagination_query(
|
query = self._get_pagination_query(
|
||||||
query, pagination, alarm_api_models.Alarm, models.Alarm)
|
session, query, pagination, alarm_api_models.Alarm, models.Alarm)
|
||||||
alarms = self._retrieve_alarms(query)
|
alarms = self._retrieve_alarms(query)
|
||||||
|
|
||||||
# TODO(cmart): improve this by using sqlalchemy.func factory
|
# TODO(cmart): improve this by using sqlalchemy.func factory
|
||||||
@ -360,7 +364,7 @@ class Connection(base.Connection):
|
|||||||
models.AlarmChange.timestamp < end_timestamp)
|
models.AlarmChange.timestamp < end_timestamp)
|
||||||
|
|
||||||
query = self._get_pagination_query(
|
query = self._get_pagination_query(
|
||||||
query, pagination, alarm_api_models.AlarmChange,
|
session, query, pagination, alarm_api_models.AlarmChange,
|
||||||
models.AlarmChange)
|
models.AlarmChange)
|
||||||
return self._retrieve_alarm_history(query)
|
return self._retrieve_alarm_history(query)
|
||||||
|
|
||||||
|
@ -1,297 +0,0 @@
|
|||||||
#
|
|
||||||
# Copyright Ericsson AB 2013. All rights reserved
|
|
||||||
# Copyright 2015 Red Hat, Inc
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
"""Common functions for MongoDB backend
|
|
||||||
"""
|
|
||||||
|
|
||||||
import weakref
|
|
||||||
|
|
||||||
from oslo_log import log
|
|
||||||
from oslo_utils import netutils
|
|
||||||
import pymongo
|
|
||||||
import retrying
|
|
||||||
|
|
||||||
from aodh.i18n import _LI, _LW
|
|
||||||
|
|
||||||
LOG = log.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
def make_timestamp_range(start, end,
|
|
||||||
start_timestamp_op=None, end_timestamp_op=None):
|
|
||||||
|
|
||||||
"""Create the query document to find timestamps within that range.
|
|
||||||
|
|
||||||
This is done by given two possible datetimes and their operations.
|
|
||||||
By default, using $gte for the lower bound and $lt for the upper bound.
|
|
||||||
"""
|
|
||||||
ts_range = {}
|
|
||||||
|
|
||||||
if start:
|
|
||||||
if start_timestamp_op == 'gt':
|
|
||||||
start_timestamp_op = '$gt'
|
|
||||||
else:
|
|
||||||
start_timestamp_op = '$gte'
|
|
||||||
ts_range[start_timestamp_op] = start
|
|
||||||
|
|
||||||
if end:
|
|
||||||
if end_timestamp_op == 'le':
|
|
||||||
end_timestamp_op = '$lte'
|
|
||||||
else:
|
|
||||||
end_timestamp_op = '$lt'
|
|
||||||
ts_range[end_timestamp_op] = end
|
|
||||||
return ts_range
|
|
||||||
|
|
||||||
|
|
||||||
class ConnectionPool(object):
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
self._pool = {}
|
|
||||||
|
|
||||||
def connect(self, url, max_retries, retry_interval):
|
|
||||||
connection_options = pymongo.uri_parser.parse_uri(url)
|
|
||||||
del connection_options['database']
|
|
||||||
del connection_options['username']
|
|
||||||
del connection_options['password']
|
|
||||||
del connection_options['collection']
|
|
||||||
pool_key = tuple(connection_options)
|
|
||||||
|
|
||||||
if pool_key in self._pool:
|
|
||||||
client = self._pool.get(pool_key)()
|
|
||||||
if client:
|
|
||||||
return client
|
|
||||||
splitted_url = netutils.urlsplit(url)
|
|
||||||
log_data = {'db': splitted_url.scheme,
|
|
||||||
'nodelist': connection_options['nodelist']}
|
|
||||||
LOG.info(_LI('Connecting to %(db)s on %(nodelist)s'), log_data)
|
|
||||||
try:
|
|
||||||
client = MongoProxy(
|
|
||||||
pymongo.MongoClient(url),
|
|
||||||
max_retries,
|
|
||||||
retry_interval,
|
|
||||||
)
|
|
||||||
except pymongo.errors.ConnectionFailure as e:
|
|
||||||
LOG.warning(_LW('Unable to connect to the database server: '
|
|
||||||
'%(errmsg)s.'), {'errmsg': e})
|
|
||||||
raise
|
|
||||||
self._pool[pool_key] = weakref.ref(client)
|
|
||||||
return client
|
|
||||||
|
|
||||||
|
|
||||||
class QueryTransformer(object):
|
|
||||||
|
|
||||||
operators = {"<": "$lt",
|
|
||||||
">": "$gt",
|
|
||||||
"<=": "$lte",
|
|
||||||
"=<": "$lte",
|
|
||||||
">=": "$gte",
|
|
||||||
"=>": "$gte",
|
|
||||||
"!=": "$ne",
|
|
||||||
"in": "$in",
|
|
||||||
"=~": "$regex"}
|
|
||||||
|
|
||||||
complex_operators = {"or": "$or",
|
|
||||||
"and": "$and"}
|
|
||||||
|
|
||||||
ordering_functions = {"asc": pymongo.ASCENDING,
|
|
||||||
"desc": pymongo.DESCENDING}
|
|
||||||
|
|
||||||
def transform_orderby(self, orderby):
|
|
||||||
orderby_filter = []
|
|
||||||
|
|
||||||
for field in orderby:
|
|
||||||
field_name = list(field.keys())[0]
|
|
||||||
ordering = self.ordering_functions[list(field.values())[0]]
|
|
||||||
orderby_filter.append((field_name, ordering))
|
|
||||||
return orderby_filter
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _move_negation_to_leaf(condition):
|
|
||||||
"""Moves every not operator to the leafs.
|
|
||||||
|
|
||||||
Moving is going by applying the De Morgan rules and annihilating
|
|
||||||
double negations.
|
|
||||||
"""
|
|
||||||
def _apply_de_morgan(tree, negated_subtree, negated_op):
|
|
||||||
if negated_op == "and":
|
|
||||||
new_op = "or"
|
|
||||||
else:
|
|
||||||
new_op = "and"
|
|
||||||
|
|
||||||
tree[new_op] = [{"not": child}
|
|
||||||
for child in negated_subtree[negated_op]]
|
|
||||||
del tree["not"]
|
|
||||||
|
|
||||||
def transform(subtree):
|
|
||||||
op = list(subtree.keys())[0]
|
|
||||||
if op in ["and", "or"]:
|
|
||||||
[transform(child) for child in subtree[op]]
|
|
||||||
elif op == "not":
|
|
||||||
negated_tree = subtree[op]
|
|
||||||
negated_op = list(negated_tree.keys())[0]
|
|
||||||
if negated_op == "and":
|
|
||||||
_apply_de_morgan(subtree, negated_tree, negated_op)
|
|
||||||
transform(subtree)
|
|
||||||
elif negated_op == "or":
|
|
||||||
_apply_de_morgan(subtree, negated_tree, negated_op)
|
|
||||||
transform(subtree)
|
|
||||||
elif negated_op == "not":
|
|
||||||
# two consecutive not annihilates themselves
|
|
||||||
value = list(negated_tree.values())[0]
|
|
||||||
new_op = list(value.keys())[0]
|
|
||||||
subtree[new_op] = negated_tree[negated_op][new_op]
|
|
||||||
del subtree["not"]
|
|
||||||
transform(subtree)
|
|
||||||
|
|
||||||
transform(condition)
|
|
||||||
|
|
||||||
def transform_filter(self, condition):
|
|
||||||
# in Mongo not operator can only be applied to
|
|
||||||
# simple expressions so we have to move every
|
|
||||||
# not operator to the leafs of the expression tree
|
|
||||||
self._move_negation_to_leaf(condition)
|
|
||||||
return self._process_json_tree(condition)
|
|
||||||
|
|
||||||
def _handle_complex_op(self, complex_op, nodes):
|
|
||||||
element_list = []
|
|
||||||
for node in nodes:
|
|
||||||
element = self._process_json_tree(node)
|
|
||||||
element_list.append(element)
|
|
||||||
complex_operator = self.complex_operators[complex_op]
|
|
||||||
op = {complex_operator: element_list}
|
|
||||||
return op
|
|
||||||
|
|
||||||
def _handle_not_op(self, negated_tree):
|
|
||||||
# assumes that not is moved to the leaf already
|
|
||||||
# so we are next to a leaf
|
|
||||||
negated_op = list(negated_tree.keys())[0]
|
|
||||||
negated_field = list(negated_tree[negated_op].keys())[0]
|
|
||||||
value = negated_tree[negated_op][negated_field]
|
|
||||||
if negated_op == "=":
|
|
||||||
return {negated_field: {"$ne": value}}
|
|
||||||
elif negated_op == "!=":
|
|
||||||
return {negated_field: value}
|
|
||||||
else:
|
|
||||||
return {negated_field: {"$not":
|
|
||||||
{self.operators[negated_op]: value}}}
|
|
||||||
|
|
||||||
def _handle_simple_op(self, simple_op, nodes):
|
|
||||||
field_name = list(nodes.keys())[0]
|
|
||||||
field_value = list(nodes.values())[0]
|
|
||||||
|
|
||||||
# no operator for equal in Mongo
|
|
||||||
if simple_op == "=":
|
|
||||||
op = {field_name: field_value}
|
|
||||||
return op
|
|
||||||
|
|
||||||
operator = self.operators[simple_op]
|
|
||||||
op = {field_name: {operator: field_value}}
|
|
||||||
return op
|
|
||||||
|
|
||||||
def _process_json_tree(self, condition_tree):
|
|
||||||
operator_node = list(condition_tree.keys())[0]
|
|
||||||
nodes = list(condition_tree.values())[0]
|
|
||||||
|
|
||||||
if operator_node in self.complex_operators:
|
|
||||||
return self._handle_complex_op(operator_node, nodes)
|
|
||||||
|
|
||||||
if operator_node == "not":
|
|
||||||
negated_tree = condition_tree[operator_node]
|
|
||||||
return self._handle_not_op(negated_tree)
|
|
||||||
|
|
||||||
return self._handle_simple_op(operator_node, nodes)
|
|
||||||
|
|
||||||
MONGO_METHODS = set([typ for typ in dir(pymongo.collection.Collection)
|
|
||||||
if not typ.startswith('_')])
|
|
||||||
MONGO_METHODS.update(set([typ for typ in dir(pymongo.MongoClient)
|
|
||||||
if not typ.startswith('_')]))
|
|
||||||
MONGO_METHODS.update(set([typ for typ in dir(pymongo)
|
|
||||||
if not typ.startswith('_')]))
|
|
||||||
|
|
||||||
|
|
||||||
def _safe_mongo_call(max_retries, retry_interval):
|
|
||||||
return retrying.retry(
|
|
||||||
retry_on_exception=lambda e: isinstance(
|
|
||||||
e, pymongo.errors.AutoReconnect),
|
|
||||||
wait_fixed=retry_interval * 1000,
|
|
||||||
stop_max_attempt_number=max_retries if max_retries >= 0 else None
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class MongoProxy(object):
|
|
||||||
def __init__(self, conn, max_retries, retry_interval):
|
|
||||||
self.conn = conn
|
|
||||||
self.max_retries = max_retries
|
|
||||||
self.retry_interval = retry_interval
|
|
||||||
|
|
||||||
def __getitem__(self, item):
|
|
||||||
"""Create and return proxy around the method in the connection.
|
|
||||||
|
|
||||||
:param item: name of the connection
|
|
||||||
"""
|
|
||||||
return MongoProxy(self.conn[item])
|
|
||||||
|
|
||||||
def find(self, *args, **kwargs):
|
|
||||||
# We need this modifying method to return a CursorProxy object so that
|
|
||||||
# we can handle the Cursor next function to catch the AutoReconnect
|
|
||||||
# exception.
|
|
||||||
return CursorProxy(self.conn.find(*args, **kwargs),
|
|
||||||
self.max_retries,
|
|
||||||
self.retry_interval)
|
|
||||||
|
|
||||||
def __getattr__(self, item):
|
|
||||||
"""Wrap MongoDB connection.
|
|
||||||
|
|
||||||
If item is the name of an executable method, for example find or
|
|
||||||
insert, wrap this method to retry.
|
|
||||||
Else wrap getting attribute with MongoProxy.
|
|
||||||
"""
|
|
||||||
if item in ('name', 'database'):
|
|
||||||
return getattr(self.conn, item)
|
|
||||||
if item in MONGO_METHODS:
|
|
||||||
return _safe_mongo_call(
|
|
||||||
self.max_retries,
|
|
||||||
self.retry_interval,
|
|
||||||
)(getattr(self.conn, item))
|
|
||||||
return MongoProxy(getattr(self.conn, item),
|
|
||||||
self.max_retries,
|
|
||||||
self.retry_interval)
|
|
||||||
|
|
||||||
def __call__(self, *args, **kwargs):
|
|
||||||
return self.conn(*args, **kwargs)
|
|
||||||
|
|
||||||
|
|
||||||
class CursorProxy(pymongo.cursor.Cursor):
|
|
||||||
def __init__(self, cursor, max_retries, retry_interval):
|
|
||||||
self.cursor = cursor
|
|
||||||
self.next = _safe_mongo_call(
|
|
||||||
max_retries, retry_interval)(self._next)
|
|
||||||
|
|
||||||
def __getitem__(self, item):
|
|
||||||
return self.cursor[item]
|
|
||||||
|
|
||||||
def _next(self):
|
|
||||||
"""Wrap Cursor next method.
|
|
||||||
|
|
||||||
This method will be executed before each Cursor next method call.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
save_cursor = self.cursor.clone()
|
|
||||||
return self.cursor.next()
|
|
||||||
except pymongo.errors.AutoReconnect:
|
|
||||||
self.cursor = save_cursor
|
|
||||||
raise
|
|
||||||
|
|
||||||
def __getattr__(self, item):
|
|
||||||
return getattr(self.cursor, item)
|
|
@ -1,317 +0,0 @@
|
|||||||
#
|
|
||||||
# Copyright Ericsson AB 2013. All rights reserved
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
"""Common functions for MongoDB backend
|
|
||||||
"""
|
|
||||||
import pymongo
|
|
||||||
import six
|
|
||||||
|
|
||||||
import aodh
|
|
||||||
from aodh import storage
|
|
||||||
from aodh.storage import base
|
|
||||||
from aodh.storage import models
|
|
||||||
from aodh.storage.mongo import utils as pymongo_utils
|
|
||||||
|
|
||||||
|
|
||||||
COMMON_AVAILABLE_CAPABILITIES = {
|
|
||||||
'alarms': {'query': {'simple': True,
|
|
||||||
'complex': True},
|
|
||||||
'history': {'query': {'simple': True,
|
|
||||||
'complex': True}}},
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
AVAILABLE_STORAGE_CAPABILITIES = {
|
|
||||||
'storage': {'production_ready': True},
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
class Connection(base.Connection):
|
|
||||||
"""Base Alarm Connection class for MongoDB driver."""
|
|
||||||
CAPABILITIES = base.update_nested(base.Connection.CAPABILITIES,
|
|
||||||
COMMON_AVAILABLE_CAPABILITIES)
|
|
||||||
|
|
||||||
STORAGE_CAPABILITIES = base.update_nested(
|
|
||||||
base.Connection.STORAGE_CAPABILITIES,
|
|
||||||
AVAILABLE_STORAGE_CAPABILITIES,
|
|
||||||
)
|
|
||||||
|
|
||||||
def upgrade(self):
|
|
||||||
# create collection if not present
|
|
||||||
if 'alarm' not in self.db.conn.collection_names():
|
|
||||||
self.db.conn.create_collection('alarm')
|
|
||||||
if 'alarm_history' not in self.db.conn.collection_names():
|
|
||||||
self.db.conn.create_collection('alarm_history')
|
|
||||||
|
|
||||||
def update_alarm(self, alarm, upsert=False):
|
|
||||||
"""Update alarm."""
|
|
||||||
data = alarm.as_dict()
|
|
||||||
|
|
||||||
self.db.alarm.update(
|
|
||||||
{'alarm_id': alarm.alarm_id},
|
|
||||||
{'$set': data},
|
|
||||||
upsert=upsert)
|
|
||||||
|
|
||||||
alarms_found = self.db.alarm.find({'alarm_id': alarm.alarm_id})
|
|
||||||
if alarms_found.count() == 0:
|
|
||||||
raise storage.AlarmNotFound(alarm.alarm_id)
|
|
||||||
stored_alarm = alarms_found[0]
|
|
||||||
del stored_alarm['_id']
|
|
||||||
self._ensure_encapsulated_rule_format(stored_alarm)
|
|
||||||
self._ensure_time_constraints(stored_alarm)
|
|
||||||
return models.Alarm(**stored_alarm)
|
|
||||||
|
|
||||||
def create_alarm(self, alarm):
|
|
||||||
return self.update_alarm(alarm, upsert=True)
|
|
||||||
|
|
||||||
def delete_alarm(self, alarm_id):
|
|
||||||
"""Delete an alarm and its history data."""
|
|
||||||
self.db.alarm.remove({'alarm_id': alarm_id})
|
|
||||||
self.db.alarm_history.remove({'alarm_id': alarm_id})
|
|
||||||
|
|
||||||
def record_alarm_change(self, alarm_change):
|
|
||||||
"""Record alarm change event."""
|
|
||||||
self.db.alarm_history.insert(alarm_change.copy())
|
|
||||||
|
|
||||||
def get_alarms(self, name=None, user=None, state=None, meter=None,
|
|
||||||
project=None, enabled=None, alarm_id=None,
|
|
||||||
alarm_type=None, severity=None, exclude=None,
|
|
||||||
pagination=None):
|
|
||||||
"""Yields a lists of alarms that match filters.
|
|
||||||
|
|
||||||
:param name: Optional name for alarm.
|
|
||||||
:param user: Optional ID for user that owns the resource.
|
|
||||||
:param state: Optional string for alarm state.
|
|
||||||
:param meter: Optional string for alarms associated with meter.
|
|
||||||
:param project: Optional ID for project that owns the resource.
|
|
||||||
:param enabled: Optional boolean to list disable alarm.
|
|
||||||
:param alarm_id: Optional alarm_id to return one alarm.
|
|
||||||
:param alarm_type: Optional alarm type.
|
|
||||||
:param severity: Optional alarm severity.
|
|
||||||
:param exclude: Optional dict for inequality constraint.
|
|
||||||
:param pagination: Pagination query parameters.
|
|
||||||
"""
|
|
||||||
if pagination:
|
|
||||||
raise aodh.NotImplementedError('Pagination query not implemented')
|
|
||||||
q = {}
|
|
||||||
if user is not None:
|
|
||||||
q['user_id'] = user
|
|
||||||
if project is not None:
|
|
||||||
q['project_id'] = project
|
|
||||||
if name is not None:
|
|
||||||
q['name'] = name
|
|
||||||
if enabled is not None:
|
|
||||||
q['enabled'] = enabled
|
|
||||||
if alarm_id is not None:
|
|
||||||
q['alarm_id'] = alarm_id
|
|
||||||
if state is not None:
|
|
||||||
q['state'] = state
|
|
||||||
if meter is not None:
|
|
||||||
q['rule.meter_name'] = meter
|
|
||||||
if alarm_type is not None:
|
|
||||||
q['type'] = alarm_type
|
|
||||||
if severity is not None:
|
|
||||||
q['severity'] = severity
|
|
||||||
if exclude is not None:
|
|
||||||
for key, value in six.iteritems(exclude):
|
|
||||||
q[key] = {'$ne': value}
|
|
||||||
|
|
||||||
return self._retrieve_alarms(q,
|
|
||||||
[("timestamp",
|
|
||||||
pymongo.DESCENDING)],
|
|
||||||
None)
|
|
||||||
|
|
||||||
def get_alarm_changes(self, alarm_id, on_behalf_of,
|
|
||||||
user=None, project=None, alarm_type=None,
|
|
||||||
severity=None, start_timestamp=None,
|
|
||||||
start_timestamp_op=None, end_timestamp=None,
|
|
||||||
end_timestamp_op=None, pagination=None):
|
|
||||||
"""Yields list of AlarmChanges describing alarm history
|
|
||||||
|
|
||||||
Changes are always sorted in reverse order of occurrence, given
|
|
||||||
the importance of currency.
|
|
||||||
|
|
||||||
Segregation for non-administrative users is done on the basis
|
|
||||||
of the on_behalf_of parameter. This allows such users to have
|
|
||||||
visibility on both the changes initiated by themselves directly
|
|
||||||
(generally creation, rule changes, or deletion) and also on those
|
|
||||||
changes initiated on their behalf by the alarming service (state
|
|
||||||
transitions after alarm thresholds are crossed).
|
|
||||||
|
|
||||||
:param alarm_id: ID of alarm to return changes for
|
|
||||||
:param on_behalf_of: ID of tenant to scope changes query (None for
|
|
||||||
administrative user, indicating all projects)
|
|
||||||
:param user: Optional ID of user to return changes for
|
|
||||||
:param project: Optional ID of project to return changes for
|
|
||||||
:param alarm_type: Optional change type
|
|
||||||
:param severity: Optional change severity
|
|
||||||
:param start_timestamp: Optional modified timestamp start range
|
|
||||||
:param start_timestamp_op: Optional timestamp start range operation
|
|
||||||
:param end_timestamp: Optional modified timestamp end range
|
|
||||||
:param end_timestamp_op: Optional timestamp end range operation
|
|
||||||
:param pagination: Pagination query parameters.
|
|
||||||
"""
|
|
||||||
if pagination:
|
|
||||||
raise aodh.NotImplementedError('Pagination query not implemented')
|
|
||||||
q = dict(alarm_id=alarm_id)
|
|
||||||
if on_behalf_of is not None:
|
|
||||||
q['on_behalf_of'] = on_behalf_of
|
|
||||||
if user is not None:
|
|
||||||
q['user_id'] = user
|
|
||||||
if project is not None:
|
|
||||||
q['project_id'] = project
|
|
||||||
if alarm_type is not None:
|
|
||||||
q['type'] = alarm_type
|
|
||||||
if severity is not None:
|
|
||||||
q['severity'] = severity
|
|
||||||
if start_timestamp or end_timestamp:
|
|
||||||
ts_range = pymongo_utils.make_timestamp_range(start_timestamp,
|
|
||||||
end_timestamp,
|
|
||||||
start_timestamp_op,
|
|
||||||
end_timestamp_op)
|
|
||||||
if ts_range:
|
|
||||||
q['timestamp'] = ts_range
|
|
||||||
|
|
||||||
return self._retrieve_alarm_changes(q,
|
|
||||||
[("timestamp",
|
|
||||||
pymongo.DESCENDING)],
|
|
||||||
None)
|
|
||||||
|
|
||||||
def query_alarms(self, filter_expr=None, orderby=None, limit=None):
|
|
||||||
"""Return an iterable of model.Alarm objects."""
|
|
||||||
return self._retrieve_data(filter_expr, orderby, limit,
|
|
||||||
models.Alarm)
|
|
||||||
|
|
||||||
def query_alarm_history(self, filter_expr=None, orderby=None, limit=None):
|
|
||||||
"""Return an iterable of model.AlarmChange objects."""
|
|
||||||
return self._retrieve_data(filter_expr,
|
|
||||||
orderby,
|
|
||||||
limit,
|
|
||||||
models.AlarmChange)
|
|
||||||
|
|
||||||
def _retrieve_data(self, filter_expr, orderby, limit, model):
|
|
||||||
if limit == 0:
|
|
||||||
return []
|
|
||||||
query_filter = {}
|
|
||||||
orderby_filter = [("timestamp", pymongo.DESCENDING)]
|
|
||||||
transformer = pymongo_utils.QueryTransformer()
|
|
||||||
if orderby is not None:
|
|
||||||
orderby_filter = transformer.transform_orderby(orderby)
|
|
||||||
if filter_expr is not None:
|
|
||||||
query_filter = transformer.transform_filter(filter_expr)
|
|
||||||
|
|
||||||
retrieve = {models.Alarm: self._retrieve_alarms,
|
|
||||||
models.AlarmChange: self._retrieve_alarm_changes}
|
|
||||||
return retrieve[model](query_filter, orderby_filter, limit)
|
|
||||||
|
|
||||||
def _retrieve_alarms(self, query_filter, orderby, limit):
|
|
||||||
if limit is not None:
|
|
||||||
alarms = self.db.alarm.find(query_filter,
|
|
||||||
limit=limit,
|
|
||||||
sort=orderby)
|
|
||||||
else:
|
|
||||||
alarms = self.db.alarm.find(query_filter, sort=orderby)
|
|
||||||
|
|
||||||
for alarm in alarms:
|
|
||||||
a = {}
|
|
||||||
a.update(alarm)
|
|
||||||
del a['_id']
|
|
||||||
self._ensure_encapsulated_rule_format(a)
|
|
||||||
self._ensure_time_constraints(a)
|
|
||||||
yield models.Alarm(**a)
|
|
||||||
|
|
||||||
def _retrieve_alarm_changes(self, query_filter, orderby, limit):
|
|
||||||
if limit is not None:
|
|
||||||
alarms_history = self.db.alarm_history.find(query_filter,
|
|
||||||
limit=limit,
|
|
||||||
sort=orderby)
|
|
||||||
else:
|
|
||||||
alarms_history = self.db.alarm_history.find(
|
|
||||||
query_filter, sort=orderby)
|
|
||||||
|
|
||||||
for alarm_history in alarms_history:
|
|
||||||
ah = {}
|
|
||||||
ah.update(alarm_history)
|
|
||||||
del ah['_id']
|
|
||||||
yield models.AlarmChange(**ah)
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def _ensure_encapsulated_rule_format(cls, alarm):
|
|
||||||
"""Ensure the alarm returned by the storage have the correct format.
|
|
||||||
|
|
||||||
The previous format looks like:
|
|
||||||
{
|
|
||||||
'alarm_id': '0ld-4l3rt',
|
|
||||||
'enabled': True,
|
|
||||||
'name': 'old-alert',
|
|
||||||
'description': 'old-alert',
|
|
||||||
'timestamp': None,
|
|
||||||
'meter_name': 'cpu',
|
|
||||||
'user_id': 'me',
|
|
||||||
'project_id': 'and-da-boys',
|
|
||||||
'comparison_operator': 'lt',
|
|
||||||
'threshold': 36,
|
|
||||||
'statistic': 'count',
|
|
||||||
'evaluation_periods': 1,
|
|
||||||
'period': 60,
|
|
||||||
'state': "insufficient data",
|
|
||||||
'state_timestamp': None,
|
|
||||||
'ok_actions': [],
|
|
||||||
'alarm_actions': ['http://nowhere/alarms'],
|
|
||||||
'insufficient_data_actions': [],
|
|
||||||
'repeat_actions': False,
|
|
||||||
'matching_metadata': {'key': 'value'}
|
|
||||||
# or 'matching_metadata': [{'key': 'key', 'value': 'value'}]
|
|
||||||
}
|
|
||||||
"""
|
|
||||||
|
|
||||||
if isinstance(alarm.get('rule'), dict):
|
|
||||||
return
|
|
||||||
|
|
||||||
alarm['type'] = 'threshold'
|
|
||||||
alarm['rule'] = {}
|
|
||||||
alarm['matching_metadata'] = cls._decode_matching_metadata(
|
|
||||||
alarm['matching_metadata'])
|
|
||||||
for field in ['period', 'evaluation_periods', 'threshold',
|
|
||||||
'statistic', 'comparison_operator', 'meter_name']:
|
|
||||||
if field in alarm:
|
|
||||||
alarm['rule'][field] = alarm[field]
|
|
||||||
del alarm[field]
|
|
||||||
|
|
||||||
query = []
|
|
||||||
for key in alarm['matching_metadata']:
|
|
||||||
query.append({'field': key,
|
|
||||||
'op': 'eq',
|
|
||||||
'value': alarm['matching_metadata'][key],
|
|
||||||
'type': 'string'})
|
|
||||||
del alarm['matching_metadata']
|
|
||||||
alarm['rule']['query'] = query
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _decode_matching_metadata(matching_metadata):
|
|
||||||
if isinstance(matching_metadata, dict):
|
|
||||||
# note(sileht): keep compatibility with alarm
|
|
||||||
# with matching_metadata as a dict
|
|
||||||
return matching_metadata
|
|
||||||
else:
|
|
||||||
new_matching_metadata = {}
|
|
||||||
for elem in matching_metadata:
|
|
||||||
new_matching_metadata[elem['key']] = elem['value']
|
|
||||||
return new_matching_metadata
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _ensure_time_constraints(alarm):
|
|
||||||
"""Ensures the alarm has a time constraints field."""
|
|
||||||
if 'time_constraints' not in alarm:
|
|
||||||
alarm['time_constraints'] = []
|
|
@ -30,7 +30,6 @@ from aodh import messaging
|
|||||||
from aodh.storage import models
|
from aodh.storage import models
|
||||||
from aodh.tests import constants
|
from aodh.tests import constants
|
||||||
from aodh.tests.functional.api import v2
|
from aodh.tests.functional.api import v2
|
||||||
from aodh.tests.functional import db as tests_db
|
|
||||||
|
|
||||||
|
|
||||||
def default_alarms(auth_headers):
|
def default_alarms(auth_headers):
|
||||||
@ -1365,7 +1364,7 @@ class TestAlarms(TestAlarmsBase):
|
|||||||
self.assertEqual(1, len(alarms))
|
self.assertEqual(1, len(alarms))
|
||||||
|
|
||||||
# FIXME(sileht): This should really returns [] not None
|
# FIXME(sileht): This should really returns [] not None
|
||||||
# but the mongodb and sql just store the json dict as is...
|
# but SQL just stores the json dict as is...
|
||||||
# migration script for sql will be a mess because we have
|
# migration script for sql will be a mess because we have
|
||||||
# to parse all JSON :(
|
# to parse all JSON :(
|
||||||
# I guess we assume that wsme convert the None input to []
|
# I guess we assume that wsme convert the None input to []
|
||||||
@ -3300,7 +3299,6 @@ class TestAlarmsCompositeRule(TestAlarmsBase):
|
|||||||
response.json['error_message']['faultstring'])
|
response.json['error_message']['faultstring'])
|
||||||
|
|
||||||
|
|
||||||
@tests_db.run_with('mysql', 'pgsql', 'sqlite')
|
|
||||||
class TestPaginationQuery(TestAlarmsBase):
|
class TestPaginationQuery(TestAlarmsBase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(TestPaginationQuery, self).setUp()
|
super(TestPaginationQuery, self).setUp()
|
||||||
@ -3318,6 +3316,8 @@ class TestPaginationQuery(TestAlarmsBase):
|
|||||||
self.assertEqual(['name1', 'name2', 'name3', 'name4'], names)
|
self.assertEqual(['name1', 'name2', 'name3', 'name4'], names)
|
||||||
|
|
||||||
def test_sort_by_severity_with_its_value(self):
|
def test_sort_by_severity_with_its_value(self):
|
||||||
|
if self.engine != "mysql":
|
||||||
|
self.skipTest("This is only implemented for MySQL")
|
||||||
data = self.get_json('/alarms?sort=severity:asc',
|
data = self.get_json('/alarms?sort=severity:asc',
|
||||||
headers=self.auth_headers)
|
headers=self.auth_headers)
|
||||||
severities = [a['severity'] for a in data]
|
severities = [a['severity'] for a in data]
|
||||||
|
@ -21,7 +21,6 @@ from oslo_utils import timeutils
|
|||||||
|
|
||||||
from aodh.storage import models
|
from aodh.storage import models
|
||||||
from aodh.tests.functional.api import v2 as tests_api
|
from aodh.tests.functional.api import v2 as tests_api
|
||||||
from aodh.tests.functional import db as tests_db
|
|
||||||
|
|
||||||
|
|
||||||
admin_header = {"X-Roles": "admin",
|
admin_header = {"X-Roles": "admin",
|
||||||
@ -195,8 +194,9 @@ class TestQueryAlarmsController(tests_api.FunctionalTest):
|
|||||||
for alarm in data.json:
|
for alarm in data.json:
|
||||||
self.assertEqual("alarm", alarm["state"])
|
self.assertEqual("alarm", alarm["state"])
|
||||||
|
|
||||||
@tests_db.run_with('mysql', 'pgsql', 'sqlite')
|
|
||||||
def test_query_with_orderby_severity(self):
|
def test_query_with_orderby_severity(self):
|
||||||
|
if self.engine != "mysql":
|
||||||
|
self.skipTest("This is only implemented for MySQL")
|
||||||
orderby = '[{"severity": "ASC"}]'
|
orderby = '[{"severity": "ASC"}]'
|
||||||
data = self.post_json(self.alarm_url,
|
data = self.post_json(self.alarm_url,
|
||||||
headers=admin_header,
|
headers=admin_header,
|
||||||
|
@ -20,29 +20,14 @@ import os
|
|||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
import fixtures
|
import fixtures
|
||||||
import mock
|
|
||||||
from oslo_config import fixture as fixture_config
|
from oslo_config import fixture as fixture_config
|
||||||
from oslotest import mockpatch
|
from oslotest import mockpatch
|
||||||
import six
|
import six
|
||||||
from six.moves.urllib import parse as urlparse
|
from six.moves.urllib import parse as urlparse
|
||||||
from testtools import testcase
|
|
||||||
|
|
||||||
from aodh import service
|
from aodh import service
|
||||||
from aodh import storage
|
from aodh import storage
|
||||||
from aodh.tests import base as test_base
|
from aodh.tests import base as test_base
|
||||||
try:
|
|
||||||
from aodh.tests import mocks
|
|
||||||
except ImportError:
|
|
||||||
mocks = None # happybase module is not Python 3 compatible yet
|
|
||||||
|
|
||||||
|
|
||||||
class MongoDbManager(fixtures.Fixture):
|
|
||||||
|
|
||||||
def __init__(self, conf):
|
|
||||||
self.url = '%(url)s_%(db)s' % {
|
|
||||||
'url': conf.database.connection,
|
|
||||||
'db': uuid.uuid4().hex,
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
class SQLManager(fixtures.Fixture):
|
class SQLManager(fixtures.Fixture):
|
||||||
@ -80,36 +65,6 @@ class MySQLManager(SQLManager):
|
|||||||
conn.execute('CREATE DATABASE %s;' % db_name)
|
conn.execute('CREATE DATABASE %s;' % db_name)
|
||||||
|
|
||||||
|
|
||||||
class HBaseManager(fixtures.Fixture):
|
|
||||||
def __init__(self, conf):
|
|
||||||
self.url = '%s?table_prefix=%s' % (
|
|
||||||
conf.database.connection,
|
|
||||||
os.getenv("AODH_TEST_HBASE_TABLE_PREFIX", "test")
|
|
||||||
)
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
super(HBaseManager, self).setUp()
|
|
||||||
# Unique prefix for each test to keep data is distinguished because
|
|
||||||
# all test data is stored in one table
|
|
||||||
data_prefix = str(uuid.uuid4().hex)
|
|
||||||
|
|
||||||
def table(conn, name):
|
|
||||||
return mocks.MockHBaseTable(name, conn, data_prefix)
|
|
||||||
|
|
||||||
# Mock only real HBase connection, MConnection "table" method
|
|
||||||
# stays origin.
|
|
||||||
mock.patch('happybase.Connection.table', new=table).start()
|
|
||||||
# We shouldn't delete data and tables after each test,
|
|
||||||
# because it last for too long.
|
|
||||||
# All tests tables will be deleted in setup-test-env.sh
|
|
||||||
mock.patch("happybase.Connection.disable_table",
|
|
||||||
new=mock.MagicMock()).start()
|
|
||||||
mock.patch("happybase.Connection.delete_table",
|
|
||||||
new=mock.MagicMock()).start()
|
|
||||||
mock.patch("happybase.Connection.create_table",
|
|
||||||
new=mock.MagicMock()).start()
|
|
||||||
|
|
||||||
|
|
||||||
class SQLiteManager(fixtures.Fixture):
|
class SQLiteManager(fixtures.Fixture):
|
||||||
|
|
||||||
def __init__(self, conf):
|
def __init__(self, conf):
|
||||||
@ -120,13 +75,10 @@ class SQLiteManager(fixtures.Fixture):
|
|||||||
class TestBase(test_base.BaseTestCase):
|
class TestBase(test_base.BaseTestCase):
|
||||||
|
|
||||||
DRIVER_MANAGERS = {
|
DRIVER_MANAGERS = {
|
||||||
'mongodb': MongoDbManager,
|
|
||||||
'mysql': MySQLManager,
|
'mysql': MySQLManager,
|
||||||
'postgresql': PgSQLManager,
|
'postgresql': PgSQLManager,
|
||||||
'sqlite': SQLiteManager,
|
'sqlite': SQLiteManager,
|
||||||
}
|
}
|
||||||
if mocks is not None:
|
|
||||||
DRIVER_MANAGERS['hbase'] = HBaseManager
|
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(TestBase, self).setUp()
|
super(TestBase, self).setUp()
|
||||||
@ -137,23 +89,16 @@ class TestBase(test_base.BaseTestCase):
|
|||||||
engine = urlparse.urlparse(db_url).scheme
|
engine = urlparse.urlparse(db_url).scheme
|
||||||
# In case some drivers have additional specification, for example:
|
# In case some drivers have additional specification, for example:
|
||||||
# PyMySQL will have scheme mysql+pymysql.
|
# PyMySQL will have scheme mysql+pymysql.
|
||||||
engine = engine.split('+')[0]
|
self.engine = engine.split('+')[0]
|
||||||
|
|
||||||
# NOTE(Alexei_987) Shortcut to skip expensive db setUp
|
|
||||||
test_method = self._get_test_method()
|
|
||||||
if (hasattr(test_method, '_run_with')
|
|
||||||
and engine not in test_method._run_with):
|
|
||||||
raise testcase.TestSkipped(
|
|
||||||
'Test is not applicable for %s' % engine)
|
|
||||||
|
|
||||||
conf = service.prepare_service(argv=[], config_files=[])
|
conf = service.prepare_service(argv=[], config_files=[])
|
||||||
self.CONF = self.useFixture(fixture_config.Config(conf)).conf
|
self.CONF = self.useFixture(fixture_config.Config(conf)).conf
|
||||||
self.CONF.set_override('connection', db_url, group="database",
|
self.CONF.set_override('connection', db_url, group="database",
|
||||||
enforce_type=True)
|
enforce_type=True)
|
||||||
|
|
||||||
manager = self.DRIVER_MANAGERS.get(engine)
|
manager = self.DRIVER_MANAGERS.get(self.engine)
|
||||||
if not manager:
|
if not manager:
|
||||||
self.skipTest("missing driver manager: %s" % engine)
|
self.skipTest("missing driver manager: %s" % self.engine)
|
||||||
|
|
||||||
self.db_manager = manager(self.CONF)
|
self.db_manager = manager(self.CONF)
|
||||||
|
|
||||||
@ -176,25 +121,3 @@ class TestBase(test_base.BaseTestCase):
|
|||||||
|
|
||||||
def _get_connection(self, conf):
|
def _get_connection(self, conf):
|
||||||
return self.alarm_conn
|
return self.alarm_conn
|
||||||
|
|
||||||
|
|
||||||
def run_with(*drivers):
|
|
||||||
"""Used to mark tests that are only applicable for certain db driver.
|
|
||||||
|
|
||||||
Skips test if driver is not available.
|
|
||||||
"""
|
|
||||||
def decorator(test):
|
|
||||||
if isinstance(test, type) and issubclass(test, TestBase):
|
|
||||||
# Decorate all test methods
|
|
||||||
for attr in dir(test):
|
|
||||||
if attr.startswith('test_'):
|
|
||||||
value = getattr(test, attr)
|
|
||||||
if callable(value):
|
|
||||||
if six.PY3:
|
|
||||||
value._run_with = drivers
|
|
||||||
else:
|
|
||||||
value.__func__._run_with = drivers
|
|
||||||
else:
|
|
||||||
test._run_with = drivers
|
|
||||||
return test
|
|
||||||
return decorator
|
|
||||||
|
@ -27,7 +27,6 @@ class ABCSkip(base.SkipNotImplementedMeta, abc.ABCMeta):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
@tests_db.run_with('mysql', 'pgsql', 'sqlite')
|
|
||||||
class ModelsMigrationsSync(
|
class ModelsMigrationsSync(
|
||||||
six.with_metaclass(ABCSkip,
|
six.with_metaclass(ABCSkip,
|
||||||
tests_db.TestBase,
|
tests_db.TestBase,
|
||||||
|
@ -1,112 +0,0 @@
|
|||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
import datetime
|
|
||||||
import uuid
|
|
||||||
|
|
||||||
import mock
|
|
||||||
from oslo_config import fixture as fixture_config
|
|
||||||
|
|
||||||
from aodh.cmd import data_migration
|
|
||||||
from aodh import service
|
|
||||||
from aodh import storage
|
|
||||||
from aodh.storage import models as alarm_models
|
|
||||||
from aodh.tests.functional import db as tests_db
|
|
||||||
from aodh.tests.functional.storage import test_storage_scenarios
|
|
||||||
|
|
||||||
|
|
||||||
@tests_db.run_with('hbase', 'mongodb')
|
|
||||||
class TestDataMigration(test_storage_scenarios.AlarmTestBase):
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
sql_conf = service.prepare_service(argv=[], config_files=[])
|
|
||||||
self.sql_conf = self.useFixture(fixture_config.Config(sql_conf)).conf
|
|
||||||
# using sqlite to represent the type of SQL dbs
|
|
||||||
self.sql_conf.set_override('connection', "sqlite://",
|
|
||||||
group="database", enforce_type=True)
|
|
||||||
self.sql_namager = tests_db.SQLiteManager(self.sql_conf)
|
|
||||||
self.useFixture(self.sql_namager)
|
|
||||||
self.sql_conf.set_override('connection', self.sql_namager.url,
|
|
||||||
group="database", enforce_type=True)
|
|
||||||
self.sql_alarm_conn = storage.get_connection_from_config(self.sql_conf)
|
|
||||||
self.sql_alarm_conn.upgrade()
|
|
||||||
super(TestDataMigration, self).setUp()
|
|
||||||
self.add_some_alarms()
|
|
||||||
self._add_some_alarm_changes()
|
|
||||||
|
|
||||||
def tearDown(self):
|
|
||||||
self.sql_alarm_conn.clear()
|
|
||||||
self.sql_alarm_conn = None
|
|
||||||
super(TestDataMigration, self).tearDown()
|
|
||||||
|
|
||||||
def _add_some_alarm_changes(self):
|
|
||||||
alarms = list(self.alarm_conn.get_alarms())
|
|
||||||
i = 0
|
|
||||||
for alarm in alarms:
|
|
||||||
for change_type in [alarm_models.AlarmChange.CREATION,
|
|
||||||
alarm_models.AlarmChange.RULE_CHANGE,
|
|
||||||
alarm_models.AlarmChange.STATE_TRANSITION,
|
|
||||||
alarm_models.AlarmChange.STATE_TRANSITION,
|
|
||||||
alarm_models.AlarmChange.STATE_TRANSITION]:
|
|
||||||
alarm_change = {
|
|
||||||
"event_id": str(uuid.uuid4()),
|
|
||||||
"alarm_id": alarm.alarm_id,
|
|
||||||
"type": change_type,
|
|
||||||
"detail": "detail %s" % alarm.name,
|
|
||||||
"user_id": alarm.user_id,
|
|
||||||
"project_id": alarm.project_id,
|
|
||||||
"on_behalf_of": alarm.project_id,
|
|
||||||
"timestamp": datetime.datetime(2014, 4, 7, 7, 30 + i)
|
|
||||||
}
|
|
||||||
self.alarm_conn.record_alarm_change(alarm_change=alarm_change)
|
|
||||||
i += 1
|
|
||||||
|
|
||||||
def test_data_migration_without_history_data(self):
|
|
||||||
alarms = list(self.alarm_conn.get_alarms())
|
|
||||||
self.assertEqual(3, len(alarms))
|
|
||||||
alarms_sql = list(self.sql_alarm_conn.get_alarms())
|
|
||||||
self.assertEqual(0, len(alarms_sql))
|
|
||||||
test_args = data_migration.get_parser().parse_args(
|
|
||||||
['--sql-conn', 'sqlite://', '--nosql-conn',
|
|
||||||
self.CONF.database.connection, '--migrate-history', False])
|
|
||||||
with mock.patch('argparse.ArgumentParser.parse_args') as args_parser:
|
|
||||||
# because get_connection_from_config has been mocked in
|
|
||||||
# aodh.tests.functional.db.TestBase#setUp, here re-mocked it that
|
|
||||||
# this test can get nosql and sql storage connections
|
|
||||||
with mock.patch('aodh.storage.get_connection_from_config') as conn:
|
|
||||||
conn.side_effect = [self.alarm_conn, self.sql_alarm_conn]
|
|
||||||
args_parser.return_value = test_args
|
|
||||||
data_migration.main()
|
|
||||||
alarms_sql = list(self.sql_alarm_conn.get_alarms())
|
|
||||||
alarm_changes = list(self.sql_alarm_conn.query_alarm_history())
|
|
||||||
self.assertEqual(0, len(alarm_changes))
|
|
||||||
self.assertEqual(3, len(alarms_sql))
|
|
||||||
self.assertEqual(sorted([a.alarm_id for a in alarms]),
|
|
||||||
sorted([a.alarm_id for a in alarms_sql]))
|
|
||||||
|
|
||||||
def test_data_migration_with_history_data(self):
|
|
||||||
test_args = data_migration.get_parser().parse_args(
|
|
||||||
['--sql-conn', 'sqlite://', '--nosql-conn',
|
|
||||||
self.CONF.database.connection])
|
|
||||||
with mock.patch('argparse.ArgumentParser.parse_args') as args_parser:
|
|
||||||
# because get_connection_from_config has been mocked in
|
|
||||||
# aodh.tests.functional.db.TestBase#setUp, here re-mocked it that
|
|
||||||
# this test can get nosql and sql storage connections
|
|
||||||
with mock.patch('aodh.storage.get_connection_from_config') as conn:
|
|
||||||
conn.side_effect = [self.alarm_conn, self.sql_alarm_conn]
|
|
||||||
args_parser.return_value = test_args
|
|
||||||
data_migration.main()
|
|
||||||
alarms_sql = list(self.sql_alarm_conn.get_alarms())
|
|
||||||
self.assertEqual(3, len(alarms_sql))
|
|
||||||
for alarm in alarms_sql:
|
|
||||||
changes = list(self.sql_alarm_conn.get_alarm_changes(
|
|
||||||
alarm.alarm_id, alarm.project_id))
|
|
||||||
self.assertEqual(5, len(changes))
|
|
@ -1,67 +0,0 @@
|
|||||||
#
|
|
||||||
# Copyright 2012, 2013 Dell Inc.
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
"""Tests for aodh/storage/impl_hbase.py
|
|
||||||
|
|
||||||
.. note::
|
|
||||||
In order to run the tests against real HBase server set the environment
|
|
||||||
variable aodh_TEST_HBASE_URL to point to that HBase instance before
|
|
||||||
running the tests. Make sure the Thrift server is running on that server.
|
|
||||||
|
|
||||||
"""
|
|
||||||
import mock
|
|
||||||
|
|
||||||
try:
|
|
||||||
import happybase # noqa
|
|
||||||
except ImportError:
|
|
||||||
import testtools.testcase
|
|
||||||
raise testtools.testcase.TestSkipped("happybase is needed")
|
|
||||||
|
|
||||||
from aodh.storage import impl_hbase
|
|
||||||
from aodh.tests import base as test_base
|
|
||||||
from aodh.tests.functional import db as tests_db
|
|
||||||
|
|
||||||
|
|
||||||
class ConnectionTest(tests_db.TestBase):
|
|
||||||
|
|
||||||
@tests_db.run_with('hbase')
|
|
||||||
def test_hbase_connection(self):
|
|
||||||
|
|
||||||
class TestConn(object):
|
|
||||||
def __init__(self, host, port):
|
|
||||||
self.netloc = '%s:%s' % (host, port)
|
|
||||||
|
|
||||||
def open(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def get_connection_pool(conf):
|
|
||||||
return TestConn(conf['host'], conf['port'])
|
|
||||||
|
|
||||||
with mock.patch.object(impl_hbase.Connection, '_get_connection_pool',
|
|
||||||
side_effect=get_connection_pool):
|
|
||||||
conn = impl_hbase.Connection(self.CONF, 'hbase://test_hbase:9090')
|
|
||||||
self.assertIsInstance(conn.conn_pool, TestConn)
|
|
||||||
|
|
||||||
|
|
||||||
class CapabilitiesTest(test_base.BaseTestCase):
|
|
||||||
def test_alarm_capabilities(self):
|
|
||||||
expected_capabilities = {
|
|
||||||
'alarms': {'query': {'simple': True,
|
|
||||||
'complex': False},
|
|
||||||
'history': {'query': {'simple': True,
|
|
||||||
'complex': False}}},
|
|
||||||
}
|
|
||||||
|
|
||||||
actual_capabilities = impl_hbase.Connection.get_capabilities()
|
|
||||||
self.assertEqual(expected_capabilities, actual_capabilities)
|
|
@ -1,101 +0,0 @@
|
|||||||
#
|
|
||||||
# Copyright 2012 New Dream Network, LLC (DreamHost)
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
"""Tests for aodh/storage/impl_mongodb.py
|
|
||||||
|
|
||||||
.. note::
|
|
||||||
In order to run the tests against another MongoDB server set the
|
|
||||||
environment variable aodh_TEST_MONGODB_URL to point to a MongoDB
|
|
||||||
server before running the tests.
|
|
||||||
|
|
||||||
"""
|
|
||||||
import unittest
|
|
||||||
|
|
||||||
try:
|
|
||||||
from aodh.storage import impl_mongodb
|
|
||||||
except ImportError:
|
|
||||||
impl_mongodb = None
|
|
||||||
from aodh.tests import base as test_base
|
|
||||||
from aodh.tests.functional import db as tests_db
|
|
||||||
|
|
||||||
|
|
||||||
@unittest.skipUnless(impl_mongodb, "pymongo not available")
|
|
||||||
@tests_db.run_with('mongodb')
|
|
||||||
class MongoDBConnection(tests_db.TestBase):
|
|
||||||
def test_connection_pooling(self):
|
|
||||||
test_conn = impl_mongodb.Connection(self.CONF,
|
|
||||||
self.CONF.database.connection)
|
|
||||||
self.assertEqual(self.alarm_conn.conn, test_conn.conn)
|
|
||||||
|
|
||||||
def test_replica_set(self):
|
|
||||||
url = self.CONF.database.connection + '?replicaSet=foobar'
|
|
||||||
conn = impl_mongodb.Connection(self.CONF, url)
|
|
||||||
self.assertTrue(conn.conn)
|
|
||||||
|
|
||||||
|
|
||||||
@unittest.skipUnless(impl_mongodb, "pymongo not available")
|
|
||||||
@tests_db.run_with('mongodb')
|
|
||||||
class IndexTest(tests_db.TestBase):
|
|
||||||
def _test_ttl_index_absent(self, conn, coll_name, ttl_opt):
|
|
||||||
# create a fake index and check it is deleted
|
|
||||||
coll = getattr(conn.db, coll_name)
|
|
||||||
index_name = '%s_ttl' % coll_name
|
|
||||||
self.CONF.set_override(ttl_opt, -1, group='database',
|
|
||||||
enforce_type=True)
|
|
||||||
conn.upgrade()
|
|
||||||
self.assertNotIn(index_name, coll.index_information())
|
|
||||||
|
|
||||||
self.CONF.set_override(ttl_opt, 456789, group='database',
|
|
||||||
enforce_type=True)
|
|
||||||
conn.upgrade()
|
|
||||||
self.assertEqual(456789,
|
|
||||||
coll.index_information()
|
|
||||||
[index_name]['expireAfterSeconds'])
|
|
||||||
|
|
||||||
def test_alarm_history_ttl_index_absent(self):
|
|
||||||
self._test_ttl_index_absent(self.alarm_conn, 'alarm_history',
|
|
||||||
'alarm_history_time_to_live')
|
|
||||||
|
|
||||||
def _test_ttl_index_present(self, conn, coll_name, ttl_opt):
|
|
||||||
coll = getattr(conn.db, coll_name)
|
|
||||||
self.CONF.set_override(ttl_opt, 456789, group='database',
|
|
||||||
enforce_type=True)
|
|
||||||
conn.upgrade()
|
|
||||||
index_name = '%s_ttl' % coll_name
|
|
||||||
self.assertEqual(456789,
|
|
||||||
coll.index_information()
|
|
||||||
[index_name]['expireAfterSeconds'])
|
|
||||||
|
|
||||||
self.CONF.set_override(ttl_opt, -1, group='database',
|
|
||||||
enforce_type=True)
|
|
||||||
conn.upgrade()
|
|
||||||
self.assertNotIn(index_name, coll.index_information())
|
|
||||||
|
|
||||||
def test_alarm_history_ttl_index_present(self):
|
|
||||||
self._test_ttl_index_present(self.alarm_conn, 'alarm_history',
|
|
||||||
'alarm_history_time_to_live')
|
|
||||||
|
|
||||||
|
|
||||||
class CapabilitiesTest(test_base.BaseTestCase):
|
|
||||||
@unittest.skipUnless(impl_mongodb, "pymongo not available")
|
|
||||||
def test_alarm_capabilities(self):
|
|
||||||
expected_capabilities = {
|
|
||||||
'alarms': {'query': {'simple': True,
|
|
||||||
'complex': True},
|
|
||||||
'history': {'query': {'simple': True,
|
|
||||||
'complex': True}}},
|
|
||||||
}
|
|
||||||
|
|
||||||
actual_capabilities = impl_mongodb.Connection.get_capabilities()
|
|
||||||
self.assertEqual(expected_capabilities, actual_capabilities)
|
|
@ -265,7 +265,6 @@ class AlarmTest(AlarmTestBase):
|
|||||||
self.assertNotEqual(victim.name, s.name)
|
self.assertNotEqual(victim.name, s.name)
|
||||||
|
|
||||||
|
|
||||||
@tests_db.run_with('sqlite', 'mysql', 'pgsql', 'hbase')
|
|
||||||
class AlarmHistoryTest(AlarmTestBase):
|
class AlarmHistoryTest(AlarmTestBase):
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
|
@ -1,79 +0,0 @@
|
|||||||
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
|
|
||||||
import os
|
|
||||||
|
|
||||||
import happybase
|
|
||||||
|
|
||||||
|
|
||||||
class MockHBaseTable(happybase.Table):
|
|
||||||
|
|
||||||
def __init__(self, name, connection, data_prefix):
|
|
||||||
# data_prefix is added to all rows which are written
|
|
||||||
# in this test. It allows to divide data from different tests
|
|
||||||
self.data_prefix = data_prefix
|
|
||||||
# We create happybase Table with prefix from
|
|
||||||
# AODH_TEST_HBASE_TABLE_PREFIX
|
|
||||||
prefix = os.getenv("AODH_TEST_HBASE_TABLE_PREFIX", 'test')
|
|
||||||
super(MockHBaseTable, self).__init__(
|
|
||||||
"%s_%s" % (prefix, name),
|
|
||||||
connection)
|
|
||||||
|
|
||||||
def put(self, row, *args, **kwargs):
|
|
||||||
row = self.data_prefix + row
|
|
||||||
return super(MockHBaseTable, self).put(row, *args,
|
|
||||||
**kwargs)
|
|
||||||
|
|
||||||
def scan(self, row_start=None, row_stop=None, row_prefix=None,
|
|
||||||
columns=None, filter=None, timestamp=None,
|
|
||||||
include_timestamp=False, batch_size=10, scan_batching=None,
|
|
||||||
limit=None, sorted_columns=False):
|
|
||||||
# Add data prefix for row parameters
|
|
||||||
# row_prefix could not be combined with row_start or row_stop
|
|
||||||
if not row_start and not row_stop:
|
|
||||||
row_prefix = self.data_prefix + (row_prefix or "")
|
|
||||||
row_start = None
|
|
||||||
row_stop = None
|
|
||||||
elif row_start and not row_stop:
|
|
||||||
# Adding data_prefix to row_start and row_stop does not work
|
|
||||||
# if it looks like row_start = %data_prefix%foo,
|
|
||||||
# row_stop = %data_prefix, because row_start > row_stop
|
|
||||||
filter = self._update_filter_row(filter)
|
|
||||||
row_start = self.data_prefix + row_start
|
|
||||||
else:
|
|
||||||
row_start = self.data_prefix + (row_start or "")
|
|
||||||
row_stop = self.data_prefix + (row_stop or "")
|
|
||||||
gen = super(MockHBaseTable, self).scan(row_start, row_stop,
|
|
||||||
row_prefix, columns,
|
|
||||||
filter, timestamp,
|
|
||||||
include_timestamp, batch_size,
|
|
||||||
scan_batching, limit,
|
|
||||||
sorted_columns)
|
|
||||||
data_prefix_len = len(self.data_prefix)
|
|
||||||
# Restore original row format
|
|
||||||
for row, data in gen:
|
|
||||||
yield (row[data_prefix_len:], data)
|
|
||||||
|
|
||||||
def row(self, row, *args, **kwargs):
|
|
||||||
row = self.data_prefix + row
|
|
||||||
return super(MockHBaseTable, self).row(row, *args, **kwargs)
|
|
||||||
|
|
||||||
def delete(self, row, *args, **kwargs):
|
|
||||||
row = self.data_prefix + row
|
|
||||||
return super(MockHBaseTable, self).delete(row, *args, **kwargs)
|
|
||||||
|
|
||||||
def _update_filter_row(self, filter):
|
|
||||||
if filter:
|
|
||||||
return "PrefixFilter(%s) AND %s" % (self.data_prefix, filter)
|
|
||||||
else:
|
|
||||||
return "PrefixFilter(%s)" % self.data_prefix
|
|
@ -9,7 +9,7 @@
|
|||||||
# By default all aodh services are started (see
|
# By default all aodh services are started (see
|
||||||
# devstack/settings).
|
# devstack/settings).
|
||||||
#
|
#
|
||||||
# AODH_BACKEND: Database backend (e.g. 'mysql', 'mongodb')
|
# AODH_BACKEND: Database backend (e.g. 'mysql')
|
||||||
# AODH_COORDINATION_URL: URL for group membership service provided by tooz.
|
# AODH_COORDINATION_URL: URL for group membership service provided by tooz.
|
||||||
|
|
||||||
# Support potential entry-points console scripts in VENV or not
|
# Support potential entry-points console scripts in VENV or not
|
||||||
@ -50,28 +50,6 @@ function aodh_service_url {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
# _install_mongdb - Install mongodb and python lib.
|
|
||||||
function _aodh_install_mongodb {
|
|
||||||
# Server package is the same on all
|
|
||||||
local packages=mongodb-server
|
|
||||||
|
|
||||||
if is_fedora; then
|
|
||||||
# mongodb client
|
|
||||||
packages="${packages} mongodb"
|
|
||||||
fi
|
|
||||||
|
|
||||||
install_package ${packages}
|
|
||||||
|
|
||||||
if is_fedora; then
|
|
||||||
restart_service mongod
|
|
||||||
else
|
|
||||||
restart_service mongodb
|
|
||||||
fi
|
|
||||||
|
|
||||||
# give time for service to restart
|
|
||||||
sleep 5
|
|
||||||
}
|
|
||||||
|
|
||||||
# _install_redis() - Install the redis server and python lib.
|
# _install_redis() - Install the redis server and python lib.
|
||||||
function _aodh_install_redis {
|
function _aodh_install_redis {
|
||||||
if is_ubuntu; then
|
if is_ubuntu; then
|
||||||
@ -102,18 +80,12 @@ function _aodh_config_apache_wsgi {
|
|||||||
fi
|
fi
|
||||||
|
|
||||||
sudo cp $AODH_DIR/devstack/apache-aodh.template $aodh_apache_conf
|
sudo cp $AODH_DIR/devstack/apache-aodh.template $aodh_apache_conf
|
||||||
if [ "$AODH_BACKEND" = 'hbase' ] ; then
|
|
||||||
# Use one process to have single in-memory DB instance for data consistency
|
|
||||||
AODH_API_WORKERS=1
|
|
||||||
else
|
|
||||||
AODH_API_WORKERS=$API_WORKERS
|
|
||||||
fi
|
|
||||||
sudo sed -e "
|
sudo sed -e "
|
||||||
s|%PORT%|$AODH_SERVICE_PORT|g;
|
s|%PORT%|$AODH_SERVICE_PORT|g;
|
||||||
s|%APACHE_NAME%|$APACHE_NAME|g;
|
s|%APACHE_NAME%|$APACHE_NAME|g;
|
||||||
s|%WSGIAPP%|$AODH_WSGI_DIR/app|g;
|
s|%WSGIAPP%|$AODH_WSGI_DIR/app|g;
|
||||||
s|%USER%|$STACK_USER|g;
|
s|%USER%|$STACK_USER|g;
|
||||||
s|%APIWORKERS%|$AODH_API_WORKERS|g;
|
s|%APIWORKERS%|$API_WORKERS|g;
|
||||||
s|%VIRTUALENV%|$venv_path|g
|
s|%VIRTUALENV%|$venv_path|g
|
||||||
" -i $aodh_apache_conf
|
" -i $aodh_apache_conf
|
||||||
}
|
}
|
||||||
@ -127,14 +99,6 @@ function _aodh_prepare_coordination {
|
|||||||
fi
|
fi
|
||||||
}
|
}
|
||||||
|
|
||||||
# Install required services for storage backends
|
|
||||||
function _aodh_prepare_storage_backend {
|
|
||||||
if [ "$AODH_BACKEND" = 'mongodb' ] ; then
|
|
||||||
pip_install_gr pymongo
|
|
||||||
_aodh_install_mongodb
|
|
||||||
fi
|
|
||||||
}
|
|
||||||
|
|
||||||
# Create aodh related accounts in Keystone
|
# Create aodh related accounts in Keystone
|
||||||
function _aodh_create_accounts {
|
function _aodh_create_accounts {
|
||||||
if is_service_enabled aodh-api; then
|
if is_service_enabled aodh-api; then
|
||||||
@ -170,9 +134,6 @@ function _aodh_cleanup_apache_wsgi {
|
|||||||
# cleanup_aodh() - Remove residual data files, anything left over
|
# cleanup_aodh() - Remove residual data files, anything left over
|
||||||
# from previous runs that a clean run would need to clean up
|
# from previous runs that a clean run would need to clean up
|
||||||
function cleanup_aodh {
|
function cleanup_aodh {
|
||||||
if [ "$AODH_BACKEND" = 'mongodb' ] ; then
|
|
||||||
mongo aodh --eval "db.dropDatabase();"
|
|
||||||
fi
|
|
||||||
if [ "$AODH_DEPLOY" == "mod_wsgi" ]; then
|
if [ "$AODH_DEPLOY" == "mod_wsgi" ]; then
|
||||||
_aodh_cleanup_apache_wsgi
|
_aodh_cleanup_apache_wsgi
|
||||||
fi
|
fi
|
||||||
@ -182,11 +143,6 @@ function cleanup_aodh {
|
|||||||
function _aodh_configure_storage_backend {
|
function _aodh_configure_storage_backend {
|
||||||
if [ "$AODH_BACKEND" = 'mysql' ] || [ "$AODH_BACKEND" = 'postgresql' ] ; then
|
if [ "$AODH_BACKEND" = 'mysql' ] || [ "$AODH_BACKEND" = 'postgresql' ] ; then
|
||||||
iniset $AODH_CONF database connection $(database_connection_url aodh)
|
iniset $AODH_CONF database connection $(database_connection_url aodh)
|
||||||
elif [ "$AODH_BACKEND" = 'mongodb' ] ; then
|
|
||||||
iniset $AODH_CONF database connection mongodb://localhost:27017/aodh
|
|
||||||
cleanup_aodh
|
|
||||||
elif [ "$AODH_BACKEND" = 'hbase' ] ; then
|
|
||||||
iniset $AODH_CONF database connection hbase://__test__
|
|
||||||
else
|
else
|
||||||
die $LINENO "Unable to configure unknown AODH_BACKEND $AODH_BACKEND"
|
die $LINENO "Unable to configure unknown AODH_BACKEND $AODH_BACKEND"
|
||||||
fi
|
fi
|
||||||
@ -290,7 +246,6 @@ function init_aodh {
|
|||||||
# otherwise makes sense to do the backend services).
|
# otherwise makes sense to do the backend services).
|
||||||
function install_aodh {
|
function install_aodh {
|
||||||
_aodh_prepare_coordination
|
_aodh_prepare_coordination
|
||||||
_aodh_prepare_storage_backend
|
|
||||||
install_aodhclient
|
install_aodhclient
|
||||||
sudo -H pip install -e "$AODH_DIR"[test,$AODH_BACKEND]
|
sudo -H pip install -e "$AODH_DIR"[test,$AODH_BACKEND]
|
||||||
sudo install -d -o $STACK_USER -m 755 $AODH_CONF_DIR
|
sudo install -d -o $STACK_USER -m 755 $AODH_CONF_DIR
|
||||||
|
@ -18,47 +18,6 @@
|
|||||||
Configuration
|
Configuration
|
||||||
=============
|
=============
|
||||||
|
|
||||||
HBase
|
|
||||||
===================
|
|
||||||
|
|
||||||
This storage implementation uses Thrift HBase interface. The default Thrift
|
|
||||||
connection settings should be changed to support using ConnectionPool in HBase.
|
|
||||||
To ensure proper configuration, please add the following lines to the
|
|
||||||
`hbase-site.xml` configuration file::
|
|
||||||
|
|
||||||
<property>
|
|
||||||
<name>hbase.thrift.minWorkerThreads</name>
|
|
||||||
<value>200</value>
|
|
||||||
</property>
|
|
||||||
|
|
||||||
For pure development purposes, you can use HBase from Apache_ or some other
|
|
||||||
vendor like Cloudera or Hortonworks. To verify your installation, you can use
|
|
||||||
the `list` command in `HBase shell`, to list the tables in your
|
|
||||||
HBase server, as follows::
|
|
||||||
|
|
||||||
$ ${HBASE_HOME}/bin/hbase shell
|
|
||||||
|
|
||||||
hbase> list
|
|
||||||
|
|
||||||
.. note::
|
|
||||||
This driver has been tested against HBase 0.94.2/CDH 4.2.0,
|
|
||||||
HBase 0.94.4/HDP 1.2, HBase 0.94.18/Apache, HBase 0.94.5/Apache,
|
|
||||||
HBase 0.96.2/Apache and HBase 0.98.0/Apache.
|
|
||||||
Versions earlier than 0.92.1 are not supported due to feature incompatibility.
|
|
||||||
|
|
||||||
To find out more about supported storage backends please take a look on the
|
|
||||||
:doc:`install/manual/` guide.
|
|
||||||
|
|
||||||
.. note::
|
|
||||||
|
|
||||||
If you are changing the configuration on the fly to use HBase, as a storage
|
|
||||||
backend, you will need to restart the Aodh services that use the
|
|
||||||
database to allow the changes to take affect, i.e. the collector and API
|
|
||||||
services.
|
|
||||||
|
|
||||||
.. _Apache: https://hbase.apache.org/book/quickstart.html
|
|
||||||
|
|
||||||
|
|
||||||
Aodh Sample Configuration File
|
Aodh Sample Configuration File
|
||||||
==============================
|
==============================
|
||||||
|
|
||||||
|
@ -28,86 +28,15 @@ This step is a prerequisite for the collector, notification agent and API
|
|||||||
services. You may use one of the listed database backends below to store
|
services. You may use one of the listed database backends below to store
|
||||||
Aodh data.
|
Aodh data.
|
||||||
|
|
||||||
.. note::
|
|
||||||
Please notice, MongoDB requires pymongo_ to be installed on the system. The
|
|
||||||
required minimum version of pymongo is 2.4.
|
|
||||||
..
|
|
||||||
|
|
||||||
|
The recommended Aodh storage backend is any SQLAlchemy-supported database
|
||||||
|
(`PostgreSQL` or `MySQL`). You need to create a `aodh` database first and then
|
||||||
|
initialise it by running::
|
||||||
|
|
||||||
SQLalchemy-supported DBs
|
aodh-dbsync
|
||||||
------------------------
|
|
||||||
|
|
||||||
The recommended Aodh storage backend is any SQLAlchemy-supported
|
|
||||||
database (`PostgreSQL` or `MySQL`).
|
|
||||||
|
|
||||||
In case of SQL-based database backends, you need to create a `aodh`
|
|
||||||
database first and then initialise it by running::
|
|
||||||
|
|
||||||
aodh-dbsync
|
|
||||||
|
|
||||||
To use MySQL as the storage backend, change the 'database' section in
|
|
||||||
aodh.conf as follows::
|
|
||||||
|
|
||||||
[database]
|
|
||||||
connection = mysql+pymysql://username:password@host/aodh?charset=utf8
|
|
||||||
|
|
||||||
|
|
||||||
MongoDB
|
|
||||||
-------
|
|
||||||
|
|
||||||
Follow the instructions to install the MongoDB_ package for your operating
|
|
||||||
system, then start the service. The required minimum version of MongoDB is 2.4.
|
|
||||||
|
|
||||||
To use MongoDB as the storage backend, change the 'database' section in
|
|
||||||
aodh.conf as follows::
|
|
||||||
|
|
||||||
[database]
|
|
||||||
connection = mongodb://username:password@host:27017/aodh
|
|
||||||
|
|
||||||
If MongoDB is configured in replica set mode, add `?replicaSet=` in your
|
|
||||||
connection URL::
|
|
||||||
|
|
||||||
[database]
|
|
||||||
connection = mongodb://username:password@host:27017/aodh?replicaSet=foobar
|
|
||||||
|
|
||||||
|
|
||||||
HBase
|
|
||||||
-----
|
|
||||||
|
|
||||||
HBase backend is implemented to use HBase Thrift interface, therefore it is
|
|
||||||
mandatory to have the HBase Thrift server installed and running. To start
|
|
||||||
the Thrift server, please run the following command::
|
|
||||||
|
|
||||||
${HBASE_HOME}/bin/hbase thrift start
|
|
||||||
|
|
||||||
The implementation uses `HappyBase`_, which is a wrapper library used to
|
|
||||||
interact with HBase via Thrift protocol. You can verify the thrift
|
|
||||||
connection by running a quick test from a client::
|
|
||||||
|
|
||||||
import happybase
|
|
||||||
|
|
||||||
conn = happybase.Connection(host=$hbase-thrift-server, port=9090, table_prefix=None)
|
|
||||||
print conn.tables() # this returns a list of HBase tables in your HBase server
|
|
||||||
|
|
||||||
.. note::
|
|
||||||
HappyBase version 0.5 or greater is required. Additionally, version 0.7
|
|
||||||
is not currently supported.
|
|
||||||
..
|
|
||||||
|
|
||||||
In case of HBase, the needed database tables (`project`, `user`, `resource`,
|
|
||||||
`meter`, `alarm`, `alarm_h`) should be created manually with `f` column
|
|
||||||
family for each one.
|
|
||||||
|
|
||||||
To use HBase as the storage backend, change the 'database' section in
|
|
||||||
aodh.conf as follows::
|
|
||||||
|
|
||||||
[database]
|
|
||||||
connection = hbase://hbase-thrift-host:9090
|
|
||||||
|
|
||||||
|
|
||||||
.. _HappyBase: http://happybase.readthedocs.org/en/latest/index.html#
|
|
||||||
.. _MongoDB: http://www.mongodb.org/
|
|
||||||
.. _pymongo: https://pypi.python.org/pypi/pymongo/
|
|
||||||
|
|
||||||
|
|
||||||
|
To use MySQL as the storage backend, change the 'database' section in
|
||||||
|
aodh.conf as follows::
|
||||||
|
|
||||||
|
[database]
|
||||||
|
connection = mysql+pymysql://username:password@host/aodh?charset=utf8
|
||||||
|
@ -24,15 +24,13 @@ run through tox_.
|
|||||||
|
|
||||||
$ sudo pip install tox
|
$ sudo pip install tox
|
||||||
|
|
||||||
2. On Ubuntu install ``mongodb`` and ``libmysqlclient-dev`` packages::
|
2. On Ubuntu install ``libmysqlclient-dev`` packages::
|
||||||
|
|
||||||
$ sudo apt-get install mongodb
|
|
||||||
$ sudo apt-get install libmysqlclient-dev
|
$ sudo apt-get install libmysqlclient-dev
|
||||||
|
|
||||||
For Fedora20 there is no ``libmysqlclient-dev`` package, so you’ll need
|
For Fedora20 there is no ``libmysqlclient-dev`` package, so you’ll need
|
||||||
to install ``mariadb-devel.x86-64`` (or ``mariadb-devel.i386``) instead::
|
to install ``mariadb-devel.x86-64`` (or ``mariadb-devel.i386``) instead::
|
||||||
|
|
||||||
$ sudo yum install mongodb
|
|
||||||
$ sudo yum install mariadb-devel.x86_64
|
$ sudo yum install mariadb-devel.x86_64
|
||||||
|
|
||||||
3. Run the unit and code-style tests::
|
3. Run the unit and code-style tests::
|
||||||
|
@ -0,0 +1,3 @@
|
|||||||
|
---
|
||||||
|
upgrade:
|
||||||
|
- All the deprecated non-SQL drivers have been removed.
|
@ -4,11 +4,4 @@ set -e
|
|||||||
export AODH_TEST_BACKEND=${AODH_TEST_BACKEND:-mysql}
|
export AODH_TEST_BACKEND=${AODH_TEST_BACKEND:-mysql}
|
||||||
export AODH_SERVICE_URL=${AODH_SERVICE_URL:-http://127.0.0.1:8042}
|
export AODH_SERVICE_URL=${AODH_SERVICE_URL:-http://127.0.0.1:8042}
|
||||||
|
|
||||||
case $AODH_TEST_BACKEND in
|
pifpaf -g AODH_TEST_STORAGE_URL run $AODH_TEST_BACKEND -- $*
|
||||||
hbase)
|
|
||||||
AODH_TEST_STORAGE_URL="hbase://__test__" $*
|
|
||||||
;;
|
|
||||||
*)
|
|
||||||
pifpaf -g AODH_TEST_STORAGE_URL run $AODH_TEST_BACKEND -- $*
|
|
||||||
;;
|
|
||||||
esac
|
|
||||||
|
11
setup.cfg
11
setup.cfg
@ -44,14 +44,6 @@ postgresql =
|
|||||||
alembic>=0.7.2
|
alembic>=0.7.2
|
||||||
psycopg2
|
psycopg2
|
||||||
|
|
||||||
mongodb =
|
|
||||||
pymongo>=3.0.2
|
|
||||||
|
|
||||||
hbase =
|
|
||||||
happybase!=0.7,>=0.5,<1.0.0:python_version=='2.7'
|
|
||||||
# Required for bson
|
|
||||||
pymongo>=3.0.2
|
|
||||||
|
|
||||||
zaqar =
|
zaqar =
|
||||||
python-zaqarclient>=1.2.0
|
python-zaqarclient>=1.2.0
|
||||||
|
|
||||||
@ -80,12 +72,10 @@ test =
|
|||||||
[entry_points]
|
[entry_points]
|
||||||
aodh.storage =
|
aodh.storage =
|
||||||
log = aodh.storage.impl_log:Connection
|
log = aodh.storage.impl_log:Connection
|
||||||
mongodb = aodh.storage.impl_mongodb:Connection
|
|
||||||
mysql = aodh.storage.impl_sqlalchemy:Connection
|
mysql = aodh.storage.impl_sqlalchemy:Connection
|
||||||
mysql+pymysql = aodh.storage.impl_sqlalchemy:Connection
|
mysql+pymysql = aodh.storage.impl_sqlalchemy:Connection
|
||||||
postgresql = aodh.storage.impl_sqlalchemy:Connection
|
postgresql = aodh.storage.impl_sqlalchemy:Connection
|
||||||
sqlite = aodh.storage.impl_sqlalchemy:Connection
|
sqlite = aodh.storage.impl_sqlalchemy:Connection
|
||||||
hbase = aodh.storage.impl_hbase:Connection
|
|
||||||
|
|
||||||
aodh.alarm.rule =
|
aodh.alarm.rule =
|
||||||
threshold = aodh.api.controllers.v2.alarm_rules.threshold:AlarmThresholdRule
|
threshold = aodh.api.controllers.v2.alarm_rules.threshold:AlarmThresholdRule
|
||||||
@ -122,7 +112,6 @@ console_scripts =
|
|||||||
aodh-evaluator = aodh.cmd.alarm:evaluator
|
aodh-evaluator = aodh.cmd.alarm:evaluator
|
||||||
aodh-notifier = aodh.cmd.alarm:notifier
|
aodh-notifier = aodh.cmd.alarm:notifier
|
||||||
aodh-listener = aodh.cmd.alarm:listener
|
aodh-listener = aodh.cmd.alarm:listener
|
||||||
aodh-data-migration = aodh.cmd.data_migration:main
|
|
||||||
aodh-combination-alarm-conversion = aodh.cmd.alarm_conversion:conversion
|
aodh-combination-alarm-conversion = aodh.cmd.alarm_conversion:conversion
|
||||||
|
|
||||||
oslo.config.opts =
|
oslo.config.opts =
|
||||||
|
@ -1,38 +0,0 @@
|
|||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
|
|
||||||
import os
|
|
||||||
import sys
|
|
||||||
|
|
||||||
from oslo_config import cfg
|
|
||||||
|
|
||||||
from aodh import storage
|
|
||||||
|
|
||||||
|
|
||||||
def main(argv):
|
|
||||||
cfg.CONF([], project='aodh')
|
|
||||||
if os.getenv("AODH_TEST_STORAGE_URL"):
|
|
||||||
url = ("%s?table_prefix=%s" %
|
|
||||||
(os.getenv("AODH_TEST_STORAGE_URL"),
|
|
||||||
os.getenv("AODH_TEST_HBASE_TABLE_PREFIX", "test")))
|
|
||||||
cfg.CONF.set_override("connection", url, group="database",
|
|
||||||
enforce_type=True)
|
|
||||||
alarm_conn = storage.get_connection_from_config(cfg.CONF)
|
|
||||||
for arg in argv:
|
|
||||||
if arg == "--upgrade":
|
|
||||||
alarm_conn.upgrade()
|
|
||||||
if arg == "--clear":
|
|
||||||
alarm_conn.clear()
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
main(sys.argv[1:])
|
|
18
tox.ini
18
tox.ini
@ -15,17 +15,6 @@ commands =
|
|||||||
oslo-config-generator --config-file=etc/aodh/aodh-config-generator.conf
|
oslo-config-generator --config-file=etc/aodh/aodh-config-generator.conf
|
||||||
whitelist_externals = bash
|
whitelist_externals = bash
|
||||||
|
|
||||||
[testenv:py27-hbase]
|
|
||||||
deps = .[hbase,test]
|
|
||||||
setenv = OS_TEST_PATH=aodh/tests/functional/
|
|
||||||
AODH_TEST_STORAGE_URL=hbase://__test__
|
|
||||||
|
|
||||||
[testenv:py27-mongodb]
|
|
||||||
deps = .[mongodb,test]
|
|
||||||
setenv = OS_TEST_PATH=aodh/tests/functional/
|
|
||||||
commands =
|
|
||||||
pifpaf -g AODH_TEST_STORAGE_URL run mongodb -- python setup.py testr --slowest --testr-args="{posargs}"
|
|
||||||
|
|
||||||
[testenv:py27-mysql]
|
[testenv:py27-mysql]
|
||||||
deps = .[mysql,test]
|
deps = .[mysql,test]
|
||||||
setenv = OS_TEST_PATH=aodh/tests/functional/
|
setenv = OS_TEST_PATH=aodh/tests/functional/
|
||||||
@ -39,7 +28,7 @@ commands =
|
|||||||
pifpaf -g AODH_TEST_STORAGE_URL run postgresql -- python setup.py testr --slowest --testr-args="{posargs}"
|
pifpaf -g AODH_TEST_STORAGE_URL run postgresql -- python setup.py testr --slowest --testr-args="{posargs}"
|
||||||
|
|
||||||
[testenv:functional]
|
[testenv:functional]
|
||||||
deps = .[mysql,postgresql,mongodb,hbase,test]
|
deps = .[mysql,postgresql,test]
|
||||||
setenv = VIRTUAL_ENV={envdir}
|
setenv = VIRTUAL_ENV={envdir}
|
||||||
OS_TEST_PATH=aodh/tests/functional/
|
OS_TEST_PATH=aodh/tests/functional/
|
||||||
GABBI_LIVE_FAIL_IF_NO_TEST=1
|
GABBI_LIVE_FAIL_IF_NO_TEST=1
|
||||||
@ -91,11 +80,6 @@ setenv = PYTHONHASHSEED=0
|
|||||||
[testenv:debug]
|
[testenv:debug]
|
||||||
commands = bash -x oslo_debug_helper {posargs}
|
commands = bash -x oslo_debug_helper {posargs}
|
||||||
|
|
||||||
[testenv:debug-mongodb]
|
|
||||||
deps = .[mongodb,test]
|
|
||||||
setenv = OS_TEST_PATH=aodh/tests/functional/
|
|
||||||
commands = pifpaf -g AODH_TEST_STORAGE_URL run mongodb -- oslo_debug_helper {posargs}
|
|
||||||
|
|
||||||
[testenv:debug-mysql]
|
[testenv:debug-mysql]
|
||||||
deps = .[mysql,test]
|
deps = .[mysql,test]
|
||||||
setenv = OS_TEST_PATH=aodh/tests/functional/
|
setenv = OS_TEST_PATH=aodh/tests/functional/
|
||||||
|
Loading…
Reference in New Issue
Block a user