Radware LBaaS driver is able to flip to a secondary backend node
Change-Id: Ifbfef493d5339f61dcf58dddcc8e3830aaf06bf1 Closes-Bug: #1324131
This commit is contained in:
parent
2ee736d64b
commit
79c284905f
@ -1,5 +1,6 @@
|
|||||||
[radware]
|
[radware]
|
||||||
#vdirect_address = 0.0.0.0
|
#vdirect_address = 0.0.0.0
|
||||||
|
#ha_secondary_address=
|
||||||
#vdirect_user = vDirect
|
#vdirect_user = vDirect
|
||||||
#vdirect_password = radware
|
#vdirect_password = radware
|
||||||
#service_ha_pair = False
|
#service_ha_pair = False
|
||||||
|
@ -20,10 +20,10 @@ import base64
|
|||||||
import copy
|
import copy
|
||||||
import httplib
|
import httplib
|
||||||
import Queue
|
import Queue
|
||||||
import socket
|
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
|
||||||
import eventlet
|
import eventlet
|
||||||
from oslo.config import cfg
|
from oslo.config import cfg
|
||||||
|
|
||||||
@ -61,6 +61,8 @@ CREATE_SERVICE_HEADER = {'Content-Type':
|
|||||||
driver_opts = [
|
driver_opts = [
|
||||||
cfg.StrOpt('vdirect_address',
|
cfg.StrOpt('vdirect_address',
|
||||||
help=_('IP address of vDirect server.')),
|
help=_('IP address of vDirect server.')),
|
||||||
|
cfg.StrOpt('ha_secondary_address',
|
||||||
|
help=_('IP address of secondary vDirect server.')),
|
||||||
cfg.StrOpt('vdirect_user',
|
cfg.StrOpt('vdirect_user',
|
||||||
default='vDirect',
|
default='vDirect',
|
||||||
help=_('vDirect user name.')),
|
help=_('vDirect user name.')),
|
||||||
@ -173,8 +175,10 @@ class LoadBalancerDriver(abstract_driver.LoadBalancerAbstractDriver):
|
|||||||
self.l2_l3_setup_params = rad.l2_l3_setup_params
|
self.l2_l3_setup_params = rad.l2_l3_setup_params
|
||||||
self.l4_action_name = rad.l4_action_name
|
self.l4_action_name = rad.l4_action_name
|
||||||
self.actions_to_skip = rad.actions_to_skip
|
self.actions_to_skip = rad.actions_to_skip
|
||||||
vdirect_address = cfg.CONF.radware.vdirect_address
|
vdirect_address = rad.vdirect_address
|
||||||
|
sec_server = rad.ha_secondary_address
|
||||||
self.rest_client = vDirectRESTClient(server=vdirect_address,
|
self.rest_client = vDirectRESTClient(server=vdirect_address,
|
||||||
|
secondary_server=sec_server,
|
||||||
user=rad.vdirect_user,
|
user=rad.vdirect_user,
|
||||||
password=rad.vdirect_password)
|
password=rad.vdirect_password)
|
||||||
self.queue = Queue.Queue()
|
self.queue = Queue.Queue()
|
||||||
@ -633,6 +637,7 @@ class vDirectRESTClient:
|
|||||||
|
|
||||||
def __init__(self,
|
def __init__(self,
|
||||||
server='localhost',
|
server='localhost',
|
||||||
|
secondary_server=None,
|
||||||
user=None,
|
user=None,
|
||||||
password=None,
|
password=None,
|
||||||
port=2189,
|
port=2189,
|
||||||
@ -640,6 +645,7 @@ class vDirectRESTClient:
|
|||||||
timeout=5000,
|
timeout=5000,
|
||||||
base_uri=''):
|
base_uri=''):
|
||||||
self.server = server
|
self.server = server
|
||||||
|
self.secondary_server = secondary_server
|
||||||
self.port = port
|
self.port = port
|
||||||
self.ssl = ssl
|
self.ssl = ssl
|
||||||
self.base_uri = base_uri
|
self.base_uri = base_uri
|
||||||
@ -651,14 +657,48 @@ class vDirectRESTClient:
|
|||||||
raise r_exc.AuthenticationMissing()
|
raise r_exc.AuthenticationMissing()
|
||||||
|
|
||||||
debug_params = {'server': self.server,
|
debug_params = {'server': self.server,
|
||||||
|
'sec_server': self.secondary_server,
|
||||||
'port': self.port,
|
'port': self.port,
|
||||||
'ssl': self.ssl}
|
'ssl': self.ssl}
|
||||||
LOG.debug(_('vDirectRESTClient:init server=%(server)s, '
|
LOG.debug(_('vDirectRESTClient:init server=%(server)s, '
|
||||||
'port=%(port)d, '
|
'secondary server=%(sec_server)s, '
|
||||||
'ssl=%(ssl)r'), debug_params)
|
'port=%(port)d, '
|
||||||
|
'ssl=%(ssl)r'), debug_params)
|
||||||
|
|
||||||
|
def _flip_servers(self):
|
||||||
|
LOG.warning(_('Fliping servers. Current is: %(server)s, '
|
||||||
|
'switching to %(secondary)s'),
|
||||||
|
{'server': self.server,
|
||||||
|
'secondary': self.secondary_server})
|
||||||
|
self.server, self.secondary_server = self.secondary_server, self.server
|
||||||
|
|
||||||
|
def _recover(self, action, resource, data, headers, binary=False):
|
||||||
|
if self.server and self.secondary_server:
|
||||||
|
self._flip_servers()
|
||||||
|
resp = self._call(action, resource, data,
|
||||||
|
headers, binary)
|
||||||
|
return resp
|
||||||
|
else:
|
||||||
|
LOG.exception(_('REST client is not able to recover '
|
||||||
|
'since only one vDirect server is '
|
||||||
|
'configured.'))
|
||||||
|
return -1, None, None, None
|
||||||
|
|
||||||
|
def call(self, action, resource, data, headers, binary=False):
|
||||||
|
resp = self._call(action, resource, data, headers, binary)
|
||||||
|
if resp[RESP_STATUS] == -1:
|
||||||
|
LOG.warning(_('vDirect server is not responding (%s).'),
|
||||||
|
self.server)
|
||||||
|
return self._recover(action, resource, data, headers, binary)
|
||||||
|
elif resp[RESP_STATUS] in (301, 307):
|
||||||
|
LOG.warning(_('vDirect server is not active (%s).'),
|
||||||
|
self.server)
|
||||||
|
return self._recover(action, resource, data, headers, binary)
|
||||||
|
else:
|
||||||
|
return resp
|
||||||
|
|
||||||
@call_log.log
|
@call_log.log
|
||||||
def call(self, action, resource, data, headers, binary=False):
|
def _call(self, action, resource, data, headers, binary=False):
|
||||||
if resource.startswith('http'):
|
if resource.startswith('http'):
|
||||||
uri = resource
|
uri = resource
|
||||||
else:
|
else:
|
||||||
@ -701,11 +741,11 @@ class vDirectRESTClient:
|
|||||||
# response was not JSON, ignore the exception
|
# response was not JSON, ignore the exception
|
||||||
pass
|
pass
|
||||||
ret = (response.status, response.reason, respstr, respdata)
|
ret = (response.status, response.reason, respstr, respdata)
|
||||||
except (socket.timeout, socket.error) as e:
|
except Exception as e:
|
||||||
log_dict = {'action': action, 'e': e}
|
log_dict = {'action': action, 'e': e}
|
||||||
LOG.error(_('vdirectRESTClient: %(action)s failure, %(e)r'),
|
LOG.error(_('vdirectRESTClient: %(action)s failure, %(e)r'),
|
||||||
log_dict)
|
log_dict)
|
||||||
ret = 0, None, None, None
|
ret = -1, None, None, None
|
||||||
conn.close()
|
conn.close()
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
@ -853,7 +893,14 @@ class OperationCompletionHandler(threading.Thread):
|
|||||||
|
|
||||||
def _rest_wrapper(response, success_codes=[202]):
|
def _rest_wrapper(response, success_codes=[202]):
|
||||||
"""Wrap a REST call and make sure a valid status is returned."""
|
"""Wrap a REST call and make sure a valid status is returned."""
|
||||||
if response[RESP_STATUS] not in success_codes:
|
if not response:
|
||||||
|
raise r_exc.RESTRequestFailure(
|
||||||
|
status=-1,
|
||||||
|
reason="Unknown",
|
||||||
|
description="Unknown",
|
||||||
|
success_codes=success_codes
|
||||||
|
)
|
||||||
|
elif response[RESP_STATUS] not in success_codes:
|
||||||
raise r_exc.RESTRequestFailure(
|
raise r_exc.RESTRequestFailure(
|
||||||
status=response[RESP_STATUS],
|
status=response[RESP_STATUS],
|
||||||
reason=response[RESP_REASON],
|
reason=response[RESP_REASON],
|
||||||
|
@ -32,6 +32,7 @@ from neutron.services.loadbalancer.drivers.radware import exceptions as r_exc
|
|||||||
from neutron.tests.unit.db.loadbalancer import test_db_loadbalancer
|
from neutron.tests.unit.db.loadbalancer import test_db_loadbalancer
|
||||||
|
|
||||||
GET_200 = ('/api/workflow/', '/api/service/', '/api/workflowTemplate')
|
GET_200 = ('/api/workflow/', '/api/service/', '/api/workflowTemplate')
|
||||||
|
SERVER_DOWN_CODES = (-1, 301, 307)
|
||||||
|
|
||||||
|
|
||||||
class QueueMock(Queue.Queue):
|
class QueueMock(Queue.Queue):
|
||||||
@ -43,10 +44,16 @@ class QueueMock(Queue.Queue):
|
|||||||
self.completion_handler(oper)
|
self.completion_handler(oper)
|
||||||
|
|
||||||
|
|
||||||
|
def _recover_function_mock(action, resource, data, headers, binary=False):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
def rest_call_function_mock(action, resource, data, headers, binary=False):
|
def rest_call_function_mock(action, resource, data, headers, binary=False):
|
||||||
if rest_call_function_mock.RESPOND_WITH_ERROR:
|
if rest_call_function_mock.RESPOND_WITH_ERROR:
|
||||||
return 400, 'error_status', 'error_description', None
|
return 400, 'error_status', 'error_description', None
|
||||||
|
if rest_call_function_mock.RESPOND_WITH_SERVER_DOWN in SERVER_DOWN_CODES:
|
||||||
|
val = rest_call_function_mock.RESPOND_WITH_SERVER_DOWN
|
||||||
|
return val, 'error_status', 'error_description', None
|
||||||
if action == 'GET':
|
if action == 'GET':
|
||||||
return _get_handler(resource)
|
return _get_handler(resource)
|
||||||
elif action == 'DELETE':
|
elif action == 'DELETE':
|
||||||
@ -114,6 +121,8 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
|
|||||||
{'RESPOND_WITH_ERROR': False})
|
{'RESPOND_WITH_ERROR': False})
|
||||||
rest_call_function_mock.__dict__.update(
|
rest_call_function_mock.__dict__.update(
|
||||||
{'TEMPLATES_MISSING': False})
|
{'TEMPLATES_MISSING': False})
|
||||||
|
rest_call_function_mock.__dict__.update(
|
||||||
|
{'RESPOND_WITH_SERVER_DOWN': 200})
|
||||||
|
|
||||||
self.operation_completer_start_mock = mock.Mock(
|
self.operation_completer_start_mock = mock.Mock(
|
||||||
return_value=None)
|
return_value=None)
|
||||||
@ -121,13 +130,22 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
|
|||||||
return_value=None)
|
return_value=None)
|
||||||
self.driver_rest_call_mock = mock.Mock(
|
self.driver_rest_call_mock = mock.Mock(
|
||||||
side_effect=rest_call_function_mock)
|
side_effect=rest_call_function_mock)
|
||||||
|
self.flip_servers_mock = mock.Mock(
|
||||||
|
return_value=None)
|
||||||
|
self.recover_mock = mock.Mock(
|
||||||
|
side_effect=_recover_function_mock)
|
||||||
|
|
||||||
radware_driver = self.plugin_instance.drivers['radware']
|
radware_driver = self.plugin_instance.drivers['radware']
|
||||||
radware_driver.completion_handler.start = (
|
radware_driver.completion_handler.start = (
|
||||||
self.operation_completer_start_mock)
|
self.operation_completer_start_mock)
|
||||||
radware_driver.completion_handler.join = (
|
radware_driver.completion_handler.join = (
|
||||||
self.operation_completer_join_mock)
|
self.operation_completer_join_mock)
|
||||||
|
self.orig_call = radware_driver.rest_client.call
|
||||||
|
self.orig__call = radware_driver.rest_client._call
|
||||||
radware_driver.rest_client.call = self.driver_rest_call_mock
|
radware_driver.rest_client.call = self.driver_rest_call_mock
|
||||||
|
radware_driver.rest_client._call = self.driver_rest_call_mock
|
||||||
|
radware_driver.rest_client._flip_servers = self.flip_servers_mock
|
||||||
|
radware_driver.rest_client._recover = self.recover_mock
|
||||||
radware_driver.completion_handler.rest_client.call = (
|
radware_driver.completion_handler.rest_client.call = (
|
||||||
self.driver_rest_call_mock)
|
self.driver_rest_call_mock)
|
||||||
|
|
||||||
@ -136,6 +154,34 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
|
|||||||
|
|
||||||
self.addCleanup(radware_driver.completion_handler.join)
|
self.addCleanup(radware_driver.completion_handler.join)
|
||||||
|
|
||||||
|
def test_rest_client_recover_was_called(self):
|
||||||
|
"""Call the real REST client and verify _recover is called."""
|
||||||
|
radware_driver = self.plugin_instance.drivers['radware']
|
||||||
|
radware_driver.rest_client.call = self.orig_call
|
||||||
|
radware_driver.rest_client._call = self.orig__call
|
||||||
|
self.assertRaises(r_exc.RESTRequestFailure,
|
||||||
|
radware_driver._verify_workflow_templates)
|
||||||
|
self.recover_mock.assert_called_once()
|
||||||
|
|
||||||
|
def test_rest_client_flip_servers(self):
|
||||||
|
radware_driver = self.plugin_instance.drivers['radware']
|
||||||
|
server = radware_driver.rest_client.server
|
||||||
|
sec_server = radware_driver.rest_client.secondary_server
|
||||||
|
radware_driver.rest_client._flip_servers()
|
||||||
|
self.assertEqual(server,
|
||||||
|
radware_driver.rest_client.secondary_server)
|
||||||
|
self.assertEqual(sec_server,
|
||||||
|
radware_driver.rest_client.server)
|
||||||
|
|
||||||
|
def test_verify_workflow_templates_server_down(self):
|
||||||
|
"""Test the rest call failure when backend is down."""
|
||||||
|
for value in SERVER_DOWN_CODES:
|
||||||
|
rest_call_function_mock.__dict__.update(
|
||||||
|
{'RESPOND_WITH_SERVER_DOWN': value})
|
||||||
|
self.assertRaises(r_exc.RESTRequestFailure,
|
||||||
|
self.plugin_instance.drivers['radware'].
|
||||||
|
_verify_workflow_templates)
|
||||||
|
|
||||||
def test_verify_workflow_templates(self):
|
def test_verify_workflow_templates(self):
|
||||||
"""Test the rest call failure handling by Exception raising."""
|
"""Test the rest call failure handling by Exception raising."""
|
||||||
rest_call_function_mock.__dict__.update(
|
rest_call_function_mock.__dict__.update(
|
||||||
|
Loading…
Reference in New Issue
Block a user