Merge "NSX-v3 multi-manager round robin scheduling"
This commit is contained in:
commit
5d33132a2e
@ -18,8 +18,8 @@ import contextlib
|
|||||||
import copy
|
import copy
|
||||||
import datetime
|
import datetime
|
||||||
import eventlet
|
import eventlet
|
||||||
|
import itertools
|
||||||
import logging
|
import logging
|
||||||
import random
|
|
||||||
import requests
|
import requests
|
||||||
import six
|
import six
|
||||||
import urlparse
|
import urlparse
|
||||||
@ -263,6 +263,9 @@ class ClusteredAPI(object):
|
|||||||
endpoint = Endpoint(provider, pool)
|
endpoint = Endpoint(provider, pool)
|
||||||
self._endpoints[provider.id] = endpoint
|
self._endpoints[provider.id] = endpoint
|
||||||
|
|
||||||
|
# service requests using round robin
|
||||||
|
self._endpoint_schedule = itertools.cycle(self._endpoints.values())
|
||||||
|
|
||||||
# duck type to proxy http invocations
|
# duck type to proxy http invocations
|
||||||
for method in ClusteredAPI._HTTP_VERBS:
|
for method in ClusteredAPI._HTTP_VERBS:
|
||||||
setattr(self, method, self._proxy_stub(method))
|
setattr(self, method, self._proxy_stub(method))
|
||||||
@ -339,18 +342,13 @@ class ClusteredAPI(object):
|
|||||||
{'ep': endpoint, 'err': e})
|
{'ep': endpoint, 'err': e})
|
||||||
|
|
||||||
def _select_endpoint(self):
|
def _select_endpoint(self):
|
||||||
connected = {}
|
# check for UP state until exhausting all endpoints
|
||||||
for provider_id, endpoint in self._endpoints.items():
|
seen, total = 0, len(self._endpoints.values())
|
||||||
|
while seen < total:
|
||||||
|
endpoint = next(self._endpoint_schedule)
|
||||||
if endpoint.state == EndpointState.UP:
|
if endpoint.state == EndpointState.UP:
|
||||||
connected[provider_id] = endpoint
|
return endpoint
|
||||||
if endpoint.pool.free():
|
seen += 1
|
||||||
# connection can be used now
|
|
||||||
return endpoint
|
|
||||||
|
|
||||||
# no free connections; randomly select a connected endpoint
|
|
||||||
# which will likely wait on pool.item() until a connection frees up
|
|
||||||
return (connected[random.choice(connected.keys())]
|
|
||||||
if connected else None)
|
|
||||||
|
|
||||||
def endpoint_for_connection(self, conn):
|
def endpoint_for_connection(self, conn):
|
||||||
# check all endpoint pools
|
# check all endpoint pools
|
||||||
|
@ -211,3 +211,30 @@ class ClusteredAPITestCase(nsxlib_testcase.NsxClientTestCase):
|
|||||||
api._validate = mock.Mock()
|
api._validate = mock.Mock()
|
||||||
self.assertRaises(nsx_exc.ServiceClusterUnavailable,
|
self.assertRaises(nsx_exc.ServiceClusterUnavailable,
|
||||||
api.get, 'api/v1/transport-zones')
|
api.get, 'api/v1/transport-zones')
|
||||||
|
|
||||||
|
def test_cluster_round_robin_servicing(self):
|
||||||
|
conf_managers = ['8.9.10.11', '9.10.11.12', '10.11.12.13']
|
||||||
|
cfg.CONF.set_override(
|
||||||
|
'nsx_api_managers', conf_managers, 'nsx_v3')
|
||||||
|
|
||||||
|
api = self.mock_nsx_clustered_api()
|
||||||
|
api._validate = mock.Mock()
|
||||||
|
|
||||||
|
eps = api._endpoints.values()
|
||||||
|
|
||||||
|
def _get_schedule(num_eps):
|
||||||
|
return [api._select_endpoint() for i in range(num_eps)]
|
||||||
|
|
||||||
|
self.assertEqual(_get_schedule(3), eps)
|
||||||
|
|
||||||
|
self.assertEqual(_get_schedule(6), [eps[0], eps[1], eps[2],
|
||||||
|
eps[0], eps[1], eps[2]])
|
||||||
|
|
||||||
|
eps[0]._state = cluster.EndpointState.DOWN
|
||||||
|
self.assertEqual(_get_schedule(4), [eps[1], eps[2], eps[1], eps[2]])
|
||||||
|
|
||||||
|
eps[1]._state = cluster.EndpointState.DOWN
|
||||||
|
self.assertEqual(_get_schedule(2), [eps[2], eps[2]])
|
||||||
|
|
||||||
|
eps[0]._state = cluster.EndpointState.UP
|
||||||
|
self.assertEqual(_get_schedule(4), [eps[0], eps[2], eps[0], eps[2]])
|
||||||
|
Loading…
x
Reference in New Issue
Block a user