Remove deprecated non-SQL drivers
Change-Id: I1eb890fe422718e986bb92139a023868bace7d96
This commit is contained in:
parent
e8eafdbba7
commit
4570e42803
@ -1,162 +0,0 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
A tool for migrating alarms and alarms history data from NoSQL to SQL.
|
||||
|
||||
NOTES:
|
||||
|
||||
- Users need to specify the source NoSQL connection url and the destination SQL
|
||||
connection URL with this tool, an usage example:
|
||||
aodh-data-migration --nosql-conn \
|
||||
mongodb://aodh:password@127.0.0.1:27017/aodh --sql-conn \
|
||||
mysql+pymysql://root:password@127.0.0.1/aodh?charset=utf8
|
||||
|
||||
- Both the alarm data and alarm history data will be migrated when running this
|
||||
tool, but the history data migration can be avoid by specifying False of
|
||||
--migrate-history option.
|
||||
|
||||
- It is better to ensure the db connection is OK when running this tool, and
|
||||
this tool can be run repeatedly, the duplicated data will be skipped.
|
||||
|
||||
- This tool depends on the NoSQL and SQL drivers of Aodh, so it is should be
|
||||
used only before the removal of NoSQL drivers.
|
||||
|
||||
- This tool has been tested OK in devstack environment, but users need to be
|
||||
cautious with this, because the data migration between storage backends is
|
||||
a bit dangerous.
|
||||
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
import sys
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_db import exception
|
||||
from oslo_db import options as db_options
|
||||
from aodh.i18n import _LE, _LI, _LW
|
||||
import six.moves.urllib.parse as urlparse
|
||||
|
||||
from aodh import storage
|
||||
|
||||
root_logger = logging.getLogger('')
|
||||
|
||||
|
||||
def get_parser():
|
||||
parser = argparse.ArgumentParser(
|
||||
description='A tool for Migrating alarms and alarms history from'
|
||||
' NoSQL to SQL',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--nosql-conn',
|
||||
required=True,
|
||||
type=str,
|
||||
help='The source NoSQL database connection.',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--sql-conn',
|
||||
required=True,
|
||||
type=str,
|
||||
help='The destination SQL database connection.',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--migrate-history',
|
||||
default=True,
|
||||
type=bool,
|
||||
help='Migrate history data when migrate alarms or not,'
|
||||
' True as Default.',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--debug',
|
||||
default=False,
|
||||
action='store_true',
|
||||
help='Show the debug level log messages.',
|
||||
)
|
||||
return parser
|
||||
|
||||
|
||||
def _validate_conn_options(args):
|
||||
nosql_scheme = urlparse.urlparse(args.nosql_conn).scheme
|
||||
sql_scheme = urlparse.urlparse(args.sql_conn).scheme
|
||||
if nosql_scheme not in ('mongodb', 'hbase'):
|
||||
root_logger.error(_LE('Invalid source DB type %s, the source database '
|
||||
'connection should be one of: [mongodb, hbase]'
|
||||
), nosql_scheme)
|
||||
sys.exit(1)
|
||||
if sql_scheme not in ('mysql', 'mysql+pymysql', 'postgresql',
|
||||
'sqlite'):
|
||||
root_logger.error(_LE('Invalid destination DB type %s, the destination'
|
||||
' database connection should be one of: '
|
||||
'[mysql, postgresql, sqlite]'), sql_scheme)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def main():
|
||||
args = get_parser().parse_args()
|
||||
|
||||
# Set up logging to use the console
|
||||
console = logging.StreamHandler(sys.stderr)
|
||||
formatter = logging.Formatter(
|
||||
'[%(asctime)s] %(levelname)-8s %(message)s')
|
||||
console.setFormatter(formatter)
|
||||
root_logger.addHandler(console)
|
||||
if args.debug:
|
||||
root_logger.setLevel(logging.DEBUG)
|
||||
else:
|
||||
root_logger.setLevel(logging.INFO)
|
||||
|
||||
_validate_conn_options(args)
|
||||
|
||||
nosql_conf = cfg.ConfigOpts()
|
||||
db_options.set_defaults(nosql_conf, args.nosql_conn)
|
||||
nosql_conf.register_opts(storage.OPTS, 'database')
|
||||
nosql_conn = storage.get_connection_from_config(nosql_conf)
|
||||
|
||||
sql_conf = cfg.ConfigOpts()
|
||||
db_options.set_defaults(sql_conf, args.sql_conn)
|
||||
sql_conf.register_opts(storage.OPTS, 'database')
|
||||
sql_conn = storage.get_connection_from_config(sql_conf)
|
||||
|
||||
root_logger.info(
|
||||
_LI("Starting to migrate alarms data from NoSQL to SQL..."))
|
||||
|
||||
count = 0
|
||||
for alarm in nosql_conn.get_alarms():
|
||||
root_logger.debug("Migrating alarm %s..." % alarm.alarm_id)
|
||||
try:
|
||||
sql_conn.create_alarm(alarm)
|
||||
count += 1
|
||||
except exception.DBDuplicateEntry:
|
||||
root_logger.warning(_LW("Duplicated alarm %s found, skipped."),
|
||||
alarm.alarm_id)
|
||||
if not args.migrate_history:
|
||||
continue
|
||||
|
||||
history_count = 0
|
||||
for history in nosql_conn.get_alarm_changes(alarm.alarm_id, None):
|
||||
history_data = history.as_dict()
|
||||
root_logger.debug(" Migrating alarm history data with"
|
||||
" event_id %s..." % history_data['event_id'])
|
||||
try:
|
||||
sql_conn.record_alarm_change(history_data)
|
||||
history_count += 1
|
||||
except exception.DBDuplicateEntry:
|
||||
root_logger.warning(
|
||||
_LW(" Duplicated alarm history %s found, skipped."),
|
||||
history_data['event_id'])
|
||||
root_logger.info(_LI(" Migrated %(count)s history data of alarm "
|
||||
"%(alarm_id)s"),
|
||||
{'count': history_count, 'alarm_id': alarm.alarm_id})
|
||||
|
||||
root_logger.info(_LI("End alarms data migration from NoSQL to SQL, %s"
|
||||
" alarms have been migrated."), count)
|
@ -22,7 +22,6 @@ from oslo_utils import timeutils
|
||||
import retrying
|
||||
import six.moves.urllib.parse as urlparse
|
||||
from stevedore import driver
|
||||
import warnings
|
||||
|
||||
_NAMESPACE = 'aodh.storage'
|
||||
|
||||
@ -58,12 +57,6 @@ def get_connection_from_config(conf):
|
||||
retries = conf.database.max_retries
|
||||
url = conf.database.connection
|
||||
connection_scheme = urlparse.urlparse(url).scheme
|
||||
if connection_scheme not in ('mysql', 'mysql+pymysql', 'postgresql',
|
||||
'sqlite'):
|
||||
msg = ('Storage backend %s is deprecated, and all the NoSQL backends '
|
||||
'will be removed in Aodh 4.0, please use SQL backend.' %
|
||||
connection_scheme)
|
||||
warnings.warn(msg)
|
||||
LOG.debug('looking for %(name)r driver in %(namespace)r',
|
||||
{'name': connection_scheme, 'namespace': _NAMESPACE})
|
||||
mgr = driver.DriverManager(_NAMESPACE, connection_scheme)
|
||||
|
@ -1,78 +0,0 @@
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import happybase
|
||||
from oslo_log import log
|
||||
from oslo_utils import netutils
|
||||
from six.moves.urllib import parse as urlparse
|
||||
|
||||
from aodh.storage.hbase import inmemory as hbase_inmemory
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class Connection(object):
|
||||
"""Base connection class for HBase."""
|
||||
|
||||
_memory_instance = None
|
||||
|
||||
def __init__(self, conf, url):
|
||||
"""Hbase Connection Initialization."""
|
||||
opts = self._parse_connection_url(url)
|
||||
|
||||
if opts['host'] == '__test__':
|
||||
# This is a in-memory usage for unit tests
|
||||
if Connection._memory_instance is None:
|
||||
LOG.debug('Creating a new in-memory HBase Connection object')
|
||||
Connection._memory_instance = (hbase_inmemory.
|
||||
MConnectionPool())
|
||||
self.conn_pool = Connection._memory_instance
|
||||
else:
|
||||
self.conn_pool = self._get_connection_pool(opts)
|
||||
|
||||
@staticmethod
|
||||
def _get_connection_pool(conf):
|
||||
"""Return a connection pool to the database.
|
||||
|
||||
.. note::
|
||||
|
||||
The tests use a subclass to override this and return an
|
||||
in-memory connection pool.
|
||||
"""
|
||||
LOG.debug('connecting to HBase on %(host)s:%(port)s',
|
||||
{'host': conf['host'], 'port': conf['port']})
|
||||
return happybase.ConnectionPool(size=100, host=conf['host'],
|
||||
port=conf['port'],
|
||||
table_prefix=conf['table_prefix'])
|
||||
|
||||
@staticmethod
|
||||
def _parse_connection_url(url):
|
||||
"""Parse connection parameters from a database url.
|
||||
|
||||
.. note::
|
||||
|
||||
HBase Thrift does not support authentication and there is no
|
||||
database name, so we are not looking for these in the url.
|
||||
"""
|
||||
opts = {}
|
||||
result = netutils.urlsplit(url)
|
||||
opts['table_prefix'] = urlparse.parse_qs(
|
||||
result.query).get('table_prefix', [None])[0]
|
||||
opts['dbtype'] = result.scheme
|
||||
if ':' in result.netloc:
|
||||
opts['host'], port = result.netloc.split(':')
|
||||
else:
|
||||
opts['host'] = result.netloc
|
||||
port = 9090
|
||||
opts['port'] = port and int(port) or 9090
|
||||
return opts
|
@ -1,283 +0,0 @@
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
""" This is a very crude version of "in-memory HBase", which implements just
|
||||
enough functionality of HappyBase API to support testing of our driver.
|
||||
"""
|
||||
|
||||
import copy
|
||||
import re
|
||||
|
||||
from oslo_log import log
|
||||
import six
|
||||
|
||||
import aodh
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class MTable(object):
|
||||
"""HappyBase.Table mock."""
|
||||
def __init__(self, name, families):
|
||||
self.name = name
|
||||
self.families = families
|
||||
self._rows_with_ts = {}
|
||||
|
||||
def row(self, key, columns=None):
|
||||
if key not in self._rows_with_ts:
|
||||
return {}
|
||||
res = copy.copy(sorted(six.iteritems(
|
||||
self._rows_with_ts.get(key)))[-1][1])
|
||||
if columns:
|
||||
keys = res.keys()
|
||||
for key in keys:
|
||||
if key not in columns:
|
||||
res.pop(key)
|
||||
return res
|
||||
|
||||
def rows(self, keys):
|
||||
return ((k, self.row(k)) for k in keys)
|
||||
|
||||
def put(self, key, data, ts=None):
|
||||
# Note: Now we use 'timestamped' but only for one Resource table.
|
||||
# That's why we may put ts='0' in case when ts is None. If it is
|
||||
# needed to use 2 types of put in one table ts=0 cannot be used.
|
||||
if ts is None:
|
||||
ts = "0"
|
||||
if key not in self._rows_with_ts:
|
||||
self._rows_with_ts[key] = {ts: data}
|
||||
else:
|
||||
if ts in self._rows_with_ts[key]:
|
||||
self._rows_with_ts[key][ts].update(data)
|
||||
else:
|
||||
self._rows_with_ts[key].update({ts: data})
|
||||
|
||||
def delete(self, key):
|
||||
del self._rows_with_ts[key]
|
||||
|
||||
def _get_latest_dict(self, row):
|
||||
# The idea here is to return latest versions of columns.
|
||||
# In _rows_with_ts we store {row: {ts_1: {data}, ts_2: {data}}}.
|
||||
# res will contain a list of tuples [(ts_1, {data}), (ts_2, {data})]
|
||||
# sorted by ts, i.e. in this list ts_2 is the most latest.
|
||||
# To get result as HBase provides we should iterate in reverse order
|
||||
# and get from "latest" data only key-values that are not in newer data
|
||||
data = {}
|
||||
for i in sorted(six.iteritems(self._rows_with_ts[row])):
|
||||
data.update(i[1])
|
||||
return data
|
||||
|
||||
def scan(self, filter=None, columns=None, row_start=None, row_stop=None,
|
||||
limit=None):
|
||||
columns = columns or []
|
||||
sorted_keys = sorted(self._rows_with_ts)
|
||||
# copy data between row_start and row_stop into a dict
|
||||
rows = {}
|
||||
for row in sorted_keys:
|
||||
if row_start and row < row_start:
|
||||
continue
|
||||
if row_stop and row > row_stop:
|
||||
break
|
||||
rows[row] = self._get_latest_dict(row)
|
||||
|
||||
if columns:
|
||||
ret = {}
|
||||
for row, data in six.iteritems(rows):
|
||||
for key in data:
|
||||
if key in columns:
|
||||
ret[row] = data
|
||||
rows = ret
|
||||
if filter:
|
||||
# TODO(jdanjou): we should really parse this properly,
|
||||
# but at the moment we are only going to support AND here
|
||||
filters = filter.split('AND')
|
||||
for f in filters:
|
||||
# Extract filter name and its arguments
|
||||
g = re.search("(.*)\((.*),?\)", f)
|
||||
fname = g.group(1).strip()
|
||||
fargs = [s.strip().replace('\'', '')
|
||||
for s in g.group(2).split(',')]
|
||||
m = getattr(self, fname)
|
||||
if callable(m):
|
||||
# overwrite rows for filtering to take effect
|
||||
# in case of multiple filters
|
||||
rows = m(fargs, rows)
|
||||
else:
|
||||
raise aodh.NotImplementedError(
|
||||
"%s filter is not implemented, "
|
||||
"you may want to add it!" % filter)
|
||||
for k in sorted(rows)[:limit]:
|
||||
yield k, rows[k]
|
||||
|
||||
@staticmethod
|
||||
def SingleColumnValueFilter(args, rows):
|
||||
"""This is filter for testing "in-memory HBase".
|
||||
|
||||
This method is called from scan() when 'SingleColumnValueFilter'
|
||||
is found in the 'filter' argument.
|
||||
"""
|
||||
op = args[2]
|
||||
column = "%s:%s" % (args[0], args[1])
|
||||
value = args[3]
|
||||
if value.startswith('binary:'):
|
||||
value = value[7:]
|
||||
r = {}
|
||||
for row in rows:
|
||||
data = rows[row]
|
||||
if op == '=':
|
||||
if column in data and data[column] == value:
|
||||
r[row] = data
|
||||
elif op == '<':
|
||||
if column in data and data[column] < value:
|
||||
r[row] = data
|
||||
elif op == '<=':
|
||||
if column in data and data[column] <= value:
|
||||
r[row] = data
|
||||
elif op == '>':
|
||||
if column in data and data[column] > value:
|
||||
r[row] = data
|
||||
elif op == '>=':
|
||||
if column in data and data[column] >= value:
|
||||
r[row] = data
|
||||
elif op == '!=':
|
||||
if column in data and data[column] != value:
|
||||
r[row] = data
|
||||
return r
|
||||
|
||||
@staticmethod
|
||||
def ColumnPrefixFilter(args, rows):
|
||||
"""This is filter for testing "in-memory HBase".
|
||||
|
||||
This method is called from scan() when 'ColumnPrefixFilter' is found
|
||||
in the 'filter' argument.
|
||||
|
||||
:param args: a list of filter arguments, contain prefix of column
|
||||
:param rows: a dict of row prefixes for filtering
|
||||
"""
|
||||
value = args[0]
|
||||
column = 'f:' + value
|
||||
r = {}
|
||||
for row, data in rows.items():
|
||||
column_dict = {}
|
||||
for key in data:
|
||||
if key.startswith(column):
|
||||
column_dict[key] = data[key]
|
||||
r[row] = column_dict
|
||||
return r
|
||||
|
||||
@staticmethod
|
||||
def RowFilter(args, rows):
|
||||
"""This is filter for testing "in-memory HBase".
|
||||
|
||||
This method is called from scan() when 'RowFilter' is found in the
|
||||
'filter' argument.
|
||||
|
||||
:param args: a list of filter arguments, it contains operator and
|
||||
sought string
|
||||
:param rows: a dict of rows which are filtered
|
||||
"""
|
||||
op = args[0]
|
||||
value = args[1]
|
||||
if value.startswith('binary:'):
|
||||
value = value[len('binary:'):]
|
||||
if value.startswith('regexstring:'):
|
||||
value = value[len('regexstring:'):]
|
||||
r = {}
|
||||
for row, data in rows.items():
|
||||
try:
|
||||
g = re.search(value, row).group()
|
||||
if op == '=':
|
||||
if g == row:
|
||||
r[row] = data
|
||||
else:
|
||||
raise aodh.NotImplementedError(
|
||||
"In-memory "
|
||||
"RowFilter doesn't support "
|
||||
"the %s operation yet" % op)
|
||||
except AttributeError:
|
||||
pass
|
||||
return r
|
||||
|
||||
@staticmethod
|
||||
def QualifierFilter(args, rows):
|
||||
"""This is filter for testing "in-memory HBase".
|
||||
|
||||
This method is called from scan() when 'QualifierFilter' is found in
|
||||
the 'filter' argument
|
||||
"""
|
||||
op = args[0]
|
||||
value = args[1]
|
||||
is_regex = False
|
||||
if value.startswith('binaryprefix:'):
|
||||
value = value[len('binaryprefix:'):]
|
||||
if value.startswith('regexstring:'):
|
||||
value = value[len('regexstring:'):]
|
||||
is_regex = True
|
||||
column = 'f:' + value
|
||||
r = {}
|
||||
for row in rows:
|
||||
data = rows[row]
|
||||
r_data = {}
|
||||
for key in data:
|
||||
if ((op == '=' and key.startswith(column)) or
|
||||
(op == '>=' and key >= column) or
|
||||
(op == '<=' and key <= column) or
|
||||
(op == '>' and key > column) or
|
||||
(op == '<' and key < column) or
|
||||
(is_regex and re.search(value, key))):
|
||||
r_data[key] = data[key]
|
||||
else:
|
||||
raise aodh.NotImplementedError(
|
||||
"In-memory QualifierFilter "
|
||||
"doesn't support the %s "
|
||||
"operation yet" % op)
|
||||
if r_data:
|
||||
r[row] = r_data
|
||||
return r
|
||||
|
||||
|
||||
class MConnectionPool(object):
|
||||
def __init__(self):
|
||||
self.conn = MConnection()
|
||||
|
||||
def connection(self):
|
||||
return self.conn
|
||||
|
||||
|
||||
class MConnection(object):
|
||||
"""HappyBase.Connection mock."""
|
||||
def __init__(self):
|
||||
self.tables = {}
|
||||
|
||||
def __enter__(self, *args, **kwargs):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def open():
|
||||
LOG.debug("Opening in-memory HBase connection")
|
||||
|
||||
def create_table(self, n, families=None):
|
||||
families = families or {}
|
||||
if n in self.tables:
|
||||
return self.tables[n]
|
||||
t = MTable(n, families)
|
||||
self.tables[n] = t
|
||||
return t
|
||||
|
||||
def delete_table(self, name, use_prefix=True):
|
||||
del self.tables[name]
|
||||
|
||||
def table(self, name):
|
||||
return self.create_table(name)
|
@ -1,43 +0,0 @@
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
"""HBase storage backend migrations
|
||||
"""
|
||||
|
||||
|
||||
from aodh.storage.hbase import utils as hbase_utils
|
||||
|
||||
|
||||
def migrate_alarm_history_table(conn, table):
|
||||
"""Migrate table 'alarm_h' in HBase.
|
||||
|
||||
Change row format from ""%s_%s" % alarm_id, rts,
|
||||
to new separator format "%s:%s" % alarm_id, rts
|
||||
"""
|
||||
alarm_h_table = conn.table(table)
|
||||
alarm_h_filter = "RowFilter(=, 'regexstring:\\w*_\\d{19}')"
|
||||
gen = alarm_h_table.scan(filter=alarm_h_filter)
|
||||
for row, data in gen:
|
||||
row_parts = row.rsplit('_', 1)
|
||||
alarm_h_table.put(hbase_utils.prepare_key(*row_parts), data)
|
||||
alarm_h_table.delete(row)
|
||||
|
||||
|
||||
TABLE_MIGRATION_FUNCS = {'alarm_h': migrate_alarm_history_table}
|
||||
|
||||
|
||||
def migrate_tables(conn, tables):
|
||||
if type(tables) is not list:
|
||||
tables = [tables]
|
||||
for table in tables:
|
||||
if table in TABLE_MIGRATION_FUNCS:
|
||||
TABLE_MIGRATION_FUNCS.get(table)(conn, table)
|
@ -1,250 +0,0 @@
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
""" Various HBase helpers
|
||||
"""
|
||||
import copy
|
||||
import datetime
|
||||
import json
|
||||
|
||||
import bson.json_util
|
||||
from happybase.hbase import ttypes
|
||||
from oslo_log import log
|
||||
import six
|
||||
|
||||
from aodh.i18n import _LW
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
OP_SIGN = {'eq': '=', 'lt': '<', 'le': '<=', 'ne': '!=', 'gt': '>', 'ge': '>='}
|
||||
# We need this additional dictionary because we have reverted timestamp in
|
||||
# row-keys for stored metrics
|
||||
OP_SIGN_REV = {'eq': '=', 'lt': '>', 'le': '>=', 'ne': '!=', 'gt': '<',
|
||||
'ge': '<='}
|
||||
|
||||
|
||||
def timestamp(dt, reverse=True):
|
||||
"""Timestamp is count of milliseconds since start of epoch.
|
||||
|
||||
If reverse=True then timestamp will be reversed. Such a technique is used
|
||||
in HBase rowkey design when period queries are required. Because of the
|
||||
fact that rows are sorted lexicographically it's possible to vary whether
|
||||
the 'oldest' entries will be on top of the table or it should be the newest
|
||||
ones (reversed timestamp case).
|
||||
|
||||
:param dt: datetime which is translated to timestamp
|
||||
:param reverse: a boolean parameter for reverse or straight count of
|
||||
timestamp in milliseconds
|
||||
:return: count or reversed count of milliseconds since start of epoch
|
||||
"""
|
||||
epoch = datetime.datetime(1970, 1, 1)
|
||||
td = dt - epoch
|
||||
ts = td.microseconds + td.seconds * 1000000 + td.days * 86400000000
|
||||
return 0x7fffffffffffffff - ts if reverse else ts
|
||||
|
||||
|
||||
def make_timestamp_query(func, start=None, start_op=None, end=None,
|
||||
end_op=None, bounds_only=False, **kwargs):
|
||||
"""Return a filter start and stop row for filtering and a query.
|
||||
|
||||
Query is based on the fact that CF-name is 'rts'.
|
||||
:param start: Optional start timestamp
|
||||
:param start_op: Optional start timestamp operator, like gt, ge
|
||||
:param end: Optional end timestamp
|
||||
:param end_op: Optional end timestamp operator, like lt, le
|
||||
:param bounds_only: if True than query will not be returned
|
||||
:param func: a function that provide a format of row
|
||||
:param kwargs: kwargs for :param func
|
||||
"""
|
||||
# We don't need to dump here because get_start_end_rts returns strings
|
||||
rts_start, rts_end = get_start_end_rts(start, end)
|
||||
start_row, end_row = func(rts_start, rts_end, **kwargs)
|
||||
|
||||
if bounds_only:
|
||||
return start_row, end_row
|
||||
|
||||
q = []
|
||||
start_op = start_op or 'ge'
|
||||
end_op = end_op or 'lt'
|
||||
if rts_start:
|
||||
q.append("SingleColumnValueFilter ('f', 'rts', %s, 'binary:%s')" %
|
||||
(OP_SIGN_REV[start_op], rts_start))
|
||||
if rts_end:
|
||||
q.append("SingleColumnValueFilter ('f', 'rts', %s, 'binary:%s')" %
|
||||
(OP_SIGN_REV[end_op], rts_end))
|
||||
|
||||
res_q = None
|
||||
if len(q):
|
||||
res_q = " AND ".join(q)
|
||||
|
||||
return start_row, end_row, res_q
|
||||
|
||||
|
||||
def get_start_end_rts(start, end):
|
||||
|
||||
rts_start = str(timestamp(start)) if start else ""
|
||||
rts_end = str(timestamp(end)) if end else ""
|
||||
return rts_start, rts_end
|
||||
|
||||
|
||||
def make_query(**kwargs):
|
||||
"""Return a filter query string based on the selected parameters.
|
||||
|
||||
:param kwargs: key-value pairs to filter on. Key should be a real
|
||||
column name in db
|
||||
"""
|
||||
q = []
|
||||
|
||||
# Note: we use extended constructor for SingleColumnValueFilter here.
|
||||
# It is explicitly specified that entry should not be returned if CF is not
|
||||
# found in table.
|
||||
for key, value in sorted(kwargs.items()):
|
||||
if value is not None:
|
||||
if key == 'source':
|
||||
q.append("SingleColumnValueFilter "
|
||||
"('f', 's_%s', =, 'binary:%s', true, true)" %
|
||||
(value, dump('1')))
|
||||
elif key == 'trait_type':
|
||||
q.append("ColumnPrefixFilter('%s')" % value)
|
||||
elif key == 'event_id':
|
||||
q.append("RowFilter ( = , 'regexstring:\d*:%s')" % value)
|
||||
elif key == 'exclude':
|
||||
for k, v in six.iteritems(value):
|
||||
q.append("SingleColumnValueFilter "
|
||||
"('f', '%(k)s', !=, 'binary:%(v)s', true, true)" %
|
||||
{'k': quote(k), 'v': dump(v)})
|
||||
else:
|
||||
q.append("SingleColumnValueFilter "
|
||||
"('f', '%s', =, 'binary:%s', true, true)" %
|
||||
(quote(key), dump(value)))
|
||||
if len(q):
|
||||
return " AND ".join(q)
|
||||
|
||||
|
||||
def make_general_rowkey_scan(rts_start=None, rts_end=None, some_id=None):
|
||||
"""If it's filter on some_id without start and end.
|
||||
|
||||
start_row = some_id while end_row = some_id + MAX_BYTE.
|
||||
"""
|
||||
if some_id is None:
|
||||
return None, None
|
||||
if not rts_start:
|
||||
# NOTE(idegtiarov): Here we could not use chr > 122 because chr >= 123
|
||||
# will be quoted and character will be turn in a composition that is
|
||||
# started with '%' (chr(37)) that lexicographically is less than chr
|
||||
# of number
|
||||
rts_start = chr(122)
|
||||
end_row = prepare_key(some_id, rts_start)
|
||||
start_row = prepare_key(some_id, rts_end)
|
||||
|
||||
return start_row, end_row
|
||||
|
||||
|
||||
def prepare_key(*args):
|
||||
"""Prepares names for rows and columns with correct separator.
|
||||
|
||||
:param args: strings or numbers that we want our key construct of
|
||||
:return: key with quoted args that are separated with character ":"
|
||||
"""
|
||||
key_quote = []
|
||||
for key in args:
|
||||
if isinstance(key, six.integer_types):
|
||||
key = str(key)
|
||||
key_quote.append(quote(key))
|
||||
return ":".join(key_quote)
|
||||
|
||||
|
||||
def deserialize_entry(entry):
|
||||
"""Return a list of flatten results.
|
||||
|
||||
Result contains a dict of simple structures such as 'resource_id':1
|
||||
|
||||
:param entry: entry from HBase, without row name and timestamp
|
||||
"""
|
||||
flatten_result = {}
|
||||
for k, v in entry.items():
|
||||
if ':' in k[2:]:
|
||||
key = tuple([unquote(i) for i in k[2:].split(':')])
|
||||
else:
|
||||
key = unquote(k[2:])
|
||||
flatten_result[key] = load(v)
|
||||
|
||||
return flatten_result
|
||||
|
||||
|
||||
def serialize_entry(data=None, **kwargs):
|
||||
"""Return a dict that is ready to be stored to HBase
|
||||
|
||||
:param data: dict to be serialized
|
||||
:param kwargs: additional args
|
||||
"""
|
||||
data = data or {}
|
||||
entry_dict = copy.copy(data)
|
||||
entry_dict.update(**kwargs)
|
||||
|
||||
result = {}
|
||||
for k, v in entry_dict.items():
|
||||
result['f:' + quote(k, ':')] = dump(v)
|
||||
return result
|
||||
|
||||
|
||||
def dump(data):
|
||||
return json.dumps(data, default=bson.json_util.default)
|
||||
|
||||
|
||||
def load(data):
|
||||
return json.loads(data, object_hook=object_hook)
|
||||
|
||||
|
||||
# We don't want to have tzinfo in decoded json.This object_hook is
|
||||
# overwritten json_util.object_hook for $date
|
||||
def object_hook(dct):
|
||||
if "$date" in dct:
|
||||
dt = bson.json_util.object_hook(dct)
|
||||
return dt.replace(tzinfo=None)
|
||||
return bson.json_util.object_hook(dct)
|
||||
|
||||
|
||||
def create_tables(conn, tables, column_families):
|
||||
for table in tables:
|
||||
try:
|
||||
conn.create_table(table, column_families)
|
||||
except ttypes.AlreadyExists:
|
||||
if conn.table_prefix:
|
||||
table = ("%(table_prefix)s"
|
||||
"%(separator)s"
|
||||
"%(table_name)s" %
|
||||
dict(table_prefix=conn.table_prefix,
|
||||
separator=conn.table_prefix_separator,
|
||||
table_name=table))
|
||||
|
||||
LOG.warning(_LW("Cannot create table %s because "
|
||||
"it already exists. Ignoring error"), table)
|
||||
|
||||
|
||||
def quote(s, *args):
|
||||
"""Return quoted string even if it is unicode one.
|
||||
|
||||
:param s: string that should be quoted
|
||||
:param args: any symbol we want to stay unquoted
|
||||
"""
|
||||
s_en = s.encode('utf8')
|
||||
return six.moves.urllib.parse.quote(s_en, *args)
|
||||
|
||||
|
||||
def unquote(s):
|
||||
"""Return unquoted and decoded string.
|
||||
|
||||
:param s: string that should be unquoted
|
||||
"""
|
||||
s_de = six.moves.urllib.parse.unquote(s)
|
||||
return s_de.decode('utf8')
|
@ -1,189 +0,0 @@
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import datetime
|
||||
import operator
|
||||
|
||||
from oslo_log import log
|
||||
|
||||
import aodh
|
||||
from aodh import storage
|
||||
from aodh.storage import base
|
||||
from aodh.storage.hbase import base as hbase_base
|
||||
from aodh.storage.hbase import migration as hbase_migration
|
||||
from aodh.storage.hbase import utils as hbase_utils
|
||||
from aodh.storage import models
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
AVAILABLE_CAPABILITIES = {
|
||||
'alarms': {'query': {'simple': True,
|
||||
'complex': False},
|
||||
'history': {'query': {'simple': True,
|
||||
'complex': False}}},
|
||||
}
|
||||
|
||||
|
||||
AVAILABLE_STORAGE_CAPABILITIES = {
|
||||
'storage': {'production_ready': True},
|
||||
}
|
||||
|
||||
|
||||
class Connection(hbase_base.Connection, base.Connection):
|
||||
"""Put the alarm data into a HBase database
|
||||
|
||||
Collections:
|
||||
|
||||
- alarm:
|
||||
|
||||
- row_key: uuid of alarm
|
||||
- Column Families:
|
||||
|
||||
f: contains the raw incoming alarm data
|
||||
|
||||
- alarm_h:
|
||||
|
||||
- row_key: uuid of alarm + ":" + reversed timestamp
|
||||
- Column Families:
|
||||
|
||||
f: raw incoming alarm_history data. Timestamp becomes now()
|
||||
if not determined
|
||||
"""
|
||||
|
||||
CAPABILITIES = base.update_nested(base.Connection.CAPABILITIES,
|
||||
AVAILABLE_CAPABILITIES)
|
||||
STORAGE_CAPABILITIES = base.update_nested(
|
||||
base.Connection.STORAGE_CAPABILITIES,
|
||||
AVAILABLE_STORAGE_CAPABILITIES,
|
||||
)
|
||||
_memory_instance = None
|
||||
|
||||
ALARM_TABLE = "alarm"
|
||||
ALARM_HISTORY_TABLE = "alarm_h"
|
||||
|
||||
def upgrade(self):
|
||||
tables = [self.ALARM_HISTORY_TABLE, self.ALARM_TABLE]
|
||||
column_families = {'f': dict()}
|
||||
with self.conn_pool.connection() as conn:
|
||||
hbase_utils.create_tables(conn, tables, column_families)
|
||||
hbase_migration.migrate_tables(conn, tables)
|
||||
|
||||
def clear(self):
|
||||
LOG.debug('Dropping HBase schema...')
|
||||
with self.conn_pool.connection() as conn:
|
||||
for table in [self.ALARM_TABLE,
|
||||
self.ALARM_HISTORY_TABLE]:
|
||||
try:
|
||||
conn.disable_table(table)
|
||||
except Exception:
|
||||
LOG.debug('Cannot disable table but ignoring error')
|
||||
try:
|
||||
conn.delete_table(table)
|
||||
except Exception:
|
||||
LOG.debug('Cannot delete table but ignoring error')
|
||||
|
||||
def update_alarm(self, alarm, upsert=False):
|
||||
"""Create an alarm.
|
||||
|
||||
:param alarm: The alarm to create. It is Alarm object, so we need to
|
||||
call as_dict()
|
||||
"""
|
||||
_id = alarm.alarm_id
|
||||
alarm_to_store = hbase_utils.serialize_entry(alarm.as_dict())
|
||||
with self.conn_pool.connection() as conn:
|
||||
alarm_table = conn.table(self.ALARM_TABLE)
|
||||
if not upsert:
|
||||
q = hbase_utils.make_query(alarm_id=alarm.alarm_id)
|
||||
query_alarm = alarm_table.scan(filter=q)
|
||||
if len(list(query_alarm)) == 0:
|
||||
raise storage.AlarmNotFound(alarm.alarm_id)
|
||||
alarm_table.put(_id, alarm_to_store)
|
||||
stored_alarm = hbase_utils.deserialize_entry(
|
||||
alarm_table.row(_id))
|
||||
return models.Alarm(**stored_alarm)
|
||||
|
||||
def create_alarm(self, alarm):
|
||||
return self.update_alarm(alarm, upsert=True)
|
||||
|
||||
def delete_alarm(self, alarm_id):
|
||||
"""Delete an alarm and its history data."""
|
||||
with self.conn_pool.connection() as conn:
|
||||
alarm_table = conn.table(self.ALARM_TABLE)
|
||||
alarm_table.delete(alarm_id)
|
||||
q = hbase_utils.make_query(alarm_id=alarm_id)
|
||||
alarm_history_table = conn.table(self.ALARM_HISTORY_TABLE)
|
||||
for alarm_id, ignored in alarm_history_table.scan(filter=q):
|
||||
alarm_history_table.delete(alarm_id)
|
||||
|
||||
def get_alarms(self, name=None, user=None, state=None, meter=None,
|
||||
project=None, enabled=None, alarm_id=None,
|
||||
alarm_type=None, severity=None, exclude=None,
|
||||
pagination=None):
|
||||
if pagination:
|
||||
raise aodh.NotImplementedError('Pagination query not implemented')
|
||||
if meter:
|
||||
raise aodh.NotImplementedError(
|
||||
'Filter by meter not implemented')
|
||||
|
||||
q = hbase_utils.make_query(alarm_id=alarm_id, name=name,
|
||||
enabled=enabled, user_id=user,
|
||||
project_id=project, state=state,
|
||||
type=alarm_type, severity=severity,
|
||||
exclude=exclude)
|
||||
|
||||
with self.conn_pool.connection() as conn:
|
||||
alarm_table = conn.table(self.ALARM_TABLE)
|
||||
gen = alarm_table.scan(filter=q)
|
||||
alarms = [hbase_utils.deserialize_entry(data)
|
||||
for ignored, data in gen]
|
||||
for alarm in sorted(
|
||||
alarms,
|
||||
key=operator.itemgetter('timestamp'),
|
||||
reverse=True):
|
||||
yield models.Alarm(**alarm)
|
||||
|
||||
def get_alarm_changes(self, alarm_id, on_behalf_of,
|
||||
user=None, project=None, alarm_type=None,
|
||||
severity=None, start_timestamp=None,
|
||||
start_timestamp_op=None, end_timestamp=None,
|
||||
end_timestamp_op=None, pagination=None):
|
||||
if pagination:
|
||||
raise aodh.NotImplementedError('Pagination query not implemented')
|
||||
q = hbase_utils.make_query(alarm_id=alarm_id,
|
||||
on_behalf_of=on_behalf_of, type=alarm_type,
|
||||
user_id=user, project_id=project,
|
||||
severity=severity)
|
||||
start_row, end_row = hbase_utils.make_timestamp_query(
|
||||
hbase_utils.make_general_rowkey_scan,
|
||||
start=start_timestamp, start_op=start_timestamp_op,
|
||||
end=end_timestamp, end_op=end_timestamp_op, bounds_only=True,
|
||||
some_id=alarm_id)
|
||||
with self.conn_pool.connection() as conn:
|
||||
alarm_history_table = conn.table(self.ALARM_HISTORY_TABLE)
|
||||
gen = alarm_history_table.scan(filter=q, row_start=start_row,
|
||||
row_stop=end_row)
|
||||
for ignored, data in gen:
|
||||
stored_entry = hbase_utils.deserialize_entry(data)
|
||||
yield models.AlarmChange(**stored_entry)
|
||||
|
||||
def record_alarm_change(self, alarm_change):
|
||||
"""Record alarm change event."""
|
||||
alarm_change_dict = hbase_utils.serialize_entry(alarm_change)
|
||||
ts = alarm_change.get('timestamp') or datetime.datetime.now()
|
||||
rts = hbase_utils.timestamp(ts)
|
||||
with self.conn_pool.connection() as conn:
|
||||
alarm_history_table = conn.table(self.ALARM_HISTORY_TABLE)
|
||||
alarm_history_table.put(
|
||||
hbase_utils.prepare_key(alarm_change.get('alarm_id'), rts),
|
||||
alarm_change_dict)
|
@ -1,106 +0,0 @@
|
||||
#
|
||||
# Copyright 2012 New Dream Network, LLC (DreamHost)
|
||||
# Copyright 2013 eNovance
|
||||
# Copyright 2014-2015 Red Hat, Inc
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
"""MongoDB storage backend"""
|
||||
|
||||
from oslo_log import log
|
||||
import pymongo
|
||||
|
||||
from aodh import storage
|
||||
from aodh.storage.mongo import utils as pymongo_utils
|
||||
from aodh.storage import pymongo_base
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class Connection(pymongo_base.Connection):
|
||||
"""Put the alarm data into a MongoDB database."""
|
||||
|
||||
CONNECTION_POOL = pymongo_utils.ConnectionPool()
|
||||
|
||||
def __init__(self, conf, url):
|
||||
self.conf = conf
|
||||
# NOTE(jd) Use our own connection pooling on top of the Pymongo one./
|
||||
# We need that otherwise we overflow the MongoDB instance with new
|
||||
# connection since we instantiate a Pymongo client each time someone
|
||||
# requires a new storage connection.
|
||||
self.conn = self.CONNECTION_POOL.connect(
|
||||
url,
|
||||
conf.database.max_retries,
|
||||
conf.database.retry_interval)
|
||||
|
||||
# Require MongoDB 2.4 to use $setOnInsert
|
||||
if self.conn.server_info()['versionArray'] < [2, 4]:
|
||||
raise storage.StorageBadVersion("Need at least MongoDB 2.4")
|
||||
|
||||
connection_options = pymongo.uri_parser.parse_uri(url)
|
||||
self.db = getattr(self.conn, connection_options['database'])
|
||||
if connection_options.get('username'):
|
||||
self.db.authenticate(connection_options['username'],
|
||||
connection_options['password'])
|
||||
|
||||
# NOTE(jd) Upgrading is just about creating index, so let's do this
|
||||
# on connection to be sure at least the TTL is correctly updated if
|
||||
# needed.
|
||||
self.upgrade()
|
||||
|
||||
@staticmethod
|
||||
def update_ttl(ttl, ttl_index_name, index_field, coll):
|
||||
"""Update or ensure time_to_live indexes.
|
||||
|
||||
:param ttl: time to live in seconds.
|
||||
:param ttl_index_name: name of the index we want to update or ensure.
|
||||
:param index_field: field with the index that we need to update.
|
||||
:param coll: collection which indexes need to be updated.
|
||||
"""
|
||||
indexes = coll.index_information()
|
||||
if ttl <= 0:
|
||||
if ttl_index_name in indexes:
|
||||
coll.drop_index(ttl_index_name)
|
||||
return
|
||||
|
||||
if ttl_index_name in indexes:
|
||||
return coll.database.command(
|
||||
'collMod', coll.name,
|
||||
index={'keyPattern': {index_field: pymongo.ASCENDING},
|
||||
'expireAfterSeconds': ttl})
|
||||
|
||||
coll.ensure_index([(index_field, pymongo.ASCENDING)],
|
||||
expireAfterSeconds=ttl,
|
||||
name=ttl_index_name)
|
||||
|
||||
def upgrade(self):
|
||||
super(Connection, self).upgrade()
|
||||
# Establish indexes
|
||||
ttl = self.conf.database.alarm_history_time_to_live
|
||||
self.update_ttl(
|
||||
ttl, 'alarm_history_ttl', 'timestamp', self.db.alarm_history)
|
||||
|
||||
def clear(self):
|
||||
self.conn.drop_database(self.db.name)
|
||||
# Connection will be reopened automatically if needed
|
||||
self.conn.close()
|
||||
|
||||
def clear_expired_alarm_history_data(self, alarm_history_ttl):
|
||||
"""Clear expired alarm history data from the backend storage system.
|
||||
|
||||
Clearing occurs according to the time-to-live.
|
||||
|
||||
:param alarm_history_ttl: Number of seconds to keep alarm history
|
||||
records for.
|
||||
"""
|
||||
LOG.debug("Clearing expired alarm history data is based on native "
|
||||
"MongoDB time to live feature and going in background.")
|
@ -30,6 +30,7 @@ from sqlalchemy import desc
|
||||
from sqlalchemy import func
|
||||
from sqlalchemy.orm import exc
|
||||
|
||||
import aodh
|
||||
from aodh.i18n import _LI
|
||||
from aodh import storage
|
||||
from aodh.storage import base
|
||||
@ -149,7 +150,7 @@ class Connection(base.Connection):
|
||||
return (self._row_to_alarm_model(x) for x in query.all())
|
||||
|
||||
@staticmethod
|
||||
def _get_pagination_query(query, pagination, api_model, model):
|
||||
def _get_pagination_query(session, query, pagination, api_model, model):
|
||||
if not pagination.get('sort'):
|
||||
pagination['sort'] = api_model.DEFAULT_SORT
|
||||
marker = None
|
||||
@ -168,6 +169,9 @@ class Connection(base.Connection):
|
||||
# order when "severity" specified in sorts.
|
||||
for sort_key, sort_dir in pagination['sort'][::-1]:
|
||||
if sort_key == 'severity':
|
||||
engine = session.connection()
|
||||
if engine.dialect.name != "mysql":
|
||||
raise aodh.NotImplementedError
|
||||
sort_dir_func = {'asc': asc, 'desc': desc}[sort_dir]
|
||||
query = query.order_by(sort_dir_func(
|
||||
func.field(getattr(model, sort_key), 'low',
|
||||
@ -222,7 +226,7 @@ class Connection(base.Connection):
|
||||
query = query.filter(getattr(models.Alarm, key) != value)
|
||||
|
||||
query = self._get_pagination_query(
|
||||
query, pagination, alarm_api_models.Alarm, models.Alarm)
|
||||
session, query, pagination, alarm_api_models.Alarm, models.Alarm)
|
||||
alarms = self._retrieve_alarms(query)
|
||||
|
||||
# TODO(cmart): improve this by using sqlalchemy.func factory
|
||||
@ -360,7 +364,7 @@ class Connection(base.Connection):
|
||||
models.AlarmChange.timestamp < end_timestamp)
|
||||
|
||||
query = self._get_pagination_query(
|
||||
query, pagination, alarm_api_models.AlarmChange,
|
||||
session, query, pagination, alarm_api_models.AlarmChange,
|
||||
models.AlarmChange)
|
||||
return self._retrieve_alarm_history(query)
|
||||
|
||||
|
@ -1,297 +0,0 @@
|
||||
#
|
||||
# Copyright Ericsson AB 2013. All rights reserved
|
||||
# Copyright 2015 Red Hat, Inc
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
"""Common functions for MongoDB backend
|
||||
"""
|
||||
|
||||
import weakref
|
||||
|
||||
from oslo_log import log
|
||||
from oslo_utils import netutils
|
||||
import pymongo
|
||||
import retrying
|
||||
|
||||
from aodh.i18n import _LI, _LW
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
def make_timestamp_range(start, end,
|
||||
start_timestamp_op=None, end_timestamp_op=None):
|
||||
|
||||
"""Create the query document to find timestamps within that range.
|
||||
|
||||
This is done by given two possible datetimes and their operations.
|
||||
By default, using $gte for the lower bound and $lt for the upper bound.
|
||||
"""
|
||||
ts_range = {}
|
||||
|
||||
if start:
|
||||
if start_timestamp_op == 'gt':
|
||||
start_timestamp_op = '$gt'
|
||||
else:
|
||||
start_timestamp_op = '$gte'
|
||||
ts_range[start_timestamp_op] = start
|
||||
|
||||
if end:
|
||||
if end_timestamp_op == 'le':
|
||||
end_timestamp_op = '$lte'
|
||||
else:
|
||||
end_timestamp_op = '$lt'
|
||||
ts_range[end_timestamp_op] = end
|
||||
return ts_range
|
||||
|
||||
|
||||
class ConnectionPool(object):
|
||||
|
||||
def __init__(self):
|
||||
self._pool = {}
|
||||
|
||||
def connect(self, url, max_retries, retry_interval):
|
||||
connection_options = pymongo.uri_parser.parse_uri(url)
|
||||
del connection_options['database']
|
||||
del connection_options['username']
|
||||
del connection_options['password']
|
||||
del connection_options['collection']
|
||||
pool_key = tuple(connection_options)
|
||||
|
||||
if pool_key in self._pool:
|
||||
client = self._pool.get(pool_key)()
|
||||
if client:
|
||||
return client
|
||||
splitted_url = netutils.urlsplit(url)
|
||||
log_data = {'db': splitted_url.scheme,
|
||||
'nodelist': connection_options['nodelist']}
|
||||
LOG.info(_LI('Connecting to %(db)s on %(nodelist)s'), log_data)
|
||||
try:
|
||||
client = MongoProxy(
|
||||
pymongo.MongoClient(url),
|
||||
max_retries,
|
||||
retry_interval,
|
||||
)
|
||||
except pymongo.errors.ConnectionFailure as e:
|
||||
LOG.warning(_LW('Unable to connect to the database server: '
|
||||
'%(errmsg)s.'), {'errmsg': e})
|
||||
raise
|
||||
self._pool[pool_key] = weakref.ref(client)
|
||||
return client
|
||||
|
||||
|
||||
class QueryTransformer(object):
|
||||
|
||||
operators = {"<": "$lt",
|
||||
">": "$gt",
|
||||
"<=": "$lte",
|
||||
"=<": "$lte",
|
||||
">=": "$gte",
|
||||
"=>": "$gte",
|
||||
"!=": "$ne",
|
||||
"in": "$in",
|
||||
"=~": "$regex"}
|
||||
|
||||
complex_operators = {"or": "$or",
|
||||
"and": "$and"}
|
||||
|
||||
ordering_functions = {"asc": pymongo.ASCENDING,
|
||||
"desc": pymongo.DESCENDING}
|
||||
|
||||
def transform_orderby(self, orderby):
|
||||
orderby_filter = []
|
||||
|
||||
for field in orderby:
|
||||
field_name = list(field.keys())[0]
|
||||
ordering = self.ordering_functions[list(field.values())[0]]
|
||||
orderby_filter.append((field_name, ordering))
|
||||
return orderby_filter
|
||||
|
||||
@staticmethod
|
||||
def _move_negation_to_leaf(condition):
|
||||
"""Moves every not operator to the leafs.
|
||||
|
||||
Moving is going by applying the De Morgan rules and annihilating
|
||||
double negations.
|
||||
"""
|
||||
def _apply_de_morgan(tree, negated_subtree, negated_op):
|
||||
if negated_op == "and":
|
||||
new_op = "or"
|
||||
else:
|
||||
new_op = "and"
|
||||
|
||||
tree[new_op] = [{"not": child}
|
||||
for child in negated_subtree[negated_op]]
|
||||
del tree["not"]
|
||||
|
||||
def transform(subtree):
|
||||
op = list(subtree.keys())[0]
|
||||
if op in ["and", "or"]:
|
||||
[transform(child) for child in subtree[op]]
|
||||
elif op == "not":
|
||||
negated_tree = subtree[op]
|
||||
negated_op = list(negated_tree.keys())[0]
|
||||
if negated_op == "and":
|
||||
_apply_de_morgan(subtree, negated_tree, negated_op)
|
||||
transform(subtree)
|
||||
elif negated_op == "or":
|
||||
_apply_de_morgan(subtree, negated_tree, negated_op)
|
||||
transform(subtree)
|
||||
elif negated_op == "not":
|
||||
# two consecutive not annihilates themselves
|
||||
value = list(negated_tree.values())[0]
|
||||
new_op = list(value.keys())[0]
|
||||
subtree[new_op] = negated_tree[negated_op][new_op]
|
||||
del subtree["not"]
|
||||
transform(subtree)
|
||||
|
||||
transform(condition)
|
||||
|
||||
def transform_filter(self, condition):
|
||||
# in Mongo not operator can only be applied to
|
||||
# simple expressions so we have to move every
|
||||
# not operator to the leafs of the expression tree
|
||||
self._move_negation_to_leaf(condition)
|
||||
return self._process_json_tree(condition)
|
||||
|
||||
def _handle_complex_op(self, complex_op, nodes):
|
||||
element_list = []
|
||||
for node in nodes:
|
||||
element = self._process_json_tree(node)
|
||||
element_list.append(element)
|
||||
complex_operator = self.complex_operators[complex_op]
|
||||
op = {complex_operator: element_list}
|
||||
return op
|
||||
|
||||
def _handle_not_op(self, negated_tree):
|
||||
# assumes that not is moved to the leaf already
|
||||
# so we are next to a leaf
|
||||
negated_op = list(negated_tree.keys())[0]
|
||||
negated_field = list(negated_tree[negated_op].keys())[0]
|
||||
value = negated_tree[negated_op][negated_field]
|
||||
if negated_op == "=":
|
||||
return {negated_field: {"$ne": value}}
|
||||
elif negated_op == "!=":
|
||||
return {negated_field: value}
|
||||
else:
|
||||
return {negated_field: {"$not":
|
||||
{self.operators[negated_op]: value}}}
|
||||
|
||||
def _handle_simple_op(self, simple_op, nodes):
|
||||
field_name = list(nodes.keys())[0]
|
||||
field_value = list(nodes.values())[0]
|
||||
|
||||
# no operator for equal in Mongo
|
||||
if simple_op == "=":
|
||||
op = {field_name: field_value}
|
||||
return op
|
||||
|
||||
operator = self.operators[simple_op]
|
||||
op = {field_name: {operator: field_value}}
|
||||
return op
|
||||
|
||||
def _process_json_tree(self, condition_tree):
|
||||
operator_node = list(condition_tree.keys())[0]
|
||||
nodes = list(condition_tree.values())[0]
|
||||
|
||||
if operator_node in self.complex_operators:
|
||||
return self._handle_complex_op(operator_node, nodes)
|
||||
|
||||
if operator_node == "not":
|
||||
negated_tree = condition_tree[operator_node]
|
||||
return self._handle_not_op(negated_tree)
|
||||
|
||||
return self._handle_simple_op(operator_node, nodes)
|
||||
|
||||
MONGO_METHODS = set([typ for typ in dir(pymongo.collection.Collection)
|
||||
if not typ.startswith('_')])
|
||||
MONGO_METHODS.update(set([typ for typ in dir(pymongo.MongoClient)
|
||||
if not typ.startswith('_')]))
|
||||
MONGO_METHODS.update(set([typ for typ in dir(pymongo)
|
||||
if not typ.startswith('_')]))
|
||||
|
||||
|
||||
def _safe_mongo_call(max_retries, retry_interval):
|
||||
return retrying.retry(
|
||||
retry_on_exception=lambda e: isinstance(
|
||||
e, pymongo.errors.AutoReconnect),
|
||||
wait_fixed=retry_interval * 1000,
|
||||
stop_max_attempt_number=max_retries if max_retries >= 0 else None
|
||||
)
|
||||
|
||||
|
||||
class MongoProxy(object):
|
||||
def __init__(self, conn, max_retries, retry_interval):
|
||||
self.conn = conn
|
||||
self.max_retries = max_retries
|
||||
self.retry_interval = retry_interval
|
||||
|
||||
def __getitem__(self, item):
|
||||
"""Create and return proxy around the method in the connection.
|
||||
|
||||
:param item: name of the connection
|
||||
"""
|
||||
return MongoProxy(self.conn[item])
|
||||
|
||||
def find(self, *args, **kwargs):
|
||||
# We need this modifying method to return a CursorProxy object so that
|
||||
# we can handle the Cursor next function to catch the AutoReconnect
|
||||
# exception.
|
||||
return CursorProxy(self.conn.find(*args, **kwargs),
|
||||
self.max_retries,
|
||||
self.retry_interval)
|
||||
|
||||
def __getattr__(self, item):
|
||||
"""Wrap MongoDB connection.
|
||||
|
||||
If item is the name of an executable method, for example find or
|
||||
insert, wrap this method to retry.
|
||||
Else wrap getting attribute with MongoProxy.
|
||||
"""
|
||||
if item in ('name', 'database'):
|
||||
return getattr(self.conn, item)
|
||||
if item in MONGO_METHODS:
|
||||
return _safe_mongo_call(
|
||||
self.max_retries,
|
||||
self.retry_interval,
|
||||
)(getattr(self.conn, item))
|
||||
return MongoProxy(getattr(self.conn, item),
|
||||
self.max_retries,
|
||||
self.retry_interval)
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
return self.conn(*args, **kwargs)
|
||||
|
||||
|
||||
class CursorProxy(pymongo.cursor.Cursor):
|
||||
def __init__(self, cursor, max_retries, retry_interval):
|
||||
self.cursor = cursor
|
||||
self.next = _safe_mongo_call(
|
||||
max_retries, retry_interval)(self._next)
|
||||
|
||||
def __getitem__(self, item):
|
||||
return self.cursor[item]
|
||||
|
||||
def _next(self):
|
||||
"""Wrap Cursor next method.
|
||||
|
||||
This method will be executed before each Cursor next method call.
|
||||
"""
|
||||
try:
|
||||
save_cursor = self.cursor.clone()
|
||||
return self.cursor.next()
|
||||
except pymongo.errors.AutoReconnect:
|
||||
self.cursor = save_cursor
|
||||
raise
|
||||
|
||||
def __getattr__(self, item):
|
||||
return getattr(self.cursor, item)
|
@ -1,317 +0,0 @@
|
||||
#
|
||||
# Copyright Ericsson AB 2013. All rights reserved
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
"""Common functions for MongoDB backend
|
||||
"""
|
||||
import pymongo
|
||||
import six
|
||||
|
||||
import aodh
|
||||
from aodh import storage
|
||||
from aodh.storage import base
|
||||
from aodh.storage import models
|
||||
from aodh.storage.mongo import utils as pymongo_utils
|
||||
|
||||
|
||||
COMMON_AVAILABLE_CAPABILITIES = {
|
||||
'alarms': {'query': {'simple': True,
|
||||
'complex': True},
|
||||
'history': {'query': {'simple': True,
|
||||
'complex': True}}},
|
||||
}
|
||||
|
||||
|
||||
AVAILABLE_STORAGE_CAPABILITIES = {
|
||||
'storage': {'production_ready': True},
|
||||
}
|
||||
|
||||
|
||||
class Connection(base.Connection):
|
||||
"""Base Alarm Connection class for MongoDB driver."""
|
||||
CAPABILITIES = base.update_nested(base.Connection.CAPABILITIES,
|
||||
COMMON_AVAILABLE_CAPABILITIES)
|
||||
|
||||
STORAGE_CAPABILITIES = base.update_nested(
|
||||
base.Connection.STORAGE_CAPABILITIES,
|
||||
AVAILABLE_STORAGE_CAPABILITIES,
|
||||
)
|
||||
|
||||
def upgrade(self):
|
||||
# create collection if not present
|
||||
if 'alarm' not in self.db.conn.collection_names():
|
||||
self.db.conn.create_collection('alarm')
|
||||
if 'alarm_history' not in self.db.conn.collection_names():
|
||||
self.db.conn.create_collection('alarm_history')
|
||||
|
||||
def update_alarm(self, alarm, upsert=False):
|
||||
"""Update alarm."""
|
||||
data = alarm.as_dict()
|
||||
|
||||
self.db.alarm.update(
|
||||
{'alarm_id': alarm.alarm_id},
|
||||
{'$set': data},
|
||||
upsert=upsert)
|
||||
|
||||
alarms_found = self.db.alarm.find({'alarm_id': alarm.alarm_id})
|
||||
if alarms_found.count() == 0:
|
||||
raise storage.AlarmNotFound(alarm.alarm_id)
|
||||
stored_alarm = alarms_found[0]
|
||||
del stored_alarm['_id']
|
||||
self._ensure_encapsulated_rule_format(stored_alarm)
|
||||
self._ensure_time_constraints(stored_alarm)
|
||||
return models.Alarm(**stored_alarm)
|
||||
|
||||
def create_alarm(self, alarm):
|
||||
return self.update_alarm(alarm, upsert=True)
|
||||
|
||||
def delete_alarm(self, alarm_id):
|
||||
"""Delete an alarm and its history data."""
|
||||
self.db.alarm.remove({'alarm_id': alarm_id})
|
||||
self.db.alarm_history.remove({'alarm_id': alarm_id})
|
||||
|
||||
def record_alarm_change(self, alarm_change):
|
||||
"""Record alarm change event."""
|
||||
self.db.alarm_history.insert(alarm_change.copy())
|
||||
|
||||
def get_alarms(self, name=None, user=None, state=None, meter=None,
|
||||
project=None, enabled=None, alarm_id=None,
|
||||
alarm_type=None, severity=None, exclude=None,
|
||||
pagination=None):
|
||||
"""Yields a lists of alarms that match filters.
|
||||
|
||||
:param name: Optional name for alarm.
|
||||
:param user: Optional ID for user that owns the resource.
|
||||
:param state: Optional string for alarm state.
|
||||
:param meter: Optional string for alarms associated with meter.
|
||||
:param project: Optional ID for project that owns the resource.
|
||||
:param enabled: Optional boolean to list disable alarm.
|
||||
:param alarm_id: Optional alarm_id to return one alarm.
|
||||
:param alarm_type: Optional alarm type.
|
||||
:param severity: Optional alarm severity.
|
||||
:param exclude: Optional dict for inequality constraint.
|
||||
:param pagination: Pagination query parameters.
|
||||
"""
|
||||
if pagination:
|
||||
raise aodh.NotImplementedError('Pagination query not implemented')
|
||||
q = {}
|
||||
if user is not None:
|
||||
q['user_id'] = user
|
||||
if project is not None:
|
||||
q['project_id'] = project
|
||||
if name is not None:
|
||||
q['name'] = name
|
||||
if enabled is not None:
|
||||
q['enabled'] = enabled
|
||||
if alarm_id is not None:
|
||||
q['alarm_id'] = alarm_id
|
||||
if state is not None:
|
||||
q['state'] = state
|
||||
if meter is not None:
|
||||
q['rule.meter_name'] = meter
|
||||
if alarm_type is not None:
|
||||
q['type'] = alarm_type
|
||||
if severity is not None:
|
||||
q['severity'] = severity
|
||||
if exclude is not None:
|
||||
for key, value in six.iteritems(exclude):
|
||||
q[key] = {'$ne': value}
|
||||
|
||||
return self._retrieve_alarms(q,
|
||||
[("timestamp",
|
||||
pymongo.DESCENDING)],
|
||||
None)
|
||||
|
||||
def get_alarm_changes(self, alarm_id, on_behalf_of,
|
||||
user=None, project=None, alarm_type=None,
|
||||
severity=None, start_timestamp=None,
|
||||
start_timestamp_op=None, end_timestamp=None,
|
||||
end_timestamp_op=None, pagination=None):
|
||||
"""Yields list of AlarmChanges describing alarm history
|
||||
|
||||
Changes are always sorted in reverse order of occurrence, given
|
||||
the importance of currency.
|
||||
|
||||
Segregation for non-administrative users is done on the basis
|
||||
of the on_behalf_of parameter. This allows such users to have
|
||||
visibility on both the changes initiated by themselves directly
|
||||
(generally creation, rule changes, or deletion) and also on those
|
||||
changes initiated on their behalf by the alarming service (state
|
||||
transitions after alarm thresholds are crossed).
|
||||
|
||||
:param alarm_id: ID of alarm to return changes for
|
||||
:param on_behalf_of: ID of tenant to scope changes query (None for
|
||||
administrative user, indicating all projects)
|
||||
:param user: Optional ID of user to return changes for
|
||||
:param project: Optional ID of project to return changes for
|
||||
:param alarm_type: Optional change type
|
||||
:param severity: Optional change severity
|
||||
:param start_timestamp: Optional modified timestamp start range
|
||||
:param start_timestamp_op: Optional timestamp start range operation
|
||||
:param end_timestamp: Optional modified timestamp end range
|
||||
:param end_timestamp_op: Optional timestamp end range operation
|
||||
:param pagination: Pagination query parameters.
|
||||
"""
|
||||
if pagination:
|
||||
raise aodh.NotImplementedError('Pagination query not implemented')
|
||||
q = dict(alarm_id=alarm_id)
|
||||
if on_behalf_of is not None:
|
||||
q['on_behalf_of'] = on_behalf_of
|
||||
if user is not None:
|
||||
q['user_id'] = user
|
||||
if project is not None:
|
||||
q['project_id'] = project
|
||||
if alarm_type is not None:
|
||||
q['type'] = alarm_type
|
||||
if severity is not None:
|
||||
q['severity'] = severity
|
||||
if start_timestamp or end_timestamp:
|
||||
ts_range = pymongo_utils.make_timestamp_range(start_timestamp,
|
||||
end_timestamp,
|
||||
start_timestamp_op,
|
||||
end_timestamp_op)
|
||||
if ts_range:
|
||||
q['timestamp'] = ts_range
|
||||
|
||||
return self._retrieve_alarm_changes(q,
|
||||
[("timestamp",
|
||||
pymongo.DESCENDING)],
|
||||
None)
|
||||
|
||||
def query_alarms(self, filter_expr=None, orderby=None, limit=None):
|
||||
"""Return an iterable of model.Alarm objects."""
|
||||
return self._retrieve_data(filter_expr, orderby, limit,
|
||||
models.Alarm)
|
||||
|
||||
def query_alarm_history(self, filter_expr=None, orderby=None, limit=None):
|
||||
"""Return an iterable of model.AlarmChange objects."""
|
||||
return self._retrieve_data(filter_expr,
|
||||
orderby,
|
||||
limit,
|
||||
models.AlarmChange)
|
||||
|
||||
def _retrieve_data(self, filter_expr, orderby, limit, model):
|
||||
if limit == 0:
|
||||
return []
|
||||
query_filter = {}
|
||||
orderby_filter = [("timestamp", pymongo.DESCENDING)]
|
||||
transformer = pymongo_utils.QueryTransformer()
|
||||
if orderby is not None:
|
||||
orderby_filter = transformer.transform_orderby(orderby)
|
||||
if filter_expr is not None:
|
||||
query_filter = transformer.transform_filter(filter_expr)
|
||||
|
||||
retrieve = {models.Alarm: self._retrieve_alarms,
|
||||
models.AlarmChange: self._retrieve_alarm_changes}
|
||||
return retrieve[model](query_filter, orderby_filter, limit)
|
||||
|
||||
def _retrieve_alarms(self, query_filter, orderby, limit):
|
||||
if limit is not None:
|
||||
alarms = self.db.alarm.find(query_filter,
|
||||
limit=limit,
|
||||
sort=orderby)
|
||||
else:
|
||||
alarms = self.db.alarm.find(query_filter, sort=orderby)
|
||||
|
||||
for alarm in alarms:
|
||||
a = {}
|
||||
a.update(alarm)
|
||||
del a['_id']
|
||||
self._ensure_encapsulated_rule_format(a)
|
||||
self._ensure_time_constraints(a)
|
||||
yield models.Alarm(**a)
|
||||
|
||||
def _retrieve_alarm_changes(self, query_filter, orderby, limit):
|
||||
if limit is not None:
|
||||
alarms_history = self.db.alarm_history.find(query_filter,
|
||||
limit=limit,
|
||||
sort=orderby)
|
||||
else:
|
||||
alarms_history = self.db.alarm_history.find(
|
||||
query_filter, sort=orderby)
|
||||
|
||||
for alarm_history in alarms_history:
|
||||
ah = {}
|
||||
ah.update(alarm_history)
|
||||
del ah['_id']
|
||||
yield models.AlarmChange(**ah)
|
||||
|
||||
@classmethod
|
||||
def _ensure_encapsulated_rule_format(cls, alarm):
|
||||
"""Ensure the alarm returned by the storage have the correct format.
|
||||
|
||||
The previous format looks like:
|
||||
{
|
||||
'alarm_id': '0ld-4l3rt',
|
||||
'enabled': True,
|
||||
'name': 'old-alert',
|
||||
'description': 'old-alert',
|
||||
'timestamp': None,
|
||||
'meter_name': 'cpu',
|
||||
'user_id': 'me',
|
||||
'project_id': 'and-da-boys',
|
||||
'comparison_operator': 'lt',
|
||||
'threshold': 36,
|
||||
'statistic': 'count',
|
||||
'evaluation_periods': 1,
|
||||
'period': 60,
|
||||
'state': "insufficient data",
|
||||
'state_timestamp': None,
|
||||
'ok_actions': [],
|
||||
'alarm_actions': ['http://nowhere/alarms'],
|
||||
'insufficient_data_actions': [],
|
||||
'repeat_actions': False,
|
||||
'matching_metadata': {'key': 'value'}
|
||||
# or 'matching_metadata': [{'key': 'key', 'value': 'value'}]
|
||||
}
|
||||
"""
|
||||
|
||||
if isinstance(alarm.get('rule'), dict):
|
||||
return
|
||||
|
||||
alarm['type'] = 'threshold'
|
||||
alarm['rule'] = {}
|
||||
alarm['matching_metadata'] = cls._decode_matching_metadata(
|
||||
alarm['matching_metadata'])
|
||||
for field in ['period', 'evaluation_periods', 'threshold',
|
||||
'statistic', 'comparison_operator', 'meter_name']:
|
||||
if field in alarm:
|
||||
alarm['rule'][field] = alarm[field]
|
||||
del alarm[field]
|
||||
|
||||
query = []
|
||||
for key in alarm['matching_metadata']:
|
||||
query.append({'field': key,
|
||||
'op': 'eq',
|
||||
'value': alarm['matching_metadata'][key],
|
||||
'type': 'string'})
|
||||
del alarm['matching_metadata']
|
||||
alarm['rule']['query'] = query
|
||||
|
||||
@staticmethod
|
||||
def _decode_matching_metadata(matching_metadata):
|
||||
if isinstance(matching_metadata, dict):
|
||||
# note(sileht): keep compatibility with alarm
|
||||
# with matching_metadata as a dict
|
||||
return matching_metadata
|
||||
else:
|
||||
new_matching_metadata = {}
|
||||
for elem in matching_metadata:
|
||||
new_matching_metadata[elem['key']] = elem['value']
|
||||
return new_matching_metadata
|
||||
|
||||
@staticmethod
|
||||
def _ensure_time_constraints(alarm):
|
||||
"""Ensures the alarm has a time constraints field."""
|
||||
if 'time_constraints' not in alarm:
|
||||
alarm['time_constraints'] = []
|
@ -30,7 +30,6 @@ from aodh import messaging
|
||||
from aodh.storage import models
|
||||
from aodh.tests import constants
|
||||
from aodh.tests.functional.api import v2
|
||||
from aodh.tests.functional import db as tests_db
|
||||
|
||||
|
||||
def default_alarms(auth_headers):
|
||||
@ -1365,7 +1364,7 @@ class TestAlarms(TestAlarmsBase):
|
||||
self.assertEqual(1, len(alarms))
|
||||
|
||||
# FIXME(sileht): This should really returns [] not None
|
||||
# but the mongodb and sql just store the json dict as is...
|
||||
# but SQL just stores the json dict as is...
|
||||
# migration script for sql will be a mess because we have
|
||||
# to parse all JSON :(
|
||||
# I guess we assume that wsme convert the None input to []
|
||||
@ -3300,7 +3299,6 @@ class TestAlarmsCompositeRule(TestAlarmsBase):
|
||||
response.json['error_message']['faultstring'])
|
||||
|
||||
|
||||
@tests_db.run_with('mysql', 'pgsql', 'sqlite')
|
||||
class TestPaginationQuery(TestAlarmsBase):
|
||||
def setUp(self):
|
||||
super(TestPaginationQuery, self).setUp()
|
||||
@ -3318,6 +3316,8 @@ class TestPaginationQuery(TestAlarmsBase):
|
||||
self.assertEqual(['name1', 'name2', 'name3', 'name4'], names)
|
||||
|
||||
def test_sort_by_severity_with_its_value(self):
|
||||
if self.engine != "mysql":
|
||||
self.skipTest("This is only implemented for MySQL")
|
||||
data = self.get_json('/alarms?sort=severity:asc',
|
||||
headers=self.auth_headers)
|
||||
severities = [a['severity'] for a in data]
|
||||
|
@ -21,7 +21,6 @@ from oslo_utils import timeutils
|
||||
|
||||
from aodh.storage import models
|
||||
from aodh.tests.functional.api import v2 as tests_api
|
||||
from aodh.tests.functional import db as tests_db
|
||||
|
||||
|
||||
admin_header = {"X-Roles": "admin",
|
||||
@ -195,8 +194,9 @@ class TestQueryAlarmsController(tests_api.FunctionalTest):
|
||||
for alarm in data.json:
|
||||
self.assertEqual("alarm", alarm["state"])
|
||||
|
||||
@tests_db.run_with('mysql', 'pgsql', 'sqlite')
|
||||
def test_query_with_orderby_severity(self):
|
||||
if self.engine != "mysql":
|
||||
self.skipTest("This is only implemented for MySQL")
|
||||
orderby = '[{"severity": "ASC"}]'
|
||||
data = self.post_json(self.alarm_url,
|
||||
headers=admin_header,
|
||||
|
@ -20,29 +20,14 @@ import os
|
||||
import uuid
|
||||
|
||||
import fixtures
|
||||
import mock
|
||||
from oslo_config import fixture as fixture_config
|
||||
from oslotest import mockpatch
|
||||
import six
|
||||
from six.moves.urllib import parse as urlparse
|
||||
from testtools import testcase
|
||||
|
||||
from aodh import service
|
||||
from aodh import storage
|
||||
from aodh.tests import base as test_base
|
||||
try:
|
||||
from aodh.tests import mocks
|
||||
except ImportError:
|
||||
mocks = None # happybase module is not Python 3 compatible yet
|
||||
|
||||
|
||||
class MongoDbManager(fixtures.Fixture):
|
||||
|
||||
def __init__(self, conf):
|
||||
self.url = '%(url)s_%(db)s' % {
|
||||
'url': conf.database.connection,
|
||||
'db': uuid.uuid4().hex,
|
||||
}
|
||||
|
||||
|
||||
class SQLManager(fixtures.Fixture):
|
||||
@ -80,36 +65,6 @@ class MySQLManager(SQLManager):
|
||||
conn.execute('CREATE DATABASE %s;' % db_name)
|
||||
|
||||
|
||||
class HBaseManager(fixtures.Fixture):
|
||||
def __init__(self, conf):
|
||||
self.url = '%s?table_prefix=%s' % (
|
||||
conf.database.connection,
|
||||
os.getenv("AODH_TEST_HBASE_TABLE_PREFIX", "test")
|
||||
)
|
||||
|
||||
def setUp(self):
|
||||
super(HBaseManager, self).setUp()
|
||||
# Unique prefix for each test to keep data is distinguished because
|
||||
# all test data is stored in one table
|
||||
data_prefix = str(uuid.uuid4().hex)
|
||||
|
||||
def table(conn, name):
|
||||
return mocks.MockHBaseTable(name, conn, data_prefix)
|
||||
|
||||
# Mock only real HBase connection, MConnection "table" method
|
||||
# stays origin.
|
||||
mock.patch('happybase.Connection.table', new=table).start()
|
||||
# We shouldn't delete data and tables after each test,
|
||||
# because it last for too long.
|
||||
# All tests tables will be deleted in setup-test-env.sh
|
||||
mock.patch("happybase.Connection.disable_table",
|
||||
new=mock.MagicMock()).start()
|
||||
mock.patch("happybase.Connection.delete_table",
|
||||
new=mock.MagicMock()).start()
|
||||
mock.patch("happybase.Connection.create_table",
|
||||
new=mock.MagicMock()).start()
|
||||
|
||||
|
||||
class SQLiteManager(fixtures.Fixture):
|
||||
|
||||
def __init__(self, conf):
|
||||
@ -120,13 +75,10 @@ class SQLiteManager(fixtures.Fixture):
|
||||
class TestBase(test_base.BaseTestCase):
|
||||
|
||||
DRIVER_MANAGERS = {
|
||||
'mongodb': MongoDbManager,
|
||||
'mysql': MySQLManager,
|
||||
'postgresql': PgSQLManager,
|
||||
'sqlite': SQLiteManager,
|
||||
}
|
||||
if mocks is not None:
|
||||
DRIVER_MANAGERS['hbase'] = HBaseManager
|
||||
|
||||
def setUp(self):
|
||||
super(TestBase, self).setUp()
|
||||
@ -137,23 +89,16 @@ class TestBase(test_base.BaseTestCase):
|
||||
engine = urlparse.urlparse(db_url).scheme
|
||||
# In case some drivers have additional specification, for example:
|
||||
# PyMySQL will have scheme mysql+pymysql.
|
||||
engine = engine.split('+')[0]
|
||||
|
||||
# NOTE(Alexei_987) Shortcut to skip expensive db setUp
|
||||
test_method = self._get_test_method()
|
||||
if (hasattr(test_method, '_run_with')
|
||||
and engine not in test_method._run_with):
|
||||
raise testcase.TestSkipped(
|
||||
'Test is not applicable for %s' % engine)
|
||||
self.engine = engine.split('+')[0]
|
||||
|
||||
conf = service.prepare_service(argv=[], config_files=[])
|
||||
self.CONF = self.useFixture(fixture_config.Config(conf)).conf
|
||||
self.CONF.set_override('connection', db_url, group="database",
|
||||
enforce_type=True)
|
||||
|
||||
manager = self.DRIVER_MANAGERS.get(engine)
|
||||
manager = self.DRIVER_MANAGERS.get(self.engine)
|
||||
if not manager:
|
||||
self.skipTest("missing driver manager: %s" % engine)
|
||||
self.skipTest("missing driver manager: %s" % self.engine)
|
||||
|
||||
self.db_manager = manager(self.CONF)
|
||||
|
||||
@ -176,25 +121,3 @@ class TestBase(test_base.BaseTestCase):
|
||||
|
||||
def _get_connection(self, conf):
|
||||
return self.alarm_conn
|
||||
|
||||
|
||||
def run_with(*drivers):
|
||||
"""Used to mark tests that are only applicable for certain db driver.
|
||||
|
||||
Skips test if driver is not available.
|
||||
"""
|
||||
def decorator(test):
|
||||
if isinstance(test, type) and issubclass(test, TestBase):
|
||||
# Decorate all test methods
|
||||
for attr in dir(test):
|
||||
if attr.startswith('test_'):
|
||||
value = getattr(test, attr)
|
||||
if callable(value):
|
||||
if six.PY3:
|
||||
value._run_with = drivers
|
||||
else:
|
||||
value.__func__._run_with = drivers
|
||||
else:
|
||||
test._run_with = drivers
|
||||
return test
|
||||
return decorator
|
||||
|
@ -27,7 +27,6 @@ class ABCSkip(base.SkipNotImplementedMeta, abc.ABCMeta):
|
||||
pass
|
||||
|
||||
|
||||
@tests_db.run_with('mysql', 'pgsql', 'sqlite')
|
||||
class ModelsMigrationsSync(
|
||||
six.with_metaclass(ABCSkip,
|
||||
tests_db.TestBase,
|
||||
|
@ -1,112 +0,0 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import datetime
|
||||
import uuid
|
||||
|
||||
import mock
|
||||
from oslo_config import fixture as fixture_config
|
||||
|
||||
from aodh.cmd import data_migration
|
||||
from aodh import service
|
||||
from aodh import storage
|
||||
from aodh.storage import models as alarm_models
|
||||
from aodh.tests.functional import db as tests_db
|
||||
from aodh.tests.functional.storage import test_storage_scenarios
|
||||
|
||||
|
||||
@tests_db.run_with('hbase', 'mongodb')
|
||||
class TestDataMigration(test_storage_scenarios.AlarmTestBase):
|
||||
|
||||
def setUp(self):
|
||||
sql_conf = service.prepare_service(argv=[], config_files=[])
|
||||
self.sql_conf = self.useFixture(fixture_config.Config(sql_conf)).conf
|
||||
# using sqlite to represent the type of SQL dbs
|
||||
self.sql_conf.set_override('connection', "sqlite://",
|
||||
group="database", enforce_type=True)
|
||||
self.sql_namager = tests_db.SQLiteManager(self.sql_conf)
|
||||
self.useFixture(self.sql_namager)
|
||||
self.sql_conf.set_override('connection', self.sql_namager.url,
|
||||
group="database", enforce_type=True)
|
||||
self.sql_alarm_conn = storage.get_connection_from_config(self.sql_conf)
|
||||
self.sql_alarm_conn.upgrade()
|
||||
super(TestDataMigration, self).setUp()
|
||||
self.add_some_alarms()
|
||||
self._add_some_alarm_changes()
|
||||
|
||||
def tearDown(self):
|
||||
self.sql_alarm_conn.clear()
|
||||
self.sql_alarm_conn = None
|
||||
super(TestDataMigration, self).tearDown()
|
||||
|
||||
def _add_some_alarm_changes(self):
|
||||
alarms = list(self.alarm_conn.get_alarms())
|
||||
i = 0
|
||||
for alarm in alarms:
|
||||
for change_type in [alarm_models.AlarmChange.CREATION,
|
||||
alarm_models.AlarmChange.RULE_CHANGE,
|
||||
alarm_models.AlarmChange.STATE_TRANSITION,
|
||||
alarm_models.AlarmChange.STATE_TRANSITION,
|
||||
alarm_models.AlarmChange.STATE_TRANSITION]:
|
||||
alarm_change = {
|
||||
"event_id": str(uuid.uuid4()),
|
||||
"alarm_id": alarm.alarm_id,
|
||||
"type": change_type,
|
||||
"detail": "detail %s" % alarm.name,
|
||||
"user_id": alarm.user_id,
|
||||
"project_id": alarm.project_id,
|
||||
"on_behalf_of": alarm.project_id,
|
||||
"timestamp": datetime.datetime(2014, 4, 7, 7, 30 + i)
|
||||
}
|
||||
self.alarm_conn.record_alarm_change(alarm_change=alarm_change)
|
||||
i += 1
|
||||
|
||||
def test_data_migration_without_history_data(self):
|
||||
alarms = list(self.alarm_conn.get_alarms())
|
||||
self.assertEqual(3, len(alarms))
|
||||
alarms_sql = list(self.sql_alarm_conn.get_alarms())
|
||||
self.assertEqual(0, len(alarms_sql))
|
||||
test_args = data_migration.get_parser().parse_args(
|
||||
['--sql-conn', 'sqlite://', '--nosql-conn',
|
||||
self.CONF.database.connection, '--migrate-history', False])
|
||||
with mock.patch('argparse.ArgumentParser.parse_args') as args_parser:
|
||||
# because get_connection_from_config has been mocked in
|
||||
# aodh.tests.functional.db.TestBase#setUp, here re-mocked it that
|
||||
# this test can get nosql and sql storage connections
|
||||
with mock.patch('aodh.storage.get_connection_from_config') as conn:
|
||||
conn.side_effect = [self.alarm_conn, self.sql_alarm_conn]
|
||||
args_parser.return_value = test_args
|
||||
data_migration.main()
|
||||
alarms_sql = list(self.sql_alarm_conn.get_alarms())
|
||||
alarm_changes = list(self.sql_alarm_conn.query_alarm_history())
|
||||
self.assertEqual(0, len(alarm_changes))
|
||||
self.assertEqual(3, len(alarms_sql))
|
||||
self.assertEqual(sorted([a.alarm_id for a in alarms]),
|
||||
sorted([a.alarm_id for a in alarms_sql]))
|
||||
|
||||
def test_data_migration_with_history_data(self):
|
||||
test_args = data_migration.get_parser().parse_args(
|
||||
['--sql-conn', 'sqlite://', '--nosql-conn',
|
||||
self.CONF.database.connection])
|
||||
with mock.patch('argparse.ArgumentParser.parse_args') as args_parser:
|
||||
# because get_connection_from_config has been mocked in
|
||||
# aodh.tests.functional.db.TestBase#setUp, here re-mocked it that
|
||||
# this test can get nosql and sql storage connections
|
||||
with mock.patch('aodh.storage.get_connection_from_config') as conn:
|
||||
conn.side_effect = [self.alarm_conn, self.sql_alarm_conn]
|
||||
args_parser.return_value = test_args
|
||||
data_migration.main()
|
||||
alarms_sql = list(self.sql_alarm_conn.get_alarms())
|
||||
self.assertEqual(3, len(alarms_sql))
|
||||
for alarm in alarms_sql:
|
||||
changes = list(self.sql_alarm_conn.get_alarm_changes(
|
||||
alarm.alarm_id, alarm.project_id))
|
||||
self.assertEqual(5, len(changes))
|
@ -1,67 +0,0 @@
|
||||
#
|
||||
# Copyright 2012, 2013 Dell Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
"""Tests for aodh/storage/impl_hbase.py
|
||||
|
||||
.. note::
|
||||
In order to run the tests against real HBase server set the environment
|
||||
variable aodh_TEST_HBASE_URL to point to that HBase instance before
|
||||
running the tests. Make sure the Thrift server is running on that server.
|
||||
|
||||
"""
|
||||
import mock
|
||||
|
||||
try:
|
||||
import happybase # noqa
|
||||
except ImportError:
|
||||
import testtools.testcase
|
||||
raise testtools.testcase.TestSkipped("happybase is needed")
|
||||
|
||||
from aodh.storage import impl_hbase
|
||||
from aodh.tests import base as test_base
|
||||
from aodh.tests.functional import db as tests_db
|
||||
|
||||
|
||||
class ConnectionTest(tests_db.TestBase):
|
||||
|
||||
@tests_db.run_with('hbase')
|
||||
def test_hbase_connection(self):
|
||||
|
||||
class TestConn(object):
|
||||
def __init__(self, host, port):
|
||||
self.netloc = '%s:%s' % (host, port)
|
||||
|
||||
def open(self):
|
||||
pass
|
||||
|
||||
def get_connection_pool(conf):
|
||||
return TestConn(conf['host'], conf['port'])
|
||||
|
||||
with mock.patch.object(impl_hbase.Connection, '_get_connection_pool',
|
||||
side_effect=get_connection_pool):
|
||||
conn = impl_hbase.Connection(self.CONF, 'hbase://test_hbase:9090')
|
||||
self.assertIsInstance(conn.conn_pool, TestConn)
|
||||
|
||||
|
||||
class CapabilitiesTest(test_base.BaseTestCase):
|
||||
def test_alarm_capabilities(self):
|
||||
expected_capabilities = {
|
||||
'alarms': {'query': {'simple': True,
|
||||
'complex': False},
|
||||
'history': {'query': {'simple': True,
|
||||
'complex': False}}},
|
||||
}
|
||||
|
||||
actual_capabilities = impl_hbase.Connection.get_capabilities()
|
||||
self.assertEqual(expected_capabilities, actual_capabilities)
|
@ -1,101 +0,0 @@
|
||||
#
|
||||
# Copyright 2012 New Dream Network, LLC (DreamHost)
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
"""Tests for aodh/storage/impl_mongodb.py
|
||||
|
||||
.. note::
|
||||
In order to run the tests against another MongoDB server set the
|
||||
environment variable aodh_TEST_MONGODB_URL to point to a MongoDB
|
||||
server before running the tests.
|
||||
|
||||
"""
|
||||
import unittest
|
||||
|
||||
try:
|
||||
from aodh.storage import impl_mongodb
|
||||
except ImportError:
|
||||
impl_mongodb = None
|
||||
from aodh.tests import base as test_base
|
||||
from aodh.tests.functional import db as tests_db
|
||||
|
||||
|
||||
@unittest.skipUnless(impl_mongodb, "pymongo not available")
|
||||
@tests_db.run_with('mongodb')
|
||||
class MongoDBConnection(tests_db.TestBase):
|
||||
def test_connection_pooling(self):
|
||||
test_conn = impl_mongodb.Connection(self.CONF,
|
||||
self.CONF.database.connection)
|
||||
self.assertEqual(self.alarm_conn.conn, test_conn.conn)
|
||||
|
||||
def test_replica_set(self):
|
||||
url = self.CONF.database.connection + '?replicaSet=foobar'
|
||||
conn = impl_mongodb.Connection(self.CONF, url)
|
||||
self.assertTrue(conn.conn)
|
||||
|
||||
|
||||
@unittest.skipUnless(impl_mongodb, "pymongo not available")
|
||||
@tests_db.run_with('mongodb')
|
||||
class IndexTest(tests_db.TestBase):
|
||||
def _test_ttl_index_absent(self, conn, coll_name, ttl_opt):
|
||||
# create a fake index and check it is deleted
|
||||
coll = getattr(conn.db, coll_name)
|
||||
index_name = '%s_ttl' % coll_name
|
||||
self.CONF.set_override(ttl_opt, -1, group='database',
|
||||
enforce_type=True)
|
||||
conn.upgrade()
|
||||
self.assertNotIn(index_name, coll.index_information())
|
||||
|
||||
self.CONF.set_override(ttl_opt, 456789, group='database',
|
||||
enforce_type=True)
|
||||
conn.upgrade()
|
||||
self.assertEqual(456789,
|
||||
coll.index_information()
|
||||
[index_name]['expireAfterSeconds'])
|
||||
|
||||
def test_alarm_history_ttl_index_absent(self):
|
||||
self._test_ttl_index_absent(self.alarm_conn, 'alarm_history',
|
||||
'alarm_history_time_to_live')
|
||||
|
||||
def _test_ttl_index_present(self, conn, coll_name, ttl_opt):
|
||||
coll = getattr(conn.db, coll_name)
|
||||
self.CONF.set_override(ttl_opt, 456789, group='database',
|
||||
enforce_type=True)
|
||||
conn.upgrade()
|
||||
index_name = '%s_ttl' % coll_name
|
||||
self.assertEqual(456789,
|
||||
coll.index_information()
|
||||
[index_name]['expireAfterSeconds'])
|
||||
|
||||
self.CONF.set_override(ttl_opt, -1, group='database',
|
||||
enforce_type=True)
|
||||
conn.upgrade()
|
||||
self.assertNotIn(index_name, coll.index_information())
|
||||
|
||||
def test_alarm_history_ttl_index_present(self):
|
||||
self._test_ttl_index_present(self.alarm_conn, 'alarm_history',
|
||||
'alarm_history_time_to_live')
|
||||
|
||||
|
||||
class CapabilitiesTest(test_base.BaseTestCase):
|
||||
@unittest.skipUnless(impl_mongodb, "pymongo not available")
|
||||
def test_alarm_capabilities(self):
|
||||
expected_capabilities = {
|
||||
'alarms': {'query': {'simple': True,
|
||||
'complex': True},
|
||||
'history': {'query': {'simple': True,
|
||||
'complex': True}}},
|
||||
}
|
||||
|
||||
actual_capabilities = impl_mongodb.Connection.get_capabilities()
|
||||
self.assertEqual(expected_capabilities, actual_capabilities)
|
@ -265,7 +265,6 @@ class AlarmTest(AlarmTestBase):
|
||||
self.assertNotEqual(victim.name, s.name)
|
||||
|
||||
|
||||
@tests_db.run_with('sqlite', 'mysql', 'pgsql', 'hbase')
|
||||
class AlarmHistoryTest(AlarmTestBase):
|
||||
|
||||
def setUp(self):
|
||||
|
@ -1,79 +0,0 @@
|
||||
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
|
||||
import happybase
|
||||
|
||||
|
||||
class MockHBaseTable(happybase.Table):
|
||||
|
||||
def __init__(self, name, connection, data_prefix):
|
||||
# data_prefix is added to all rows which are written
|
||||
# in this test. It allows to divide data from different tests
|
||||
self.data_prefix = data_prefix
|
||||
# We create happybase Table with prefix from
|
||||
# AODH_TEST_HBASE_TABLE_PREFIX
|
||||
prefix = os.getenv("AODH_TEST_HBASE_TABLE_PREFIX", 'test')
|
||||
super(MockHBaseTable, self).__init__(
|
||||
"%s_%s" % (prefix, name),
|
||||
connection)
|
||||
|
||||
def put(self, row, *args, **kwargs):
|
||||
row = self.data_prefix + row
|
||||
return super(MockHBaseTable, self).put(row, *args,
|
||||
**kwargs)
|
||||
|
||||
def scan(self, row_start=None, row_stop=None, row_prefix=None,
|
||||
columns=None, filter=None, timestamp=None,
|
||||
include_timestamp=False, batch_size=10, scan_batching=None,
|
||||
limit=None, sorted_columns=False):
|
||||
# Add data prefix for row parameters
|
||||
# row_prefix could not be combined with row_start or row_stop
|
||||
if not row_start and not row_stop:
|
||||
row_prefix = self.data_prefix + (row_prefix or "")
|
||||
row_start = None
|
||||
row_stop = None
|
||||
elif row_start and not row_stop:
|
||||
# Adding data_prefix to row_start and row_stop does not work
|
||||
# if it looks like row_start = %data_prefix%foo,
|
||||
# row_stop = %data_prefix, because row_start > row_stop
|
||||
filter = self._update_filter_row(filter)
|
||||
row_start = self.data_prefix + row_start
|
||||
else:
|
||||
row_start = self.data_prefix + (row_start or "")
|
||||
row_stop = self.data_prefix + (row_stop or "")
|
||||
gen = super(MockHBaseTable, self).scan(row_start, row_stop,
|
||||
row_prefix, columns,
|
||||
filter, timestamp,
|
||||
include_timestamp, batch_size,
|
||||
scan_batching, limit,
|
||||
sorted_columns)
|
||||
data_prefix_len = len(self.data_prefix)
|
||||
# Restore original row format
|
||||
for row, data in gen:
|
||||
yield (row[data_prefix_len:], data)
|
||||
|
||||
def row(self, row, *args, **kwargs):
|
||||
row = self.data_prefix + row
|
||||
return super(MockHBaseTable, self).row(row, *args, **kwargs)
|
||||
|
||||
def delete(self, row, *args, **kwargs):
|
||||
row = self.data_prefix + row
|
||||
return super(MockHBaseTable, self).delete(row, *args, **kwargs)
|
||||
|
||||
def _update_filter_row(self, filter):
|
||||
if filter:
|
||||
return "PrefixFilter(%s) AND %s" % (self.data_prefix, filter)
|
||||
else:
|
||||
return "PrefixFilter(%s)" % self.data_prefix
|
@ -9,7 +9,7 @@
|
||||
# By default all aodh services are started (see
|
||||
# devstack/settings).
|
||||
#
|
||||
# AODH_BACKEND: Database backend (e.g. 'mysql', 'mongodb')
|
||||
# AODH_BACKEND: Database backend (e.g. 'mysql')
|
||||
# AODH_COORDINATION_URL: URL for group membership service provided by tooz.
|
||||
|
||||
# Support potential entry-points console scripts in VENV or not
|
||||
@ -50,28 +50,6 @@ function aodh_service_url {
|
||||
}
|
||||
|
||||
|
||||
# _install_mongdb - Install mongodb and python lib.
|
||||
function _aodh_install_mongodb {
|
||||
# Server package is the same on all
|
||||
local packages=mongodb-server
|
||||
|
||||
if is_fedora; then
|
||||
# mongodb client
|
||||
packages="${packages} mongodb"
|
||||
fi
|
||||
|
||||
install_package ${packages}
|
||||
|
||||
if is_fedora; then
|
||||
restart_service mongod
|
||||
else
|
||||
restart_service mongodb
|
||||
fi
|
||||
|
||||
# give time for service to restart
|
||||
sleep 5
|
||||
}
|
||||
|
||||
# _install_redis() - Install the redis server and python lib.
|
||||
function _aodh_install_redis {
|
||||
if is_ubuntu; then
|
||||
@ -102,18 +80,12 @@ function _aodh_config_apache_wsgi {
|
||||
fi
|
||||
|
||||
sudo cp $AODH_DIR/devstack/apache-aodh.template $aodh_apache_conf
|
||||
if [ "$AODH_BACKEND" = 'hbase' ] ; then
|
||||
# Use one process to have single in-memory DB instance for data consistency
|
||||
AODH_API_WORKERS=1
|
||||
else
|
||||
AODH_API_WORKERS=$API_WORKERS
|
||||
fi
|
||||
sudo sed -e "
|
||||
s|%PORT%|$AODH_SERVICE_PORT|g;
|
||||
s|%APACHE_NAME%|$APACHE_NAME|g;
|
||||
s|%WSGIAPP%|$AODH_WSGI_DIR/app|g;
|
||||
s|%USER%|$STACK_USER|g;
|
||||
s|%APIWORKERS%|$AODH_API_WORKERS|g;
|
||||
s|%APIWORKERS%|$API_WORKERS|g;
|
||||
s|%VIRTUALENV%|$venv_path|g
|
||||
" -i $aodh_apache_conf
|
||||
}
|
||||
@ -127,14 +99,6 @@ function _aodh_prepare_coordination {
|
||||
fi
|
||||
}
|
||||
|
||||
# Install required services for storage backends
|
||||
function _aodh_prepare_storage_backend {
|
||||
if [ "$AODH_BACKEND" = 'mongodb' ] ; then
|
||||
pip_install_gr pymongo
|
||||
_aodh_install_mongodb
|
||||
fi
|
||||
}
|
||||
|
||||
# Create aodh related accounts in Keystone
|
||||
function _aodh_create_accounts {
|
||||
if is_service_enabled aodh-api; then
|
||||
@ -170,9 +134,6 @@ function _aodh_cleanup_apache_wsgi {
|
||||
# cleanup_aodh() - Remove residual data files, anything left over
|
||||
# from previous runs that a clean run would need to clean up
|
||||
function cleanup_aodh {
|
||||
if [ "$AODH_BACKEND" = 'mongodb' ] ; then
|
||||
mongo aodh --eval "db.dropDatabase();"
|
||||
fi
|
||||
if [ "$AODH_DEPLOY" == "mod_wsgi" ]; then
|
||||
_aodh_cleanup_apache_wsgi
|
||||
fi
|
||||
@ -182,11 +143,6 @@ function cleanup_aodh {
|
||||
function _aodh_configure_storage_backend {
|
||||
if [ "$AODH_BACKEND" = 'mysql' ] || [ "$AODH_BACKEND" = 'postgresql' ] ; then
|
||||
iniset $AODH_CONF database connection $(database_connection_url aodh)
|
||||
elif [ "$AODH_BACKEND" = 'mongodb' ] ; then
|
||||
iniset $AODH_CONF database connection mongodb://localhost:27017/aodh
|
||||
cleanup_aodh
|
||||
elif [ "$AODH_BACKEND" = 'hbase' ] ; then
|
||||
iniset $AODH_CONF database connection hbase://__test__
|
||||
else
|
||||
die $LINENO "Unable to configure unknown AODH_BACKEND $AODH_BACKEND"
|
||||
fi
|
||||
@ -291,7 +247,6 @@ function init_aodh {
|
||||
# otherwise makes sense to do the backend services).
|
||||
function install_aodh {
|
||||
_aodh_prepare_coordination
|
||||
_aodh_prepare_storage_backend
|
||||
install_aodhclient
|
||||
sudo -H pip install -e "$AODH_DIR"[test,$AODH_BACKEND]
|
||||
sudo install -d -o $STACK_USER -m 755 $AODH_CONF_DIR
|
||||
|
@ -18,47 +18,6 @@
|
||||
Configuration
|
||||
=============
|
||||
|
||||
HBase
|
||||
===================
|
||||
|
||||
This storage implementation uses Thrift HBase interface. The default Thrift
|
||||
connection settings should be changed to support using ConnectionPool in HBase.
|
||||
To ensure proper configuration, please add the following lines to the
|
||||
`hbase-site.xml` configuration file::
|
||||
|
||||
<property>
|
||||
<name>hbase.thrift.minWorkerThreads</name>
|
||||
<value>200</value>
|
||||
</property>
|
||||
|
||||
For pure development purposes, you can use HBase from Apache_ or some other
|
||||
vendor like Cloudera or Hortonworks. To verify your installation, you can use
|
||||
the `list` command in `HBase shell`, to list the tables in your
|
||||
HBase server, as follows::
|
||||
|
||||
$ ${HBASE_HOME}/bin/hbase shell
|
||||
|
||||
hbase> list
|
||||
|
||||
.. note::
|
||||
This driver has been tested against HBase 0.94.2/CDH 4.2.0,
|
||||
HBase 0.94.4/HDP 1.2, HBase 0.94.18/Apache, HBase 0.94.5/Apache,
|
||||
HBase 0.96.2/Apache and HBase 0.98.0/Apache.
|
||||
Versions earlier than 0.92.1 are not supported due to feature incompatibility.
|
||||
|
||||
To find out more about supported storage backends please take a look on the
|
||||
:doc:`install/manual/` guide.
|
||||
|
||||
.. note::
|
||||
|
||||
If you are changing the configuration on the fly to use HBase, as a storage
|
||||
backend, you will need to restart the Aodh services that use the
|
||||
database to allow the changes to take affect, i.e. the collector and API
|
||||
services.
|
||||
|
||||
.. _Apache: https://hbase.apache.org/book/quickstart.html
|
||||
|
||||
|
||||
Aodh Sample Configuration File
|
||||
==============================
|
||||
|
||||
|
@ -28,86 +28,15 @@ This step is a prerequisite for the collector, notification agent and API
|
||||
services. You may use one of the listed database backends below to store
|
||||
Aodh data.
|
||||
|
||||
.. note::
|
||||
Please notice, MongoDB requires pymongo_ to be installed on the system. The
|
||||
required minimum version of pymongo is 2.4.
|
||||
..
|
||||
|
||||
|
||||
SQLalchemy-supported DBs
|
||||
------------------------
|
||||
|
||||
The recommended Aodh storage backend is any SQLAlchemy-supported
|
||||
database (`PostgreSQL` or `MySQL`).
|
||||
|
||||
In case of SQL-based database backends, you need to create a `aodh`
|
||||
database first and then initialise it by running::
|
||||
The recommended Aodh storage backend is any SQLAlchemy-supported database
|
||||
(`PostgreSQL` or `MySQL`). You need to create a `aodh` database first and then
|
||||
initialise it by running::
|
||||
|
||||
aodh-dbsync
|
||||
|
||||
To use MySQL as the storage backend, change the 'database' section in
|
||||
aodh.conf as follows::
|
||||
To use MySQL as the storage backend, change the 'database' section in
|
||||
aodh.conf as follows::
|
||||
|
||||
[database]
|
||||
connection = mysql+pymysql://username:password@host/aodh?charset=utf8
|
||||
|
||||
|
||||
MongoDB
|
||||
-------
|
||||
|
||||
Follow the instructions to install the MongoDB_ package for your operating
|
||||
system, then start the service. The required minimum version of MongoDB is 2.4.
|
||||
|
||||
To use MongoDB as the storage backend, change the 'database' section in
|
||||
aodh.conf as follows::
|
||||
|
||||
[database]
|
||||
connection = mongodb://username:password@host:27017/aodh
|
||||
|
||||
If MongoDB is configured in replica set mode, add `?replicaSet=` in your
|
||||
connection URL::
|
||||
|
||||
[database]
|
||||
connection = mongodb://username:password@host:27017/aodh?replicaSet=foobar
|
||||
|
||||
|
||||
HBase
|
||||
-----
|
||||
|
||||
HBase backend is implemented to use HBase Thrift interface, therefore it is
|
||||
mandatory to have the HBase Thrift server installed and running. To start
|
||||
the Thrift server, please run the following command::
|
||||
|
||||
${HBASE_HOME}/bin/hbase thrift start
|
||||
|
||||
The implementation uses `HappyBase`_, which is a wrapper library used to
|
||||
interact with HBase via Thrift protocol. You can verify the thrift
|
||||
connection by running a quick test from a client::
|
||||
|
||||
import happybase
|
||||
|
||||
conn = happybase.Connection(host=$hbase-thrift-server, port=9090, table_prefix=None)
|
||||
print conn.tables() # this returns a list of HBase tables in your HBase server
|
||||
|
||||
.. note::
|
||||
HappyBase version 0.5 or greater is required. Additionally, version 0.7
|
||||
is not currently supported.
|
||||
..
|
||||
|
||||
In case of HBase, the needed database tables (`project`, `user`, `resource`,
|
||||
`meter`, `alarm`, `alarm_h`) should be created manually with `f` column
|
||||
family for each one.
|
||||
|
||||
To use HBase as the storage backend, change the 'database' section in
|
||||
aodh.conf as follows::
|
||||
|
||||
[database]
|
||||
connection = hbase://hbase-thrift-host:9090
|
||||
|
||||
|
||||
.. _HappyBase: http://happybase.readthedocs.org/en/latest/index.html#
|
||||
.. _MongoDB: http://www.mongodb.org/
|
||||
.. _pymongo: https://pypi.python.org/pypi/pymongo/
|
||||
|
||||
|
||||
|
||||
|
@ -24,15 +24,13 @@ run through tox_.
|
||||
|
||||
$ sudo pip install tox
|
||||
|
||||
2. On Ubuntu install ``mongodb`` and ``libmysqlclient-dev`` packages::
|
||||
2. On Ubuntu install ``libmysqlclient-dev`` packages::
|
||||
|
||||
$ sudo apt-get install mongodb
|
||||
$ sudo apt-get install libmysqlclient-dev
|
||||
|
||||
For Fedora20 there is no ``libmysqlclient-dev`` package, so you’ll need
|
||||
to install ``mariadb-devel.x86-64`` (or ``mariadb-devel.i386``) instead::
|
||||
|
||||
$ sudo yum install mongodb
|
||||
$ sudo yum install mariadb-devel.x86_64
|
||||
|
||||
3. Run the unit and code-style tests::
|
||||
|
@ -0,0 +1,3 @@
|
||||
---
|
||||
upgrade:
|
||||
- All the deprecated non-SQL drivers have been removed.
|
@ -4,11 +4,4 @@ set -e
|
||||
export AODH_TEST_BACKEND=${AODH_TEST_BACKEND:-mysql}
|
||||
export AODH_SERVICE_URL=${AODH_SERVICE_URL:-http://127.0.0.1:8042}
|
||||
|
||||
case $AODH_TEST_BACKEND in
|
||||
hbase)
|
||||
AODH_TEST_STORAGE_URL="hbase://__test__" $*
|
||||
;;
|
||||
*)
|
||||
pifpaf -g AODH_TEST_STORAGE_URL run $AODH_TEST_BACKEND -- $*
|
||||
;;
|
||||
esac
|
||||
pifpaf -g AODH_TEST_STORAGE_URL run $AODH_TEST_BACKEND -- $*
|
||||
|
11
setup.cfg
11
setup.cfg
@ -44,14 +44,6 @@ postgresql =
|
||||
alembic>=0.7.2
|
||||
psycopg2
|
||||
|
||||
mongodb =
|
||||
pymongo>=3.0.2
|
||||
|
||||
hbase =
|
||||
happybase!=0.7,>=0.5,<1.0.0:python_version=='2.7'
|
||||
# Required for bson
|
||||
pymongo>=3.0.2
|
||||
|
||||
zaqar =
|
||||
python-zaqarclient>=1.2.0
|
||||
|
||||
@ -80,12 +72,10 @@ test =
|
||||
[entry_points]
|
||||
aodh.storage =
|
||||
log = aodh.storage.impl_log:Connection
|
||||
mongodb = aodh.storage.impl_mongodb:Connection
|
||||
mysql = aodh.storage.impl_sqlalchemy:Connection
|
||||
mysql+pymysql = aodh.storage.impl_sqlalchemy:Connection
|
||||
postgresql = aodh.storage.impl_sqlalchemy:Connection
|
||||
sqlite = aodh.storage.impl_sqlalchemy:Connection
|
||||
hbase = aodh.storage.impl_hbase:Connection
|
||||
|
||||
aodh.alarm.rule =
|
||||
threshold = aodh.api.controllers.v2.alarm_rules.threshold:AlarmThresholdRule
|
||||
@ -122,7 +112,6 @@ console_scripts =
|
||||
aodh-evaluator = aodh.cmd.alarm:evaluator
|
||||
aodh-notifier = aodh.cmd.alarm:notifier
|
||||
aodh-listener = aodh.cmd.alarm:listener
|
||||
aodh-data-migration = aodh.cmd.data_migration:main
|
||||
aodh-combination-alarm-conversion = aodh.cmd.alarm_conversion:conversion
|
||||
|
||||
oslo.config.opts =
|
||||
|
@ -1,38 +0,0 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
from aodh import storage
|
||||
|
||||
|
||||
def main(argv):
|
||||
cfg.CONF([], project='aodh')
|
||||
if os.getenv("AODH_TEST_STORAGE_URL"):
|
||||
url = ("%s?table_prefix=%s" %
|
||||
(os.getenv("AODH_TEST_STORAGE_URL"),
|
||||
os.getenv("AODH_TEST_HBASE_TABLE_PREFIX", "test")))
|
||||
cfg.CONF.set_override("connection", url, group="database",
|
||||
enforce_type=True)
|
||||
alarm_conn = storage.get_connection_from_config(cfg.CONF)
|
||||
for arg in argv:
|
||||
if arg == "--upgrade":
|
||||
alarm_conn.upgrade()
|
||||
if arg == "--clear":
|
||||
alarm_conn.clear()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main(sys.argv[1:])
|
18
tox.ini
18
tox.ini
@ -15,17 +15,6 @@ commands =
|
||||
oslo-config-generator --config-file=etc/aodh/aodh-config-generator.conf
|
||||
whitelist_externals = bash
|
||||
|
||||
[testenv:py27-hbase]
|
||||
deps = .[hbase,test]
|
||||
setenv = OS_TEST_PATH=aodh/tests/functional/
|
||||
AODH_TEST_STORAGE_URL=hbase://__test__
|
||||
|
||||
[testenv:py27-mongodb]
|
||||
deps = .[mongodb,test]
|
||||
setenv = OS_TEST_PATH=aodh/tests/functional/
|
||||
commands =
|
||||
pifpaf -g AODH_TEST_STORAGE_URL run mongodb -- python setup.py testr --slowest --testr-args="{posargs}"
|
||||
|
||||
[testenv:py27-mysql]
|
||||
deps = .[mysql,test]
|
||||
setenv = OS_TEST_PATH=aodh/tests/functional/
|
||||
@ -39,7 +28,7 @@ commands =
|
||||
pifpaf -g AODH_TEST_STORAGE_URL run postgresql -- python setup.py testr --slowest --testr-args="{posargs}"
|
||||
|
||||
[testenv:functional]
|
||||
deps = .[mysql,postgresql,mongodb,hbase,test]
|
||||
deps = .[mysql,postgresql,test]
|
||||
setenv = VIRTUAL_ENV={envdir}
|
||||
OS_TEST_PATH=aodh/tests/functional/
|
||||
GABBI_LIVE_FAIL_IF_NO_TEST=1
|
||||
@ -91,11 +80,6 @@ setenv = PYTHONHASHSEED=0
|
||||
[testenv:debug]
|
||||
commands = bash -x oslo_debug_helper {posargs}
|
||||
|
||||
[testenv:debug-mongodb]
|
||||
deps = .[mongodb,test]
|
||||
setenv = OS_TEST_PATH=aodh/tests/functional/
|
||||
commands = pifpaf -g AODH_TEST_STORAGE_URL run mongodb -- oslo_debug_helper {posargs}
|
||||
|
||||
[testenv:debug-mysql]
|
||||
deps = .[mysql,test]
|
||||
setenv = OS_TEST_PATH=aodh/tests/functional/
|
||||
|
Loading…
Reference in New Issue
Block a user