Ensure that the status for a port is properly synchronized even when the port itself does not exist anymore in the backend, avoiding raising exceptions due to missing results in the NVP query. This patch also adjusts a comment and improves exception handling in _nvp_get_port_id Bug 1229331 Change-Id: Ibaa581006c994a543c1795dd3f1d50086583b953
603 lines
27 KiB
603 lines
27 KiB
# Copyright 2013 Nicira, Inc.
# All Rights Reserved
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import random
from neutron.common import constants
from neutron.common import exceptions
from neutron import context
from neutron.db import external_net_db
from neutron.db import l3_db
from neutron.db import models_v2
from neutron.openstack.common import jsonutils
from neutron.openstack.common import log
from neutron.openstack.common import loopingcall
from neutron.openstack.common import timeutils
from neutron.plugins.nicira.common import exceptions as nvp_exc
from neutron.plugins.nicira import NvpApiClient
from neutron.plugins.nicira import nvplib
LOG = log.getLogger(__name__)
class NvpCache(object):
"""A simple Cache for NVP resources.
Associates resource id with resource hash to rapidly identify
updated resources.
Each entry in the cache also stores the following information:
- changed: the resource in the cache has been altered following
an update or a delete
- hit: the resource has been visited during an update (and possibly
left unchanged)
- data: current resource data
- data_bk: backup of resource data prior to its removal
def __init__(self):
# Maps a uuid to the dict containing it
self._uuid_dict_mappings = {}
# Dicts for NVP cached resources
self._lswitches = {}
self._lswitchports = {}
self._lrouters = {}
def __getitem__(self, key):
# uuids are unique across the various types of resources
# TODO(salv-orlando): Avoid lookups over all dictionaries
# when retrieving items
# Fetch lswitches, lports, or lrouters
resources = self._uuid_dict_mappings[key]
return resources[key]
def _update_resources(self, resources, new_resources):
# Clear the 'changed' attribute for all items
for uuid, item in resources.items():
if item.pop('changed', None) and not item.get('data'):
# The item is not anymore in NVP, so delete it
del resources[uuid]
del self._uuid_dict_mappings[uuid]
def do_hash(item):
return hash(jsonutils.dumps(item))
# Parse new data and identify new, deleted, and updated resources
for item in new_resources:
item_id = item['uuid']
if resources.get(item_id):
new_hash = do_hash(item)
if new_hash != resources[item_id]['hash']:
resources[item_id]['hash'] = new_hash
resources[item_id]['changed'] = True
resources[item_id]['data_bk'] = (
resources[item_id]['data'] = item
# Mark the item as hit in any case
resources[item_id]['hit'] = True
resources[item_id] = {'hash': do_hash(item)}
resources[item_id]['hit'] = True
resources[item_id]['changed'] = True
resources[item_id]['data'] = item
# add a uuid to dict mapping for easy retrieval
# with __getitem__
self._uuid_dict_mappings[item_id] = resources
def _delete_resources(self, resources):
# Mark for removal all the elements which have not been visited.
# And clear the 'hit' attribute.
for to_delete in [k for (k, v) in resources.iteritems()
if not v.pop('hit', False)]:
resources[to_delete]['changed'] = True
resources[to_delete]['data_bk'] = (
resources[to_delete].pop('data', None))
def _get_resource_ids(self, resources, changed_only):
if changed_only:
return [k for (k, v) in resources.iteritems()
if v.get('changed')]
return resources.keys()
def get_lswitches(self, changed_only=False):
return self._get_resource_ids(self._lswitches, changed_only)
def get_lrouters(self, changed_only=False):
return self._get_resource_ids(self._lrouters, changed_only)
def get_lswitchports(self, changed_only=False):
return self._get_resource_ids(self._lswitchports, changed_only)
def update_lswitch(self, lswitch):
self._update_resources(self._lswitches, [lswitch])
def update_lrouter(self, lrouter):
self._update_resources(self._lrouters, [lrouter])
def update_lswitchport(self, lswitchport):
self._update_resources(self._lswitchports, [lswitchport])
def process_updates(self, lswitches=None,
lrouters=None, lswitchports=None):
self._update_resources(self._lswitches, lswitches)
self._update_resources(self._lrouters, lrouters)
self._update_resources(self._lswitchports, lswitchports)
return (self._get_resource_ids(self._lswitches, changed_only=True),
self._get_resource_ids(self._lrouters, changed_only=True),
self._get_resource_ids(self._lswitchports, changed_only=True))
def process_deletes(self):
return (self._get_resource_ids(self._lswitches, changed_only=True),
self._get_resource_ids(self._lrouters, changed_only=True),
self._get_resource_ids(self._lswitchports, changed_only=True))
class SyncParameters():
"""Defines attributes used by the synchronization procedure.
chunk_size: Actual chunk size
extra_chunk_size: Additional data to fetch because of chunk size
current_chunk: Counter of the current data chunk being synchronized
Page cursors: markers for the next resource to fetch.
'start' means page cursor unset for fetching 1st page
init_sync_performed: True if the initial synchronization concluded
def __init__(self, min_chunk_size):
self.chunk_size = min_chunk_size
self.extra_chunk_size = 0
self.current_chunk = 0
self.ls_cursor = 'start'
self.lr_cursor = 'start'
self.lp_cursor = 'start'
self.init_sync_performed = False
self.total_size = 0
def _start_loopingcall(min_chunk_size, state_sync_interval, func):
"""Start a loopingcall for the synchronization task."""
# Start a looping call to synchronize operational status
# for neutron resources
if not state_sync_interval:
# do not start the looping call if specified
# sync interval is 0
state_synchronizer = loopingcall.DynamicLoopingCall(
func, sp=SyncParameters(min_chunk_size))
return state_synchronizer
class NvpSynchronizer():
LS_URI = nvplib._build_uri_path(
nvplib.LSWITCH_RESOURCE, fields='uuid,tags,fabric_status',
LR_URI = nvplib._build_uri_path(
nvplib.LROUTER_RESOURCE, fields='uuid,tags,fabric_status',
LP_URI = nvplib._build_uri_path(
def __init__(self, plugin, cluster, state_sync_interval,
req_delay, min_chunk_size, max_rand_delay=0):
self._nvp_cache = NvpCache()
# Store parameters as instance members
# NOTE(salv-orlando): apologies if it looks java-ish
self._plugin = plugin
self._cluster = cluster
self._req_delay = req_delay
self._sync_interval = state_sync_interval
self._max_rand_delay = max_rand_delay
# Validate parameters
if self._sync_interval < self._req_delay:
err_msg = (_("Minimum request delay:%(req_delay)s must not "
"exceed synchronization interval:%(sync_interval)s") %
{'req_delay': self._req_delay,
'sync_interval': self._sync_interval})
raise nvp_exc.NvpPluginException(err_msg=err_msg)
# Backoff time in case of failures while fetching sync data
self._sync_backoff = 1
# Store the looping call in an instance variable to allow unit tests
# for controlling its lifecycle
self._sync_looping_call = _start_loopingcall(
min_chunk_size, state_sync_interval, self._synchronize_state)
def _get_tag_dict(self, tags):
return dict((tag.get('scope'), tag['tag']) for tag in tags)
def _update_neutron_object(self, context, neutron_data, status):
if status == neutron_data['status']:
# do nothing
with context.session.begin(subtransactions=True):
LOG.debug(_("Updating status for neutron resource %(q_id)s to: "
"%(status)s"), {'q_id': neutron_data['id'],
'status': status})
neutron_data['status'] = status
def synchronize_network(self, context, neutron_network_data,
"""Synchronize a Neutron network with its NVP counterpart.
This routine synchronizes a set of switches when a Neutron
network is mapped to multiple lswitches.
if not lswitches:
# Try to get logical switches from nvp
lswitches = nvplib.get_lswitches(
self._cluster, neutron_network_data['id'])
except exceptions.NetworkNotFound:
# TODO(salv-orlando): We should be catching
# NvpApiClient.ResourceNotFound here
# The logical switch was not found
LOG.warning(_("Logical switch for neutron network %s not "
"found on NVP."), neutron_network_data['id'])
lswitches = []
for lswitch in lswitches:
# By default assume things go wrong
status = constants.NET_STATUS_ERROR
# In most cases lswitches will contain a single element
for ls in lswitches:
if not ls:
# Logical switch was deleted
ls_status = ls['_relations']['LogicalSwitchStatus']
if not ls_status['fabric_status']:
status = constants.NET_STATUS_DOWN
# No switch was down or missing. Set status to ACTIVE unless
# there were no switches in the first place!
if lswitches:
status = constants.NET_STATUS_ACTIVE
# Update db object
self._update_neutron_object(context, neutron_network_data, status)
def _synchronize_lswitches(self, ctx, ls_uuids, scan_missing=False):
if not ls_uuids and not scan_missing:
neutron_net_ids = set()
neutron_nvp_mappings = {}
# TODO(salvatore-orlando): Deal with the case the tag
# has been tampered with
for ls_uuid in ls_uuids:
# If the lswitch has been deleted, get backup copy of data
lswitch = (self._nvp_cache[ls_uuid].get('data') or
tags = self._get_tag_dict(lswitch['tags'])
neutron_id = tags.get('neutron_net_id', ls_uuid)
neutron_nvp_mappings[neutron_id] = (
neutron_nvp_mappings.get(neutron_id, []) +
with ctx.session.begin(subtransactions=True):
# Fetch neutron networks from database
filters = {'router:external': [False]}
if not scan_missing:
filters['id'] = neutron_net_ids
# TODO(salv-orlando): Filter out external networks
for network in self._plugin._get_collection_query(
ctx, models_v2.Network, filters=filters):
lswitches = neutron_nvp_mappings.get(network['id'], [])
lswitches = [lswitch.get('data') for lswitch in lswitches]
self.synchronize_network(ctx, network, lswitches)
def synchronize_router(self, context, neutron_router_data,
"""Synchronize a neutron router with its NVP counterpart."""
if not lrouter:
# Try to get router from nvp
# This query will return the logical router status too
lrouter = nvplib.get_lrouter(
self._cluster, neutron_router_data['id'])
except exceptions.NotFound:
# NOTE(salv-orlando): We should be catching
# NvpApiClient.ResourceNotFound here
# The logical router was not found
LOG.warning(_("Logical router for neutron router %s not "
"found on NVP."), neutron_router_data['id'])
lrouter = None
# Update the cache
# Note(salv-orlando): It might worth adding a check to verify neutron
# resource tag in nvp entity matches a Neutron id.
# By default assume things go wrong
status = constants.NET_STATUS_ERROR
if lrouter:
lr_status = (lrouter['_relations']
status = (lr_status and
or constants.NET_STATUS_DOWN)
# Update db object
self._update_neutron_object(context, neutron_router_data, status)
def _synchronize_lrouters(self, ctx, lr_uuids, scan_missing=False):
if not lr_uuids and not scan_missing:
neutron_router_mappings = (
dict((lr_uuid, self._nvp_cache[lr_uuid]) for lr_uuid in lr_uuids))
with ctx.session.begin(subtransactions=True):
# Fetch neutron routers from database
filters = ({} if scan_missing else
{'id': neutron_router_mappings.keys()})
for router in self._plugin._get_collection_query(
ctx, l3_db.Router, filters=filters):
lrouter = neutron_router_mappings.get(router['id'])
ctx, router, lrouter and lrouter.get('data'))
def synchronize_port(self, context, neutron_port_data,
lswitchport=None, ext_networks=None):
"""Synchronize a Neutron port with its NVP counterpart."""
# Skip synchronization for ports on external networks
if not ext_networks:
ext_networks = [net['id'] for net in context.session.query(
(models_v2.Network.id ==
if neutron_port_data['network_id'] in ext_networks:
with context.session.begin(subtransactions=True):
neutron_port_data['status'] = constants.PORT_STATUS_ACTIVE
if not lswitchport:
# Try to get port from nvp
lp_uuid = self._plugin._nvp_get_port_id(
context, self._cluster, neutron_port_data)
if lp_uuid:
lswitchport = nvplib.get_port(
self._cluster, neutron_port_data['network_id'],
lp_uuid, relations='LogicalPortStatus')
except (exceptions.PortNotFoundOnNetwork):
# NOTE(salv-orlando): We should be catching
# NvpApiClient.ResourceNotFound here instead
# of PortNotFoundOnNetwork when the id exists but
# the logical switch port was not found
LOG.warning(_("Logical switch port for neutron port %s "
"not found on NVP."), neutron_port_data['id'])
lswitchport = None
# If lswitchport is not None, update the cache.
# It could be none if the port was deleted from the backend
if lswitchport:
# Note(salv-orlando): It might worth adding a check to verify neutron
# resource tag in nvp entity matches Neutron id.
# By default assume things go wrong
status = constants.PORT_STATUS_ERROR
if lswitchport:
lp_status = (lswitchport['_relations']
status = (lp_status and
or constants.PORT_STATUS_DOWN)
# Update db object
self._update_neutron_object(context, neutron_port_data, status)
def _synchronize_lswitchports(self, ctx, lp_uuids, scan_missing=False):
if not lp_uuids and not scan_missing:
# Find Neutron port id by tag - the tag is already
# loaded in memory, no reason for doing a db query
# TODO(salvatore-orlando): Deal with the case the tag
# has been tampered with
neutron_port_mappings = {}
for lp_uuid in lp_uuids:
lport = (self._nvp_cache[lp_uuid].get('data') or
tags = self._get_tag_dict(lport['tags'])
neutron_port_id = tags.get('q_port_id')
if neutron_port_id:
neutron_port_mappings[neutron_port_id] = (
with ctx.session.begin(subtransactions=True):
# Fetch neutron ports from database
# At the first sync we need to fetch all ports
filters = ({} if scan_missing else
{'id': neutron_port_mappings.keys()})
# TODO(salv-orlando): Work out a solution for avoiding
# this query
ext_nets = [net['id'] for net in ctx.session.query(
(models_v2.Network.id ==
for port in self._plugin._get_collection_query(
ctx, models_v2.Port, filters=filters):
lswitchport = neutron_port_mappings.get(port['id'])
ctx, port, lswitchport and lswitchport.get('data'),
def _get_chunk_size(self, sp):
# NOTE(salv-orlando): Try to use __future__ for this routine only?
ratio = ((float(sp.total_size) / float(sp.chunk_size)) /
(float(self._sync_interval) / float(self._req_delay)))
new_size = max(1.0, ratio) * float(sp.chunk_size)
return int(new_size) + (new_size - int(new_size) > 0)
def _fetch_data(self, uri, cursor, page_size):
# If not cursor there is nothing to retrieve
if cursor:
if cursor == 'start':
cursor = None
# Chunk size tuning might, in some conditions, make it larger
# than 5,000, which is the maximum page size allowed by the NVP
# API. In this case the request should be split in multiple
# requests. This is not ideal, and therefore a log warning will
# be emitted.
num_requests = page_size / (nvplib.MAX_PAGE_SIZE + 1) + 1
if num_requests > 1:
LOG.warn(_("Requested page size is %(cur_chunk_size)d."
"It might be necessary to do %(num_requests)d "
"round-trips to NVP for fetching data. Please "
"tune sync parameters to ensure chunk size "
"is less than %(max_page_size)d"),
{'cur_chunk_size': page_size,
'num_requests': num_requests,
'max_page_size': nvplib.MAX_PAGE_SIZE})
# Only the first request might return the total size,
# subsequent requests will definetely not
results, cursor, total_size = nvplib.get_single_query_page(
uri, self._cluster, cursor,
min(page_size, nvplib.MAX_PAGE_SIZE))
for _req in range(0, num_requests - 1):
# If no cursor is returned break the cycle as there is no
# actual need to perform multiple requests (all fetched)
# This happens when the overall size of resources exceeds
# the maximum page size, but the number for each single
# resource type is below this threshold
if not cursor:
req_results, cursor = nvplib.get_single_query_page(
uri, self._cluster, cursor,
min(page_size, nvplib.MAX_PAGE_SIZE))[:2]
# reset cursor before returning if we queried just to
# know the number of entities
return results, cursor if page_size else 'start', total_size
return [], cursor, None
def _fetch_nvp_data_chunk(self, sp):
base_chunk_size = sp.chunk_size
chunk_size = base_chunk_size + sp.extra_chunk_size
LOG.info(_("Fetching up to %s resources "
"from NVP backend"), chunk_size)
fetched = ls_count = lr_count = lp_count = 0
lswitches = lrouters = lswitchports = []
if sp.ls_cursor or sp.ls_cursor == 'start':
(lswitches, sp.ls_cursor, ls_count) = self._fetch_data(
self.LS_URI, sp.ls_cursor, chunk_size)
fetched = len(lswitches)
if fetched < chunk_size and sp.lr_cursor or sp.lr_cursor == 'start':
(lrouters, sp.lr_cursor, lr_count) = self._fetch_data(
self.LR_URI, sp.lr_cursor, max(chunk_size - fetched, 0))
fetched += len(lrouters)
if fetched < chunk_size and sp.lp_cursor or sp.lp_cursor == 'start':
(lswitchports, sp.lp_cursor, lp_count) = self._fetch_data(
self.LP_URI, sp.lp_cursor, max(chunk_size - fetched, 0))
fetched += len(lswitchports)
if sp.current_chunk == 0:
# No cursors were provided. Then it must be possible to
# calculate the total amount of data to fetch
sp.total_size = ls_count + lr_count + lp_count
LOG.debug(_("Total data size: %d"), sp.total_size)
sp.chunk_size = self._get_chunk_size(sp)
# Calculate chunk size adjustment
sp.extra_chunk_size = sp.chunk_size - base_chunk_size
LOG.debug(_("Fetched %(num_lswitches)d logical switches, "
"%(num_lswitchports)d logical switch ports,"
"%(num_lrouters)d logical routers"),
{'num_lswitches': len(lswitches),
'num_lswitchports': len(lswitchports),
'num_lrouters': len(lrouters)})
return (lswitches, lrouters, lswitchports)
def _synchronize_state(self, sp):
# If the plugin has been destroyed, stop the LoopingCall
if not self._plugin:
raise loopingcall.LoopingCallDone
start = timeutils.utcnow()
# Reset page cursor variables if necessary
if sp.current_chunk == 0:
sp.ls_cursor = sp.lr_cursor = sp.lp_cursor = 'start'
LOG.info(_("Running state synchronization task. Chunk: %s"),
# Fetch chunk_size data from NVP
(lswitches, lrouters, lswitchports) = (
except (NvpApiClient.RequestTimeout, NvpApiClient.NvpApiException):
sleep_interval = self._sync_backoff
# Cap max back off to 64 seconds
self._sync_backoff = min(self._sync_backoff * 2, 64)
LOG.exception(_("An error occured while communicating with "
"NVP backend. Will retry synchronization "
"in %d seconds"), sleep_interval)
return sleep_interval
LOG.debug(_("Time elapsed querying NVP: %s"),
timeutils.utcnow() - start)
if sp.total_size:
num_chunks = ((sp.total_size / sp.chunk_size) +
(sp.total_size % sp.chunk_size != 0))
num_chunks = 1
LOG.debug(_("Number of chunks: %d"), num_chunks)
# Find objects which have changed on NVP side and need
# to be synchronized
(ls_uuids, lr_uuids, lp_uuids) = self._nvp_cache.process_updates(
lswitches, lrouters, lswitchports)
# Process removed objects only at the last chunk
scan_missing = (sp.current_chunk == num_chunks - 1 and
not sp.init_sync_performed)
if sp.current_chunk == num_chunks - 1:
ls_uuids = self._nvp_cache.get_lswitches(
changed_only=not scan_missing)
lr_uuids = self._nvp_cache.get_lrouters(
changed_only=not scan_missing)
lp_uuids = self._nvp_cache.get_lswitchports(
changed_only=not scan_missing)
LOG.debug(_("Time elapsed hashing data: %s"),
timeutils.utcnow() - start)
# Get an admin context
ctx = context.get_admin_context()
# Synchronize with database
with ctx.session.begin(subtransactions=True):
self._synchronize_lswitches(ctx, ls_uuids,
self._synchronize_lrouters(ctx, lr_uuids,
self._synchronize_lswitchports(ctx, lp_uuids,
# Increase chunk counter
LOG.info(_("Synchronization for chunk %(chunk_num)d of "
"%(total_chunks)d performed"),
{'chunk_num': sp.current_chunk + 1,
'total_chunks': num_chunks})
sp.current_chunk = (sp.current_chunk + 1) % num_chunks
added_delay = 0
if sp.current_chunk == 0:
# Ensure init_sync_performed is True
if not sp.init_sync_performed:
sp.init_sync_performed = True
# Add additional random delay
added_delay = random.randint(0, self._max_rand_delay)
LOG.debug(_("Time elapsed at end of sync: %s"),
timeutils.utcnow() - start)
return self._sync_interval / num_chunks + added_delay