proxy: mongodb storage driver
This patch adds a mongodb storage driver for marconi-proxy. This also includes the test driver and a test configuration file. Change-Id: Ia039df7b8c86af8b3ec51732ca4cd7b533437fff Implements: blueprint placement-service
This commit is contained in:
parent
804c116084
commit
cd972aafd1
6
marconi/proxy/storage/mongodb/__init__.py
Normal file
6
marconi/proxy/storage/mongodb/__init__.py
Normal file
@ -0,0 +1,6 @@
|
||||
"""MongoDB Proxy Storage Driver for Marconi"""
|
||||
|
||||
from marconi.proxy.storage.mongodb import driver
|
||||
|
||||
# Hoist classes into package namespace
|
||||
Driver = driver.Driver
|
103
marconi/proxy/storage/mongodb/catalogue.py
Normal file
103
marconi/proxy/storage/mongodb/catalogue.py
Normal file
@ -0,0 +1,103 @@
|
||||
# Copyright (c) 2013 Rackspace Hosting, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""MongoDB storage controller for the proxy catalogue.
|
||||
|
||||
Schema:
|
||||
|
||||
{
|
||||
'p': Project_name :: str,
|
||||
'q': Queue_name :: str,
|
||||
'n': partition_Name :: str,
|
||||
'h': representative_Host_name :: str,
|
||||
'm': Metadata :: dict
|
||||
}
|
||||
"""
|
||||
|
||||
import marconi.openstack.common.log as logging
|
||||
from marconi.proxy.storage import base
|
||||
from marconi.proxy.storage import exceptions
|
||||
from marconi.queues.storage.mongodb import utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
CATALOGUE_INDEX = [
|
||||
('p', 1),
|
||||
('q', 1)
|
||||
]
|
||||
|
||||
|
||||
class CatalogueController(base.CatalogueBase):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(CatalogueController, self).__init__(*args, **kwargs)
|
||||
|
||||
self._col = self.driver.db['catalogue']
|
||||
self._col.ensure_index(CATALOGUE_INDEX, unique=True)
|
||||
|
||||
@utils.raises_conn_error
|
||||
def list(self, project, include_metadata=False):
|
||||
fields = {
|
||||
'p': 1,
|
||||
'q': 1,
|
||||
'n': 1,
|
||||
'h': 1,
|
||||
'm': 1
|
||||
}
|
||||
|
||||
cursor = self._col.find({'p': project}, fields)
|
||||
for entry in cursor:
|
||||
yield _normalize(entry)
|
||||
|
||||
@utils.raises_conn_error
|
||||
def get(self, project, queue):
|
||||
fields = {'p': 1, 'q': 1, 'n': 1, 'h': 1, 'm': 1, '_id': 0}
|
||||
entry = self._col.find_one({'p': project, 'q': queue},
|
||||
fields=fields)
|
||||
|
||||
if entry is None:
|
||||
raise exceptions.EntryNotFound(project, queue)
|
||||
|
||||
return _normalize(entry)
|
||||
|
||||
@utils.raises_conn_error
|
||||
def exists(self, project, queue):
|
||||
return self._col.find_one({'p': project, 'q': queue}) is not None
|
||||
|
||||
@utils.raises_conn_error
|
||||
def insert(self, project, queue, partition, host, metadata={}):
|
||||
self._col.insert({'p': project, 'q': queue,
|
||||
'n': partition, 'h': host, 'm': metadata})
|
||||
|
||||
@utils.raises_conn_error
|
||||
def delete(self, project, queue):
|
||||
self._col.remove({'p': project, 'q': queue}, w=0)
|
||||
|
||||
@utils.raises_conn_error
|
||||
def update_metadata(self, project, queue, metadata):
|
||||
# NOTE(cpp-cabrera): since update does not create, checking
|
||||
# for existence isn't necesssary
|
||||
self._col.update({'p': project, 'q': queue},
|
||||
{'$set': {'m': metadata}},
|
||||
multi=False, manipulate=False)
|
||||
|
||||
|
||||
def _normalize(entry):
|
||||
return {
|
||||
'name': entry['q'],
|
||||
'metadata': entry.get('m', {}),
|
||||
'partition': entry['n'],
|
||||
'host': entry['h']
|
||||
}
|
22
marconi/proxy/storage/mongodb/controllers.py
Normal file
22
marconi/proxy/storage/mongodb/controllers.py
Normal file
@ -0,0 +1,22 @@
|
||||
# Copyright (c) 2013 Rackspace Hosting, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""Exports Mongodb proxy storage controllers."""
|
||||
|
||||
from marconi.proxy.storage.mongodb import catalogue
|
||||
from marconi.proxy.storage.mongodb import partitions
|
||||
|
||||
CatalogueController = catalogue.CatalogueController
|
||||
PartitionsController = partitions.PartitionsController
|
52
marconi/proxy/storage/mongodb/driver.py
Normal file
52
marconi/proxy/storage/mongodb/driver.py
Normal file
@ -0,0 +1,52 @@
|
||||
# Copyright (c) 2013 Rackspace Hosting, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""Mongodb proxy storage driver implementation."""
|
||||
|
||||
import pymongo
|
||||
import pymongo.errors
|
||||
|
||||
from marconi.openstack.common import log as logging
|
||||
from marconi.proxy import storage
|
||||
from marconi.proxy.storage.mongodb import controllers
|
||||
from marconi.proxy.storage.mongodb import options
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Driver(storage.DriverBase):
|
||||
|
||||
def __init__(self):
|
||||
self._database = None
|
||||
|
||||
@property
|
||||
def db(self):
|
||||
if self._database is None:
|
||||
if options.CFG.uri and 'replicaSet' in options.CFG.uri:
|
||||
conn = pymongo.MongoReplicaSetClient(options.CFG.uri)
|
||||
else:
|
||||
conn = pymongo.MongoClient(options.CFG.uri)
|
||||
|
||||
self._database = conn[options.CFG.database]
|
||||
|
||||
return self._database
|
||||
|
||||
@property
|
||||
def partitions_controller(self):
|
||||
return controllers.PartitionsController(self)
|
||||
|
||||
@property
|
||||
def catalogue_controller(self):
|
||||
return controllers.CatalogueController(self)
|
29
marconi/proxy/storage/mongodb/options.py
Normal file
29
marconi/proxy/storage/mongodb/options.py
Normal file
@ -0,0 +1,29 @@
|
||||
# Copyright (c) 2013 Rackspace Hosting, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""MongoDB driver configuration options."""
|
||||
|
||||
from marconi.common import config
|
||||
|
||||
OPTIONS = {
|
||||
# Connection string
|
||||
'uri': None,
|
||||
|
||||
# Database name
|
||||
'database': 'marconi_proxy'
|
||||
}
|
||||
|
||||
NAMESPACE = 'drivers:proxy:storage:mongodb'
|
||||
CFG = config.namespace(NAMESPACE).from_options(**OPTIONS)
|
95
marconi/proxy/storage/mongodb/partitions.py
Normal file
95
marconi/proxy/storage/mongodb/partitions.py
Normal file
@ -0,0 +1,95 @@
|
||||
# Copyright (c) 2013 Rackspace Hosting, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""MongoDB storage controller for proxy partitions.
|
||||
|
||||
Schema:
|
||||
|
||||
{
|
||||
'n': Name :: str
|
||||
'h': [Host_url :: str],
|
||||
'w': Weight :: int,
|
||||
}
|
||||
"""
|
||||
|
||||
from marconi.proxy.storage import base
|
||||
from marconi.proxy.storage import exceptions
|
||||
from marconi.proxy.utils import round_robin
|
||||
from marconi.queues.storage.mongodb import utils
|
||||
|
||||
PARTITIONS_INDEX = [
|
||||
('n', 1)
|
||||
]
|
||||
|
||||
|
||||
class PartitionsController(base.PartitionsBase):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(PartitionsController, self).__init__(*args, **kwargs)
|
||||
|
||||
self._col = self.driver.db['partitions']
|
||||
self._col.ensure_index(PARTITIONS_INDEX, unique=True)
|
||||
self._rr = round_robin.Selector()
|
||||
|
||||
@utils.raises_conn_error
|
||||
def list(self):
|
||||
cursor = self._col.find(fields={'n': 1, 'h': 1, 'w': 1, '_id': 0})
|
||||
|
||||
for entry in cursor:
|
||||
yield _normalize(entry)
|
||||
|
||||
@utils.raises_conn_error
|
||||
def select(self, name):
|
||||
partition = self.get(name)
|
||||
return self._rr.next(partition['name'], partition['hosts'])
|
||||
|
||||
@utils.raises_conn_error
|
||||
def get(self, name):
|
||||
fields = {'n': 1, 'w': 1, 'h': 1, '_id': 0}
|
||||
partition = self._col.find_one({'n': name},
|
||||
fields=fields)
|
||||
|
||||
if partition is None:
|
||||
raise exceptions.PartitionNotFound(name)
|
||||
|
||||
return _normalize(partition)
|
||||
|
||||
@utils.raises_conn_error
|
||||
def exists(self, name):
|
||||
try:
|
||||
next(self._col.find({'n': name}))
|
||||
except StopIteration:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
@utils.raises_conn_error
|
||||
def create(self, name, weight, hosts):
|
||||
# NOTE(cpp-cabrera): overwriting behavior should be okay for
|
||||
# managing partitions
|
||||
self._col.update({'n': name},
|
||||
{'$set': {'n': name, 'w': weight, 'h': hosts}},
|
||||
upsert=True)
|
||||
|
||||
@utils.raises_conn_error
|
||||
def delete(self, name):
|
||||
self._col.remove({'n': name}, w=0)
|
||||
|
||||
|
||||
def _normalize(entry):
|
||||
return {
|
||||
'name': entry['n'],
|
||||
'hosts': entry['h'],
|
||||
'weight': entry['w']
|
||||
}
|
14
tests/etc/wsgi_proxy_mongodb.conf
Normal file
14
tests/etc/wsgi_proxy_mongodb.conf
Normal file
@ -0,0 +1,14 @@
|
||||
[DEFAULT]
|
||||
debug = False
|
||||
verbose = False
|
||||
|
||||
[drivers]
|
||||
transport = wsgi
|
||||
storage = mongodb
|
||||
|
||||
[drivers:transport:wsgi]
|
||||
port = 8888
|
||||
|
||||
[drivers:proxy:storage:mongodb]
|
||||
uri = mongodb://127.0.0.1:27017
|
||||
database = marconi_proxy_test
|
71
tests/unit/proxy/storage/test_impl_mongodb.py
Normal file
71
tests/unit/proxy/storage/test_impl_mongodb.py
Normal file
@ -0,0 +1,71 @@
|
||||
# Copyright (c) 2013 Rackspace Hosting, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import os
|
||||
|
||||
from marconi.proxy.storage import mongodb
|
||||
from marconi.proxy.storage.mongodb import controllers
|
||||
from marconi.proxy.storage.mongodb import options
|
||||
from marconi import tests as testing
|
||||
|
||||
from tests.unit.proxy.storage import base
|
||||
|
||||
|
||||
class MongodbDriverTest(testing.TestBase):
|
||||
|
||||
def setUp(self):
|
||||
if not os.environ.get('MONGODB_TEST_LIVE'):
|
||||
self.skipTest('No MongoDB instance running')
|
||||
|
||||
super(MongodbDriverTest, self).setUp()
|
||||
self.load_conf('wsgi_proxy_mongodb.conf')
|
||||
|
||||
def test_db_instance(self):
|
||||
driver = mongodb.Driver()
|
||||
db = driver.db
|
||||
self.assertEquals(db.name, options.CFG.database)
|
||||
|
||||
|
||||
class MongodbPartitionsTest(base.PartitionsControllerTest):
|
||||
|
||||
driver_class = mongodb.Driver
|
||||
controller_class = controllers.PartitionsController
|
||||
|
||||
def setUp(self):
|
||||
if not os.environ.get('MONGODB_TEST_LIVE'):
|
||||
self.skipTest('No MongoDB instance found running')
|
||||
|
||||
super(MongodbPartitionsTest, self).setUp()
|
||||
self.load_conf('wsgi_proxy_mongodb.conf')
|
||||
|
||||
def tearDown(self):
|
||||
self.controller._col.drop()
|
||||
super(MongodbPartitionsTest, self).tearDown()
|
||||
|
||||
|
||||
class MongodbCatalogueTest(base.CatalogueControllerTest):
|
||||
|
||||
driver_class = mongodb.Driver
|
||||
controller_class = controllers.CatalogueController
|
||||
|
||||
def setUp(self):
|
||||
if not os.environ.get('MONGODB_TEST_LIVE'):
|
||||
self.skipTest('No MongoDB instance found running')
|
||||
|
||||
super(MongodbCatalogueTest, self).setUp()
|
||||
self.load_conf('wsgi_proxy_mongodb.conf')
|
||||
|
||||
def tearDown(self):
|
||||
self.controller._col.drop()
|
||||
super(MongodbCatalogueTest, self).tearDown()
|
Loading…
x
Reference in New Issue
Block a user