Fix misleading error message about failed dhcp notifications
This is achieved by adjusting the log traces and ensuring that the right log errors are emitted based on the status of the network. Also, the patch drastically simplifies the structure of the notification method and unit tests to increase coverage. Closes-bug: #1289130 Change-Id: I7cc78bba81c516380fc93a68aa7b295312a88e29
This commit is contained in:
parent
6945f525f4
commit
48b62508e6
@ -39,76 +39,101 @@ class DhcpAgentNotifyAPI(proxy.RpcProxy):
|
||||
'port.update.end',
|
||||
'port.delete.end']
|
||||
|
||||
def __init__(self, topic=topics.DHCP_AGENT):
|
||||
def __init__(self, topic=topics.DHCP_AGENT, plugin=None):
|
||||
super(DhcpAgentNotifyAPI, self).__init__(
|
||||
topic=topic, default_version=self.BASE_RPC_API_VERSION)
|
||||
self._plugin = plugin
|
||||
|
||||
def _get_enabled_dhcp_agents(self, context, network_id):
|
||||
"""Return enabled dhcp agents associated with the given network."""
|
||||
plugin = manager.NeutronManager.get_plugin()
|
||||
agents = plugin.get_dhcp_agents_hosting_networks(context, [network_id])
|
||||
return [x for x in agents if x.admin_state_up]
|
||||
@property
|
||||
def plugin(self):
|
||||
if self._plugin is None:
|
||||
self._plugin = manager.NeutronManager.get_plugin()
|
||||
return self._plugin
|
||||
|
||||
def _notification_host(self, context, method, payload, host):
|
||||
"""Notify the agent on host."""
|
||||
def _schedule_network(self, context, network, existing_agents):
|
||||
"""Schedule the network to new agents
|
||||
|
||||
:return: all agents associated with the network
|
||||
"""
|
||||
new_agents = self.plugin.schedule_network(context, network) or []
|
||||
if new_agents:
|
||||
for agent in new_agents:
|
||||
self._cast_message(
|
||||
context, 'network_create_end',
|
||||
{'network': {'id': network['id']}}, agent['host'])
|
||||
elif not existing_agents:
|
||||
LOG.warn(_('Unable to schedule network %s: no agents available; '
|
||||
'will retry on subsequent port creation events.'),
|
||||
network['id'])
|
||||
return new_agents + existing_agents
|
||||
|
||||
def _get_enabled_agents(self, context, network, agents, method, payload):
|
||||
"""Get the list of agents whose admin_state is UP."""
|
||||
network_id = network['id']
|
||||
enabled_agents = [x for x in agents if x.admin_state_up]
|
||||
active_agents = [x for x in agents if x.is_active]
|
||||
len_enabled_agents = len(enabled_agents)
|
||||
len_active_agents = len(active_agents)
|
||||
if len_active_agents < len_enabled_agents:
|
||||
LOG.warn(_("Only %(active)d of %(total)d DHCP agents associated "
|
||||
"with network '%(net_id)s' are marked as active, so "
|
||||
" notifications may be sent to inactive agents.")
|
||||
% {'active': len_active_agents,
|
||||
'total': len_enabled_agents,
|
||||
'net_id': network_id})
|
||||
if not enabled_agents:
|
||||
num_ports = self.plugin.get_ports_count(
|
||||
context, {'network_id': [network_id]})
|
||||
notification_required = (
|
||||
num_ports > 0 and len(network['subnets']) >= 1)
|
||||
if notification_required:
|
||||
LOG.error(_("Will not send event %(method)s for network "
|
||||
"%(net_id)s: no agent available. Payload: "
|
||||
"%(payload)s")
|
||||
% {'method': method,
|
||||
'net_id': network_id,
|
||||
'payload': payload})
|
||||
return enabled_agents
|
||||
|
||||
def _notify_agents(self, context, method, payload, network_id):
|
||||
"""Notify all the agents that are hosting the network."""
|
||||
# fanout is required as we do not know who is "listening"
|
||||
no_agents = not utils.is_extension_supported(
|
||||
self.plugin, constants.DHCP_AGENT_SCHEDULER_EXT_ALIAS)
|
||||
fanout_required = method == 'network_delete_end' or no_agents
|
||||
|
||||
# we do nothing on network creation because we want to give the
|
||||
# admin the chance to associate an agent to the network manually
|
||||
cast_required = method != 'network_create_end'
|
||||
|
||||
if fanout_required:
|
||||
self._fanout_message(context, method, payload)
|
||||
elif cast_required:
|
||||
admin_ctx = (context if context.is_admin else context.elevated())
|
||||
network = self.plugin.get_network(admin_ctx, network_id)
|
||||
agents = self.plugin.get_dhcp_agents_hosting_networks(
|
||||
context, [network_id])
|
||||
|
||||
# schedule the network first, if needed
|
||||
schedule_required = method == 'port_create_end'
|
||||
if schedule_required:
|
||||
agents = self._schedule_network(admin_ctx, network, agents)
|
||||
|
||||
enabled_agents = self._get_enabled_agents(
|
||||
context, network, agents, method, payload)
|
||||
for agent in enabled_agents:
|
||||
self._cast_message(
|
||||
context, method, payload, agent.host, agent.topic)
|
||||
|
||||
def _cast_message(self, context, method, payload, host,
|
||||
topic=topics.DHCP_AGENT):
|
||||
"""Cast the payload to the dhcp agent running on the host."""
|
||||
self.cast(
|
||||
context, self.make_msg(method,
|
||||
payload=payload),
|
||||
topic='%s.%s' % (topics.DHCP_AGENT, host))
|
||||
topic='%s.%s' % (topic, host))
|
||||
|
||||
def _notification(self, context, method, payload, network_id):
|
||||
"""Notify all the agents that are hosting the network."""
|
||||
plugin = manager.NeutronManager.get_plugin()
|
||||
if (method != 'network_delete_end' and utils.is_extension_supported(
|
||||
plugin, constants.DHCP_AGENT_SCHEDULER_EXT_ALIAS)):
|
||||
if method == 'port_create_end':
|
||||
# we don't schedule when we create network
|
||||
# because we want to give admin a chance to
|
||||
# schedule network manually by API
|
||||
adminContext = (context if context.is_admin else
|
||||
context.elevated())
|
||||
network = plugin.get_network(adminContext, network_id)
|
||||
chosen_agents = plugin.schedule_network(adminContext, network)
|
||||
if chosen_agents:
|
||||
for agent in chosen_agents:
|
||||
self._notification_host(
|
||||
context, 'network_create_end',
|
||||
{'network': {'id': network_id}},
|
||||
agent['host'])
|
||||
agents = self._get_enabled_dhcp_agents(context, network_id)
|
||||
if not agents:
|
||||
LOG.error(_("No DHCP agents are associated with network "
|
||||
"'%(net_id)s'. Unable to send notification "
|
||||
"for '%(method)s' with payload: %(payload)s"),
|
||||
{
|
||||
'net_id': network_id,
|
||||
'method': method,
|
||||
'payload': payload,
|
||||
})
|
||||
return
|
||||
active_agents = [x for x in agents if x.is_active]
|
||||
if active_agents != agents:
|
||||
LOG.warning(_("Only %(active)d of %(total)d DHCP agents "
|
||||
"associated with network '%(net_id)s' are "
|
||||
"marked as active, so notifications may "
|
||||
"be sent to inactive agents."),
|
||||
{
|
||||
'active': len(active_agents),
|
||||
'total': len(agents),
|
||||
'net_id': network_id,
|
||||
})
|
||||
for agent in agents:
|
||||
self.cast(
|
||||
context, self.make_msg(method,
|
||||
payload=payload),
|
||||
topic='%s.%s' % (agent.topic, agent.host))
|
||||
else:
|
||||
# besides the non-agentscheduler plugin,
|
||||
# There is no way to query who is hosting the network
|
||||
# when the network is deleted, so we need to fanout
|
||||
self._notification_fanout(context, method, payload)
|
||||
|
||||
def _notification_fanout(self, context, method, payload):
|
||||
def _fanout_message(self, context, method, payload):
|
||||
"""Fanout the payload to all dhcp agents."""
|
||||
self.fanout_cast(
|
||||
context, self.make_msg(method,
|
||||
@ -116,17 +141,16 @@ class DhcpAgentNotifyAPI(proxy.RpcProxy):
|
||||
topic=topics.DHCP_AGENT)
|
||||
|
||||
def network_removed_from_agent(self, context, network_id, host):
|
||||
self._notification_host(context, 'network_delete_end',
|
||||
{'network_id': network_id}, host)
|
||||
self._cast_message(context, 'network_delete_end',
|
||||
{'network_id': network_id}, host)
|
||||
|
||||
def network_added_to_agent(self, context, network_id, host):
|
||||
self._notification_host(context, 'network_create_end',
|
||||
{'network': {'id': network_id}}, host)
|
||||
self._cast_message(context, 'network_create_end',
|
||||
{'network': {'id': network_id}}, host)
|
||||
|
||||
def agent_updated(self, context, admin_state_up, host):
|
||||
self._notification_host(context, 'agent_updated',
|
||||
{'admin_state_up': admin_state_up},
|
||||
host)
|
||||
self._cast_message(context, 'agent_updated',
|
||||
{'admin_state_up': admin_state_up}, host)
|
||||
|
||||
def notify(self, context, data, method_name):
|
||||
# data is {'key' : 'value'} with only one key
|
||||
@ -146,8 +170,8 @@ class DhcpAgentNotifyAPI(proxy.RpcProxy):
|
||||
method_name = method_name.replace(".", "_")
|
||||
if method_name.endswith("_delete_end"):
|
||||
if 'id' in obj_value:
|
||||
self._notification(context, method_name,
|
||||
{obj_type + '_id': obj_value['id']},
|
||||
network_id)
|
||||
self._notify_agents(context, method_name,
|
||||
{obj_type + '_id': obj_value['id']},
|
||||
network_id)
|
||||
else:
|
||||
self._notification(context, method_name, data, network_id)
|
||||
self._notify_agents(context, method_name, data, network_id)
|
||||
|
@ -27,6 +27,7 @@ import mock
|
||||
from oslo.config import cfg
|
||||
import testtools
|
||||
|
||||
from neutron.common import constants as const
|
||||
from neutron import manager
|
||||
from neutron.tests import post_mortem_debug
|
||||
|
||||
@ -43,6 +44,12 @@ def fake_use_fatal_exceptions(*args):
|
||||
class BaseTestCase(testtools.TestCase):
|
||||
|
||||
def _cleanup_coreplugin(self):
|
||||
if manager.NeutronManager._instance:
|
||||
agent_notifiers = getattr(manager.NeutronManager._instance.plugin,
|
||||
'agent_notifiers', {})
|
||||
dhcp_agent_notifier = agent_notifiers.get(const.AGENT_TYPE_DHCP)
|
||||
if dhcp_agent_notifier:
|
||||
dhcp_agent_notifier._plugin = None
|
||||
manager.NeutronManager._instance = self._saved_instance
|
||||
|
||||
def setup_coreplugin(self, core_plugin=None):
|
||||
|
@ -13,13 +13,13 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import contextlib
|
||||
|
||||
import datetime
|
||||
import mock
|
||||
|
||||
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
|
||||
from neutron.common import utils
|
||||
from neutron import manager
|
||||
from neutron.db import agents_db
|
||||
from neutron.openstack.common import timeutils
|
||||
from neutron.tests import base
|
||||
|
||||
|
||||
@ -27,50 +27,128 @@ class TestDhcpAgentNotifyAPI(base.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestDhcpAgentNotifyAPI, self).setUp()
|
||||
self.notify = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
|
||||
self.notifier = (
|
||||
dhcp_rpc_agent_api.DhcpAgentNotifyAPI(plugin=mock.Mock()))
|
||||
|
||||
def test_get_enabled_dhcp_agents_filters_disabled_agents(self):
|
||||
disabled_agent = mock.Mock()
|
||||
disabled_agent.admin_state_up = False
|
||||
enabled_agent = mock.Mock()
|
||||
with mock.patch.object(manager.NeutronManager,
|
||||
'get_plugin') as mock_get_plugin:
|
||||
mock_get_plugin.return_value = mock_plugin = mock.Mock()
|
||||
with mock.patch.object(
|
||||
mock_plugin, 'get_dhcp_agents_hosting_networks'
|
||||
) as mock_get_agents:
|
||||
mock_get_agents.return_value = [disabled_agent, enabled_agent]
|
||||
result = self.notify._get_enabled_dhcp_agents('ctx', 'net_id')
|
||||
self.assertEqual(result, [enabled_agent])
|
||||
mock_util_p = mock.patch.object(utils, 'is_extension_supported')
|
||||
mock_log_p = mock.patch.object(dhcp_rpc_agent_api, 'LOG')
|
||||
mock_fanout_p = mock.patch.object(self.notifier, '_fanout_message')
|
||||
mock_cast_p = mock.patch.object(self.notifier, '_cast_message')
|
||||
self.mock_util = mock_util_p.start()
|
||||
self.mock_log = mock_log_p.start()
|
||||
self.mock_fanout = mock_fanout_p.start()
|
||||
self.mock_cast = mock_cast_p.start()
|
||||
|
||||
def _test_notification(self, agents):
|
||||
with contextlib.nested(
|
||||
mock.patch.object(manager.NeutronManager, 'get_plugin'),
|
||||
mock.patch.object(utils, 'is_extension_supported'),
|
||||
mock.patch.object(self.notify, '_get_enabled_dhcp_agents')
|
||||
) as (m1, m2, mock_get_agents):
|
||||
mock_get_agents.return_value = agents
|
||||
self.notify._notification(mock.Mock(), 'foo', {}, 'net_id')
|
||||
def _test__schedule_network(self, network,
|
||||
new_agents=None, existing_agents=None,
|
||||
expected_casts=0, expected_warnings=0):
|
||||
self.notifier.plugin.schedule_network.return_value = new_agents
|
||||
agents = self.notifier._schedule_network(
|
||||
mock.ANY, network, existing_agents)
|
||||
if new_agents is None:
|
||||
new_agents = []
|
||||
self.assertEqual(new_agents + existing_agents, agents)
|
||||
self.assertEqual(expected_casts, self.mock_cast.call_count)
|
||||
self.assertEqual(expected_warnings, self.mock_log.warn.call_count)
|
||||
|
||||
def test_notification_sends_cast_for_enabled_agent(self):
|
||||
with mock.patch.object(self.notify, 'cast') as mock_cast:
|
||||
self._test_notification([mock.Mock()])
|
||||
self.assertEqual(mock_cast.call_count, 1)
|
||||
def test__schedule_network(self):
|
||||
agent = agents_db.Agent()
|
||||
agent.admin_state_up = True
|
||||
agent.heartbeat_timestamp = timeutils.utcnow()
|
||||
network = {'id': 'foo_net_id'}
|
||||
self._test__schedule_network(network,
|
||||
new_agents=[agent], existing_agents=[],
|
||||
expected_casts=1, expected_warnings=0)
|
||||
|
||||
def test_notification_logs_error_for_no_enabled_agents(self):
|
||||
with mock.patch.object(self.notify, 'cast') as mock_cast:
|
||||
with mock.patch.object(dhcp_rpc_agent_api.LOG,
|
||||
'error') as mock_log:
|
||||
self._test_notification([])
|
||||
self.assertEqual(mock_cast.call_count, 0)
|
||||
self.assertEqual(mock_log.call_count, 1)
|
||||
def test__schedule_network_no_existing_agents(self):
|
||||
agent = agents_db.Agent()
|
||||
agent.admin_state_up = True
|
||||
agent.heartbeat_timestamp = timeutils.utcnow()
|
||||
network = {'id': 'foo_net_id'}
|
||||
self._test__schedule_network(network,
|
||||
new_agents=None, existing_agents=[agent],
|
||||
expected_casts=0, expected_warnings=0)
|
||||
|
||||
def test_notification_logs_warning_for_inactive_agents(self):
|
||||
agent = mock.Mock()
|
||||
agent.is_active = False
|
||||
with mock.patch.object(self.notify, 'cast') as mock_cast:
|
||||
with mock.patch.object(dhcp_rpc_agent_api.LOG,
|
||||
'warning') as mock_log:
|
||||
self._test_notification([agent])
|
||||
self.assertEqual(mock_cast.call_count, 1)
|
||||
self.assertEqual(mock_log.call_count, 1)
|
||||
def test__schedule_network_no_new_agents(self):
|
||||
network = {'id': 'foo_net_id'}
|
||||
self._test__schedule_network(network,
|
||||
new_agents=None, existing_agents=[],
|
||||
expected_casts=0, expected_warnings=1)
|
||||
|
||||
def _test__get_enabled_agents(self, network,
|
||||
agents=None, port_count=0,
|
||||
expected_warnings=0, expected_errors=0):
|
||||
self.notifier.plugin.get_ports_count.return_value = port_count
|
||||
enabled_agents = self.notifier._get_enabled_agents(
|
||||
mock.ANY, network, agents, mock.ANY, mock.ANY)
|
||||
self.assertEqual(agents, enabled_agents)
|
||||
self.assertEqual(expected_warnings, self.mock_log.warn.call_count)
|
||||
self.assertEqual(expected_errors, self.mock_log.error.call_count)
|
||||
|
||||
def test__get_enabled_agents(self):
|
||||
agent = agents_db.Agent()
|
||||
agent.admin_state_up = True
|
||||
agent.heartbeat_timestamp = timeutils.utcnow()
|
||||
network = {'id': 'foo_network_id'}
|
||||
self._test__get_enabled_agents(network, agents=[agent])
|
||||
|
||||
def test__get_enabled_agents_with_inactive_ones(self):
|
||||
agent1 = agents_db.Agent()
|
||||
agent1.admin_state_up = True
|
||||
agent1.heartbeat_timestamp = timeutils.utcnow()
|
||||
agent2 = agents_db.Agent()
|
||||
agent2.admin_state_up = True
|
||||
# This is effectively an inactive agent
|
||||
agent2.heartbeat_timestamp = datetime.datetime(2000, 1, 1, 0, 0)
|
||||
network = {'id': 'foo_network_id'}
|
||||
self._test__get_enabled_agents(network,
|
||||
agents=[agent1, agent2],
|
||||
expected_warnings=1, expected_errors=0)
|
||||
|
||||
def test__get_enabled_agents_with_notification_required(self):
|
||||
network = {'id': 'foo_network_id', 'subnets': ['foo_subnet_id']}
|
||||
self._test__get_enabled_agents(network, [], port_count=20,
|
||||
expected_warnings=0, expected_errors=1)
|
||||
|
||||
def test__notify_agents_fanout_required(self):
|
||||
self.notifier._notify_agents(mock.ANY,
|
||||
'network_delete_end',
|
||||
mock.ANY, 'foo_network_id')
|
||||
self.assertEqual(1, self.mock_fanout.call_count)
|
||||
|
||||
def _test__notify_agents(self, method,
|
||||
expected_scheduling=0, expected_casts=0):
|
||||
with mock.patch.object(self.notifier, '_schedule_network') as f:
|
||||
with mock.patch.object(self.notifier, '_get_enabled_agents') as g:
|
||||
agent = agents_db.Agent()
|
||||
agent.admin_state_up = True
|
||||
agent.heartbeat_timestamp = timeutils.utcnow()
|
||||
g.return_value = [agent]
|
||||
self.notifier._notify_agents(mock.Mock(), method,
|
||||
mock.ANY, 'foo_network_id')
|
||||
self.assertEqual(expected_scheduling, f.call_count)
|
||||
self.assertEqual(expected_casts, self.mock_cast.call_count)
|
||||
|
||||
def test__notify_agents_cast_required_with_scheduling(self):
|
||||
self._test__notify_agents('port_create_end',
|
||||
expected_scheduling=1, expected_casts=1)
|
||||
|
||||
def test__notify_agents_cast_required_wo_scheduling_on_port_update(self):
|
||||
self._test__notify_agents('port_update_end',
|
||||
expected_scheduling=0, expected_casts=1)
|
||||
|
||||
def test__notify_agents_cast_required_wo_scheduling_on_subnet_create(self):
|
||||
self._test__notify_agents('subnet_create_end',
|
||||
expected_scheduling=0, expected_casts=1)
|
||||
|
||||
def test__notify_agents_no_action(self):
|
||||
self._test__notify_agents('network_create_end',
|
||||
expected_scheduling=0, expected_casts=0)
|
||||
|
||||
def test__fanout_message(self):
|
||||
self.notifier._fanout_message(mock.ANY, mock.ANY, mock.ANY)
|
||||
self.assertEqual(1, self.mock_fanout.call_count)
|
||||
|
||||
def test__cast_message(self):
|
||||
self.notifier._cast_message(mock.ANY, mock.ANY, mock.ANY)
|
||||
self.assertEqual(1, self.mock_cast.call_count)
|
||||
|
Loading…
Reference in New Issue
Block a user