Merge "Splits mongo storage code base"

This commit is contained in:
Jenkins 2014-06-18 09:17:04 +00:00 committed by Gerrit Code Review
commit 2b01fceabc
6 changed files with 301 additions and 271 deletions

View File

@ -35,6 +35,7 @@ from ceilometer.openstack.common import timeutils
from ceilometer import storage
from ceilometer.storage import base
from ceilometer.storage import models
from ceilometer.storage.mongo import utils as pymongo_utils
from ceilometer.storage import pymongo_base
from ceilometer import utils
@ -71,7 +72,7 @@ class Connection(pymongo_base.Connection):
CAPABILITIES = utils.update_nested(pymongo_base.Connection.CAPABILITIES,
AVAILABLE_CAPABILITIES)
CONNECTION_POOL = pymongo_base.ConnectionPool()
CONNECTION_POOL = pymongo_utils.ConnectionPool()
GROUP = {'_id': '$counter_name',
'unit': {'$min': '$counter_unit'},
@ -271,10 +272,10 @@ class Connection(pymongo_base.Connection):
# Look for resources matching the above criteria and with
# samples in the time range we care about, then change the
# resource query to return just those resources by id.
ts_range = pymongo_base.make_timestamp_range(start_timestamp,
end_timestamp,
start_timestamp_op,
end_timestamp_op)
ts_range = pymongo_utils.make_timestamp_range(start_timestamp,
end_timestamp,
start_timestamp_op,
end_timestamp_op)
if ts_range:
q['timestamp'] = ts_range
@ -316,7 +317,7 @@ class Connection(pymongo_base.Connection):
if aggregate:
raise NotImplementedError('Selectable aggregates not implemented')
q = pymongo_base.make_query_from_filter(sample_filter)
q = pymongo_utils.make_query_from_filter(sample_filter)
if period:
if sample_filter.start:

View File

@ -39,6 +39,7 @@ from ceilometer.openstack.common import timeutils
from ceilometer import storage
from ceilometer.storage import base
from ceilometer.storage import models
from ceilometer.storage.mongo import utils as pymongo_utils
from ceilometer.storage import pymongo_base
from ceilometer import utils
@ -85,7 +86,7 @@ class Connection(pymongo_base.Connection):
CAPABILITIES = utils.update_nested(pymongo_base.Connection.CAPABILITIES,
AVAILABLE_CAPABILITIES)
CONNECTION_POOL = pymongo_base.ConnectionPool()
CONNECTION_POOL = pymongo_utils.ConnectionPool()
REDUCE_GROUP_CLEAN = bson.code.Code("""
function ( curr, result ) {
@ -708,10 +709,10 @@ class Connection(pymongo_base.Connection):
# Look for resources matching the above criteria and with
# samples in the time range we care about, then change the
# resource query to return just those resources by id.
ts_range = pymongo_base.make_timestamp_range(start_timestamp,
end_timestamp,
start_timestamp_op,
end_timestamp_op)
ts_range = pymongo_utils.make_timestamp_range(start_timestamp,
end_timestamp,
start_timestamp_op,
end_timestamp_op)
if ts_range:
query['timestamp'] = ts_range
@ -855,7 +856,7 @@ class Connection(pymongo_base.Connection):
'resource_id', 'source'])):
raise NotImplementedError("Unable to group by these fields")
q = pymongo_base.make_query_from_filter(sample_filter)
q = pymongo_utils.make_query_from_filter(sample_filter)
if period:
if sample_filter.start:

View File

View File

@ -0,0 +1,274 @@
#
# Copyright Ericsson AB 2013. All rights reserved
#
# Authors: Ildiko Vancsa <ildiko.vancsa@ericsson.com>
# Balazs Gibizer <balazs.gibizer@ericsson.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Common functions for MongoDB and DB2 backends
"""
import time
from oslo.config import cfg
import pymongo
import weakref
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import log
from ceilometer.openstack.common import network_utils
LOG = log.getLogger(__name__)
cfg.CONF.import_opt('max_retries', 'ceilometer.openstack.common.db.options',
group="database")
cfg.CONF.import_opt('retry_interval', 'ceilometer.openstack.common.db.options',
group="database")
def make_timestamp_range(start, end,
start_timestamp_op=None, end_timestamp_op=None):
"""Given two possible datetimes and their operations, create the query
document to find timestamps within that range.
By default, using $gte for the lower bound and $lt for the
upper bound.
"""
ts_range = {}
if start:
if start_timestamp_op == 'gt':
start_timestamp_op = '$gt'
else:
start_timestamp_op = '$gte'
ts_range[start_timestamp_op] = start
if end:
if end_timestamp_op == 'le':
end_timestamp_op = '$lte'
else:
end_timestamp_op = '$lt'
ts_range[end_timestamp_op] = end
return ts_range
def make_query_from_filter(sample_filter, require_meter=True):
"""Return a query dictionary based on the settings in the filter.
:param filter: SampleFilter instance
:param require_meter: If true and the filter does not have a meter,
raise an error.
"""
q = {}
if sample_filter.user:
q['user_id'] = sample_filter.user
if sample_filter.project:
q['project_id'] = sample_filter.project
if sample_filter.meter:
q['counter_name'] = sample_filter.meter
elif require_meter:
raise RuntimeError('Missing required meter specifier')
ts_range = make_timestamp_range(sample_filter.start,
sample_filter.end,
sample_filter.start_timestamp_op,
sample_filter.end_timestamp_op)
if ts_range:
q['timestamp'] = ts_range
if sample_filter.resource:
q['resource_id'] = sample_filter.resource
if sample_filter.source:
q['source'] = sample_filter.source
if sample_filter.message_id:
q['message_id'] = sample_filter.message_id
# so the samples call metadata resource_metadata, so we convert
# to that.
q.update(dict(('resource_%s' % k, v)
for (k, v) in sample_filter.metaquery.iteritems()))
return q
class ConnectionPool(object):
def __init__(self):
self._pool = {}
def connect(self, url):
connection_options = pymongo.uri_parser.parse_uri(url)
del connection_options['database']
del connection_options['username']
del connection_options['password']
del connection_options['collection']
pool_key = tuple(connection_options)
if pool_key in self._pool:
client = self._pool.get(pool_key)()
if client:
return client
splitted_url = network_utils.urlsplit(url)
log_data = {'db': splitted_url.scheme,
'nodelist': connection_options['nodelist']}
LOG.info(_('Connecting to %(db)s on %(nodelist)s') % log_data)
client = self._mongo_connect(url)
self._pool[pool_key] = weakref.ref(client)
return client
@staticmethod
def _mongo_connect(url):
max_retries = cfg.CONF.database.max_retries
retry_interval = cfg.CONF.database.retry_interval
attempts = 0
while True:
try:
client = pymongo.MongoClient(url, safe=True)
except pymongo.errors.ConnectionFailure as e:
if max_retries >= 0 and attempts >= max_retries:
LOG.error(_('Unable to connect to the database after '
'%(retries)d retries. Giving up.') %
{'retries': max_retries})
raise
LOG.warn(_('Unable to connect to the database server: '
'%(errmsg)s. Trying again in %(retry_interval)d '
'seconds.') %
{'errmsg': e, 'retry_interval': retry_interval})
attempts += 1
time.sleep(retry_interval)
else:
return client
class QueryTransformer(object):
operators = {"<": "$lt",
">": "$gt",
"<=": "$lte",
"=<": "$lte",
">=": "$gte",
"=>": "$gte",
"!=": "$ne",
"in": "$in"}
complex_operators = {"or": "$or",
"and": "$and"}
ordering_functions = {"asc": pymongo.ASCENDING,
"desc": pymongo.DESCENDING}
def transform_orderby(self, orderby):
orderby_filter = []
for field in orderby:
field_name = field.keys()[0]
ordering = self.ordering_functions[field.values()[0]]
orderby_filter.append((field_name, ordering))
return orderby_filter
@staticmethod
def _move_negation_to_leaf(condition):
"""Moves every not operator to the leafs by
applying the De Morgan rules and anihilating
double negations
"""
def _apply_de_morgan(tree, negated_subtree, negated_op):
if negated_op == "and":
new_op = "or"
else:
new_op = "and"
tree[new_op] = [{"not": child}
for child in negated_subtree[negated_op]]
del tree["not"]
def transform(subtree):
op = subtree.keys()[0]
if op in ["and", "or"]:
[transform(child) for child in subtree[op]]
elif op == "not":
negated_tree = subtree[op]
negated_op = negated_tree.keys()[0]
if negated_op == "and":
_apply_de_morgan(subtree, negated_tree, negated_op)
transform(subtree)
elif negated_op == "or":
_apply_de_morgan(subtree, negated_tree, negated_op)
transform(subtree)
elif negated_op == "not":
# two consecutive not annihilates theirselves
new_op = negated_tree.values()[0].keys()[0]
subtree[new_op] = negated_tree[negated_op][new_op]
del subtree["not"]
transform(subtree)
transform(condition)
def transform_filter(self, condition):
# in Mongo not operator can only be applied to
# simple expressions so we have to move every
# not operator to the leafs of the expression tree
self._move_negation_to_leaf(condition)
return self._process_json_tree(condition)
def _handle_complex_op(self, complex_op, nodes):
element_list = []
for node in nodes:
element = self._process_json_tree(node)
element_list.append(element)
complex_operator = self.complex_operators[complex_op]
op = {complex_operator: element_list}
return op
def _handle_not_op(self, negated_tree):
# assumes that not is moved to the leaf already
# so we are next to a leaf
negated_op = negated_tree.keys()[0]
negated_field = negated_tree[negated_op].keys()[0]
value = negated_tree[negated_op][negated_field]
if negated_op == "=":
return {negated_field: {"$ne": value}}
elif negated_op == "!=":
return {negated_field: value}
else:
return {negated_field: {"$not":
{self.operators[negated_op]: value}}}
def _handle_simple_op(self, simple_op, nodes):
field_name = nodes.keys()[0]
field_value = nodes.values()[0]
# no operator for equal in Mongo
if simple_op == "=":
op = {field_name: field_value}
return op
operator = self.operators[simple_op]
op = {field_name: {operator: field_value}}
return op
def _process_json_tree(self, condition_tree):
operator_node = condition_tree.keys()[0]
nodes = condition_tree.values()[0]
if operator_node in self.complex_operators:
return self._handle_complex_op(operator_node, nodes)
if operator_node == "not":
negated_tree = condition_tree[operator_node]
return self._handle_not_op(negated_tree)
return self._handle_simple_op(operator_node, nodes)

View File

@ -18,143 +18,18 @@
"""Common functions for MongoDB and DB2 backends
"""
import time
from oslo.config import cfg
import pymongo
import weakref
from ceilometer.alarm.storage import models as alarm_models
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import log
from ceilometer.openstack.common import network_utils
from ceilometer.storage import base
from ceilometer.storage import models
from ceilometer.storage.mongo import utils as pymongo_utils
from ceilometer import utils
LOG = log.getLogger(__name__)
cfg.CONF.import_opt('max_retries', 'ceilometer.openstack.common.db.options',
group="database")
cfg.CONF.import_opt('retry_interval', 'ceilometer.openstack.common.db.options',
group="database")
def make_timestamp_range(start, end,
start_timestamp_op=None, end_timestamp_op=None):
"""Given two possible datetimes and their operations, create the query
document to find timestamps within that range.
By default, using $gte for the lower bound and $lt for the
upper bound.
"""
ts_range = {}
if start:
if start_timestamp_op == 'gt':
start_timestamp_op = '$gt'
else:
start_timestamp_op = '$gte'
ts_range[start_timestamp_op] = start
if end:
if end_timestamp_op == 'le':
end_timestamp_op = '$lte'
else:
end_timestamp_op = '$lt'
ts_range[end_timestamp_op] = end
return ts_range
def make_query_from_filter(sample_filter, require_meter=True):
"""Return a query dictionary based on the settings in the filter.
:param filter: SampleFilter instance
:param require_meter: If true and the filter does not have a meter,
raise an error.
"""
q = {}
if sample_filter.user:
q['user_id'] = sample_filter.user
if sample_filter.project:
q['project_id'] = sample_filter.project
if sample_filter.meter:
q['counter_name'] = sample_filter.meter
elif require_meter:
raise RuntimeError('Missing required meter specifier')
ts_range = make_timestamp_range(sample_filter.start,
sample_filter.end,
sample_filter.start_timestamp_op,
sample_filter.end_timestamp_op)
if ts_range:
q['timestamp'] = ts_range
if sample_filter.resource:
q['resource_id'] = sample_filter.resource
if sample_filter.source:
q['source'] = sample_filter.source
if sample_filter.message_id:
q['message_id'] = sample_filter.message_id
# so the samples call metadata resource_metadata, so we convert
# to that.
q.update(dict(('resource_%s' % k, v)
for (k, v) in sample_filter.metaquery.iteritems()))
return q
class ConnectionPool(object):
def __init__(self):
self._pool = {}
def connect(self, url):
connection_options = pymongo.uri_parser.parse_uri(url)
del connection_options['database']
del connection_options['username']
del connection_options['password']
del connection_options['collection']
pool_key = tuple(connection_options)
if pool_key in self._pool:
client = self._pool.get(pool_key)()
if client:
return client
splitted_url = network_utils.urlsplit(url)
log_data = {'db': splitted_url.scheme,
'nodelist': connection_options['nodelist']}
LOG.info(_('Connecting to %(db)s on %(nodelist)s') % log_data)
client = self._mongo_connect(url)
self._pool[pool_key] = weakref.ref(client)
return client
@staticmethod
def _mongo_connect(url):
max_retries = cfg.CONF.database.max_retries
retry_interval = cfg.CONF.database.retry_interval
attempts = 0
while True:
try:
client = pymongo.MongoClient(url, safe=True)
except pymongo.errors.ConnectionFailure as e:
if max_retries >= 0 and attempts >= max_retries:
LOG.error(_('Unable to connect to the database after '
'%(retries)d retries. Giving up.') %
{'retries': max_retries})
raise
LOG.warn(_('Unable to connect to the database server: '
'%(errmsg)s. Trying again in %(retry_interval)d '
'seconds.') %
{'errmsg': e, 'retry_interval': retry_interval})
attempts += 1
time.sleep(retry_interval)
else:
return client
COMMON_AVAILABLE_CAPABILITIES = {
'meters': {'query': {'simple': True,
@ -263,8 +138,8 @@ class Connection(base.Connection):
"""
if limit == 0:
return []
q = make_query_from_filter(sample_filter,
require_meter=False)
q = pymongo_utils.make_query_from_filter(sample_filter,
require_meter=False)
return self._retrieve_samples(q,
[("timestamp", pymongo.DESCENDING)],
@ -340,10 +215,10 @@ class Connection(base.Connection):
if type is not None:
q['type'] = type
if start_timestamp or end_timestamp:
ts_range = make_timestamp_range(start_timestamp,
end_timestamp,
start_timestamp_op,
end_timestamp_op)
ts_range = pymongo_utils.make_timestamp_range(start_timestamp,
end_timestamp,
start_timestamp_op,
end_timestamp_op)
if ts_range:
q['timestamp'] = ts_range
@ -374,7 +249,7 @@ class Connection(base.Connection):
return []
query_filter = {}
orderby_filter = [("timestamp", pymongo.DESCENDING)]
transformer = QueryTransformer()
transformer = pymongo_utils.QueryTransformer()
if orderby is not None:
orderby_filter = transformer.transform_orderby(orderby)
if filter_expr is not None:
@ -504,124 +379,3 @@ class Connection(base.Connection):
"""Ensures the alarm has a time constraints field."""
if 'time_constraints' not in alarm:
alarm['time_constraints'] = []
class QueryTransformer(object):
operators = {"<": "$lt",
">": "$gt",
"<=": "$lte",
"=<": "$lte",
">=": "$gte",
"=>": "$gte",
"!=": "$ne",
"in": "$in"}
complex_operators = {"or": "$or",
"and": "$and"}
ordering_functions = {"asc": pymongo.ASCENDING,
"desc": pymongo.DESCENDING}
def transform_orderby(self, orderby):
orderby_filter = []
for field in orderby:
field_name = field.keys()[0]
ordering = self.ordering_functions[field.values()[0]]
orderby_filter.append((field_name, ordering))
return orderby_filter
@staticmethod
def _move_negation_to_leaf(condition):
"""Moves every not operator to the leafs by
applying the De Morgan rules and anihilating
double negations
"""
def _apply_de_morgan(tree, negated_subtree, negated_op):
if negated_op == "and":
new_op = "or"
else:
new_op = "and"
tree[new_op] = [{"not": child}
for child in negated_subtree[negated_op]]
del tree["not"]
def transform(subtree):
op = subtree.keys()[0]
if op in ["and", "or"]:
[transform(child) for child in subtree[op]]
elif op == "not":
negated_tree = subtree[op]
negated_op = negated_tree.keys()[0]
if negated_op == "and":
_apply_de_morgan(subtree, negated_tree, negated_op)
transform(subtree)
elif negated_op == "or":
_apply_de_morgan(subtree, negated_tree, negated_op)
transform(subtree)
elif negated_op == "not":
# two consecutive not annihilates theirselves
new_op = negated_tree.values()[0].keys()[0]
subtree[new_op] = negated_tree[negated_op][new_op]
del subtree["not"]
transform(subtree)
transform(condition)
def transform_filter(self, condition):
# in Mongo not operator can only be applied to
# simple expressions so we have to move every
# not operator to the leafs of the expression tree
self._move_negation_to_leaf(condition)
return self._process_json_tree(condition)
def _handle_complex_op(self, complex_op, nodes):
element_list = []
for node in nodes:
element = self._process_json_tree(node)
element_list.append(element)
complex_operator = self.complex_operators[complex_op]
op = {complex_operator: element_list}
return op
def _handle_not_op(self, negated_tree):
# assumes that not is moved to the leaf already
# so we are next to a leaf
negated_op = negated_tree.keys()[0]
negated_field = negated_tree[negated_op].keys()[0]
value = negated_tree[negated_op][negated_field]
if negated_op == "=":
return {negated_field: {"$ne": value}}
elif negated_op == "!=":
return {negated_field: value}
else:
return {negated_field: {"$not":
{self.operators[negated_op]: value}}}
def _handle_simple_op(self, simple_op, nodes):
field_name = nodes.keys()[0]
field_value = nodes.values()[0]
# no operator for equal in Mongo
if simple_op == "=":
op = {field_name: field_value}
return op
operator = self.operators[simple_op]
op = {field_name: {operator: field_value}}
return op
def _process_json_tree(self, condition_tree):
operator_node = condition_tree.keys()[0]
nodes = condition_tree.values()[0]
if operator_node in self.complex_operators:
return self._handle_complex_op(operator_node, nodes)
if operator_node == "not":
negated_tree = condition_tree[operator_node]
return self._handle_not_op(negated_tree)
return self._handle_simple_op(operator_node, nodes)

View File

@ -23,7 +23,7 @@ import pymongo
from ceilometer.openstack.common.gettextutils import _
from ceilometer.publisher import utils
from ceilometer import sample
from ceilometer.storage import pymongo_base
from ceilometer.storage.mongo import utils as pymongo_utils
from ceilometer.tests import db as tests_db
from ceilometer.tests.storage import test_storage_scenarios
@ -185,13 +185,13 @@ class CompatibilityTest(test_storage_scenarios.DBTestBase,
'connection', self.db_manager.url.replace('db2:', 'mongodb:', 1),
group='database')
pool = pymongo_base.ConnectionPool()
pool = pymongo_utils.ConnectionPool()
with contextlib.nested(
patch('pymongo.MongoClient',
side_effect=pymongo.errors.ConnectionFailure('foo')),
patch.object(pymongo_base.LOG, 'error'),
patch.object(pymongo_base.LOG, 'warn'),
patch.object(pymongo_base.time, 'sleep')
patch.object(pymongo_utils.LOG, 'error'),
patch.object(pymongo_utils.LOG, 'warn'),
patch.object(pymongo_utils.time, 'sleep')
) as (MockMongo, MockLOGerror, MockLOGwarn, Mocksleep):
self.assertRaises(pymongo.errors.ConnectionFailure,
pool.connect, self.CONF.database.connection)