5c2f54cc15
This new flag will prevent updates on single resources, which can happen asynchronously to the main synchronization thread, from processing the 'changed' flag for items in the cache and delete items not present anymore in NSX. This will avoid failures seen in CI runs where the sync thread failed because of a KeyError because some items where concurrently removed from the NSX cache. Change-Id: I6a11b8dabb1e9481dc807b5fbba17c9027dad5f2 Closes-Bug: #1329650
675 lines
30 KiB
Python
675 lines
30 KiB
Python
# Copyright 2013 VMware, Inc.
|
|
# All Rights Reserved
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import random
|
|
|
|
from neutron.common import constants
|
|
from neutron.common import exceptions
|
|
from neutron import context
|
|
from neutron.db import external_net_db
|
|
from neutron.db import l3_db
|
|
from neutron.db import models_v2
|
|
from neutron.extensions import l3
|
|
from neutron.openstack.common import jsonutils
|
|
from neutron.openstack.common import log
|
|
from neutron.openstack.common import loopingcall
|
|
from neutron.openstack.common import timeutils
|
|
from neutron.plugins.vmware.api_client import exception as api_exc
|
|
from neutron.plugins.vmware.common import exceptions as nsx_exc
|
|
from neutron.plugins.vmware.common import nsx_utils
|
|
from neutron.plugins.vmware import nsxlib
|
|
from neutron.plugins.vmware.nsxlib import router as routerlib
|
|
from neutron.plugins.vmware.nsxlib import switch as switchlib
|
|
|
|
# Maximum page size for a single request
|
|
# NOTE(salv-orlando): This might become a version-dependent map should the
|
|
# limit be raised in future versions
|
|
MAX_PAGE_SIZE = 5000
|
|
|
|
LOG = log.getLogger(__name__)
|
|
|
|
|
|
class NsxCache(object):
|
|
"""A simple Cache for NSX resources.
|
|
|
|
Associates resource id with resource hash to rapidly identify
|
|
updated resources.
|
|
Each entry in the cache also stores the following information:
|
|
- changed: the resource in the cache has been altered following
|
|
an update or a delete
|
|
- hit: the resource has been visited during an update (and possibly
|
|
left unchanged)
|
|
- data: current resource data
|
|
- data_bk: backup of resource data prior to its removal
|
|
"""
|
|
|
|
def __init__(self):
|
|
# Maps a uuid to the dict containing it
|
|
self._uuid_dict_mappings = {}
|
|
# Dicts for NSX cached resources
|
|
self._lswitches = {}
|
|
self._lswitchports = {}
|
|
self._lrouters = {}
|
|
|
|
def __getitem__(self, key):
|
|
# uuids are unique across the various types of resources
|
|
# TODO(salv-orlando): Avoid lookups over all dictionaries
|
|
# when retrieving items
|
|
# Fetch lswitches, lports, or lrouters
|
|
resources = self._uuid_dict_mappings[key]
|
|
return resources[key]
|
|
|
|
def _clear_changed_flag_and_remove_from_cache(self, resources):
|
|
# Clear the 'changed' attribute for all items
|
|
for uuid, item in resources.items():
|
|
if item.pop('changed', None) and not item.get('data'):
|
|
# The item is not anymore in NSX, so delete it
|
|
del resources[uuid]
|
|
del self._uuid_dict_mappings[uuid]
|
|
LOG.debug("Removed item %s from NSX object cache", uuid)
|
|
|
|
def _update_resources(self, resources, new_resources, clear_changed=True):
|
|
if clear_changed:
|
|
self._clear_changed_flag_and_remove_from_cache(resources)
|
|
|
|
def do_hash(item):
|
|
return hash(jsonutils.dumps(item))
|
|
|
|
# Parse new data and identify new, deleted, and updated resources
|
|
for item in new_resources:
|
|
item_id = item['uuid']
|
|
if resources.get(item_id):
|
|
new_hash = do_hash(item)
|
|
if new_hash != resources[item_id]['hash']:
|
|
resources[item_id]['hash'] = new_hash
|
|
resources[item_id]['changed'] = True
|
|
resources[item_id]['data_bk'] = (
|
|
resources[item_id]['data'])
|
|
resources[item_id]['data'] = item
|
|
# Mark the item as hit in any case
|
|
resources[item_id]['hit'] = True
|
|
LOG.debug("Updating item %s in NSX object cache", item_id)
|
|
else:
|
|
resources[item_id] = {'hash': do_hash(item)}
|
|
resources[item_id]['hit'] = True
|
|
resources[item_id]['changed'] = True
|
|
resources[item_id]['data'] = item
|
|
# add a uuid to dict mapping for easy retrieval
|
|
# with __getitem__
|
|
self._uuid_dict_mappings[item_id] = resources
|
|
LOG.debug("Added item %s to NSX object cache", item_id)
|
|
|
|
def _delete_resources(self, resources):
|
|
# Mark for removal all the elements which have not been visited.
|
|
# And clear the 'hit' attribute.
|
|
for to_delete in [k for (k, v) in resources.iteritems()
|
|
if not v.pop('hit', False)]:
|
|
resources[to_delete]['changed'] = True
|
|
resources[to_delete]['data_bk'] = (
|
|
resources[to_delete].pop('data', None))
|
|
|
|
def _get_resource_ids(self, resources, changed_only):
|
|
if changed_only:
|
|
return [k for (k, v) in resources.iteritems()
|
|
if v.get('changed')]
|
|
return resources.keys()
|
|
|
|
def get_lswitches(self, changed_only=False):
|
|
return self._get_resource_ids(self._lswitches, changed_only)
|
|
|
|
def get_lrouters(self, changed_only=False):
|
|
return self._get_resource_ids(self._lrouters, changed_only)
|
|
|
|
def get_lswitchports(self, changed_only=False):
|
|
return self._get_resource_ids(self._lswitchports, changed_only)
|
|
|
|
def update_lswitch(self, lswitch):
|
|
self._update_resources(self._lswitches, [lswitch], clear_changed=False)
|
|
|
|
def update_lrouter(self, lrouter):
|
|
self._update_resources(self._lrouters, [lrouter], clear_changed=False)
|
|
|
|
def update_lswitchport(self, lswitchport):
|
|
self._update_resources(self._lswitchports, [lswitchport],
|
|
clear_changed=False)
|
|
|
|
def process_updates(self, lswitches=None,
|
|
lrouters=None, lswitchports=None):
|
|
self._update_resources(self._lswitches, lswitches)
|
|
self._update_resources(self._lrouters, lrouters)
|
|
self._update_resources(self._lswitchports, lswitchports)
|
|
return (self._get_resource_ids(self._lswitches, changed_only=True),
|
|
self._get_resource_ids(self._lrouters, changed_only=True),
|
|
self._get_resource_ids(self._lswitchports, changed_only=True))
|
|
|
|
def process_deletes(self):
|
|
self._delete_resources(self._lswitches)
|
|
self._delete_resources(self._lrouters)
|
|
self._delete_resources(self._lswitchports)
|
|
return (self._get_resource_ids(self._lswitches, changed_only=True),
|
|
self._get_resource_ids(self._lrouters, changed_only=True),
|
|
self._get_resource_ids(self._lswitchports, changed_only=True))
|
|
|
|
|
|
class SyncParameters():
|
|
"""Defines attributes used by the synchronization procedure.
|
|
|
|
chunk_size: Actual chunk size
|
|
extra_chunk_size: Additional data to fetch because of chunk size
|
|
adjustment
|
|
current_chunk: Counter of the current data chunk being synchronized
|
|
Page cursors: markers for the next resource to fetch.
|
|
'start' means page cursor unset for fetching 1st page
|
|
init_sync_performed: True if the initial synchronization concluded
|
|
"""
|
|
|
|
def __init__(self, min_chunk_size):
|
|
self.chunk_size = min_chunk_size
|
|
self.extra_chunk_size = 0
|
|
self.current_chunk = 0
|
|
self.ls_cursor = 'start'
|
|
self.lr_cursor = 'start'
|
|
self.lp_cursor = 'start'
|
|
self.init_sync_performed = False
|
|
self.total_size = 0
|
|
|
|
|
|
def _start_loopingcall(min_chunk_size, state_sync_interval, func):
|
|
"""Start a loopingcall for the synchronization task."""
|
|
# Start a looping call to synchronize operational status
|
|
# for neutron resources
|
|
if not state_sync_interval:
|
|
# do not start the looping call if specified
|
|
# sync interval is 0
|
|
return
|
|
state_synchronizer = loopingcall.DynamicLoopingCall(
|
|
func, sp=SyncParameters(min_chunk_size))
|
|
state_synchronizer.start(
|
|
periodic_interval_max=state_sync_interval)
|
|
return state_synchronizer
|
|
|
|
|
|
class NsxSynchronizer():
|
|
|
|
LS_URI = nsxlib._build_uri_path(
|
|
switchlib.LSWITCH_RESOURCE, fields='uuid,tags,fabric_status',
|
|
relations='LogicalSwitchStatus')
|
|
LR_URI = nsxlib._build_uri_path(
|
|
routerlib.LROUTER_RESOURCE, fields='uuid,tags,fabric_status',
|
|
relations='LogicalRouterStatus')
|
|
LP_URI = nsxlib._build_uri_path(
|
|
switchlib.LSWITCHPORT_RESOURCE,
|
|
parent_resource_id='*',
|
|
fields='uuid,tags,fabric_status_up',
|
|
relations='LogicalPortStatus')
|
|
|
|
def __init__(self, plugin, cluster, state_sync_interval,
|
|
req_delay, min_chunk_size, max_rand_delay=0):
|
|
random.seed()
|
|
self._nsx_cache = NsxCache()
|
|
# Store parameters as instance members
|
|
# NOTE(salv-orlando): apologies if it looks java-ish
|
|
self._plugin = plugin
|
|
self._cluster = cluster
|
|
self._req_delay = req_delay
|
|
self._sync_interval = state_sync_interval
|
|
self._max_rand_delay = max_rand_delay
|
|
# Validate parameters
|
|
if self._sync_interval < self._req_delay:
|
|
err_msg = (_("Minimum request delay:%(req_delay)s must not "
|
|
"exceed synchronization interval:%(sync_interval)s") %
|
|
{'req_delay': self._req_delay,
|
|
'sync_interval': self._sync_interval})
|
|
LOG.error(err_msg)
|
|
raise nsx_exc.NsxPluginException(err_msg=err_msg)
|
|
# Backoff time in case of failures while fetching sync data
|
|
self._sync_backoff = 1
|
|
# Store the looping call in an instance variable to allow unit tests
|
|
# for controlling its lifecycle
|
|
self._sync_looping_call = _start_loopingcall(
|
|
min_chunk_size, state_sync_interval, self._synchronize_state)
|
|
|
|
def _get_tag_dict(self, tags):
|
|
return dict((tag.get('scope'), tag['tag']) for tag in tags)
|
|
|
|
def synchronize_network(self, context, neutron_network_data,
|
|
lswitches=None):
|
|
"""Synchronize a Neutron network with its NSX counterpart.
|
|
|
|
This routine synchronizes a set of switches when a Neutron
|
|
network is mapped to multiple lswitches.
|
|
"""
|
|
if not lswitches:
|
|
# Try to get logical switches from nsx
|
|
try:
|
|
lswitches = nsx_utils.fetch_nsx_switches(
|
|
context.session, self._cluster,
|
|
neutron_network_data['id'])
|
|
except exceptions.NetworkNotFound:
|
|
# TODO(salv-orlando): We should be catching
|
|
# api_exc.ResourceNotFound here
|
|
# The logical switch was not found
|
|
LOG.warning(_("Logical switch for neutron network %s not "
|
|
"found on NSX."), neutron_network_data['id'])
|
|
lswitches = []
|
|
else:
|
|
for lswitch in lswitches:
|
|
self._nsx_cache.update_lswitch(lswitch)
|
|
# By default assume things go wrong
|
|
status = constants.NET_STATUS_ERROR
|
|
# In most cases lswitches will contain a single element
|
|
for ls in lswitches:
|
|
if not ls:
|
|
# Logical switch was deleted
|
|
break
|
|
ls_status = ls['_relations']['LogicalSwitchStatus']
|
|
if not ls_status['fabric_status']:
|
|
status = constants.NET_STATUS_DOWN
|
|
break
|
|
else:
|
|
# No switch was down or missing. Set status to ACTIVE unless
|
|
# there were no switches in the first place!
|
|
if lswitches:
|
|
status = constants.NET_STATUS_ACTIVE
|
|
# Update db object
|
|
if status == neutron_network_data['status']:
|
|
# do nothing
|
|
return
|
|
|
|
with context.session.begin(subtransactions=True):
|
|
try:
|
|
network = self._plugin._get_network(context,
|
|
neutron_network_data['id'])
|
|
except exceptions.NetworkNotFound:
|
|
pass
|
|
else:
|
|
network.status = status
|
|
LOG.debug(_("Updating status for neutron resource %(q_id)s to:"
|
|
" %(status)s"),
|
|
{'q_id': neutron_network_data['id'],
|
|
'status': status})
|
|
|
|
def _synchronize_lswitches(self, ctx, ls_uuids, scan_missing=False):
|
|
if not ls_uuids and not scan_missing:
|
|
return
|
|
neutron_net_ids = set()
|
|
neutron_nsx_mappings = {}
|
|
# TODO(salvatore-orlando): Deal with the case the tag
|
|
# has been tampered with
|
|
for ls_uuid in ls_uuids:
|
|
# If the lswitch has been deleted, get backup copy of data
|
|
lswitch = (self._nsx_cache[ls_uuid].get('data') or
|
|
self._nsx_cache[ls_uuid].get('data_bk'))
|
|
tags = self._get_tag_dict(lswitch['tags'])
|
|
neutron_id = tags.get('quantum_net_id')
|
|
neutron_net_ids.add(neutron_id)
|
|
neutron_nsx_mappings[neutron_id] = (
|
|
neutron_nsx_mappings.get(neutron_id, []) +
|
|
[self._nsx_cache[ls_uuid]])
|
|
# Fetch neutron networks from database
|
|
filters = {'router:external': [False]}
|
|
if not scan_missing:
|
|
filters['id'] = neutron_net_ids
|
|
|
|
networks = self._plugin._get_collection(
|
|
ctx, models_v2.Network, self._plugin._make_network_dict,
|
|
filters=filters)
|
|
|
|
for network in networks:
|
|
lswitches = neutron_nsx_mappings.get(network['id'], [])
|
|
lswitches = [lswitch.get('data') for lswitch in lswitches]
|
|
self.synchronize_network(ctx, network, lswitches)
|
|
|
|
def synchronize_router(self, context, neutron_router_data,
|
|
lrouter=None):
|
|
"""Synchronize a neutron router with its NSX counterpart."""
|
|
if not lrouter:
|
|
# Try to get router from nsx
|
|
try:
|
|
# This query will return the logical router status too
|
|
nsx_router_id = nsx_utils.get_nsx_router_id(
|
|
context.session, self._cluster, neutron_router_data['id'])
|
|
if nsx_router_id:
|
|
lrouter = routerlib.get_lrouter(
|
|
self._cluster, nsx_router_id)
|
|
except exceptions.NotFound:
|
|
# NOTE(salv-orlando): We should be catching
|
|
# api_exc.ResourceNotFound here
|
|
# The logical router was not found
|
|
LOG.warning(_("Logical router for neutron router %s not "
|
|
"found on NSX."), neutron_router_data['id'])
|
|
if lrouter:
|
|
# Update the cache
|
|
self._nsx_cache.update_lrouter(lrouter)
|
|
|
|
# Note(salv-orlando): It might worth adding a check to verify neutron
|
|
# resource tag in nsx entity matches a Neutron id.
|
|
# By default assume things go wrong
|
|
status = constants.NET_STATUS_ERROR
|
|
if lrouter:
|
|
lr_status = (lrouter['_relations']
|
|
['LogicalRouterStatus']
|
|
['fabric_status'])
|
|
status = (lr_status and
|
|
constants.NET_STATUS_ACTIVE
|
|
or constants.NET_STATUS_DOWN)
|
|
# Update db object
|
|
if status == neutron_router_data['status']:
|
|
# do nothing
|
|
return
|
|
|
|
with context.session.begin(subtransactions=True):
|
|
try:
|
|
router = self._plugin._get_router(context,
|
|
neutron_router_data['id'])
|
|
except l3.RouterNotFound:
|
|
pass
|
|
else:
|
|
router.status = status
|
|
LOG.debug(_("Updating status for neutron resource %(q_id)s to:"
|
|
" %(status)s"),
|
|
{'q_id': neutron_router_data['id'],
|
|
'status': status})
|
|
|
|
def _synchronize_lrouters(self, ctx, lr_uuids, scan_missing=False):
|
|
if not lr_uuids and not scan_missing:
|
|
return
|
|
# TODO(salvatore-orlando): Deal with the case the tag
|
|
# has been tampered with
|
|
neutron_router_mappings = {}
|
|
for lr_uuid in lr_uuids:
|
|
lrouter = (self._nsx_cache[lr_uuid].get('data') or
|
|
self._nsx_cache[lr_uuid].get('data_bk'))
|
|
tags = self._get_tag_dict(lrouter['tags'])
|
|
neutron_router_id = tags.get('q_router_id')
|
|
if neutron_router_id:
|
|
neutron_router_mappings[neutron_router_id] = (
|
|
self._nsx_cache[lr_uuid])
|
|
else:
|
|
LOG.warn(_("Unable to find Neutron router id for "
|
|
"NSX logical router: %s"), lr_uuid)
|
|
# Fetch neutron routers from database
|
|
filters = ({} if scan_missing else
|
|
{'id': neutron_router_mappings.keys()})
|
|
routers = self._plugin._get_collection(
|
|
ctx, l3_db.Router, self._plugin._make_router_dict,
|
|
filters=filters)
|
|
for router in routers:
|
|
lrouter = neutron_router_mappings.get(router['id'])
|
|
self.synchronize_router(
|
|
ctx, router, lrouter and lrouter.get('data'))
|
|
|
|
def synchronize_port(self, context, neutron_port_data,
|
|
lswitchport=None, ext_networks=None):
|
|
"""Synchronize a Neutron port with its NSX counterpart."""
|
|
# Skip synchronization for ports on external networks
|
|
if not ext_networks:
|
|
ext_networks = [net['id'] for net in context.session.query(
|
|
models_v2.Network).join(
|
|
external_net_db.ExternalNetwork,
|
|
(models_v2.Network.id ==
|
|
external_net_db.ExternalNetwork.network_id))]
|
|
if neutron_port_data['network_id'] in ext_networks:
|
|
with context.session.begin(subtransactions=True):
|
|
neutron_port_data['status'] = constants.PORT_STATUS_ACTIVE
|
|
return
|
|
|
|
if not lswitchport:
|
|
# Try to get port from nsx
|
|
try:
|
|
ls_uuid, lp_uuid = nsx_utils.get_nsx_switch_and_port_id(
|
|
context.session, self._cluster, neutron_port_data['id'])
|
|
if lp_uuid:
|
|
lswitchport = switchlib.get_port(
|
|
self._cluster, ls_uuid, lp_uuid,
|
|
relations='LogicalPortStatus')
|
|
except (exceptions.PortNotFoundOnNetwork):
|
|
# NOTE(salv-orlando): We should be catching
|
|
# api_exc.ResourceNotFound here instead
|
|
# of PortNotFoundOnNetwork when the id exists but
|
|
# the logical switch port was not found
|
|
LOG.warning(_("Logical switch port for neutron port %s "
|
|
"not found on NSX."), neutron_port_data['id'])
|
|
lswitchport = None
|
|
else:
|
|
# If lswitchport is not None, update the cache.
|
|
# It could be none if the port was deleted from the backend
|
|
if lswitchport:
|
|
self._nsx_cache.update_lswitchport(lswitchport)
|
|
# Note(salv-orlando): It might worth adding a check to verify neutron
|
|
# resource tag in nsx entity matches Neutron id.
|
|
# By default assume things go wrong
|
|
status = constants.PORT_STATUS_ERROR
|
|
if lswitchport:
|
|
lp_status = (lswitchport['_relations']
|
|
['LogicalPortStatus']
|
|
['fabric_status_up'])
|
|
status = (lp_status and
|
|
constants.PORT_STATUS_ACTIVE
|
|
or constants.PORT_STATUS_DOWN)
|
|
|
|
# Update db object
|
|
if status == neutron_port_data['status']:
|
|
# do nothing
|
|
return
|
|
|
|
with context.session.begin(subtransactions=True):
|
|
try:
|
|
port = self._plugin._get_port(context,
|
|
neutron_port_data['id'])
|
|
except exceptions.PortNotFound:
|
|
pass
|
|
else:
|
|
port.status = status
|
|
LOG.debug(_("Updating status for neutron resource %(q_id)s to:"
|
|
" %(status)s"),
|
|
{'q_id': neutron_port_data['id'],
|
|
'status': status})
|
|
|
|
def _synchronize_lswitchports(self, ctx, lp_uuids, scan_missing=False):
|
|
if not lp_uuids and not scan_missing:
|
|
return
|
|
# Find Neutron port id by tag - the tag is already
|
|
# loaded in memory, no reason for doing a db query
|
|
# TODO(salvatore-orlando): Deal with the case the tag
|
|
# has been tampered with
|
|
neutron_port_mappings = {}
|
|
for lp_uuid in lp_uuids:
|
|
lport = (self._nsx_cache[lp_uuid].get('data') or
|
|
self._nsx_cache[lp_uuid].get('data_bk'))
|
|
tags = self._get_tag_dict(lport['tags'])
|
|
neutron_port_id = tags.get('q_port_id')
|
|
if neutron_port_id:
|
|
neutron_port_mappings[neutron_port_id] = (
|
|
self._nsx_cache[lp_uuid])
|
|
# Fetch neutron ports from database
|
|
# At the first sync we need to fetch all ports
|
|
filters = ({} if scan_missing else
|
|
{'id': neutron_port_mappings.keys()})
|
|
# TODO(salv-orlando): Work out a solution for avoiding
|
|
# this query
|
|
ext_nets = [net['id'] for net in ctx.session.query(
|
|
models_v2.Network).join(
|
|
external_net_db.ExternalNetwork,
|
|
(models_v2.Network.id ==
|
|
external_net_db.ExternalNetwork.network_id))]
|
|
ports = self._plugin._get_collection(
|
|
ctx, models_v2.Port, self._plugin._make_port_dict,
|
|
filters=filters)
|
|
for port in ports:
|
|
lswitchport = neutron_port_mappings.get(port['id'])
|
|
self.synchronize_port(
|
|
ctx, port, lswitchport and lswitchport.get('data'),
|
|
ext_networks=ext_nets)
|
|
|
|
def _get_chunk_size(self, sp):
|
|
# NOTE(salv-orlando): Try to use __future__ for this routine only?
|
|
ratio = ((float(sp.total_size) / float(sp.chunk_size)) /
|
|
(float(self._sync_interval) / float(self._req_delay)))
|
|
new_size = max(1.0, ratio) * float(sp.chunk_size)
|
|
return int(new_size) + (new_size - int(new_size) > 0)
|
|
|
|
def _fetch_data(self, uri, cursor, page_size):
|
|
# If not cursor there is nothing to retrieve
|
|
if cursor:
|
|
if cursor == 'start':
|
|
cursor = None
|
|
# Chunk size tuning might, in some conditions, make it larger
|
|
# than 5,000, which is the maximum page size allowed by the NSX
|
|
# API. In this case the request should be split in multiple
|
|
# requests. This is not ideal, and therefore a log warning will
|
|
# be emitted.
|
|
num_requests = page_size / (MAX_PAGE_SIZE + 1) + 1
|
|
if num_requests > 1:
|
|
LOG.warn(_("Requested page size is %(cur_chunk_size)d."
|
|
"It might be necessary to do %(num_requests)d "
|
|
"round-trips to NSX for fetching data. Please "
|
|
"tune sync parameters to ensure chunk size "
|
|
"is less than %(max_page_size)d"),
|
|
{'cur_chunk_size': page_size,
|
|
'num_requests': num_requests,
|
|
'max_page_size': MAX_PAGE_SIZE})
|
|
# Only the first request might return the total size,
|
|
# subsequent requests will definetely not
|
|
results, cursor, total_size = nsxlib.get_single_query_page(
|
|
uri, self._cluster, cursor,
|
|
min(page_size, MAX_PAGE_SIZE))
|
|
for _req in range(num_requests - 1):
|
|
# If no cursor is returned break the cycle as there is no
|
|
# actual need to perform multiple requests (all fetched)
|
|
# This happens when the overall size of resources exceeds
|
|
# the maximum page size, but the number for each single
|
|
# resource type is below this threshold
|
|
if not cursor:
|
|
break
|
|
req_results, cursor = nsxlib.get_single_query_page(
|
|
uri, self._cluster, cursor,
|
|
min(page_size, MAX_PAGE_SIZE))[:2]
|
|
results.extend(req_results)
|
|
# reset cursor before returning if we queried just to
|
|
# know the number of entities
|
|
return results, cursor if page_size else 'start', total_size
|
|
return [], cursor, None
|
|
|
|
def _fetch_nsx_data_chunk(self, sp):
|
|
base_chunk_size = sp.chunk_size
|
|
chunk_size = base_chunk_size + sp.extra_chunk_size
|
|
LOG.info(_("Fetching up to %s resources "
|
|
"from NSX backend"), chunk_size)
|
|
fetched = ls_count = lr_count = lp_count = 0
|
|
lswitches = lrouters = lswitchports = []
|
|
if sp.ls_cursor or sp.ls_cursor == 'start':
|
|
(lswitches, sp.ls_cursor, ls_count) = self._fetch_data(
|
|
self.LS_URI, sp.ls_cursor, chunk_size)
|
|
fetched = len(lswitches)
|
|
if fetched < chunk_size and sp.lr_cursor or sp.lr_cursor == 'start':
|
|
(lrouters, sp.lr_cursor, lr_count) = self._fetch_data(
|
|
self.LR_URI, sp.lr_cursor, max(chunk_size - fetched, 0))
|
|
fetched += len(lrouters)
|
|
if fetched < chunk_size and sp.lp_cursor or sp.lp_cursor == 'start':
|
|
(lswitchports, sp.lp_cursor, lp_count) = self._fetch_data(
|
|
self.LP_URI, sp.lp_cursor, max(chunk_size - fetched, 0))
|
|
fetched += len(lswitchports)
|
|
if sp.current_chunk == 0:
|
|
# No cursors were provided. Then it must be possible to
|
|
# calculate the total amount of data to fetch
|
|
sp.total_size = ls_count + lr_count + lp_count
|
|
LOG.debug(_("Total data size: %d"), sp.total_size)
|
|
sp.chunk_size = self._get_chunk_size(sp)
|
|
# Calculate chunk size adjustment
|
|
sp.extra_chunk_size = sp.chunk_size - base_chunk_size
|
|
LOG.debug(_("Fetched %(num_lswitches)d logical switches, "
|
|
"%(num_lswitchports)d logical switch ports,"
|
|
"%(num_lrouters)d logical routers"),
|
|
{'num_lswitches': len(lswitches),
|
|
'num_lswitchports': len(lswitchports),
|
|
'num_lrouters': len(lrouters)})
|
|
return (lswitches, lrouters, lswitchports)
|
|
|
|
def _synchronize_state(self, sp):
|
|
# If the plugin has been destroyed, stop the LoopingCall
|
|
if not self._plugin:
|
|
raise loopingcall.LoopingCallDone
|
|
start = timeutils.utcnow()
|
|
# Reset page cursor variables if necessary
|
|
if sp.current_chunk == 0:
|
|
sp.ls_cursor = sp.lr_cursor = sp.lp_cursor = 'start'
|
|
LOG.info(_("Running state synchronization task. Chunk: %s"),
|
|
sp.current_chunk)
|
|
# Fetch chunk_size data from NSX
|
|
try:
|
|
(lswitches, lrouters, lswitchports) = (
|
|
self._fetch_nsx_data_chunk(sp))
|
|
except (api_exc.RequestTimeout, api_exc.NsxApiException):
|
|
sleep_interval = self._sync_backoff
|
|
# Cap max back off to 64 seconds
|
|
self._sync_backoff = min(self._sync_backoff * 2, 64)
|
|
LOG.exception(_("An error occurred while communicating with "
|
|
"NSX backend. Will retry synchronization "
|
|
"in %d seconds"), sleep_interval)
|
|
return sleep_interval
|
|
LOG.debug(_("Time elapsed querying NSX: %s"),
|
|
timeutils.utcnow() - start)
|
|
if sp.total_size:
|
|
num_chunks = ((sp.total_size / sp.chunk_size) +
|
|
(sp.total_size % sp.chunk_size != 0))
|
|
else:
|
|
num_chunks = 1
|
|
LOG.debug(_("Number of chunks: %d"), num_chunks)
|
|
# Find objects which have changed on NSX side and need
|
|
# to be synchronized
|
|
LOG.debug("Processing NSX cache for updated objects")
|
|
(ls_uuids, lr_uuids, lp_uuids) = self._nsx_cache.process_updates(
|
|
lswitches, lrouters, lswitchports)
|
|
# Process removed objects only at the last chunk
|
|
scan_missing = (sp.current_chunk == num_chunks - 1 and
|
|
not sp.init_sync_performed)
|
|
if sp.current_chunk == num_chunks - 1:
|
|
LOG.debug("Processing NSX cache for deleted objects")
|
|
self._nsx_cache.process_deletes()
|
|
ls_uuids = self._nsx_cache.get_lswitches(
|
|
changed_only=not scan_missing)
|
|
lr_uuids = self._nsx_cache.get_lrouters(
|
|
changed_only=not scan_missing)
|
|
lp_uuids = self._nsx_cache.get_lswitchports(
|
|
changed_only=not scan_missing)
|
|
LOG.debug(_("Time elapsed hashing data: %s"),
|
|
timeutils.utcnow() - start)
|
|
# Get an admin context
|
|
ctx = context.get_admin_context()
|
|
# Synchronize with database
|
|
self._synchronize_lswitches(ctx, ls_uuids,
|
|
scan_missing=scan_missing)
|
|
self._synchronize_lrouters(ctx, lr_uuids,
|
|
scan_missing=scan_missing)
|
|
self._synchronize_lswitchports(ctx, lp_uuids,
|
|
scan_missing=scan_missing)
|
|
# Increase chunk counter
|
|
LOG.info(_("Synchronization for chunk %(chunk_num)d of "
|
|
"%(total_chunks)d performed"),
|
|
{'chunk_num': sp.current_chunk + 1,
|
|
'total_chunks': num_chunks})
|
|
sp.current_chunk = (sp.current_chunk + 1) % num_chunks
|
|
added_delay = 0
|
|
if sp.current_chunk == 0:
|
|
# Ensure init_sync_performed is True
|
|
if not sp.init_sync_performed:
|
|
sp.init_sync_performed = True
|
|
# Add additional random delay
|
|
added_delay = random.randint(0, self._max_rand_delay)
|
|
LOG.debug(_("Time elapsed at end of sync: %s"),
|
|
timeutils.utcnow() - start)
|
|
return self._sync_interval / num_chunks + added_delay
|