L3 agent prefers RPC messages over full sync
When the L3 agent starts up and runs the sync task it doesn't process any incoming RPC events until the sync task is complete. This change combines the work from _rpc_loop and _sync_routers_task in to a single loop called _process_routers_loop. This loop spawns threads that pull from a priority queue. The queue ensures that RPC messages are handled before _process_routers_loop. The latter is generally maintenance tasks triggered by the agent rather than user triggered tasks. Synchronization between RPC and sync routers loops is no longer necessary since they both feed in to a single queue. There were places where it was necessary to reorder some things to allow for the lack of synchronization. For example, it is necessary to list namespaces before fetching the full list of routers to ensure that it doesn't delete a new namespace that gets created after listing namespaces. The lack of the need for synchronization between loops is probably the main strength of this patch. With multiple worker threads, need to handle the case where an RPC message came in while a thread was working on a router. Another thread should not handle the same router that is already in progress. Adds a mechanism to signal to the working thread that an update came in for the router it is working on. The original thread will repeat processing the router when it is finished to get the update. Multiple rapid updates to the same router will be consolidated. Essentially, there is still synchronization of work for a given router but not between routers. Much better than before. blueprint l3-agent-responsiveness Closes-Bug: #1289066 Change-Id: I39afe86c66f864d71adf865d7bd1c9db35511505
This commit is contained in:
parent
019444170b
commit
098d99d701
@ -15,11 +15,13 @@
|
|||||||
|
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
|
import datetime
|
||||||
import eventlet
|
import eventlet
|
||||||
eventlet.monkey_patch()
|
eventlet.monkey_patch()
|
||||||
|
|
||||||
import netaddr
|
import netaddr
|
||||||
from oslo.config import cfg
|
from oslo.config import cfg
|
||||||
|
import Queue
|
||||||
|
|
||||||
from neutron.agent.common import config
|
from neutron.agent.common import config
|
||||||
from neutron.agent.linux import external_process
|
from neutron.agent.linux import external_process
|
||||||
@ -37,12 +39,12 @@ from neutron import context
|
|||||||
from neutron import manager
|
from neutron import manager
|
||||||
from neutron.openstack.common import excutils
|
from neutron.openstack.common import excutils
|
||||||
from neutron.openstack.common import importutils
|
from neutron.openstack.common import importutils
|
||||||
from neutron.openstack.common import lockutils
|
|
||||||
from neutron.openstack.common import log as logging
|
from neutron.openstack.common import log as logging
|
||||||
from neutron.openstack.common import loopingcall
|
from neutron.openstack.common import loopingcall
|
||||||
from neutron.openstack.common import periodic_task
|
from neutron.openstack.common import periodic_task
|
||||||
from neutron.openstack.common import processutils
|
from neutron.openstack.common import processutils
|
||||||
from neutron.openstack.common import service
|
from neutron.openstack.common import service
|
||||||
|
from neutron.openstack.common import timeutils
|
||||||
from neutron import service as neutron_service
|
from neutron import service as neutron_service
|
||||||
from neutron.services.firewall.agents.l3reference import firewall_l3_agent
|
from neutron.services.firewall.agents.l3reference import firewall_l3_agent
|
||||||
|
|
||||||
@ -52,6 +54,10 @@ INTERNAL_DEV_PREFIX = 'qr-'
|
|||||||
EXTERNAL_DEV_PREFIX = 'qg-'
|
EXTERNAL_DEV_PREFIX = 'qg-'
|
||||||
RPC_LOOP_INTERVAL = 1
|
RPC_LOOP_INTERVAL = 1
|
||||||
FLOATING_IP_CIDR_SUFFIX = '/32'
|
FLOATING_IP_CIDR_SUFFIX = '/32'
|
||||||
|
# Lower value is higher priority
|
||||||
|
PRIORITY_RPC = 0
|
||||||
|
PRIORITY_SYNC_ROUTERS_TASK = 1
|
||||||
|
DELETE_ROUTER = 1
|
||||||
|
|
||||||
|
|
||||||
class L3PluginApi(n_rpc.RpcProxy):
|
class L3PluginApi(n_rpc.RpcProxy):
|
||||||
@ -146,6 +152,144 @@ class RouterInfo(object):
|
|||||||
self._snat_action = None
|
self._snat_action = None
|
||||||
|
|
||||||
|
|
||||||
|
class RouterUpdate(object):
|
||||||
|
"""Encapsulates a router update
|
||||||
|
|
||||||
|
An instance of this object carries the information necessary to prioritize
|
||||||
|
and process a request to update a router.
|
||||||
|
"""
|
||||||
|
def __init__(self, router_id, priority,
|
||||||
|
action=None, router=None, timestamp=None):
|
||||||
|
self.priority = priority
|
||||||
|
self.timestamp = timestamp
|
||||||
|
if not timestamp:
|
||||||
|
self.timestamp = timeutils.utcnow()
|
||||||
|
self.id = router_id
|
||||||
|
self.action = action
|
||||||
|
self.router = router
|
||||||
|
|
||||||
|
def __lt__(self, other):
|
||||||
|
"""Implements priority among updates
|
||||||
|
|
||||||
|
Lower numerical priority always gets precedence. When comparing two
|
||||||
|
updates of the same priority then the one with the earlier timestamp
|
||||||
|
gets procedence. In the unlikely event that the timestamps are also
|
||||||
|
equal it falls back to a simple comparison of ids meaning the
|
||||||
|
precedence is essentially random.
|
||||||
|
"""
|
||||||
|
if self.priority != other.priority:
|
||||||
|
return self.priority < other.priority
|
||||||
|
if self.timestamp != other.timestamp:
|
||||||
|
return self.timestamp < other.timestamp
|
||||||
|
return self.id < other.id
|
||||||
|
|
||||||
|
|
||||||
|
class ExclusiveRouterProcessor(object):
|
||||||
|
"""Manager for access to a router for processing
|
||||||
|
|
||||||
|
This class controls access to a router in a non-blocking way. The first
|
||||||
|
instance to be created for a given router_id is granted exclusive access to
|
||||||
|
the router.
|
||||||
|
|
||||||
|
Other instances may be created for the same router_id while the first
|
||||||
|
instance has exclusive access. If that happens then it doesn't block and
|
||||||
|
wait for access. Instead, it signals to the master instance that an update
|
||||||
|
came in with the timestamp.
|
||||||
|
|
||||||
|
This way, a thread will not block to wait for access to a router. Instead
|
||||||
|
it effectively signals to the thread that is working on the router that
|
||||||
|
something has changed since it started working on it. That thread will
|
||||||
|
simply finish its current iteration and then repeat.
|
||||||
|
|
||||||
|
This class keeps track of the last time that a router data was fetched and
|
||||||
|
processed. The timestamp that it keeps must be before when the data used
|
||||||
|
to process the router last was fetched from the database. But, as close as
|
||||||
|
possible. The timestamp should not be recorded, however, until the router
|
||||||
|
has been processed using the fetch data.
|
||||||
|
"""
|
||||||
|
_masters = {}
|
||||||
|
_router_timestamps = {}
|
||||||
|
|
||||||
|
def __init__(self, router_id):
|
||||||
|
self._router_id = router_id
|
||||||
|
|
||||||
|
if router_id not in self._masters:
|
||||||
|
self._masters[router_id] = self
|
||||||
|
self._queue = []
|
||||||
|
|
||||||
|
self._master = self._masters[router_id]
|
||||||
|
|
||||||
|
def _i_am_master(self):
|
||||||
|
return self == self._master
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, type, value, traceback):
|
||||||
|
if self._i_am_master():
|
||||||
|
del self._masters[self._router_id]
|
||||||
|
|
||||||
|
def _get_router_data_timestamp(self):
|
||||||
|
return self._router_timestamps.get(self._router_id,
|
||||||
|
datetime.datetime.min)
|
||||||
|
|
||||||
|
def fetched_and_processed(self, timestamp):
|
||||||
|
"""Records the data timestamp after it is used to update the router"""
|
||||||
|
new_timestamp = max(timestamp, self._get_router_data_timestamp())
|
||||||
|
self._router_timestamps[self._router_id] = new_timestamp
|
||||||
|
|
||||||
|
def queue_update(self, update):
|
||||||
|
"""Queues an update from a worker
|
||||||
|
|
||||||
|
This is the queue used to keep new updates that come in while a router
|
||||||
|
is being processed. These updates have already bubbled to the front of
|
||||||
|
the RouterProcessingQueue.
|
||||||
|
"""
|
||||||
|
self._master._queue.append(update)
|
||||||
|
|
||||||
|
def updates(self):
|
||||||
|
"""Processes the router until updates stop coming
|
||||||
|
|
||||||
|
Only the master instance will process the router. However, updates may
|
||||||
|
come in from other workers while it is in progress. This method loops
|
||||||
|
until they stop coming.
|
||||||
|
"""
|
||||||
|
if self._i_am_master():
|
||||||
|
while self._queue:
|
||||||
|
# Remove the update from the queue even if it is old.
|
||||||
|
update = self._queue.pop(0)
|
||||||
|
# Process the update only if it is fresh.
|
||||||
|
if self._get_router_data_timestamp() < update.timestamp:
|
||||||
|
yield update
|
||||||
|
|
||||||
|
|
||||||
|
class RouterProcessingQueue(object):
|
||||||
|
"""Manager of the queue of routers to process."""
|
||||||
|
def __init__(self):
|
||||||
|
self._queue = Queue.PriorityQueue()
|
||||||
|
|
||||||
|
def add(self, update):
|
||||||
|
self._queue.put(update)
|
||||||
|
|
||||||
|
def each_update_to_next_router(self):
|
||||||
|
"""Grabs the next router from the queue and processes
|
||||||
|
|
||||||
|
This method uses a for loop to process the router repeatedly until
|
||||||
|
updates stop bubbling to the front of the queue.
|
||||||
|
"""
|
||||||
|
next_update = self._queue.get()
|
||||||
|
|
||||||
|
with ExclusiveRouterProcessor(next_update.id) as rp:
|
||||||
|
# Queue the update whether this worker is the master or not.
|
||||||
|
rp.queue_update(next_update)
|
||||||
|
|
||||||
|
# Here, if the current worker is not the master, the call to
|
||||||
|
# rp.updates() will not yield and so this will essentially be a
|
||||||
|
# noop.
|
||||||
|
for update in rp.updates():
|
||||||
|
yield (rp, update)
|
||||||
|
|
||||||
|
|
||||||
class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
|
class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
|
||||||
"""Manager for L3NatAgent
|
"""Manager for L3NatAgent
|
||||||
|
|
||||||
@ -221,9 +365,7 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
|
|||||||
|
|
||||||
self._clean_stale_namespaces = self.conf.use_namespaces
|
self._clean_stale_namespaces = self.conf.use_namespaces
|
||||||
|
|
||||||
self.rpc_loop = loopingcall.FixedIntervalLoopingCall(
|
self._queue = RouterProcessingQueue()
|
||||||
self._rpc_loop)
|
|
||||||
self.rpc_loop.start(interval=RPC_LOOP_INTERVAL)
|
|
||||||
super(L3NATAgent, self).__init__(conf=self.conf)
|
super(L3NATAgent, self).__init__(conf=self.conf)
|
||||||
|
|
||||||
self.target_ex_net_id = None
|
self.target_ex_net_id = None
|
||||||
@ -244,10 +386,8 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
|
|||||||
LOG.error(msg)
|
LOG.error(msg)
|
||||||
raise SystemExit(1)
|
raise SystemExit(1)
|
||||||
|
|
||||||
def _cleanup_namespaces(self, routers):
|
def _list_namespaces(self):
|
||||||
"""Destroy stale router namespaces on host when L3 agent restarts
|
"""Get a set of all router namespaces on host
|
||||||
|
|
||||||
This routine is called when self._clean_stale_namespaces is True.
|
|
||||||
|
|
||||||
The argument routers is the list of routers that are recorded in
|
The argument routers is the list of routers that are recorded in
|
||||||
the database as being hosted on this node.
|
the database as being hosted on this node.
|
||||||
@ -256,15 +396,24 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
|
|||||||
root_ip = ip_lib.IPWrapper(self.root_helper)
|
root_ip = ip_lib.IPWrapper(self.root_helper)
|
||||||
|
|
||||||
host_namespaces = root_ip.get_namespaces(self.root_helper)
|
host_namespaces = root_ip.get_namespaces(self.root_helper)
|
||||||
router_namespaces = set(ns for ns in host_namespaces
|
return set(ns for ns in host_namespaces
|
||||||
if ns.startswith(NS_PREFIX))
|
if ns.startswith(NS_PREFIX))
|
||||||
ns_to_ignore = set(NS_PREFIX + r['id'] for r in routers)
|
|
||||||
ns_to_destroy = router_namespaces - ns_to_ignore
|
|
||||||
except RuntimeError:
|
except RuntimeError:
|
||||||
LOG.exception(_('RuntimeError in obtaining router list '
|
LOG.exception(_('RuntimeError in obtaining router list '
|
||||||
'for namespace cleanup.'))
|
'for namespace cleanup.'))
|
||||||
else:
|
return set()
|
||||||
self._destroy_stale_router_namespaces(ns_to_destroy)
|
|
||||||
|
def _cleanup_namespaces(self, router_namespaces, router_ids):
|
||||||
|
"""Destroy stale router namespaces on host when L3 agent restarts
|
||||||
|
|
||||||
|
This routine is called when self._clean_stale_namespaces is True.
|
||||||
|
|
||||||
|
The argument router_namespaces is the list of all routers namespaces
|
||||||
|
The argument router_ids is the list of ids for known routers.
|
||||||
|
"""
|
||||||
|
ns_to_ignore = set(NS_PREFIX + id for id in router_ids)
|
||||||
|
ns_to_destroy = router_namespaces - ns_to_ignore
|
||||||
|
self._destroy_stale_router_namespaces(ns_to_destroy)
|
||||||
|
|
||||||
def _destroy_stale_router_namespaces(self, router_namespaces):
|
def _destroy_stale_router_namespaces(self, router_namespaces):
|
||||||
"""Destroys the stale router namespaces
|
"""Destroys the stale router namespaces
|
||||||
@ -735,7 +884,8 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
|
|||||||
def router_deleted(self, context, router_id):
|
def router_deleted(self, context, router_id):
|
||||||
"""Deal with router deletion RPC message."""
|
"""Deal with router deletion RPC message."""
|
||||||
LOG.debug(_('Got router deleted notification for %s'), router_id)
|
LOG.debug(_('Got router deleted notification for %s'), router_id)
|
||||||
self.removed_routers.add(router_id)
|
update = RouterUpdate(router_id, PRIORITY_RPC, action=DELETE_ROUTER)
|
||||||
|
self._queue.add(update)
|
||||||
|
|
||||||
def routers_updated(self, context, routers):
|
def routers_updated(self, context, routers):
|
||||||
"""Deal with routers modification and creation RPC message."""
|
"""Deal with routers modification and creation RPC message."""
|
||||||
@ -744,11 +894,15 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
|
|||||||
# This is needed for backward compatibility
|
# This is needed for backward compatibility
|
||||||
if isinstance(routers[0], dict):
|
if isinstance(routers[0], dict):
|
||||||
routers = [router['id'] for router in routers]
|
routers = [router['id'] for router in routers]
|
||||||
self.updated_routers.update(routers)
|
for id in routers:
|
||||||
|
update = RouterUpdate(id, PRIORITY_RPC)
|
||||||
|
self._queue.add(update)
|
||||||
|
|
||||||
def router_removed_from_agent(self, context, payload):
|
def router_removed_from_agent(self, context, payload):
|
||||||
LOG.debug(_('Got router removed from agent :%r'), payload)
|
LOG.debug(_('Got router removed from agent :%r'), payload)
|
||||||
self.removed_routers.add(payload['router_id'])
|
router_id = payload['router_id']
|
||||||
|
update = RouterUpdate(router_id, PRIORITY_RPC, action=DELETE_ROUTER)
|
||||||
|
self._queue.add(update)
|
||||||
|
|
||||||
def router_added_to_agent(self, context, payload):
|
def router_added_to_agent(self, context, payload):
|
||||||
LOG.debug(_('Got router added to agent :%r'), payload)
|
LOG.debug(_('Got router added to agent :%r'), payload)
|
||||||
@ -800,36 +954,37 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
|
|||||||
pool.spawn_n(self._router_removed, router_id)
|
pool.spawn_n(self._router_removed, router_id)
|
||||||
pool.waitall()
|
pool.waitall()
|
||||||
|
|
||||||
@lockutils.synchronized('l3-agent', 'neutron-')
|
def _process_router_update(self):
|
||||||
def _rpc_loop(self):
|
for rp, update in self._queue.each_update_to_next_router():
|
||||||
# _rpc_loop and _sync_routers_task will not be
|
LOG.debug("Starting router update for %s", update.id)
|
||||||
# executed in the same time because of lock.
|
router = update.router
|
||||||
# so we can clear the value of updated_routers
|
if update.action != DELETE_ROUTER and not router:
|
||||||
# and removed_routers, but they can be updated by
|
try:
|
||||||
# updated_routers and removed_routers rpc call
|
update.timestamp = timeutils.utcnow()
|
||||||
try:
|
routers = self.plugin_rpc.get_routers(self.context,
|
||||||
LOG.debug(_("Starting RPC loop for %d updated routers"),
|
[update.id])
|
||||||
len(self.updated_routers))
|
except Exception:
|
||||||
if self.updated_routers:
|
msg = _("Failed to fetch router information for '%s'")
|
||||||
# We're capturing and clearing the list, and will
|
LOG.exception(msg, update.id)
|
||||||
# process the "captured" updates in this loop,
|
self.fullsync = True
|
||||||
# and any updates that happen due to a context switch
|
continue
|
||||||
# will be picked up on the next pass.
|
|
||||||
updated_routers = set(self.updated_routers)
|
|
||||||
self.updated_routers.clear()
|
|
||||||
router_ids = list(updated_routers)
|
|
||||||
routers = self.plugin_rpc.get_routers(
|
|
||||||
self.context, router_ids)
|
|
||||||
# routers with admin_state_up=false will not be in the fetched
|
|
||||||
fetched = set([r['id'] for r in routers])
|
|
||||||
self.removed_routers.update(updated_routers - fetched)
|
|
||||||
|
|
||||||
self._process_routers(routers)
|
if routers:
|
||||||
self._process_router_delete()
|
router = routers[0]
|
||||||
LOG.debug(_("RPC loop successfully completed"))
|
|
||||||
except Exception:
|
if not router:
|
||||||
LOG.exception(_("Failed synchronizing routers"))
|
self._router_removed(update.id)
|
||||||
self.fullsync = True
|
continue
|
||||||
|
|
||||||
|
self._process_routers([router])
|
||||||
|
LOG.debug("Finished a router update for %s", update.id)
|
||||||
|
rp.fetched_and_processed(update.timestamp)
|
||||||
|
|
||||||
|
def _process_routers_loop(self):
|
||||||
|
LOG.debug("Starting _process_routers_loop")
|
||||||
|
pool = eventlet.GreenPool(size=8)
|
||||||
|
while True:
|
||||||
|
pool.spawn_n(self._process_router_update)
|
||||||
|
|
||||||
def _process_router_delete(self):
|
def _process_router_delete(self):
|
||||||
current_removed_routers = list(self.removed_routers)
|
current_removed_routers = list(self.removed_routers)
|
||||||
@ -842,7 +997,6 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
|
|||||||
return [self.conf.router_id]
|
return [self.conf.router_id]
|
||||||
|
|
||||||
@periodic_task.periodic_task
|
@periodic_task.periodic_task
|
||||||
@lockutils.synchronized('l3-agent', 'neutron-')
|
|
||||||
def periodic_sync_routers_task(self, context):
|
def periodic_sync_routers_task(self, context):
|
||||||
self._sync_routers_task(context)
|
self._sync_routers_task(context)
|
||||||
|
|
||||||
@ -853,15 +1007,29 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
|
|||||||
self.fullsync)
|
self.fullsync)
|
||||||
if not self.fullsync:
|
if not self.fullsync:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# Capture a picture of namespaces *before* fetching the full list from
|
||||||
|
# the database. This is important to correctly identify stale ones.
|
||||||
|
namespaces = set()
|
||||||
|
if self._clean_stale_namespaces:
|
||||||
|
namespaces = self._list_namespaces()
|
||||||
|
prev_router_ids = set(self.router_info)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
router_ids = self._router_ids()
|
router_ids = self._router_ids()
|
||||||
self.updated_routers.clear()
|
self.updated_routers.clear()
|
||||||
self.removed_routers.clear()
|
self.removed_routers.clear()
|
||||||
|
timestamp = timeutils.utcnow()
|
||||||
routers = self.plugin_rpc.get_routers(
|
routers = self.plugin_rpc.get_routers(
|
||||||
context, router_ids)
|
context, router_ids)
|
||||||
|
|
||||||
LOG.debug(_('Processing :%r'), routers)
|
LOG.debug(_('Processing :%r'), routers)
|
||||||
self._process_routers(routers, all_routers=True)
|
for r in routers:
|
||||||
|
update = RouterUpdate(r['id'],
|
||||||
|
PRIORITY_SYNC_ROUTERS_TASK,
|
||||||
|
router=r,
|
||||||
|
timestamp=timestamp)
|
||||||
|
self._queue.add(update)
|
||||||
self.fullsync = False
|
self.fullsync = False
|
||||||
LOG.debug(_("_sync_routers_task successfully completed"))
|
LOG.debug(_("_sync_routers_task successfully completed"))
|
||||||
except n_rpc.RPCException:
|
except n_rpc.RPCException:
|
||||||
@ -872,10 +1040,25 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
|
|||||||
self.fullsync = True
|
self.fullsync = True
|
||||||
else:
|
else:
|
||||||
# Resync is not necessary for the cleanup of stale namespaces
|
# Resync is not necessary for the cleanup of stale namespaces
|
||||||
|
curr_router_ids = set([r['id'] for r in routers])
|
||||||
|
|
||||||
|
# Two kinds of stale routers: Routers for which info is cached in
|
||||||
|
# self.router_info and the others. First, handle the former.
|
||||||
|
for router_id in prev_router_ids - curr_router_ids:
|
||||||
|
update = RouterUpdate(router_id,
|
||||||
|
PRIORITY_SYNC_ROUTERS_TASK,
|
||||||
|
timestamp=timestamp,
|
||||||
|
action=DELETE_ROUTER)
|
||||||
|
self._queue.add(update)
|
||||||
|
|
||||||
|
# Next, one effort to clean out namespaces for which we don't have
|
||||||
|
# a record. (i.e. _clean_stale_namespaces=False after one pass)
|
||||||
if self._clean_stale_namespaces:
|
if self._clean_stale_namespaces:
|
||||||
self._cleanup_namespaces(routers)
|
ids_to_keep = curr_router_ids | prev_router_ids
|
||||||
|
self._cleanup_namespaces(namespaces, ids_to_keep)
|
||||||
|
|
||||||
def after_start(self):
|
def after_start(self):
|
||||||
|
eventlet.spawn_n(self._process_routers_loop)
|
||||||
LOG.info(_("L3 agent started"))
|
LOG.info(_("L3 agent started"))
|
||||||
|
|
||||||
def _update_routing_table(self, ri, operation, route):
|
def _update_routing_table(self, ri, operation, route):
|
||||||
|
@ -15,6 +15,7 @@
|
|||||||
|
|
||||||
import contextlib
|
import contextlib
|
||||||
import copy
|
import copy
|
||||||
|
import datetime
|
||||||
|
|
||||||
import mock
|
import mock
|
||||||
import netaddr
|
import netaddr
|
||||||
@ -35,6 +36,85 @@ from neutron.tests import base
|
|||||||
_uuid = uuidutils.generate_uuid
|
_uuid = uuidutils.generate_uuid
|
||||||
HOSTNAME = 'myhost'
|
HOSTNAME = 'myhost'
|
||||||
FAKE_ID = _uuid()
|
FAKE_ID = _uuid()
|
||||||
|
FAKE_ID_2 = _uuid()
|
||||||
|
|
||||||
|
|
||||||
|
class TestExclusiveRouterProcessor(base.BaseTestCase):
|
||||||
|
def setUp(self):
|
||||||
|
super(TestExclusiveRouterProcessor, self).setUp()
|
||||||
|
|
||||||
|
def test_i_am_master(self):
|
||||||
|
master = l3_agent.ExclusiveRouterProcessor(FAKE_ID)
|
||||||
|
not_master = l3_agent.ExclusiveRouterProcessor(FAKE_ID)
|
||||||
|
master_2 = l3_agent.ExclusiveRouterProcessor(FAKE_ID_2)
|
||||||
|
not_master_2 = l3_agent.ExclusiveRouterProcessor(FAKE_ID_2)
|
||||||
|
|
||||||
|
self.assertTrue(master._i_am_master())
|
||||||
|
self.assertFalse(not_master._i_am_master())
|
||||||
|
self.assertTrue(master_2._i_am_master())
|
||||||
|
self.assertFalse(not_master_2._i_am_master())
|
||||||
|
|
||||||
|
master.__exit__(None, None, None)
|
||||||
|
master_2.__exit__(None, None, None)
|
||||||
|
|
||||||
|
def test_master(self):
|
||||||
|
master = l3_agent.ExclusiveRouterProcessor(FAKE_ID)
|
||||||
|
not_master = l3_agent.ExclusiveRouterProcessor(FAKE_ID)
|
||||||
|
master_2 = l3_agent.ExclusiveRouterProcessor(FAKE_ID_2)
|
||||||
|
not_master_2 = l3_agent.ExclusiveRouterProcessor(FAKE_ID_2)
|
||||||
|
|
||||||
|
self.assertEqual(master._master, master)
|
||||||
|
self.assertEqual(not_master._master, master)
|
||||||
|
self.assertEqual(master_2._master, master_2)
|
||||||
|
self.assertEqual(not_master_2._master, master_2)
|
||||||
|
|
||||||
|
master.__exit__(None, None, None)
|
||||||
|
master_2.__exit__(None, None, None)
|
||||||
|
|
||||||
|
def test__enter__(self):
|
||||||
|
self.assertFalse(FAKE_ID in l3_agent.ExclusiveRouterProcessor._masters)
|
||||||
|
master = l3_agent.ExclusiveRouterProcessor(FAKE_ID)
|
||||||
|
master.__enter__()
|
||||||
|
self.assertTrue(FAKE_ID in l3_agent.ExclusiveRouterProcessor._masters)
|
||||||
|
master.__exit__(None, None, None)
|
||||||
|
|
||||||
|
def test__exit__(self):
|
||||||
|
master = l3_agent.ExclusiveRouterProcessor(FAKE_ID)
|
||||||
|
not_master = l3_agent.ExclusiveRouterProcessor(FAKE_ID)
|
||||||
|
master.__enter__()
|
||||||
|
self.assertTrue(FAKE_ID in l3_agent.ExclusiveRouterProcessor._masters)
|
||||||
|
not_master.__enter__()
|
||||||
|
not_master.__exit__(None, None, None)
|
||||||
|
self.assertTrue(FAKE_ID in l3_agent.ExclusiveRouterProcessor._masters)
|
||||||
|
master.__exit__(None, None, None)
|
||||||
|
self.assertFalse(FAKE_ID in l3_agent.ExclusiveRouterProcessor._masters)
|
||||||
|
|
||||||
|
def test_data_fetched_since(self):
|
||||||
|
master = l3_agent.ExclusiveRouterProcessor(FAKE_ID)
|
||||||
|
self.assertEqual(master._get_router_data_timestamp(),
|
||||||
|
datetime.datetime.min)
|
||||||
|
|
||||||
|
ts1 = datetime.datetime.utcnow() - datetime.timedelta(seconds=10)
|
||||||
|
ts2 = datetime.datetime.utcnow()
|
||||||
|
|
||||||
|
master.fetched_and_processed(ts2)
|
||||||
|
self.assertEqual(master._get_router_data_timestamp(), ts2)
|
||||||
|
master.fetched_and_processed(ts1)
|
||||||
|
self.assertEqual(master._get_router_data_timestamp(), ts2)
|
||||||
|
|
||||||
|
master.__exit__(None, None, None)
|
||||||
|
|
||||||
|
def test_updates(self):
|
||||||
|
master = l3_agent.ExclusiveRouterProcessor(FAKE_ID)
|
||||||
|
not_master = l3_agent.ExclusiveRouterProcessor(FAKE_ID)
|
||||||
|
|
||||||
|
master.queue_update(l3_agent.RouterUpdate(FAKE_ID, 0))
|
||||||
|
not_master.queue_update(l3_agent.RouterUpdate(FAKE_ID, 0))
|
||||||
|
|
||||||
|
for update in not_master.updates():
|
||||||
|
raise Exception("Only the master should process a router")
|
||||||
|
|
||||||
|
self.assertEqual(2, len([i for i in master.updates()]))
|
||||||
|
|
||||||
|
|
||||||
class TestBasicRouterOperations(base.BaseTestCase):
|
class TestBasicRouterOperations(base.BaseTestCase):
|
||||||
@ -100,12 +180,10 @@ class TestBasicRouterOperations(base.BaseTestCase):
|
|||||||
|
|
||||||
def test__sync_routers_task_call_clean_stale_namespaces(self):
|
def test__sync_routers_task_call_clean_stale_namespaces(self):
|
||||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||||
self.plugin_api.get_routers.return_value = mock.ANY
|
self.plugin_api.get_routers.return_value = []
|
||||||
with mock.patch.object(agent, '_cleanup_namespaces') as f:
|
with mock.patch.object(agent, '_cleanup_namespaces') as f:
|
||||||
with mock.patch.object(agent, '_process_routers') as g:
|
agent._sync_routers_task(agent.context)
|
||||||
agent._sync_routers_task(agent.context)
|
|
||||||
self.assertTrue(f.called)
|
self.assertTrue(f.called)
|
||||||
g.assert_called_with(mock.ANY, all_routers=True)
|
|
||||||
|
|
||||||
def test_router_info_create(self):
|
def test_router_info_create(self):
|
||||||
id = _uuid()
|
id = _uuid()
|
||||||
@ -1024,27 +1102,27 @@ class TestBasicRouterOperations(base.BaseTestCase):
|
|||||||
|
|
||||||
def test_router_deleted(self):
|
def test_router_deleted(self):
|
||||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||||
|
agent._queue = mock.Mock()
|
||||||
agent.router_deleted(None, FAKE_ID)
|
agent.router_deleted(None, FAKE_ID)
|
||||||
# verify that will set fullsync
|
agent._queue.add.assert_called_once()
|
||||||
self.assertIn(FAKE_ID, agent.removed_routers)
|
|
||||||
|
|
||||||
def test_routers_updated(self):
|
def test_routers_updated(self):
|
||||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||||
|
agent._queue = mock.Mock()
|
||||||
agent.routers_updated(None, [FAKE_ID])
|
agent.routers_updated(None, [FAKE_ID])
|
||||||
# verify that will set fullsync
|
agent._queue.add.assert_called_once()
|
||||||
self.assertIn(FAKE_ID, agent.updated_routers)
|
|
||||||
|
|
||||||
def test_removed_from_agent(self):
|
def test_removed_from_agent(self):
|
||||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||||
|
agent._queue = mock.Mock()
|
||||||
agent.router_removed_from_agent(None, {'router_id': FAKE_ID})
|
agent.router_removed_from_agent(None, {'router_id': FAKE_ID})
|
||||||
# verify that will set fullsync
|
agent._queue.add.assert_called_once()
|
||||||
self.assertIn(FAKE_ID, agent.removed_routers)
|
|
||||||
|
|
||||||
def test_added_to_agent(self):
|
def test_added_to_agent(self):
|
||||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||||
|
agent._queue = mock.Mock()
|
||||||
agent.router_added_to_agent(None, [FAKE_ID])
|
agent.router_added_to_agent(None, [FAKE_ID])
|
||||||
# verify that will set fullsync
|
agent._queue.add.assert_called_once()
|
||||||
self.assertIn(FAKE_ID, agent.updated_routers)
|
|
||||||
|
|
||||||
def test_process_router_delete(self):
|
def test_process_router_delete(self):
|
||||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||||
@ -1277,7 +1355,8 @@ class TestBasicRouterOperations(base.BaseTestCase):
|
|||||||
pm.reset_mock()
|
pm.reset_mock()
|
||||||
|
|
||||||
agent._destroy_router_namespace = mock.MagicMock()
|
agent._destroy_router_namespace = mock.MagicMock()
|
||||||
agent._cleanup_namespaces(router_list)
|
ns_list = agent._list_namespaces()
|
||||||
|
agent._cleanup_namespaces(ns_list, [r['id'] for r in router_list])
|
||||||
|
|
||||||
self.assertEqual(pm.disable.call_count, len(stale_namespace_list))
|
self.assertEqual(pm.disable.call_count, len(stale_namespace_list))
|
||||||
self.assertEqual(agent._destroy_router_namespace.call_count,
|
self.assertEqual(agent._destroy_router_namespace.call_count,
|
||||||
|
Loading…
Reference in New Issue
Block a user