diff --git a/neutron/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py b/neutron/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py index 4be1072a34..85eacf2716 100644 --- a/neutron/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py +++ b/neutron/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py @@ -39,76 +39,101 @@ class DhcpAgentNotifyAPI(proxy.RpcProxy): 'port.update.end', 'port.delete.end'] - def __init__(self, topic=topics.DHCP_AGENT): + def __init__(self, topic=topics.DHCP_AGENT, plugin=None): super(DhcpAgentNotifyAPI, self).__init__( topic=topic, default_version=self.BASE_RPC_API_VERSION) + self._plugin = plugin - def _get_enabled_dhcp_agents(self, context, network_id): - """Return enabled dhcp agents associated with the given network.""" - plugin = manager.NeutronManager.get_plugin() - agents = plugin.get_dhcp_agents_hosting_networks(context, [network_id]) - return [x for x in agents if x.admin_state_up] + @property + def plugin(self): + if self._plugin is None: + self._plugin = manager.NeutronManager.get_plugin() + return self._plugin - def _notification_host(self, context, method, payload, host): - """Notify the agent on host.""" + def _schedule_network(self, context, network, existing_agents): + """Schedule the network to new agents + + :return: all agents associated with the network + """ + new_agents = self.plugin.schedule_network(context, network) or [] + if new_agents: + for agent in new_agents: + self._cast_message( + context, 'network_create_end', + {'network': {'id': network['id']}}, agent['host']) + elif not existing_agents: + LOG.warn(_('Unable to schedule network %s: no agents available; ' + 'will retry on subsequent port creation events.'), + network['id']) + return new_agents + existing_agents + + def _get_enabled_agents(self, context, network, agents, method, payload): + """Get the list of agents whose admin_state is UP.""" + network_id = network['id'] + enabled_agents = [x for x in agents if x.admin_state_up] + active_agents = [x for x in agents if x.is_active] + len_enabled_agents = len(enabled_agents) + len_active_agents = len(active_agents) + if len_active_agents < len_enabled_agents: + LOG.warn(_("Only %(active)d of %(total)d DHCP agents associated " + "with network '%(net_id)s' are marked as active, so " + " notifications may be sent to inactive agents.") + % {'active': len_active_agents, + 'total': len_enabled_agents, + 'net_id': network_id}) + if not enabled_agents: + num_ports = self.plugin.get_ports_count( + context, {'network_id': [network_id]}) + notification_required = ( + num_ports > 0 and len(network['subnets']) >= 1) + if notification_required: + LOG.error(_("Will not send event %(method)s for network " + "%(net_id)s: no agent available. Payload: " + "%(payload)s") + % {'method': method, + 'net_id': network_id, + 'payload': payload}) + return enabled_agents + + def _notify_agents(self, context, method, payload, network_id): + """Notify all the agents that are hosting the network.""" + # fanout is required as we do not know who is "listening" + no_agents = not utils.is_extension_supported( + self.plugin, constants.DHCP_AGENT_SCHEDULER_EXT_ALIAS) + fanout_required = method == 'network_delete_end' or no_agents + + # we do nothing on network creation because we want to give the + # admin the chance to associate an agent to the network manually + cast_required = method != 'network_create_end' + + if fanout_required: + self._fanout_message(context, method, payload) + elif cast_required: + admin_ctx = (context if context.is_admin else context.elevated()) + network = self.plugin.get_network(admin_ctx, network_id) + agents = self.plugin.get_dhcp_agents_hosting_networks( + context, [network_id]) + + # schedule the network first, if needed + schedule_required = method == 'port_create_end' + if schedule_required: + agents = self._schedule_network(admin_ctx, network, agents) + + enabled_agents = self._get_enabled_agents( + context, network, agents, method, payload) + for agent in enabled_agents: + self._cast_message( + context, method, payload, agent.host, agent.topic) + + def _cast_message(self, context, method, payload, host, + topic=topics.DHCP_AGENT): + """Cast the payload to the dhcp agent running on the host.""" self.cast( context, self.make_msg(method, payload=payload), - topic='%s.%s' % (topics.DHCP_AGENT, host)) + topic='%s.%s' % (topic, host)) - def _notification(self, context, method, payload, network_id): - """Notify all the agents that are hosting the network.""" - plugin = manager.NeutronManager.get_plugin() - if (method != 'network_delete_end' and utils.is_extension_supported( - plugin, constants.DHCP_AGENT_SCHEDULER_EXT_ALIAS)): - if method == 'port_create_end': - # we don't schedule when we create network - # because we want to give admin a chance to - # schedule network manually by API - adminContext = (context if context.is_admin else - context.elevated()) - network = plugin.get_network(adminContext, network_id) - chosen_agents = plugin.schedule_network(adminContext, network) - if chosen_agents: - for agent in chosen_agents: - self._notification_host( - context, 'network_create_end', - {'network': {'id': network_id}}, - agent['host']) - agents = self._get_enabled_dhcp_agents(context, network_id) - if not agents: - LOG.error(_("No DHCP agents are associated with network " - "'%(net_id)s'. Unable to send notification " - "for '%(method)s' with payload: %(payload)s"), - { - 'net_id': network_id, - 'method': method, - 'payload': payload, - }) - return - active_agents = [x for x in agents if x.is_active] - if active_agents != agents: - LOG.warning(_("Only %(active)d of %(total)d DHCP agents " - "associated with network '%(net_id)s' are " - "marked as active, so notifications may " - "be sent to inactive agents."), - { - 'active': len(active_agents), - 'total': len(agents), - 'net_id': network_id, - }) - for agent in agents: - self.cast( - context, self.make_msg(method, - payload=payload), - topic='%s.%s' % (agent.topic, agent.host)) - else: - # besides the non-agentscheduler plugin, - # There is no way to query who is hosting the network - # when the network is deleted, so we need to fanout - self._notification_fanout(context, method, payload) - - def _notification_fanout(self, context, method, payload): + def _fanout_message(self, context, method, payload): """Fanout the payload to all dhcp agents.""" self.fanout_cast( context, self.make_msg(method, @@ -116,17 +141,16 @@ class DhcpAgentNotifyAPI(proxy.RpcProxy): topic=topics.DHCP_AGENT) def network_removed_from_agent(self, context, network_id, host): - self._notification_host(context, 'network_delete_end', - {'network_id': network_id}, host) + self._cast_message(context, 'network_delete_end', + {'network_id': network_id}, host) def network_added_to_agent(self, context, network_id, host): - self._notification_host(context, 'network_create_end', - {'network': {'id': network_id}}, host) + self._cast_message(context, 'network_create_end', + {'network': {'id': network_id}}, host) def agent_updated(self, context, admin_state_up, host): - self._notification_host(context, 'agent_updated', - {'admin_state_up': admin_state_up}, - host) + self._cast_message(context, 'agent_updated', + {'admin_state_up': admin_state_up}, host) def notify(self, context, data, method_name): # data is {'key' : 'value'} with only one key @@ -146,8 +170,8 @@ class DhcpAgentNotifyAPI(proxy.RpcProxy): method_name = method_name.replace(".", "_") if method_name.endswith("_delete_end"): if 'id' in obj_value: - self._notification(context, method_name, - {obj_type + '_id': obj_value['id']}, - network_id) + self._notify_agents(context, method_name, + {obj_type + '_id': obj_value['id']}, + network_id) else: - self._notification(context, method_name, data, network_id) + self._notify_agents(context, method_name, data, network_id) diff --git a/neutron/tests/base.py b/neutron/tests/base.py index 71b3427e67..218305137d 100644 --- a/neutron/tests/base.py +++ b/neutron/tests/base.py @@ -27,6 +27,7 @@ import mock from oslo.config import cfg import testtools +from neutron.common import constants as const from neutron import manager from neutron.tests import post_mortem_debug @@ -43,6 +44,12 @@ def fake_use_fatal_exceptions(*args): class BaseTestCase(testtools.TestCase): def _cleanup_coreplugin(self): + if manager.NeutronManager._instance: + agent_notifiers = getattr(manager.NeutronManager._instance.plugin, + 'agent_notifiers', {}) + dhcp_agent_notifier = agent_notifiers.get(const.AGENT_TYPE_DHCP) + if dhcp_agent_notifier: + dhcp_agent_notifier._plugin = None manager.NeutronManager._instance = self._saved_instance def setup_coreplugin(self, core_plugin=None): diff --git a/neutron/tests/unit/api/rpc/agentnotifiers/test_dhcp_rpc_agent_api.py b/neutron/tests/unit/api/rpc/agentnotifiers/test_dhcp_rpc_agent_api.py index b175d34036..5d29f6cbd0 100644 --- a/neutron/tests/unit/api/rpc/agentnotifiers/test_dhcp_rpc_agent_api.py +++ b/neutron/tests/unit/api/rpc/agentnotifiers/test_dhcp_rpc_agent_api.py @@ -13,13 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -import contextlib - +import datetime import mock from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api from neutron.common import utils -from neutron import manager +from neutron.db import agents_db +from neutron.openstack.common import timeutils from neutron.tests import base @@ -27,50 +27,128 @@ class TestDhcpAgentNotifyAPI(base.BaseTestCase): def setUp(self): super(TestDhcpAgentNotifyAPI, self).setUp() - self.notify = dhcp_rpc_agent_api.DhcpAgentNotifyAPI() + self.notifier = ( + dhcp_rpc_agent_api.DhcpAgentNotifyAPI(plugin=mock.Mock())) - def test_get_enabled_dhcp_agents_filters_disabled_agents(self): - disabled_agent = mock.Mock() - disabled_agent.admin_state_up = False - enabled_agent = mock.Mock() - with mock.patch.object(manager.NeutronManager, - 'get_plugin') as mock_get_plugin: - mock_get_plugin.return_value = mock_plugin = mock.Mock() - with mock.patch.object( - mock_plugin, 'get_dhcp_agents_hosting_networks' - ) as mock_get_agents: - mock_get_agents.return_value = [disabled_agent, enabled_agent] - result = self.notify._get_enabled_dhcp_agents('ctx', 'net_id') - self.assertEqual(result, [enabled_agent]) + mock_util_p = mock.patch.object(utils, 'is_extension_supported') + mock_log_p = mock.patch.object(dhcp_rpc_agent_api, 'LOG') + mock_fanout_p = mock.patch.object(self.notifier, '_fanout_message') + mock_cast_p = mock.patch.object(self.notifier, '_cast_message') + self.mock_util = mock_util_p.start() + self.mock_log = mock_log_p.start() + self.mock_fanout = mock_fanout_p.start() + self.mock_cast = mock_cast_p.start() - def _test_notification(self, agents): - with contextlib.nested( - mock.patch.object(manager.NeutronManager, 'get_plugin'), - mock.patch.object(utils, 'is_extension_supported'), - mock.patch.object(self.notify, '_get_enabled_dhcp_agents') - ) as (m1, m2, mock_get_agents): - mock_get_agents.return_value = agents - self.notify._notification(mock.Mock(), 'foo', {}, 'net_id') + def _test__schedule_network(self, network, + new_agents=None, existing_agents=None, + expected_casts=0, expected_warnings=0): + self.notifier.plugin.schedule_network.return_value = new_agents + agents = self.notifier._schedule_network( + mock.ANY, network, existing_agents) + if new_agents is None: + new_agents = [] + self.assertEqual(new_agents + existing_agents, agents) + self.assertEqual(expected_casts, self.mock_cast.call_count) + self.assertEqual(expected_warnings, self.mock_log.warn.call_count) - def test_notification_sends_cast_for_enabled_agent(self): - with mock.patch.object(self.notify, 'cast') as mock_cast: - self._test_notification([mock.Mock()]) - self.assertEqual(mock_cast.call_count, 1) + def test__schedule_network(self): + agent = agents_db.Agent() + agent.admin_state_up = True + agent.heartbeat_timestamp = timeutils.utcnow() + network = {'id': 'foo_net_id'} + self._test__schedule_network(network, + new_agents=[agent], existing_agents=[], + expected_casts=1, expected_warnings=0) - def test_notification_logs_error_for_no_enabled_agents(self): - with mock.patch.object(self.notify, 'cast') as mock_cast: - with mock.patch.object(dhcp_rpc_agent_api.LOG, - 'error') as mock_log: - self._test_notification([]) - self.assertEqual(mock_cast.call_count, 0) - self.assertEqual(mock_log.call_count, 1) + def test__schedule_network_no_existing_agents(self): + agent = agents_db.Agent() + agent.admin_state_up = True + agent.heartbeat_timestamp = timeutils.utcnow() + network = {'id': 'foo_net_id'} + self._test__schedule_network(network, + new_agents=None, existing_agents=[agent], + expected_casts=0, expected_warnings=0) - def test_notification_logs_warning_for_inactive_agents(self): - agent = mock.Mock() - agent.is_active = False - with mock.patch.object(self.notify, 'cast') as mock_cast: - with mock.patch.object(dhcp_rpc_agent_api.LOG, - 'warning') as mock_log: - self._test_notification([agent]) - self.assertEqual(mock_cast.call_count, 1) - self.assertEqual(mock_log.call_count, 1) + def test__schedule_network_no_new_agents(self): + network = {'id': 'foo_net_id'} + self._test__schedule_network(network, + new_agents=None, existing_agents=[], + expected_casts=0, expected_warnings=1) + + def _test__get_enabled_agents(self, network, + agents=None, port_count=0, + expected_warnings=0, expected_errors=0): + self.notifier.plugin.get_ports_count.return_value = port_count + enabled_agents = self.notifier._get_enabled_agents( + mock.ANY, network, agents, mock.ANY, mock.ANY) + self.assertEqual(agents, enabled_agents) + self.assertEqual(expected_warnings, self.mock_log.warn.call_count) + self.assertEqual(expected_errors, self.mock_log.error.call_count) + + def test__get_enabled_agents(self): + agent = agents_db.Agent() + agent.admin_state_up = True + agent.heartbeat_timestamp = timeutils.utcnow() + network = {'id': 'foo_network_id'} + self._test__get_enabled_agents(network, agents=[agent]) + + def test__get_enabled_agents_with_inactive_ones(self): + agent1 = agents_db.Agent() + agent1.admin_state_up = True + agent1.heartbeat_timestamp = timeutils.utcnow() + agent2 = agents_db.Agent() + agent2.admin_state_up = True + # This is effectively an inactive agent + agent2.heartbeat_timestamp = datetime.datetime(2000, 1, 1, 0, 0) + network = {'id': 'foo_network_id'} + self._test__get_enabled_agents(network, + agents=[agent1, agent2], + expected_warnings=1, expected_errors=0) + + def test__get_enabled_agents_with_notification_required(self): + network = {'id': 'foo_network_id', 'subnets': ['foo_subnet_id']} + self._test__get_enabled_agents(network, [], port_count=20, + expected_warnings=0, expected_errors=1) + + def test__notify_agents_fanout_required(self): + self.notifier._notify_agents(mock.ANY, + 'network_delete_end', + mock.ANY, 'foo_network_id') + self.assertEqual(1, self.mock_fanout.call_count) + + def _test__notify_agents(self, method, + expected_scheduling=0, expected_casts=0): + with mock.patch.object(self.notifier, '_schedule_network') as f: + with mock.patch.object(self.notifier, '_get_enabled_agents') as g: + agent = agents_db.Agent() + agent.admin_state_up = True + agent.heartbeat_timestamp = timeutils.utcnow() + g.return_value = [agent] + self.notifier._notify_agents(mock.Mock(), method, + mock.ANY, 'foo_network_id') + self.assertEqual(expected_scheduling, f.call_count) + self.assertEqual(expected_casts, self.mock_cast.call_count) + + def test__notify_agents_cast_required_with_scheduling(self): + self._test__notify_agents('port_create_end', + expected_scheduling=1, expected_casts=1) + + def test__notify_agents_cast_required_wo_scheduling_on_port_update(self): + self._test__notify_agents('port_update_end', + expected_scheduling=0, expected_casts=1) + + def test__notify_agents_cast_required_wo_scheduling_on_subnet_create(self): + self._test__notify_agents('subnet_create_end', + expected_scheduling=0, expected_casts=1) + + def test__notify_agents_no_action(self): + self._test__notify_agents('network_create_end', + expected_scheduling=0, expected_casts=0) + + def test__fanout_message(self): + self.notifier._fanout_message(mock.ANY, mock.ANY, mock.ANY) + self.assertEqual(1, self.mock_fanout.call_count) + + def test__cast_message(self): + self.notifier._cast_message(mock.ANY, mock.ANY, mock.ANY) + self.assertEqual(1, self.mock_cast.call_count)