Ensure that the report state is not a blocking call
Fixes bug 1191768 For the dhcp and l3 agents the first state report will be done via a call. If this succeeds then subsequent calls will be done via the cast method. Change-Id: I82a1d92fc84983b7bb46758db0aee3e3eca1d3be
This commit is contained in:
parent
49d7f87279
commit
0407b59c25
@ -743,6 +743,7 @@ class DhcpAgentWithStateReport(DhcpAgent):
|
|||||||
'start_flag': True,
|
'start_flag': True,
|
||||||
'agent_type': constants.AGENT_TYPE_DHCP}
|
'agent_type': constants.AGENT_TYPE_DHCP}
|
||||||
report_interval = cfg.CONF.AGENT.report_interval
|
report_interval = cfg.CONF.AGENT.report_interval
|
||||||
|
self.use_call = True
|
||||||
if report_interval:
|
if report_interval:
|
||||||
self.heartbeat = loopingcall.FixedIntervalLoopingCall(
|
self.heartbeat = loopingcall.FixedIntervalLoopingCall(
|
||||||
self._report_state)
|
self._report_state)
|
||||||
@ -753,8 +754,8 @@ class DhcpAgentWithStateReport(DhcpAgent):
|
|||||||
self.agent_state.get('configurations').update(
|
self.agent_state.get('configurations').update(
|
||||||
self.cache.get_state())
|
self.cache.get_state())
|
||||||
ctx = context.get_admin_context_without_session()
|
ctx = context.get_admin_context_without_session()
|
||||||
self.state_rpc.report_state(ctx,
|
self.state_rpc.report_state(ctx, self.agent_state, self.use_call)
|
||||||
self.agent_state)
|
self.use_call = False
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
# This means the server does not support report_state
|
# This means the server does not support report_state
|
||||||
LOG.warn(_("Quantum server does not support state report."
|
LOG.warn(_("Quantum server does not support state report."
|
||||||
|
@ -721,6 +721,7 @@ class L3NATAgentWithStateReport(L3NATAgent):
|
|||||||
'start_flag': True,
|
'start_flag': True,
|
||||||
'agent_type': l3_constants.AGENT_TYPE_L3}
|
'agent_type': l3_constants.AGENT_TYPE_L3}
|
||||||
report_interval = cfg.CONF.AGENT.report_interval
|
report_interval = cfg.CONF.AGENT.report_interval
|
||||||
|
self.use_call = True
|
||||||
if report_interval:
|
if report_interval:
|
||||||
self.heartbeat = loopingcall.FixedIntervalLoopingCall(
|
self.heartbeat = loopingcall.FixedIntervalLoopingCall(
|
||||||
self._report_state)
|
self._report_state)
|
||||||
@ -746,9 +747,10 @@ class L3NATAgentWithStateReport(L3NATAgent):
|
|||||||
configurations['interfaces'] = num_interfaces
|
configurations['interfaces'] = num_interfaces
|
||||||
configurations['floating_ips'] = num_floating_ips
|
configurations['floating_ips'] = num_floating_ips
|
||||||
try:
|
try:
|
||||||
self.state_rpc.report_state(self.context,
|
self.state_rpc.report_state(self.context, self.agent_state,
|
||||||
self.agent_state)
|
self.use_call)
|
||||||
self.agent_state.pop('start_flag', None)
|
self.agent_state.pop('start_flag', None)
|
||||||
|
self.use_call = False
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
# This means the server does not support report_state
|
# This means the server does not support report_state
|
||||||
LOG.warn(_("Quantum server does not support state report."
|
LOG.warn(_("Quantum server does not support state report."
|
||||||
|
@ -52,13 +52,15 @@ class PluginReportStateAPI(proxy.RpcProxy):
|
|||||||
super(PluginReportStateAPI, self).__init__(
|
super(PluginReportStateAPI, self).__init__(
|
||||||
topic=topic, default_version=self.BASE_RPC_API_VERSION)
|
topic=topic, default_version=self.BASE_RPC_API_VERSION)
|
||||||
|
|
||||||
def report_state(self, context, agent_state):
|
def report_state(self, context, agent_state, use_call=False):
|
||||||
return self.call(context,
|
msg = self.make_msg('report_state',
|
||||||
self.make_msg('report_state',
|
|
||||||
agent_state={'agent_state':
|
agent_state={'agent_state':
|
||||||
agent_state},
|
agent_state},
|
||||||
time=timeutils.strtime()),
|
time=timeutils.strtime())
|
||||||
topic=self.topic)
|
if use_call:
|
||||||
|
return self.call(context, msg, topic=self.topic)
|
||||||
|
else:
|
||||||
|
return self.cast(context, msg, topic=self.topic)
|
||||||
|
|
||||||
|
|
||||||
class PluginApi(proxy.RpcProxy):
|
class PluginApi(proxy.RpcProxy):
|
||||||
|
@ -47,13 +47,14 @@ class AgentRPCPluginApi(base.BaseTestCase):
|
|||||||
|
|
||||||
|
|
||||||
class AgentPluginReportState(base.BaseTestCase):
|
class AgentPluginReportState(base.BaseTestCase):
|
||||||
def test_plugin_report_state(self):
|
def test_plugin_report_state_use_call(self):
|
||||||
topic = 'test'
|
topic = 'test'
|
||||||
reportStateAPI = rpc.PluginReportStateAPI(topic)
|
reportStateAPI = rpc.PluginReportStateAPI(topic)
|
||||||
expected_agent_state = {'agent': 'test'}
|
expected_agent_state = {'agent': 'test'}
|
||||||
with mock.patch.object(reportStateAPI, 'call') as call:
|
with mock.patch.object(reportStateAPI, 'call') as call:
|
||||||
ctxt = context.RequestContext('fake_user', 'fake_project')
|
ctxt = context.RequestContext('fake_user', 'fake_project')
|
||||||
reportStateAPI.report_state(ctxt, expected_agent_state)
|
reportStateAPI.report_state(ctxt, expected_agent_state,
|
||||||
|
use_call=True)
|
||||||
self.assertEqual(call.call_args[0][0], ctxt)
|
self.assertEqual(call.call_args[0][0], ctxt)
|
||||||
self.assertEqual(call.call_args[0][1]['method'],
|
self.assertEqual(call.call_args[0][1]['method'],
|
||||||
'report_state')
|
'report_state')
|
||||||
@ -63,6 +64,22 @@ class AgentPluginReportState(base.BaseTestCase):
|
|||||||
str)
|
str)
|
||||||
self.assertEqual(call.call_args[1]['topic'], topic)
|
self.assertEqual(call.call_args[1]['topic'], topic)
|
||||||
|
|
||||||
|
def test_plugin_report_state_cast(self):
|
||||||
|
topic = 'test'
|
||||||
|
reportStateAPI = rpc.PluginReportStateAPI(topic)
|
||||||
|
expected_agent_state = {'agent': 'test'}
|
||||||
|
with mock.patch.object(reportStateAPI, 'cast') as cast:
|
||||||
|
ctxt = context.RequestContext('fake_user', 'fake_project')
|
||||||
|
reportStateAPI.report_state(ctxt, expected_agent_state)
|
||||||
|
self.assertEqual(cast.call_args[0][0], ctxt)
|
||||||
|
self.assertEqual(cast.call_args[0][1]['method'],
|
||||||
|
'report_state')
|
||||||
|
self.assertEqual(cast.call_args[0][1]['args']['agent_state'],
|
||||||
|
{'agent_state': expected_agent_state})
|
||||||
|
self.assertIsInstance(cast.call_args[0][1]['args']['time'],
|
||||||
|
str)
|
||||||
|
self.assertEqual(cast.call_args[1]['topic'], topic)
|
||||||
|
|
||||||
|
|
||||||
class AgentRPCMethods(base.BaseTestCase):
|
class AgentRPCMethods(base.BaseTestCase):
|
||||||
def test_create_consumers(self):
|
def test_create_consumers(self):
|
||||||
|
@ -161,7 +161,8 @@ class TestDhcpAgent(base.BaseTestCase):
|
|||||||
agent_mgr)
|
agent_mgr)
|
||||||
state_rpc.assert_has_calls(
|
state_rpc.assert_has_calls(
|
||||||
[mock.call(mock.ANY),
|
[mock.call(mock.ANY),
|
||||||
mock.call().report_state(mock.ANY, mock.ANY)])
|
mock.call().report_state(mock.ANY, mock.ANY,
|
||||||
|
mock.ANY)])
|
||||||
mock_lease_relay.assert_has_calls(
|
mock_lease_relay.assert_has_calls(
|
||||||
[mock.call(mock.ANY),
|
[mock.call(mock.ANY),
|
||||||
mock.call().start()])
|
mock.call().start()])
|
||||||
|
Loading…
Reference in New Issue
Block a user