diff --git a/marconi/proxy/storage/mongodb/__init__.py b/marconi/proxy/storage/mongodb/__init__.py new file mode 100644 index 000000000..a1fe9f5a3 --- /dev/null +++ b/marconi/proxy/storage/mongodb/__init__.py @@ -0,0 +1,6 @@ +"""MongoDB Proxy Storage Driver for Marconi""" + +from marconi.proxy.storage.mongodb import driver + +# Hoist classes into package namespace +Driver = driver.Driver diff --git a/marconi/proxy/storage/mongodb/catalogue.py b/marconi/proxy/storage/mongodb/catalogue.py new file mode 100644 index 000000000..e32d6fc87 --- /dev/null +++ b/marconi/proxy/storage/mongodb/catalogue.py @@ -0,0 +1,103 @@ +# Copyright (c) 2013 Rackspace Hosting, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""MongoDB storage controller for the proxy catalogue. + +Schema: + +{ + 'p': Project_name :: str, + 'q': Queue_name :: str, + 'n': partition_Name :: str, + 'h': representative_Host_name :: str, + 'm': Metadata :: dict +} +""" + +import marconi.openstack.common.log as logging +from marconi.proxy.storage import base +from marconi.proxy.storage import exceptions +from marconi.queues.storage.mongodb import utils + +LOG = logging.getLogger(__name__) + +CATALOGUE_INDEX = [ + ('p', 1), + ('q', 1) +] + + +class CatalogueController(base.CatalogueBase): + + def __init__(self, *args, **kwargs): + super(CatalogueController, self).__init__(*args, **kwargs) + + self._col = self.driver.db['catalogue'] + self._col.ensure_index(CATALOGUE_INDEX, unique=True) + + @utils.raises_conn_error + def list(self, project, include_metadata=False): + fields = { + 'p': 1, + 'q': 1, + 'n': 1, + 'h': 1, + 'm': 1 + } + + cursor = self._col.find({'p': project}, fields) + for entry in cursor: + yield _normalize(entry) + + @utils.raises_conn_error + def get(self, project, queue): + fields = {'p': 1, 'q': 1, 'n': 1, 'h': 1, 'm': 1, '_id': 0} + entry = self._col.find_one({'p': project, 'q': queue}, + fields=fields) + + if entry is None: + raise exceptions.EntryNotFound(project, queue) + + return _normalize(entry) + + @utils.raises_conn_error + def exists(self, project, queue): + return self._col.find_one({'p': project, 'q': queue}) is not None + + @utils.raises_conn_error + def insert(self, project, queue, partition, host, metadata={}): + self._col.insert({'p': project, 'q': queue, + 'n': partition, 'h': host, 'm': metadata}) + + @utils.raises_conn_error + def delete(self, project, queue): + self._col.remove({'p': project, 'q': queue}, w=0) + + @utils.raises_conn_error + def update_metadata(self, project, queue, metadata): + # NOTE(cpp-cabrera): since update does not create, checking + # for existence isn't necesssary + self._col.update({'p': project, 'q': queue}, + {'$set': {'m': metadata}}, + multi=False, manipulate=False) + + +def _normalize(entry): + return { + 'name': entry['q'], + 'metadata': entry.get('m', {}), + 'partition': entry['n'], + 'host': entry['h'] + } diff --git a/marconi/proxy/storage/mongodb/controllers.py b/marconi/proxy/storage/mongodb/controllers.py new file mode 100644 index 000000000..76ec57d99 --- /dev/null +++ b/marconi/proxy/storage/mongodb/controllers.py @@ -0,0 +1,22 @@ +# Copyright (c) 2013 Rackspace Hosting, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Exports Mongodb proxy storage controllers.""" + +from marconi.proxy.storage.mongodb import catalogue +from marconi.proxy.storage.mongodb import partitions + +CatalogueController = catalogue.CatalogueController +PartitionsController = partitions.PartitionsController diff --git a/marconi/proxy/storage/mongodb/driver.py b/marconi/proxy/storage/mongodb/driver.py new file mode 100644 index 000000000..09785e17c --- /dev/null +++ b/marconi/proxy/storage/mongodb/driver.py @@ -0,0 +1,52 @@ +# Copyright (c) 2013 Rackspace Hosting, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Mongodb proxy storage driver implementation.""" + +import pymongo +import pymongo.errors + +from marconi.openstack.common import log as logging +from marconi.proxy import storage +from marconi.proxy.storage.mongodb import controllers +from marconi.proxy.storage.mongodb import options + +LOG = logging.getLogger(__name__) + + +class Driver(storage.DriverBase): + + def __init__(self): + self._database = None + + @property + def db(self): + if self._database is None: + if options.CFG.uri and 'replicaSet' in options.CFG.uri: + conn = pymongo.MongoReplicaSetClient(options.CFG.uri) + else: + conn = pymongo.MongoClient(options.CFG.uri) + + self._database = conn[options.CFG.database] + + return self._database + + @property + def partitions_controller(self): + return controllers.PartitionsController(self) + + @property + def catalogue_controller(self): + return controllers.CatalogueController(self) diff --git a/marconi/proxy/storage/mongodb/options.py b/marconi/proxy/storage/mongodb/options.py new file mode 100644 index 000000000..99b3f3b9f --- /dev/null +++ b/marconi/proxy/storage/mongodb/options.py @@ -0,0 +1,29 @@ +# Copyright (c) 2013 Rackspace Hosting, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""MongoDB driver configuration options.""" + +from marconi.common import config + +OPTIONS = { + # Connection string + 'uri': None, + + # Database name + 'database': 'marconi_proxy' +} + +NAMESPACE = 'drivers:proxy:storage:mongodb' +CFG = config.namespace(NAMESPACE).from_options(**OPTIONS) diff --git a/marconi/proxy/storage/mongodb/partitions.py b/marconi/proxy/storage/mongodb/partitions.py new file mode 100644 index 000000000..5153f0c5e --- /dev/null +++ b/marconi/proxy/storage/mongodb/partitions.py @@ -0,0 +1,95 @@ +# Copyright (c) 2013 Rackspace Hosting, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""MongoDB storage controller for proxy partitions. + +Schema: + +{ + 'n': Name :: str + 'h': [Host_url :: str], + 'w': Weight :: int, +} +""" + +from marconi.proxy.storage import base +from marconi.proxy.storage import exceptions +from marconi.proxy.utils import round_robin +from marconi.queues.storage.mongodb import utils + +PARTITIONS_INDEX = [ + ('n', 1) +] + + +class PartitionsController(base.PartitionsBase): + def __init__(self, *args, **kwargs): + super(PartitionsController, self).__init__(*args, **kwargs) + + self._col = self.driver.db['partitions'] + self._col.ensure_index(PARTITIONS_INDEX, unique=True) + self._rr = round_robin.Selector() + + @utils.raises_conn_error + def list(self): + cursor = self._col.find(fields={'n': 1, 'h': 1, 'w': 1, '_id': 0}) + + for entry in cursor: + yield _normalize(entry) + + @utils.raises_conn_error + def select(self, name): + partition = self.get(name) + return self._rr.next(partition['name'], partition['hosts']) + + @utils.raises_conn_error + def get(self, name): + fields = {'n': 1, 'w': 1, 'h': 1, '_id': 0} + partition = self._col.find_one({'n': name}, + fields=fields) + + if partition is None: + raise exceptions.PartitionNotFound(name) + + return _normalize(partition) + + @utils.raises_conn_error + def exists(self, name): + try: + next(self._col.find({'n': name})) + except StopIteration: + return False + else: + return True + + @utils.raises_conn_error + def create(self, name, weight, hosts): + # NOTE(cpp-cabrera): overwriting behavior should be okay for + # managing partitions + self._col.update({'n': name}, + {'$set': {'n': name, 'w': weight, 'h': hosts}}, + upsert=True) + + @utils.raises_conn_error + def delete(self, name): + self._col.remove({'n': name}, w=0) + + +def _normalize(entry): + return { + 'name': entry['n'], + 'hosts': entry['h'], + 'weight': entry['w'] + } diff --git a/tests/etc/wsgi_proxy_mongodb.conf b/tests/etc/wsgi_proxy_mongodb.conf new file mode 100644 index 000000000..d4ac9b289 --- /dev/null +++ b/tests/etc/wsgi_proxy_mongodb.conf @@ -0,0 +1,14 @@ +[DEFAULT] +debug = False +verbose = False + +[drivers] +transport = wsgi +storage = mongodb + +[drivers:transport:wsgi] +port = 8888 + +[drivers:proxy:storage:mongodb] +uri = mongodb://127.0.0.1:27017 +database = marconi_proxy_test diff --git a/tests/unit/proxy/storage/test_impl_mongodb.py b/tests/unit/proxy/storage/test_impl_mongodb.py new file mode 100644 index 000000000..f3f8d6de8 --- /dev/null +++ b/tests/unit/proxy/storage/test_impl_mongodb.py @@ -0,0 +1,71 @@ +# Copyright (c) 2013 Rackspace Hosting, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os + +from marconi.proxy.storage import mongodb +from marconi.proxy.storage.mongodb import controllers +from marconi.proxy.storage.mongodb import options +from marconi import tests as testing + +from tests.unit.proxy.storage import base + + +class MongodbDriverTest(testing.TestBase): + + def setUp(self): + if not os.environ.get('MONGODB_TEST_LIVE'): + self.skipTest('No MongoDB instance running') + + super(MongodbDriverTest, self).setUp() + self.load_conf('wsgi_proxy_mongodb.conf') + + def test_db_instance(self): + driver = mongodb.Driver() + db = driver.db + self.assertEquals(db.name, options.CFG.database) + + +class MongodbPartitionsTest(base.PartitionsControllerTest): + + driver_class = mongodb.Driver + controller_class = controllers.PartitionsController + + def setUp(self): + if not os.environ.get('MONGODB_TEST_LIVE'): + self.skipTest('No MongoDB instance found running') + + super(MongodbPartitionsTest, self).setUp() + self.load_conf('wsgi_proxy_mongodb.conf') + + def tearDown(self): + self.controller._col.drop() + super(MongodbPartitionsTest, self).tearDown() + + +class MongodbCatalogueTest(base.CatalogueControllerTest): + + driver_class = mongodb.Driver + controller_class = controllers.CatalogueController + + def setUp(self): + if not os.environ.get('MONGODB_TEST_LIVE'): + self.skipTest('No MongoDB instance found running') + + super(MongodbCatalogueTest, self).setUp() + self.load_conf('wsgi_proxy_mongodb.conf') + + def tearDown(self): + self.controller._col.drop() + super(MongodbCatalogueTest, self).tearDown()