diff --git a/vmware_nsxlib/tests/unit/v3/test_cluster.py b/vmware_nsxlib/tests/unit/v3/test_cluster.py index a3bc2c83..2a6261da 100644 --- a/vmware_nsxlib/tests/unit/v3/test_cluster.py +++ b/vmware_nsxlib/tests/unit/v3/test_cluster.py @@ -260,6 +260,48 @@ class ClusteredAPITestCase(nsxlib_testcase.NsxClientTestCase): eps[0]._state = cluster.EndpointState.UP self.assertEqual(_get_schedule(4), [eps[0], eps[2], eps[0], eps[2]]) + def test_cluster_select_endpoint(self): + conf_managers = ['8.9.10.11', '9.10.11.12', '10.11.12.13'] + api = self.mock_nsx_clustered_api(nsx_api_managers=conf_managers) + api._validate = mock.Mock() + eps = list(api._endpoints.values()) + + # all up - select the first one + self.assertEqual(api._select_endpoint(), eps[0]) + + # run again - select the 2nd + self.assertEqual(api._select_endpoint(), eps[1]) + + # all down - return None + eps[0]._state = cluster.EndpointState.DOWN + eps[1]._state = cluster.EndpointState.DOWN + eps[2]._state = cluster.EndpointState.DOWN + self.assertEqual(api._select_endpoint(), None) + + # up till now the validate method should not have been called + self.assertEqual(api._validate.call_count, 0) + + # set up the retries flag, and check that validate was called + # until retries have been exhausted + api.nsxlib_config.cluster_unavailable_retry = True + self.assertEqual(api._select_endpoint(), None) + self.assertEqual(api._validate.call_count, + api.nsxlib_config.max_attempts * len(eps)) + + # simulate the case where 1 endpoint finally goes up + self.validate_count = 0 + self.max_validate = 9 + + def _mock_validate(ep): + if self.validate_count >= self.max_validate: + ep._state = cluster.EndpointState.UP + self.validate_count += 1 + + api._validate = _mock_validate + self.assertEqual(api._select_endpoint(), + eps[(self.max_validate - 1) % len(eps)]) + self.assertEqual(self.validate_count, self.max_validate + 1) + def test_reinitialize_cluster(self): with mock.patch.object(cluster.TimeoutSession, 'request', return_value=get_sess_create_resp()): diff --git a/vmware_nsxlib/v3/cluster.py b/vmware_nsxlib/v3/cluster.py index ad11551f..f4343a96 100644 --- a/vmware_nsxlib/v3/cluster.py +++ b/vmware_nsxlib/v3/cluster.py @@ -33,10 +33,12 @@ from requests import adapters from requests import exceptions as requests_exceptions import six import six.moves.urllib.parse as urlparse +import tenacity from vmware_nsxlib._i18n import _ from vmware_nsxlib.v3 import client as nsx_client from vmware_nsxlib.v3 import exceptions +from vmware_nsxlib.v3 import utils LOG = log.getLogger(__name__) @@ -401,7 +403,12 @@ class ClusteredAPI(object): def _create_conn(p): def _conn(): # called when a pool needs to create a new connection - return self._http_provider.new_connection(self, p) + try: + return self._http_provider.new_connection(self, p) + except Exception as e: + if self._http_provider.is_conn_open_exception(e): + LOG.warning("Timeout while trying to open a " + "connection with %s", p) return _conn @@ -484,6 +491,11 @@ class ClusteredAPI(object): def _validate(self, endpoint): try: with endpoint.pool.item() as conn: + if not conn: + LOG.warning("No connection established with endpoint " + "%(ep)s. ", {'ep': endpoint}) + endpoint.set_state(EndpointState.DOWN) + return self._http_provider.validate_connection(self, endpoint, conn) endpoint.set_state(EndpointState.UP) except exceptions.ClientCertificateNotTrusted: @@ -505,13 +517,39 @@ class ClusteredAPI(object): {'ep': endpoint, 'err': e}) def _select_endpoint(self): - # check for UP state until exhausting all endpoints - seen, total = 0, len(self._endpoints.values()) - while seen < total: - endpoint = next(self._endpoint_schedule) - if endpoint.state == EndpointState.UP: - return endpoint - seen += 1 + """Return an endpoint in UP state. + + Go over all endpoint and return the next one which is UP + If all endpoints are currently DOWN, depending on the configuration + retry it until one is UP (or max retries exceeded) + """ + def _select_endpoint_internal(refresh=False): + # check for UP state until exhausting all endpoints + seen, total = 0, len(self._endpoints.values()) + while seen < total: + endpoint = next(self._endpoint_schedule) + if refresh: + self._validate(endpoint) + if endpoint.state == EndpointState.UP: + return endpoint + seen += 1 + + @utils.retry_upon_none_result(self.nsxlib_config.max_attempts) + def _select_endpoint_internal_with_retry(): + # redo endpoint selection with refreshing states + return _select_endpoint_internal(refresh=True) + + # First attempt to get an UP endpoint + endpoint = _select_endpoint_internal() + if endpoint or not self.nsxlib_config.cluster_unavailable_retry: + return endpoint + + # Retry the selection while refreshing the endpoints state + try: + return _select_endpoint_internal_with_retry() + except tenacity.RetryError: + # exhausted number of retries + return None def endpoint_for_connection(self, conn): # check all endpoint pools diff --git a/vmware_nsxlib/v3/config.py b/vmware_nsxlib/v3/config.py index 9cb6efb9..95812ac5 100644 --- a/vmware_nsxlib/v3/config.py +++ b/vmware_nsxlib/v3/config.py @@ -73,7 +73,11 @@ class NsxLibConfig(object): the requests, to allow admin user to update/ delete all entries. :param rate_limit_retry: If True, the client will retry requests failed on - "Too many requests" error + "Too many requests" error. + :param cluster_unavailable_retry: If True, skip fatal errors when no + endpoint in the NSX management cluster is + available to serve a request, and retry + the request instead. """ def __init__(self, @@ -97,7 +101,8 @@ class NsxLibConfig(object): dns_domain='openstacklocal', dhcp_profile_uuid=None, allow_overwrite_header=False, - rate_limit_retry=True): + rate_limit_retry=True, + cluster_unavailable_retry=False): self.nsx_api_managers = nsx_api_managers self._username = username @@ -119,6 +124,7 @@ class NsxLibConfig(object): self.dns_domain = dns_domain self.allow_overwrite_header = allow_overwrite_header self.rate_limit_retry = rate_limit_retry + self.cluster_unavailable_retry = cluster_unavailable_retry if dhcp_profile_uuid: # this is deprecated, and never used. diff --git a/vmware_nsxlib/v3/utils.py b/vmware_nsxlib/v3/utils.py index 59e0450e..3da6c05a 100644 --- a/vmware_nsxlib/v3/utils.py +++ b/vmware_nsxlib/v3/utils.py @@ -175,6 +175,15 @@ def retry_random_upon_exception(exc, delay=0.5, max_delay=5, before=_log_before_retry, after=_log_after_retry) +def retry_upon_none_result(max_attempts, delay=0.5, max_delay=2): + return tenacity.retry(reraise=True, + retry=tenacity.retry_if_result(lambda x: x is None), + wait=tenacity.wait_exponential( + multiplier=delay, max=max_delay), + stop=tenacity.stop_after_attempt(max_attempts), + before=_log_before_retry, after=_log_after_retry) + + def list_match(list1, list2): # Check if list1 and list2 have identical elements, but relaxed on # dict elements where list1's dict element can be a subset of list2's