Add shard support to the client
The patch adds Shard support of the client. This will allow users to create and delete shards from Marconi. The API requires the Marconi instance to be and admin instance. However, there are no checks, restrictions, yet. This patch doesn't implement Shards listing. As soon as the MessageIterator lands, it'll be possible to reuse that code for the ShardsIterator. Partially-Implements blueprint: shard-management Partially-Implements blueprint: python-marconiclient-v1 Change-Id: I932c98d68a4a81b710dbea75a6bac123b7afaa08
This commit is contained in:
parent
7331d2c33d
commit
d5b24ef15c
@ -143,4 +143,22 @@ class V1(api.Api):
|
||||
'claim_id': {'type': 'string'},
|
||||
}
|
||||
},
|
||||
|
||||
'shard_create': {
|
||||
'ref': 'shards/{shard_name}',
|
||||
'method': 'PUT',
|
||||
'required': ['shard_name'],
|
||||
'properties': {
|
||||
'shard_name': {'type': 'string'},
|
||||
}
|
||||
},
|
||||
|
||||
'shard_delete': {
|
||||
'ref': 'shards/{shard_name}',
|
||||
'method': 'DELETE',
|
||||
'required': ['shard_name'],
|
||||
'properties': {
|
||||
'shard_name': {'type': 'string'},
|
||||
}
|
||||
},
|
||||
}
|
||||
|
@ -16,6 +16,7 @@
|
||||
import uuid
|
||||
|
||||
from marconiclient.queues.v1 import queues
|
||||
from marconiclient.queues.v1 import shard
|
||||
from marconiclient import transport
|
||||
from marconiclient.transport import request
|
||||
|
||||
@ -99,3 +100,15 @@ class Client(object):
|
||||
req.ref = ref
|
||||
|
||||
return trans.send(req).deserialized_content
|
||||
|
||||
# ADMIN API
|
||||
def shard(self, ref, **kwargs):
|
||||
"""Returns a shard instance
|
||||
|
||||
:param ref: Shard's reference name.
|
||||
:type ref: `six.text_type`
|
||||
|
||||
:returns: A shard instance
|
||||
:rtype: `shard.Shard`
|
||||
"""
|
||||
return shard.Shard(self, ref, **kwargs)
|
||||
|
@ -232,3 +232,38 @@ def message_delete(transport, request, queue_name, message_id, callback=None):
|
||||
request.params['message_id'] = message_id
|
||||
|
||||
transport.send(request)
|
||||
|
||||
|
||||
def shard_create(transport, request, shard_name, shard_data):
|
||||
"""Creates a shard called `shard_name`
|
||||
|
||||
:param transport: Transport instance to use
|
||||
:type transport: `transport.base.Transport`
|
||||
:param request: Request instance ready to be sent.
|
||||
:type request: `transport.request.Request`
|
||||
:param shard_name: Shard reference name.
|
||||
:type shard_name: `six.text_type`
|
||||
:param shard_data: Shard's properties, i.e: weight, uri, options.
|
||||
:type shard_data: `dict`
|
||||
"""
|
||||
|
||||
request.operation = 'shard_create'
|
||||
request.params['shard_name'] = shard_name
|
||||
request.content = json.dumps(shard_data)
|
||||
transport.send(request)
|
||||
|
||||
|
||||
def shard_delete(transport, request, shard_name):
|
||||
"""Deletes the shard `shard_name`
|
||||
|
||||
:param transport: Transport instance to use
|
||||
:type transport: `transport.base.Transport`
|
||||
:param request: Request instance ready to be sent.
|
||||
:type request: `transport.request.Request`
|
||||
:param shard_name: Shard reference name.
|
||||
:type shard_name: `six.text_type`
|
||||
"""
|
||||
|
||||
request.operation = 'shard_delete'
|
||||
request.params['shard_name'] = shard_name
|
||||
transport.send(request)
|
||||
|
51
marconiclient/queues/v1/shard.py
Normal file
51
marconiclient/queues/v1/shard.py
Normal file
@ -0,0 +1,51 @@
|
||||
# Copyright (c) 2014 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from marconiclient.queues.v1 import core
|
||||
|
||||
|
||||
class Shard(object):
|
||||
|
||||
def __init__(self, client, name,
|
||||
weight=None, uri=None,
|
||||
auto_create=True, **options):
|
||||
self.client = client
|
||||
|
||||
self.uri = uri
|
||||
self.name = name
|
||||
self.weight = weight
|
||||
self.options = options
|
||||
|
||||
if auto_create:
|
||||
self.ensure_exists()
|
||||
|
||||
def ensure_exists(self):
|
||||
"""Ensures shard exists
|
||||
|
||||
This method is not race safe,
|
||||
the shard could've been deleted
|
||||
right after it was called.
|
||||
"""
|
||||
req, trans = self.client._request_and_transport()
|
||||
|
||||
data = {'uri': self.uri,
|
||||
'weight': self.weight,
|
||||
'options': self.options}
|
||||
|
||||
core.shard_create(trans, req, self.name, data)
|
||||
|
||||
def delete(self):
|
||||
req, trans = self.client._request_and_transport()
|
||||
core.shard_delete(trans, req, self.name)
|
77
marconiclient/tests/queues/shard.py
Normal file
77
marconiclient/tests/queues/shard.py
Normal file
@ -0,0 +1,77 @@
|
||||
# Copyright (c) 2014 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import mock
|
||||
|
||||
from marconiclient.tests.queues import base
|
||||
from marconiclient.transport import response
|
||||
|
||||
|
||||
class QueuesV1ShardUnitTest(base.QueuesTestBase):
|
||||
|
||||
def test_shard_create(self):
|
||||
shard_data = {'weight': 10,
|
||||
'uri': 'sqlite://'}
|
||||
|
||||
with mock.patch.object(self.transport, 'send',
|
||||
autospec=True) as send_method:
|
||||
|
||||
resp = response.Response(None, None)
|
||||
send_method.return_value = resp
|
||||
|
||||
# NOTE(flaper87): This will call
|
||||
# ensure exists in the client instance
|
||||
# since auto_create's default is True
|
||||
shard = self.client.shard('test', **shard_data)
|
||||
self.assertEqual(shard.name, 'test')
|
||||
self.assertEqual(shard.weight, 10)
|
||||
|
||||
def test_shard_delete(self):
|
||||
shard_data = {'weight': 10,
|
||||
'uri': 'sqlite://'}
|
||||
|
||||
with mock.patch.object(self.transport, 'send',
|
||||
autospec=True) as send_method:
|
||||
|
||||
resp = response.Response(None, None)
|
||||
send_method.return_value = resp
|
||||
|
||||
# NOTE(flaper87): This will call
|
||||
# ensure exists in the client instance
|
||||
# since auto_create's default is True
|
||||
shard = self.client.shard('test', **shard_data)
|
||||
shard.delete()
|
||||
|
||||
# NOTE(flaper87): Nothing to assert here,
|
||||
# just checking our way down to the transport
|
||||
# doesn't crash.
|
||||
|
||||
|
||||
class QueuesV1ShardFunctionalTest(base.QueuesTestBase):
|
||||
|
||||
def test_shard_create(self):
|
||||
shard_data = {'weight': 10,
|
||||
'uri': 'sqlite://'}
|
||||
|
||||
shard = self.client.shard('test', **shard_data)
|
||||
self.assertEqual(shard.name, 'test')
|
||||
self.assertEqual(shard.weight, 10)
|
||||
|
||||
def test_shard_delete(self):
|
||||
shard_data = {'weight': 10,
|
||||
'uri': 'sqlite://'}
|
||||
|
||||
shard = self.client.shard('test', **shard_data)
|
||||
shard.delete()
|
26
tests/functional/queues/v1/test_shard.py
Normal file
26
tests/functional/queues/v1/test_shard.py
Normal file
@ -0,0 +1,26 @@
|
||||
# Copyright (c) 2014 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
from marconiclient.tests.queues import shard
|
||||
from marconiclient.transport import http
|
||||
|
||||
|
||||
class QueuesV1ShardHttpFunctionalTest(shard.QueuesV1ShardFunctionalTest):
|
||||
|
||||
is_functional = True
|
||||
transport_cls = http.HttpTransport
|
||||
url = 'http://127.0.0.1:8888/v1'
|
||||
version = 1
|
@ -166,3 +166,24 @@ class TestV1Core(base.TestBase):
|
||||
req = request.Request()
|
||||
core.message_delete(self.transport, req,
|
||||
'test', 'message_id')
|
||||
|
||||
# ADMIN API
|
||||
def test_shard_create(self):
|
||||
with mock.patch.object(self.transport, 'send',
|
||||
autospec=True) as send_method:
|
||||
resp = response.Response(None, None)
|
||||
send_method.return_value = resp
|
||||
|
||||
req = request.Request()
|
||||
core.shard_create(self.transport, req,
|
||||
'test_shard', {'uri': 'sqlite://',
|
||||
'weight': 0})
|
||||
|
||||
def test_shard_delete(self):
|
||||
with mock.patch.object(self.transport, 'send',
|
||||
autospec=True) as send_method:
|
||||
resp = response.Response(None, None)
|
||||
send_method.return_value = resp
|
||||
|
||||
req = request.Request()
|
||||
core.shard_delete(self.transport, req, 'test_shard')
|
||||
|
25
tests/unit/queues/v1/test_shard.py
Normal file
25
tests/unit/queues/v1/test_shard.py
Normal file
@ -0,0 +1,25 @@
|
||||
# Copyright (c) 2014 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
from marconiclient.tests.queues import shard
|
||||
from marconiclient.transport import http
|
||||
|
||||
|
||||
class QueuesV1ShardHttpUnitTest(shard.QueuesV1ShardUnitTest):
|
||||
|
||||
transport_cls = http.HttpTransport
|
||||
url = 'http://127.0.0.1:8888/v1'
|
||||
version = 1
|
Loading…
x
Reference in New Issue
Block a user