diff --git a/neutron/agent/l3_agent.py b/neutron/agent/l3_agent.py index e2da8967c1..2304d8e178 100644 --- a/neutron/agent/l3_agent.py +++ b/neutron/agent/l3_agent.py @@ -15,11 +15,13 @@ import sys +import datetime import eventlet eventlet.monkey_patch() import netaddr from oslo.config import cfg +import Queue from neutron.agent.common import config from neutron.agent.linux import external_process @@ -37,12 +39,12 @@ from neutron import context from neutron import manager from neutron.openstack.common import excutils from neutron.openstack.common import importutils -from neutron.openstack.common import lockutils from neutron.openstack.common import log as logging from neutron.openstack.common import loopingcall from neutron.openstack.common import periodic_task from neutron.openstack.common import processutils from neutron.openstack.common import service +from neutron.openstack.common import timeutils from neutron import service as neutron_service from neutron.services.firewall.agents.l3reference import firewall_l3_agent @@ -52,6 +54,10 @@ INTERNAL_DEV_PREFIX = 'qr-' EXTERNAL_DEV_PREFIX = 'qg-' RPC_LOOP_INTERVAL = 1 FLOATING_IP_CIDR_SUFFIX = '/32' +# Lower value is higher priority +PRIORITY_RPC = 0 +PRIORITY_SYNC_ROUTERS_TASK = 1 +DELETE_ROUTER = 1 class L3PluginApi(n_rpc.RpcProxy): @@ -146,6 +152,144 @@ class RouterInfo(object): self._snat_action = None +class RouterUpdate(object): + """Encapsulates a router update + + An instance of this object carries the information necessary to prioritize + and process a request to update a router. + """ + def __init__(self, router_id, priority, + action=None, router=None, timestamp=None): + self.priority = priority + self.timestamp = timestamp + if not timestamp: + self.timestamp = timeutils.utcnow() + self.id = router_id + self.action = action + self.router = router + + def __lt__(self, other): + """Implements priority among updates + + Lower numerical priority always gets precedence. When comparing two + updates of the same priority then the one with the earlier timestamp + gets procedence. In the unlikely event that the timestamps are also + equal it falls back to a simple comparison of ids meaning the + precedence is essentially random. + """ + if self.priority != other.priority: + return self.priority < other.priority + if self.timestamp != other.timestamp: + return self.timestamp < other.timestamp + return self.id < other.id + + +class ExclusiveRouterProcessor(object): + """Manager for access to a router for processing + + This class controls access to a router in a non-blocking way. The first + instance to be created for a given router_id is granted exclusive access to + the router. + + Other instances may be created for the same router_id while the first + instance has exclusive access. If that happens then it doesn't block and + wait for access. Instead, it signals to the master instance that an update + came in with the timestamp. + + This way, a thread will not block to wait for access to a router. Instead + it effectively signals to the thread that is working on the router that + something has changed since it started working on it. That thread will + simply finish its current iteration and then repeat. + + This class keeps track of the last time that a router data was fetched and + processed. The timestamp that it keeps must be before when the data used + to process the router last was fetched from the database. But, as close as + possible. The timestamp should not be recorded, however, until the router + has been processed using the fetch data. + """ + _masters = {} + _router_timestamps = {} + + def __init__(self, router_id): + self._router_id = router_id + + if router_id not in self._masters: + self._masters[router_id] = self + self._queue = [] + + self._master = self._masters[router_id] + + def _i_am_master(self): + return self == self._master + + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + if self._i_am_master(): + del self._masters[self._router_id] + + def _get_router_data_timestamp(self): + return self._router_timestamps.get(self._router_id, + datetime.datetime.min) + + def fetched_and_processed(self, timestamp): + """Records the data timestamp after it is used to update the router""" + new_timestamp = max(timestamp, self._get_router_data_timestamp()) + self._router_timestamps[self._router_id] = new_timestamp + + def queue_update(self, update): + """Queues an update from a worker + + This is the queue used to keep new updates that come in while a router + is being processed. These updates have already bubbled to the front of + the RouterProcessingQueue. + """ + self._master._queue.append(update) + + def updates(self): + """Processes the router until updates stop coming + + Only the master instance will process the router. However, updates may + come in from other workers while it is in progress. This method loops + until they stop coming. + """ + if self._i_am_master(): + while self._queue: + # Remove the update from the queue even if it is old. + update = self._queue.pop(0) + # Process the update only if it is fresh. + if self._get_router_data_timestamp() < update.timestamp: + yield update + + +class RouterProcessingQueue(object): + """Manager of the queue of routers to process.""" + def __init__(self): + self._queue = Queue.PriorityQueue() + + def add(self, update): + self._queue.put(update) + + def each_update_to_next_router(self): + """Grabs the next router from the queue and processes + + This method uses a for loop to process the router repeatedly until + updates stop bubbling to the front of the queue. + """ + next_update = self._queue.get() + + with ExclusiveRouterProcessor(next_update.id) as rp: + # Queue the update whether this worker is the master or not. + rp.queue_update(next_update) + + # Here, if the current worker is not the master, the call to + # rp.updates() will not yield and so this will essentially be a + # noop. + for update in rp.updates(): + yield (rp, update) + + class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager): """Manager for L3NatAgent @@ -221,9 +365,7 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager): self._clean_stale_namespaces = self.conf.use_namespaces - self.rpc_loop = loopingcall.FixedIntervalLoopingCall( - self._rpc_loop) - self.rpc_loop.start(interval=RPC_LOOP_INTERVAL) + self._queue = RouterProcessingQueue() super(L3NATAgent, self).__init__(conf=self.conf) self.target_ex_net_id = None @@ -244,10 +386,8 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager): LOG.error(msg) raise SystemExit(1) - def _cleanup_namespaces(self, routers): - """Destroy stale router namespaces on host when L3 agent restarts - - This routine is called when self._clean_stale_namespaces is True. + def _list_namespaces(self): + """Get a set of all router namespaces on host The argument routers is the list of routers that are recorded in the database as being hosted on this node. @@ -256,15 +396,24 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager): root_ip = ip_lib.IPWrapper(self.root_helper) host_namespaces = root_ip.get_namespaces(self.root_helper) - router_namespaces = set(ns for ns in host_namespaces - if ns.startswith(NS_PREFIX)) - ns_to_ignore = set(NS_PREFIX + r['id'] for r in routers) - ns_to_destroy = router_namespaces - ns_to_ignore + return set(ns for ns in host_namespaces + if ns.startswith(NS_PREFIX)) except RuntimeError: LOG.exception(_('RuntimeError in obtaining router list ' 'for namespace cleanup.')) - else: - self._destroy_stale_router_namespaces(ns_to_destroy) + return set() + + def _cleanup_namespaces(self, router_namespaces, router_ids): + """Destroy stale router namespaces on host when L3 agent restarts + + This routine is called when self._clean_stale_namespaces is True. + + The argument router_namespaces is the list of all routers namespaces + The argument router_ids is the list of ids for known routers. + """ + ns_to_ignore = set(NS_PREFIX + id for id in router_ids) + ns_to_destroy = router_namespaces - ns_to_ignore + self._destroy_stale_router_namespaces(ns_to_destroy) def _destroy_stale_router_namespaces(self, router_namespaces): """Destroys the stale router namespaces @@ -735,7 +884,8 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager): def router_deleted(self, context, router_id): """Deal with router deletion RPC message.""" LOG.debug(_('Got router deleted notification for %s'), router_id) - self.removed_routers.add(router_id) + update = RouterUpdate(router_id, PRIORITY_RPC, action=DELETE_ROUTER) + self._queue.add(update) def routers_updated(self, context, routers): """Deal with routers modification and creation RPC message.""" @@ -744,11 +894,15 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager): # This is needed for backward compatibility if isinstance(routers[0], dict): routers = [router['id'] for router in routers] - self.updated_routers.update(routers) + for id in routers: + update = RouterUpdate(id, PRIORITY_RPC) + self._queue.add(update) def router_removed_from_agent(self, context, payload): LOG.debug(_('Got router removed from agent :%r'), payload) - self.removed_routers.add(payload['router_id']) + router_id = payload['router_id'] + update = RouterUpdate(router_id, PRIORITY_RPC, action=DELETE_ROUTER) + self._queue.add(update) def router_added_to_agent(self, context, payload): LOG.debug(_('Got router added to agent :%r'), payload) @@ -800,36 +954,37 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager): pool.spawn_n(self._router_removed, router_id) pool.waitall() - @lockutils.synchronized('l3-agent', 'neutron-') - def _rpc_loop(self): - # _rpc_loop and _sync_routers_task will not be - # executed in the same time because of lock. - # so we can clear the value of updated_routers - # and removed_routers, but they can be updated by - # updated_routers and removed_routers rpc call - try: - LOG.debug(_("Starting RPC loop for %d updated routers"), - len(self.updated_routers)) - if self.updated_routers: - # We're capturing and clearing the list, and will - # process the "captured" updates in this loop, - # and any updates that happen due to a context switch - # will be picked up on the next pass. - updated_routers = set(self.updated_routers) - self.updated_routers.clear() - router_ids = list(updated_routers) - routers = self.plugin_rpc.get_routers( - self.context, router_ids) - # routers with admin_state_up=false will not be in the fetched - fetched = set([r['id'] for r in routers]) - self.removed_routers.update(updated_routers - fetched) + def _process_router_update(self): + for rp, update in self._queue.each_update_to_next_router(): + LOG.debug("Starting router update for %s", update.id) + router = update.router + if update.action != DELETE_ROUTER and not router: + try: + update.timestamp = timeutils.utcnow() + routers = self.plugin_rpc.get_routers(self.context, + [update.id]) + except Exception: + msg = _("Failed to fetch router information for '%s'") + LOG.exception(msg, update.id) + self.fullsync = True + continue - self._process_routers(routers) - self._process_router_delete() - LOG.debug(_("RPC loop successfully completed")) - except Exception: - LOG.exception(_("Failed synchronizing routers")) - self.fullsync = True + if routers: + router = routers[0] + + if not router: + self._router_removed(update.id) + continue + + self._process_routers([router]) + LOG.debug("Finished a router update for %s", update.id) + rp.fetched_and_processed(update.timestamp) + + def _process_routers_loop(self): + LOG.debug("Starting _process_routers_loop") + pool = eventlet.GreenPool(size=8) + while True: + pool.spawn_n(self._process_router_update) def _process_router_delete(self): current_removed_routers = list(self.removed_routers) @@ -842,7 +997,6 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager): return [self.conf.router_id] @periodic_task.periodic_task - @lockutils.synchronized('l3-agent', 'neutron-') def periodic_sync_routers_task(self, context): self._sync_routers_task(context) @@ -853,15 +1007,29 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager): self.fullsync) if not self.fullsync: return + + # Capture a picture of namespaces *before* fetching the full list from + # the database. This is important to correctly identify stale ones. + namespaces = set() + if self._clean_stale_namespaces: + namespaces = self._list_namespaces() + prev_router_ids = set(self.router_info) + try: router_ids = self._router_ids() self.updated_routers.clear() self.removed_routers.clear() + timestamp = timeutils.utcnow() routers = self.plugin_rpc.get_routers( context, router_ids) LOG.debug(_('Processing :%r'), routers) - self._process_routers(routers, all_routers=True) + for r in routers: + update = RouterUpdate(r['id'], + PRIORITY_SYNC_ROUTERS_TASK, + router=r, + timestamp=timestamp) + self._queue.add(update) self.fullsync = False LOG.debug(_("_sync_routers_task successfully completed")) except n_rpc.RPCException: @@ -872,10 +1040,25 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager): self.fullsync = True else: # Resync is not necessary for the cleanup of stale namespaces + curr_router_ids = set([r['id'] for r in routers]) + + # Two kinds of stale routers: Routers for which info is cached in + # self.router_info and the others. First, handle the former. + for router_id in prev_router_ids - curr_router_ids: + update = RouterUpdate(router_id, + PRIORITY_SYNC_ROUTERS_TASK, + timestamp=timestamp, + action=DELETE_ROUTER) + self._queue.add(update) + + # Next, one effort to clean out namespaces for which we don't have + # a record. (i.e. _clean_stale_namespaces=False after one pass) if self._clean_stale_namespaces: - self._cleanup_namespaces(routers) + ids_to_keep = curr_router_ids | prev_router_ids + self._cleanup_namespaces(namespaces, ids_to_keep) def after_start(self): + eventlet.spawn_n(self._process_routers_loop) LOG.info(_("L3 agent started")) def _update_routing_table(self, ri, operation, route): diff --git a/neutron/tests/unit/test_l3_agent.py b/neutron/tests/unit/test_l3_agent.py index 511b2f421e..420f4f6395 100644 --- a/neutron/tests/unit/test_l3_agent.py +++ b/neutron/tests/unit/test_l3_agent.py @@ -15,6 +15,7 @@ import contextlib import copy +import datetime import mock import netaddr @@ -35,6 +36,85 @@ from neutron.tests import base _uuid = uuidutils.generate_uuid HOSTNAME = 'myhost' FAKE_ID = _uuid() +FAKE_ID_2 = _uuid() + + +class TestExclusiveRouterProcessor(base.BaseTestCase): + def setUp(self): + super(TestExclusiveRouterProcessor, self).setUp() + + def test_i_am_master(self): + master = l3_agent.ExclusiveRouterProcessor(FAKE_ID) + not_master = l3_agent.ExclusiveRouterProcessor(FAKE_ID) + master_2 = l3_agent.ExclusiveRouterProcessor(FAKE_ID_2) + not_master_2 = l3_agent.ExclusiveRouterProcessor(FAKE_ID_2) + + self.assertTrue(master._i_am_master()) + self.assertFalse(not_master._i_am_master()) + self.assertTrue(master_2._i_am_master()) + self.assertFalse(not_master_2._i_am_master()) + + master.__exit__(None, None, None) + master_2.__exit__(None, None, None) + + def test_master(self): + master = l3_agent.ExclusiveRouterProcessor(FAKE_ID) + not_master = l3_agent.ExclusiveRouterProcessor(FAKE_ID) + master_2 = l3_agent.ExclusiveRouterProcessor(FAKE_ID_2) + not_master_2 = l3_agent.ExclusiveRouterProcessor(FAKE_ID_2) + + self.assertEqual(master._master, master) + self.assertEqual(not_master._master, master) + self.assertEqual(master_2._master, master_2) + self.assertEqual(not_master_2._master, master_2) + + master.__exit__(None, None, None) + master_2.__exit__(None, None, None) + + def test__enter__(self): + self.assertFalse(FAKE_ID in l3_agent.ExclusiveRouterProcessor._masters) + master = l3_agent.ExclusiveRouterProcessor(FAKE_ID) + master.__enter__() + self.assertTrue(FAKE_ID in l3_agent.ExclusiveRouterProcessor._masters) + master.__exit__(None, None, None) + + def test__exit__(self): + master = l3_agent.ExclusiveRouterProcessor(FAKE_ID) + not_master = l3_agent.ExclusiveRouterProcessor(FAKE_ID) + master.__enter__() + self.assertTrue(FAKE_ID in l3_agent.ExclusiveRouterProcessor._masters) + not_master.__enter__() + not_master.__exit__(None, None, None) + self.assertTrue(FAKE_ID in l3_agent.ExclusiveRouterProcessor._masters) + master.__exit__(None, None, None) + self.assertFalse(FAKE_ID in l3_agent.ExclusiveRouterProcessor._masters) + + def test_data_fetched_since(self): + master = l3_agent.ExclusiveRouterProcessor(FAKE_ID) + self.assertEqual(master._get_router_data_timestamp(), + datetime.datetime.min) + + ts1 = datetime.datetime.utcnow() - datetime.timedelta(seconds=10) + ts2 = datetime.datetime.utcnow() + + master.fetched_and_processed(ts2) + self.assertEqual(master._get_router_data_timestamp(), ts2) + master.fetched_and_processed(ts1) + self.assertEqual(master._get_router_data_timestamp(), ts2) + + master.__exit__(None, None, None) + + def test_updates(self): + master = l3_agent.ExclusiveRouterProcessor(FAKE_ID) + not_master = l3_agent.ExclusiveRouterProcessor(FAKE_ID) + + master.queue_update(l3_agent.RouterUpdate(FAKE_ID, 0)) + not_master.queue_update(l3_agent.RouterUpdate(FAKE_ID, 0)) + + for update in not_master.updates(): + raise Exception("Only the master should process a router") + + self.assertEqual(2, len([i for i in master.updates()])) class TestBasicRouterOperations(base.BaseTestCase): @@ -100,12 +180,10 @@ class TestBasicRouterOperations(base.BaseTestCase): def test__sync_routers_task_call_clean_stale_namespaces(self): agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) - self.plugin_api.get_routers.return_value = mock.ANY + self.plugin_api.get_routers.return_value = [] with mock.patch.object(agent, '_cleanup_namespaces') as f: - with mock.patch.object(agent, '_process_routers') as g: - agent._sync_routers_task(agent.context) + agent._sync_routers_task(agent.context) self.assertTrue(f.called) - g.assert_called_with(mock.ANY, all_routers=True) def test_router_info_create(self): id = _uuid() @@ -1024,27 +1102,27 @@ class TestBasicRouterOperations(base.BaseTestCase): def test_router_deleted(self): agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) + agent._queue = mock.Mock() agent.router_deleted(None, FAKE_ID) - # verify that will set fullsync - self.assertIn(FAKE_ID, agent.removed_routers) + agent._queue.add.assert_called_once() def test_routers_updated(self): agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) + agent._queue = mock.Mock() agent.routers_updated(None, [FAKE_ID]) - # verify that will set fullsync - self.assertIn(FAKE_ID, agent.updated_routers) + agent._queue.add.assert_called_once() def test_removed_from_agent(self): agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) + agent._queue = mock.Mock() agent.router_removed_from_agent(None, {'router_id': FAKE_ID}) - # verify that will set fullsync - self.assertIn(FAKE_ID, agent.removed_routers) + agent._queue.add.assert_called_once() def test_added_to_agent(self): agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) + agent._queue = mock.Mock() agent.router_added_to_agent(None, [FAKE_ID]) - # verify that will set fullsync - self.assertIn(FAKE_ID, agent.updated_routers) + agent._queue.add.assert_called_once() def test_process_router_delete(self): agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) @@ -1277,7 +1355,8 @@ class TestBasicRouterOperations(base.BaseTestCase): pm.reset_mock() agent._destroy_router_namespace = mock.MagicMock() - agent._cleanup_namespaces(router_list) + ns_list = agent._list_namespaces() + agent._cleanup_namespaces(ns_list, [r['id'] for r in router_list]) self.assertEqual(pm.disable.call_count, len(stale_namespace_list)) self.assertEqual(agent._destroy_router_namespace.call_count,