Merge "NEC plugin: Honor Retry-After response from OFC"
This commit is contained in:
commit
fa7c4b18e1
@ -37,6 +37,11 @@ firewall_driver = neutron.agent.linux.iptables_firewall.OVSHybridIptablesFirewal
|
|||||||
# Certificate file
|
# Certificate file
|
||||||
# cert_file =
|
# cert_file =
|
||||||
|
|
||||||
|
# Maximum attempts per OFC API request. NEC plugin retries
|
||||||
|
# API request to OFC when OFC returns ServiceUnavailable (503).
|
||||||
|
# The value must be greater than 0.
|
||||||
|
# api_max_attempts = 3
|
||||||
|
|
||||||
[provider]
|
[provider]
|
||||||
# Default router provider to use.
|
# Default router provider to use.
|
||||||
# default_router_provider = l3-agent
|
# default_router_provider = l3-agent
|
||||||
|
@ -48,6 +48,11 @@ ofc_opts = [
|
|||||||
help=_("Key file")),
|
help=_("Key file")),
|
||||||
cfg.StrOpt('cert_file', default=None,
|
cfg.StrOpt('cert_file', default=None,
|
||||||
help=_("Certificate file")),
|
help=_("Certificate file")),
|
||||||
|
cfg.IntOpt('api_max_attempts', default=3,
|
||||||
|
help=_("Maximum attempts per OFC API request."
|
||||||
|
"NEC plugin retries API request to OFC "
|
||||||
|
"when OFC returns ServiceUnavailable (503)."
|
||||||
|
"The value must be greater than 0.")),
|
||||||
]
|
]
|
||||||
|
|
||||||
provider_opts = [
|
provider_opts = [
|
||||||
|
@ -42,6 +42,15 @@ class OFCMappingNotFound(qexc.NotFound):
|
|||||||
"It may be deleted during processing.")
|
"It may be deleted during processing.")
|
||||||
|
|
||||||
|
|
||||||
|
class OFCServiceUnavailable(OFCException):
|
||||||
|
message = _("OFC returns Server Unavailable (503) "
|
||||||
|
"(Retry-After=%(retry_after)s)")
|
||||||
|
|
||||||
|
def __init__(self, **kwargs):
|
||||||
|
super(OFCServiceUnavailable, self).__init__(**kwargs)
|
||||||
|
self.retry_after = kwargs.get('retry_after')
|
||||||
|
|
||||||
|
|
||||||
class PortInfoNotFound(qexc.NotFound):
|
class PortInfoNotFound(qexc.NotFound):
|
||||||
message = _("PortInfo %(id)s could not be found")
|
message = _("PortInfo %(id)s could not be found")
|
||||||
|
|
||||||
|
@ -18,8 +18,10 @@
|
|||||||
import httplib
|
import httplib
|
||||||
import json
|
import json
|
||||||
import socket
|
import socket
|
||||||
|
import time
|
||||||
|
|
||||||
from neutron.openstack.common import log as logging
|
from neutron.openstack.common import log as logging
|
||||||
|
from neutron.plugins.nec.common import config
|
||||||
from neutron.plugins.nec.common import exceptions as nexc
|
from neutron.plugins.nec.common import exceptions as nexc
|
||||||
|
|
||||||
|
|
||||||
@ -67,7 +69,7 @@ class OFCClient(object):
|
|||||||
return (_("Operation on OFC failed: %(status)s%(msg)s") %
|
return (_("Operation on OFC failed: %(status)s%(msg)s") %
|
||||||
{'status': status, 'msg': detail})
|
{'status': status, 'msg': detail})
|
||||||
|
|
||||||
def do_request(self, method, action, body=None):
|
def do_single_request(self, method, action, body=None):
|
||||||
LOG.debug(_("Client request: %(host)s:%(port)s "
|
LOG.debug(_("Client request: %(host)s:%(port)s "
|
||||||
"%(method)s %(action)s [%(body)s]"),
|
"%(method)s %(action)s [%(body)s]"),
|
||||||
{'host': self.host, 'port': self.port,
|
{'host': self.host, 'port': self.port,
|
||||||
@ -95,6 +97,11 @@ class OFCClient(object):
|
|||||||
httplib.ACCEPTED,
|
httplib.ACCEPTED,
|
||||||
httplib.NO_CONTENT):
|
httplib.NO_CONTENT):
|
||||||
return data
|
return data
|
||||||
|
elif res.status == httplib.SERVICE_UNAVAILABLE:
|
||||||
|
retry_after = res.getheader('retry-after')
|
||||||
|
LOG.warning(_("OFC returns ServiceUnavailable "
|
||||||
|
"(retry-after=%s)"), retry_after)
|
||||||
|
raise nexc.OFCServiceUnavailable(retry_after=retry_after)
|
||||||
elif res.status == httplib.NOT_FOUND:
|
elif res.status == httplib.NOT_FOUND:
|
||||||
LOG.info(_("Specified resource %s does not exist on OFC "),
|
LOG.info(_("Specified resource %s does not exist on OFC "),
|
||||||
action)
|
action)
|
||||||
@ -116,6 +123,23 @@ class OFCClient(object):
|
|||||||
LOG.error(reason)
|
LOG.error(reason)
|
||||||
raise nexc.OFCException(reason=reason)
|
raise nexc.OFCException(reason=reason)
|
||||||
|
|
||||||
|
def do_request(self, method, action, body=None):
|
||||||
|
max_attempts = config.OFC.api_max_attempts
|
||||||
|
for i in range(max_attempts, 0, -1):
|
||||||
|
try:
|
||||||
|
return self.do_single_request(method, action, body)
|
||||||
|
except nexc.OFCServiceUnavailable as e:
|
||||||
|
try:
|
||||||
|
wait_time = int(e.retry_after)
|
||||||
|
except (ValueError, TypeError):
|
||||||
|
wait_time = None
|
||||||
|
if i > 1 and wait_time:
|
||||||
|
LOG.info(_("Waiting for %s seconds due to "
|
||||||
|
"OFC Service_Unavailable."), wait_time)
|
||||||
|
time.sleep(wait_time)
|
||||||
|
continue
|
||||||
|
raise
|
||||||
|
|
||||||
def get(self, action):
|
def get(self, action):
|
||||||
return self.do_request("GET", action)
|
return self.do_request("GET", action)
|
||||||
|
|
||||||
|
@ -20,6 +20,7 @@ import json
|
|||||||
import socket
|
import socket
|
||||||
|
|
||||||
import mock
|
import mock
|
||||||
|
from oslo.config import cfg
|
||||||
|
|
||||||
from neutron.plugins.nec.common import exceptions as nexc
|
from neutron.plugins.nec.common import exceptions as nexc
|
||||||
from neutron.plugins.nec.common import ofc_client
|
from neutron.plugins.nec.common import ofc_client
|
||||||
@ -120,3 +121,59 @@ class OFCClientTest(base.BaseTestCase):
|
|||||||
mock.call.request('GET', '/somewhere', '{}', headers),
|
mock.call.request('GET', '/somewhere', '{}', headers),
|
||||||
]
|
]
|
||||||
conn.assert_has_calls(expected)
|
conn.assert_has_calls(expected)
|
||||||
|
|
||||||
|
def test_do_request_retry_fail_after_one_attempts(self):
|
||||||
|
self._test_do_request_retry_after(1, api_max_attempts=1)
|
||||||
|
|
||||||
|
def test_do_request_retry_fail_with_max_attempts(self):
|
||||||
|
self._test_do_request_retry_after(3)
|
||||||
|
|
||||||
|
def test_do_request_retry_succeed_with_2nd_attempt(self):
|
||||||
|
self._test_do_request_retry_after(2, succeed_final=True)
|
||||||
|
|
||||||
|
def test_do_request_retry_succeed_with_1st_attempt(self):
|
||||||
|
self._test_do_request_retry_after(1, succeed_final=True)
|
||||||
|
|
||||||
|
def _test_do_request_retry_after(self, exp_request_count,
|
||||||
|
api_max_attempts=None,
|
||||||
|
succeed_final=False):
|
||||||
|
if api_max_attempts is not None:
|
||||||
|
cfg.CONF.set_override('api_max_attempts', api_max_attempts,
|
||||||
|
group='OFC')
|
||||||
|
|
||||||
|
res_unavail = mock.Mock()
|
||||||
|
res_unavail.status = 503
|
||||||
|
res_unavail.read.return_value = None
|
||||||
|
res_unavail.getheader.return_value = '10'
|
||||||
|
|
||||||
|
res_ok = mock.Mock()
|
||||||
|
res_ok.status = 200
|
||||||
|
res_ok.read.return_value = None
|
||||||
|
|
||||||
|
conn = mock.Mock()
|
||||||
|
if succeed_final:
|
||||||
|
side_effect = [res_unavail] * (exp_request_count - 1) + [res_ok]
|
||||||
|
else:
|
||||||
|
side_effect = [res_unavail] * exp_request_count
|
||||||
|
conn.getresponse.side_effect = side_effect
|
||||||
|
|
||||||
|
with mock.patch.object(ofc_client.OFCClient, 'get_connection',
|
||||||
|
return_value=conn):
|
||||||
|
with mock.patch('time.sleep') as sleep:
|
||||||
|
client = ofc_client.OFCClient()
|
||||||
|
if succeed_final:
|
||||||
|
ret = client.do_request('GET', '/somewhere')
|
||||||
|
self.assertIsNone(ret)
|
||||||
|
else:
|
||||||
|
e = self.assertRaises(nexc.OFCServiceUnavailable,
|
||||||
|
client.do_request,
|
||||||
|
'GET', '/somewhere')
|
||||||
|
self.assertEqual('10', e.retry_after)
|
||||||
|
headers = {"Content-Type": "application/json"}
|
||||||
|
expected = [
|
||||||
|
mock.call.request('GET', '/somewhere', None, headers),
|
||||||
|
mock.call.getresponse(),
|
||||||
|
] * exp_request_count
|
||||||
|
conn.assert_has_calls(expected)
|
||||||
|
self.assertEqual(exp_request_count, conn.request.call_count)
|
||||||
|
self.assertEqual(exp_request_count - 1, sleep.call_count)
|
||||||
|
Loading…
Reference in New Issue
Block a user