Merge "Ensure that the report state is not a blocking call"
This commit is contained in:
commit
1649a43ef8
@ -743,6 +743,7 @@ class DhcpAgentWithStateReport(DhcpAgent):
|
||||
'start_flag': True,
|
||||
'agent_type': constants.AGENT_TYPE_DHCP}
|
||||
report_interval = cfg.CONF.AGENT.report_interval
|
||||
self.use_call = True
|
||||
if report_interval:
|
||||
self.heartbeat = loopingcall.FixedIntervalLoopingCall(
|
||||
self._report_state)
|
||||
@ -753,8 +754,8 @@ class DhcpAgentWithStateReport(DhcpAgent):
|
||||
self.agent_state.get('configurations').update(
|
||||
self.cache.get_state())
|
||||
ctx = context.get_admin_context_without_session()
|
||||
self.state_rpc.report_state(ctx,
|
||||
self.agent_state)
|
||||
self.state_rpc.report_state(ctx, self.agent_state, self.use_call)
|
||||
self.use_call = False
|
||||
except AttributeError:
|
||||
# This means the server does not support report_state
|
||||
LOG.warn(_("Quantum server does not support state report."
|
||||
|
@ -721,6 +721,7 @@ class L3NATAgentWithStateReport(L3NATAgent):
|
||||
'start_flag': True,
|
||||
'agent_type': l3_constants.AGENT_TYPE_L3}
|
||||
report_interval = cfg.CONF.AGENT.report_interval
|
||||
self.use_call = True
|
||||
if report_interval:
|
||||
self.heartbeat = loopingcall.FixedIntervalLoopingCall(
|
||||
self._report_state)
|
||||
@ -746,9 +747,10 @@ class L3NATAgentWithStateReport(L3NATAgent):
|
||||
configurations['interfaces'] = num_interfaces
|
||||
configurations['floating_ips'] = num_floating_ips
|
||||
try:
|
||||
self.state_rpc.report_state(self.context,
|
||||
self.agent_state)
|
||||
self.state_rpc.report_state(self.context, self.agent_state,
|
||||
self.use_call)
|
||||
self.agent_state.pop('start_flag', None)
|
||||
self.use_call = False
|
||||
except AttributeError:
|
||||
# This means the server does not support report_state
|
||||
LOG.warn(_("Quantum server does not support state report."
|
||||
|
@ -52,13 +52,15 @@ class PluginReportStateAPI(proxy.RpcProxy):
|
||||
super(PluginReportStateAPI, self).__init__(
|
||||
topic=topic, default_version=self.BASE_RPC_API_VERSION)
|
||||
|
||||
def report_state(self, context, agent_state):
|
||||
return self.call(context,
|
||||
self.make_msg('report_state',
|
||||
agent_state={'agent_state':
|
||||
agent_state},
|
||||
time=timeutils.strtime()),
|
||||
topic=self.topic)
|
||||
def report_state(self, context, agent_state, use_call=False):
|
||||
msg = self.make_msg('report_state',
|
||||
agent_state={'agent_state':
|
||||
agent_state},
|
||||
time=timeutils.strtime())
|
||||
if use_call:
|
||||
return self.call(context, msg, topic=self.topic)
|
||||
else:
|
||||
return self.cast(context, msg, topic=self.topic)
|
||||
|
||||
|
||||
class PluginApi(proxy.RpcProxy):
|
||||
|
@ -47,13 +47,14 @@ class AgentRPCPluginApi(base.BaseTestCase):
|
||||
|
||||
|
||||
class AgentPluginReportState(base.BaseTestCase):
|
||||
def test_plugin_report_state(self):
|
||||
def test_plugin_report_state_use_call(self):
|
||||
topic = 'test'
|
||||
reportStateAPI = rpc.PluginReportStateAPI(topic)
|
||||
expected_agent_state = {'agent': 'test'}
|
||||
with mock.patch.object(reportStateAPI, 'call') as call:
|
||||
ctxt = context.RequestContext('fake_user', 'fake_project')
|
||||
reportStateAPI.report_state(ctxt, expected_agent_state)
|
||||
reportStateAPI.report_state(ctxt, expected_agent_state,
|
||||
use_call=True)
|
||||
self.assertEqual(call.call_args[0][0], ctxt)
|
||||
self.assertEqual(call.call_args[0][1]['method'],
|
||||
'report_state')
|
||||
@ -63,6 +64,22 @@ class AgentPluginReportState(base.BaseTestCase):
|
||||
str)
|
||||
self.assertEqual(call.call_args[1]['topic'], topic)
|
||||
|
||||
def test_plugin_report_state_cast(self):
|
||||
topic = 'test'
|
||||
reportStateAPI = rpc.PluginReportStateAPI(topic)
|
||||
expected_agent_state = {'agent': 'test'}
|
||||
with mock.patch.object(reportStateAPI, 'cast') as cast:
|
||||
ctxt = context.RequestContext('fake_user', 'fake_project')
|
||||
reportStateAPI.report_state(ctxt, expected_agent_state)
|
||||
self.assertEqual(cast.call_args[0][0], ctxt)
|
||||
self.assertEqual(cast.call_args[0][1]['method'],
|
||||
'report_state')
|
||||
self.assertEqual(cast.call_args[0][1]['args']['agent_state'],
|
||||
{'agent_state': expected_agent_state})
|
||||
self.assertIsInstance(cast.call_args[0][1]['args']['time'],
|
||||
str)
|
||||
self.assertEqual(cast.call_args[1]['topic'], topic)
|
||||
|
||||
|
||||
class AgentRPCMethods(base.BaseTestCase):
|
||||
def test_create_consumers(self):
|
||||
|
@ -161,7 +161,8 @@ class TestDhcpAgent(base.BaseTestCase):
|
||||
agent_mgr)
|
||||
state_rpc.assert_has_calls(
|
||||
[mock.call(mock.ANY),
|
||||
mock.call().report_state(mock.ANY, mock.ANY)])
|
||||
mock.call().report_state(mock.ANY, mock.ANY,
|
||||
mock.ANY)])
|
||||
mock_lease_relay.assert_has_calls(
|
||||
[mock.call(mock.ANY),
|
||||
mock.call().start()])
|
||||
|
Loading…
Reference in New Issue
Block a user