979b32e2b5
Ensure that the status for a port is properly synchronized even when the port itself does not exist anymore in the backend, avoiding raising exceptions due to missing results in the NVP query. This patch also adjusts a comment and improves exception handling in _nvp_get_port_id Bug 1229331 Change-Id: Ibaa581006c994a543c1795dd3f1d50086583b953
603 lines
27 KiB
Python
603 lines
27 KiB
Python
# Copyright 2013 Nicira, Inc.
|
|
# All Rights Reserved
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import random
|
|
|
|
from neutron.common import constants
|
|
from neutron.common import exceptions
|
|
from neutron import context
|
|
from neutron.db import external_net_db
|
|
from neutron.db import l3_db
|
|
from neutron.db import models_v2
|
|
from neutron.openstack.common import jsonutils
|
|
from neutron.openstack.common import log
|
|
from neutron.openstack.common import loopingcall
|
|
from neutron.openstack.common import timeutils
|
|
from neutron.plugins.nicira.common import exceptions as nvp_exc
|
|
from neutron.plugins.nicira import NvpApiClient
|
|
from neutron.plugins.nicira import nvplib
|
|
|
|
LOG = log.getLogger(__name__)
|
|
|
|
|
|
class NvpCache(object):
|
|
"""A simple Cache for NVP resources.
|
|
|
|
Associates resource id with resource hash to rapidly identify
|
|
updated resources.
|
|
Each entry in the cache also stores the following information:
|
|
- changed: the resource in the cache has been altered following
|
|
an update or a delete
|
|
- hit: the resource has been visited during an update (and possibly
|
|
left unchanged)
|
|
- data: current resource data
|
|
- data_bk: backup of resource data prior to its removal
|
|
"""
|
|
|
|
def __init__(self):
|
|
# Maps a uuid to the dict containing it
|
|
self._uuid_dict_mappings = {}
|
|
# Dicts for NVP cached resources
|
|
self._lswitches = {}
|
|
self._lswitchports = {}
|
|
self._lrouters = {}
|
|
|
|
def __getitem__(self, key):
|
|
# uuids are unique across the various types of resources
|
|
# TODO(salv-orlando): Avoid lookups over all dictionaries
|
|
# when retrieving items
|
|
# Fetch lswitches, lports, or lrouters
|
|
resources = self._uuid_dict_mappings[key]
|
|
return resources[key]
|
|
|
|
def _update_resources(self, resources, new_resources):
|
|
# Clear the 'changed' attribute for all items
|
|
for uuid, item in resources.items():
|
|
if item.pop('changed', None) and not item.get('data'):
|
|
# The item is not anymore in NVP, so delete it
|
|
del resources[uuid]
|
|
del self._uuid_dict_mappings[uuid]
|
|
|
|
def do_hash(item):
|
|
return hash(jsonutils.dumps(item))
|
|
|
|
# Parse new data and identify new, deleted, and updated resources
|
|
for item in new_resources:
|
|
item_id = item['uuid']
|
|
if resources.get(item_id):
|
|
new_hash = do_hash(item)
|
|
if new_hash != resources[item_id]['hash']:
|
|
resources[item_id]['hash'] = new_hash
|
|
resources[item_id]['changed'] = True
|
|
resources[item_id]['data_bk'] = (
|
|
resources[item_id]['data'])
|
|
resources[item_id]['data'] = item
|
|
# Mark the item as hit in any case
|
|
resources[item_id]['hit'] = True
|
|
else:
|
|
resources[item_id] = {'hash': do_hash(item)}
|
|
resources[item_id]['hit'] = True
|
|
resources[item_id]['changed'] = True
|
|
resources[item_id]['data'] = item
|
|
# add a uuid to dict mapping for easy retrieval
|
|
# with __getitem__
|
|
self._uuid_dict_mappings[item_id] = resources
|
|
|
|
def _delete_resources(self, resources):
|
|
# Mark for removal all the elements which have not been visited.
|
|
# And clear the 'hit' attribute.
|
|
for to_delete in [k for (k, v) in resources.iteritems()
|
|
if not v.pop('hit', False)]:
|
|
resources[to_delete]['changed'] = True
|
|
resources[to_delete]['data_bk'] = (
|
|
resources[to_delete].pop('data', None))
|
|
|
|
def _get_resource_ids(self, resources, changed_only):
|
|
if changed_only:
|
|
return [k for (k, v) in resources.iteritems()
|
|
if v.get('changed')]
|
|
return resources.keys()
|
|
|
|
def get_lswitches(self, changed_only=False):
|
|
return self._get_resource_ids(self._lswitches, changed_only)
|
|
|
|
def get_lrouters(self, changed_only=False):
|
|
return self._get_resource_ids(self._lrouters, changed_only)
|
|
|
|
def get_lswitchports(self, changed_only=False):
|
|
return self._get_resource_ids(self._lswitchports, changed_only)
|
|
|
|
def update_lswitch(self, lswitch):
|
|
self._update_resources(self._lswitches, [lswitch])
|
|
|
|
def update_lrouter(self, lrouter):
|
|
self._update_resources(self._lrouters, [lrouter])
|
|
|
|
def update_lswitchport(self, lswitchport):
|
|
self._update_resources(self._lswitchports, [lswitchport])
|
|
|
|
def process_updates(self, lswitches=None,
|
|
lrouters=None, lswitchports=None):
|
|
self._update_resources(self._lswitches, lswitches)
|
|
self._update_resources(self._lrouters, lrouters)
|
|
self._update_resources(self._lswitchports, lswitchports)
|
|
return (self._get_resource_ids(self._lswitches, changed_only=True),
|
|
self._get_resource_ids(self._lrouters, changed_only=True),
|
|
self._get_resource_ids(self._lswitchports, changed_only=True))
|
|
|
|
def process_deletes(self):
|
|
self._delete_resources(self._lswitches)
|
|
self._delete_resources(self._lrouters)
|
|
self._delete_resources(self._lswitchports)
|
|
return (self._get_resource_ids(self._lswitches, changed_only=True),
|
|
self._get_resource_ids(self._lrouters, changed_only=True),
|
|
self._get_resource_ids(self._lswitchports, changed_only=True))
|
|
|
|
|
|
class SyncParameters():
|
|
"""Defines attributes used by the synchronization procedure.
|
|
|
|
chunk_size: Actual chunk size
|
|
extra_chunk_size: Additional data to fetch because of chunk size
|
|
adjustment
|
|
current_chunk: Counter of the current data chunk being synchronized
|
|
Page cursors: markers for the next resource to fetch.
|
|
'start' means page cursor unset for fetching 1st page
|
|
init_sync_performed: True if the initial synchronization concluded
|
|
"""
|
|
|
|
def __init__(self, min_chunk_size):
|
|
self.chunk_size = min_chunk_size
|
|
self.extra_chunk_size = 0
|
|
self.current_chunk = 0
|
|
self.ls_cursor = 'start'
|
|
self.lr_cursor = 'start'
|
|
self.lp_cursor = 'start'
|
|
self.init_sync_performed = False
|
|
self.total_size = 0
|
|
|
|
|
|
def _start_loopingcall(min_chunk_size, state_sync_interval, func):
|
|
"""Start a loopingcall for the synchronization task."""
|
|
# Start a looping call to synchronize operational status
|
|
# for neutron resources
|
|
if not state_sync_interval:
|
|
# do not start the looping call if specified
|
|
# sync interval is 0
|
|
return
|
|
state_synchronizer = loopingcall.DynamicLoopingCall(
|
|
func, sp=SyncParameters(min_chunk_size))
|
|
state_synchronizer.start(
|
|
periodic_interval_max=state_sync_interval)
|
|
return state_synchronizer
|
|
|
|
|
|
class NvpSynchronizer():
|
|
|
|
LS_URI = nvplib._build_uri_path(
|
|
nvplib.LSWITCH_RESOURCE, fields='uuid,tags,fabric_status',
|
|
relations='LogicalSwitchStatus')
|
|
LR_URI = nvplib._build_uri_path(
|
|
nvplib.LROUTER_RESOURCE, fields='uuid,tags,fabric_status',
|
|
relations='LogicalRouterStatus')
|
|
LP_URI = nvplib._build_uri_path(
|
|
nvplib.LSWITCHPORT_RESOURCE,
|
|
parent_resource_id='*',
|
|
fields='uuid,tags,fabric_status,link_status_up',
|
|
relations='LogicalPortStatus')
|
|
|
|
def __init__(self, plugin, cluster, state_sync_interval,
|
|
req_delay, min_chunk_size, max_rand_delay=0):
|
|
random.seed()
|
|
self._nvp_cache = NvpCache()
|
|
# Store parameters as instance members
|
|
# NOTE(salv-orlando): apologies if it looks java-ish
|
|
self._plugin = plugin
|
|
self._cluster = cluster
|
|
self._req_delay = req_delay
|
|
self._sync_interval = state_sync_interval
|
|
self._max_rand_delay = max_rand_delay
|
|
# Validate parameters
|
|
if self._sync_interval < self._req_delay:
|
|
err_msg = (_("Minimum request delay:%(req_delay)s must not "
|
|
"exceed synchronization interval:%(sync_interval)s") %
|
|
{'req_delay': self._req_delay,
|
|
'sync_interval': self._sync_interval})
|
|
LOG.error(err_msg)
|
|
raise nvp_exc.NvpPluginException(err_msg=err_msg)
|
|
# Backoff time in case of failures while fetching sync data
|
|
self._sync_backoff = 1
|
|
# Store the looping call in an instance variable to allow unit tests
|
|
# for controlling its lifecycle
|
|
self._sync_looping_call = _start_loopingcall(
|
|
min_chunk_size, state_sync_interval, self._synchronize_state)
|
|
|
|
def _get_tag_dict(self, tags):
|
|
return dict((tag.get('scope'), tag['tag']) for tag in tags)
|
|
|
|
def _update_neutron_object(self, context, neutron_data, status):
|
|
if status == neutron_data['status']:
|
|
# do nothing
|
|
return
|
|
with context.session.begin(subtransactions=True):
|
|
LOG.debug(_("Updating status for neutron resource %(q_id)s to: "
|
|
"%(status)s"), {'q_id': neutron_data['id'],
|
|
'status': status})
|
|
neutron_data['status'] = status
|
|
context.session.add(neutron_data)
|
|
|
|
def synchronize_network(self, context, neutron_network_data,
|
|
lswitches=None):
|
|
"""Synchronize a Neutron network with its NVP counterpart.
|
|
|
|
This routine synchronizes a set of switches when a Neutron
|
|
network is mapped to multiple lswitches.
|
|
"""
|
|
if not lswitches:
|
|
# Try to get logical switches from nvp
|
|
try:
|
|
lswitches = nvplib.get_lswitches(
|
|
self._cluster, neutron_network_data['id'])
|
|
except exceptions.NetworkNotFound:
|
|
# TODO(salv-orlando): We should be catching
|
|
# NvpApiClient.ResourceNotFound here
|
|
# The logical switch was not found
|
|
LOG.warning(_("Logical switch for neutron network %s not "
|
|
"found on NVP."), neutron_network_data['id'])
|
|
lswitches = []
|
|
else:
|
|
for lswitch in lswitches:
|
|
self._nvp_cache.update_lswitch(lswitch)
|
|
# By default assume things go wrong
|
|
status = constants.NET_STATUS_ERROR
|
|
# In most cases lswitches will contain a single element
|
|
for ls in lswitches:
|
|
if not ls:
|
|
# Logical switch was deleted
|
|
break
|
|
ls_status = ls['_relations']['LogicalSwitchStatus']
|
|
if not ls_status['fabric_status']:
|
|
status = constants.NET_STATUS_DOWN
|
|
break
|
|
else:
|
|
# No switch was down or missing. Set status to ACTIVE unless
|
|
# there were no switches in the first place!
|
|
if lswitches:
|
|
status = constants.NET_STATUS_ACTIVE
|
|
# Update db object
|
|
self._update_neutron_object(context, neutron_network_data, status)
|
|
|
|
def _synchronize_lswitches(self, ctx, ls_uuids, scan_missing=False):
|
|
if not ls_uuids and not scan_missing:
|
|
return
|
|
neutron_net_ids = set()
|
|
neutron_nvp_mappings = {}
|
|
# TODO(salvatore-orlando): Deal with the case the tag
|
|
# has been tampered with
|
|
for ls_uuid in ls_uuids:
|
|
# If the lswitch has been deleted, get backup copy of data
|
|
lswitch = (self._nvp_cache[ls_uuid].get('data') or
|
|
self._nvp_cache[ls_uuid].get('data_bk'))
|
|
tags = self._get_tag_dict(lswitch['tags'])
|
|
neutron_id = tags.get('neutron_net_id', ls_uuid)
|
|
neutron_net_ids.add(neutron_id)
|
|
neutron_nvp_mappings[neutron_id] = (
|
|
neutron_nvp_mappings.get(neutron_id, []) +
|
|
[self._nvp_cache[ls_uuid]])
|
|
with ctx.session.begin(subtransactions=True):
|
|
# Fetch neutron networks from database
|
|
filters = {'router:external': [False]}
|
|
if not scan_missing:
|
|
filters['id'] = neutron_net_ids
|
|
# TODO(salv-orlando): Filter out external networks
|
|
for network in self._plugin._get_collection_query(
|
|
ctx, models_v2.Network, filters=filters):
|
|
lswitches = neutron_nvp_mappings.get(network['id'], [])
|
|
lswitches = [lswitch.get('data') for lswitch in lswitches]
|
|
self.synchronize_network(ctx, network, lswitches)
|
|
|
|
def synchronize_router(self, context, neutron_router_data,
|
|
lrouter=None):
|
|
"""Synchronize a neutron router with its NVP counterpart."""
|
|
if not lrouter:
|
|
# Try to get router from nvp
|
|
try:
|
|
# This query will return the logical router status too
|
|
lrouter = nvplib.get_lrouter(
|
|
self._cluster, neutron_router_data['id'])
|
|
except exceptions.NotFound:
|
|
# NOTE(salv-orlando): We should be catching
|
|
# NvpApiClient.ResourceNotFound here
|
|
# The logical router was not found
|
|
LOG.warning(_("Logical router for neutron router %s not "
|
|
"found on NVP."), neutron_router_data['id'])
|
|
lrouter = None
|
|
else:
|
|
# Update the cache
|
|
self._nvp_cache.update_lrouter(lrouter)
|
|
|
|
# Note(salv-orlando): It might worth adding a check to verify neutron
|
|
# resource tag in nvp entity matches a Neutron id.
|
|
# By default assume things go wrong
|
|
status = constants.NET_STATUS_ERROR
|
|
if lrouter:
|
|
lr_status = (lrouter['_relations']
|
|
['LogicalRouterStatus']
|
|
['fabric_status'])
|
|
status = (lr_status and
|
|
constants.NET_STATUS_ACTIVE
|
|
or constants.NET_STATUS_DOWN)
|
|
# Update db object
|
|
self._update_neutron_object(context, neutron_router_data, status)
|
|
|
|
def _synchronize_lrouters(self, ctx, lr_uuids, scan_missing=False):
|
|
if not lr_uuids and not scan_missing:
|
|
return
|
|
neutron_router_mappings = (
|
|
dict((lr_uuid, self._nvp_cache[lr_uuid]) for lr_uuid in lr_uuids))
|
|
with ctx.session.begin(subtransactions=True):
|
|
# Fetch neutron routers from database
|
|
filters = ({} if scan_missing else
|
|
{'id': neutron_router_mappings.keys()})
|
|
for router in self._plugin._get_collection_query(
|
|
ctx, l3_db.Router, filters=filters):
|
|
lrouter = neutron_router_mappings.get(router['id'])
|
|
self.synchronize_router(
|
|
ctx, router, lrouter and lrouter.get('data'))
|
|
|
|
def synchronize_port(self, context, neutron_port_data,
|
|
lswitchport=None, ext_networks=None):
|
|
"""Synchronize a Neutron port with its NVP counterpart."""
|
|
# Skip synchronization for ports on external networks
|
|
if not ext_networks:
|
|
ext_networks = [net['id'] for net in context.session.query(
|
|
models_v2.Network).join(
|
|
external_net_db.ExternalNetwork,
|
|
(models_v2.Network.id ==
|
|
external_net_db.ExternalNetwork.network_id))]
|
|
if neutron_port_data['network_id'] in ext_networks:
|
|
with context.session.begin(subtransactions=True):
|
|
neutron_port_data['status'] = constants.PORT_STATUS_ACTIVE
|
|
return
|
|
|
|
if not lswitchport:
|
|
# Try to get port from nvp
|
|
try:
|
|
lp_uuid = self._plugin._nvp_get_port_id(
|
|
context, self._cluster, neutron_port_data)
|
|
if lp_uuid:
|
|
lswitchport = nvplib.get_port(
|
|
self._cluster, neutron_port_data['network_id'],
|
|
lp_uuid, relations='LogicalPortStatus')
|
|
except (exceptions.PortNotFoundOnNetwork):
|
|
# NOTE(salv-orlando): We should be catching
|
|
# NvpApiClient.ResourceNotFound here instead
|
|
# of PortNotFoundOnNetwork when the id exists but
|
|
# the logical switch port was not found
|
|
LOG.warning(_("Logical switch port for neutron port %s "
|
|
"not found on NVP."), neutron_port_data['id'])
|
|
lswitchport = None
|
|
else:
|
|
# If lswitchport is not None, update the cache.
|
|
# It could be none if the port was deleted from the backend
|
|
if lswitchport:
|
|
self._nvp_cache.update_lswitchport(lswitchport)
|
|
# Note(salv-orlando): It might worth adding a check to verify neutron
|
|
# resource tag in nvp entity matches Neutron id.
|
|
# By default assume things go wrong
|
|
status = constants.PORT_STATUS_ERROR
|
|
if lswitchport:
|
|
lp_status = (lswitchport['_relations']
|
|
['LogicalPortStatus']
|
|
['link_status_up'])
|
|
status = (lp_status and
|
|
constants.PORT_STATUS_ACTIVE
|
|
or constants.PORT_STATUS_DOWN)
|
|
# Update db object
|
|
self._update_neutron_object(context, neutron_port_data, status)
|
|
|
|
def _synchronize_lswitchports(self, ctx, lp_uuids, scan_missing=False):
|
|
if not lp_uuids and not scan_missing:
|
|
return
|
|
# Find Neutron port id by tag - the tag is already
|
|
# loaded in memory, no reason for doing a db query
|
|
# TODO(salvatore-orlando): Deal with the case the tag
|
|
# has been tampered with
|
|
neutron_port_mappings = {}
|
|
for lp_uuid in lp_uuids:
|
|
lport = (self._nvp_cache[lp_uuid].get('data') or
|
|
self._nvp_cache[lp_uuid].get('data_bk'))
|
|
tags = self._get_tag_dict(lport['tags'])
|
|
neutron_port_id = tags.get('q_port_id')
|
|
if neutron_port_id:
|
|
neutron_port_mappings[neutron_port_id] = (
|
|
self._nvp_cache[lp_uuid])
|
|
with ctx.session.begin(subtransactions=True):
|
|
# Fetch neutron ports from database
|
|
# At the first sync we need to fetch all ports
|
|
filters = ({} if scan_missing else
|
|
{'id': neutron_port_mappings.keys()})
|
|
# TODO(salv-orlando): Work out a solution for avoiding
|
|
# this query
|
|
ext_nets = [net['id'] for net in ctx.session.query(
|
|
models_v2.Network).join(
|
|
external_net_db.ExternalNetwork,
|
|
(models_v2.Network.id ==
|
|
external_net_db.ExternalNetwork.network_id))]
|
|
for port in self._plugin._get_collection_query(
|
|
ctx, models_v2.Port, filters=filters):
|
|
lswitchport = neutron_port_mappings.get(port['id'])
|
|
self.synchronize_port(
|
|
ctx, port, lswitchport and lswitchport.get('data'),
|
|
ext_networks=ext_nets)
|
|
|
|
def _get_chunk_size(self, sp):
|
|
# NOTE(salv-orlando): Try to use __future__ for this routine only?
|
|
ratio = ((float(sp.total_size) / float(sp.chunk_size)) /
|
|
(float(self._sync_interval) / float(self._req_delay)))
|
|
new_size = max(1.0, ratio) * float(sp.chunk_size)
|
|
return int(new_size) + (new_size - int(new_size) > 0)
|
|
|
|
def _fetch_data(self, uri, cursor, page_size):
|
|
# If not cursor there is nothing to retrieve
|
|
if cursor:
|
|
if cursor == 'start':
|
|
cursor = None
|
|
# Chunk size tuning might, in some conditions, make it larger
|
|
# than 5,000, which is the maximum page size allowed by the NVP
|
|
# API. In this case the request should be split in multiple
|
|
# requests. This is not ideal, and therefore a log warning will
|
|
# be emitted.
|
|
num_requests = page_size / (nvplib.MAX_PAGE_SIZE + 1) + 1
|
|
if num_requests > 1:
|
|
LOG.warn(_("Requested page size is %(cur_chunk_size)d."
|
|
"It might be necessary to do %(num_requests)d "
|
|
"round-trips to NVP for fetching data. Please "
|
|
"tune sync parameters to ensure chunk size "
|
|
"is less than %(max_page_size)d"),
|
|
{'cur_chunk_size': page_size,
|
|
'num_requests': num_requests,
|
|
'max_page_size': nvplib.MAX_PAGE_SIZE})
|
|
# Only the first request might return the total size,
|
|
# subsequent requests will definetely not
|
|
results, cursor, total_size = nvplib.get_single_query_page(
|
|
uri, self._cluster, cursor,
|
|
min(page_size, nvplib.MAX_PAGE_SIZE))
|
|
for _req in range(0, num_requests - 1):
|
|
# If no cursor is returned break the cycle as there is no
|
|
# actual need to perform multiple requests (all fetched)
|
|
# This happens when the overall size of resources exceeds
|
|
# the maximum page size, but the number for each single
|
|
# resource type is below this threshold
|
|
if not cursor:
|
|
break
|
|
req_results, cursor = nvplib.get_single_query_page(
|
|
uri, self._cluster, cursor,
|
|
min(page_size, nvplib.MAX_PAGE_SIZE))[:2]
|
|
results.extend(req_results)
|
|
# reset cursor before returning if we queried just to
|
|
# know the number of entities
|
|
return results, cursor if page_size else 'start', total_size
|
|
return [], cursor, None
|
|
|
|
def _fetch_nvp_data_chunk(self, sp):
|
|
base_chunk_size = sp.chunk_size
|
|
chunk_size = base_chunk_size + sp.extra_chunk_size
|
|
LOG.info(_("Fetching up to %s resources "
|
|
"from NVP backend"), chunk_size)
|
|
fetched = ls_count = lr_count = lp_count = 0
|
|
lswitches = lrouters = lswitchports = []
|
|
if sp.ls_cursor or sp.ls_cursor == 'start':
|
|
(lswitches, sp.ls_cursor, ls_count) = self._fetch_data(
|
|
self.LS_URI, sp.ls_cursor, chunk_size)
|
|
fetched = len(lswitches)
|
|
if fetched < chunk_size and sp.lr_cursor or sp.lr_cursor == 'start':
|
|
(lrouters, sp.lr_cursor, lr_count) = self._fetch_data(
|
|
self.LR_URI, sp.lr_cursor, max(chunk_size - fetched, 0))
|
|
fetched += len(lrouters)
|
|
if fetched < chunk_size and sp.lp_cursor or sp.lp_cursor == 'start':
|
|
(lswitchports, sp.lp_cursor, lp_count) = self._fetch_data(
|
|
self.LP_URI, sp.lp_cursor, max(chunk_size - fetched, 0))
|
|
fetched += len(lswitchports)
|
|
if sp.current_chunk == 0:
|
|
# No cursors were provided. Then it must be possible to
|
|
# calculate the total amount of data to fetch
|
|
sp.total_size = ls_count + lr_count + lp_count
|
|
LOG.debug(_("Total data size: %d"), sp.total_size)
|
|
sp.chunk_size = self._get_chunk_size(sp)
|
|
# Calculate chunk size adjustment
|
|
sp.extra_chunk_size = sp.chunk_size - base_chunk_size
|
|
LOG.debug(_("Fetched %(num_lswitches)d logical switches, "
|
|
"%(num_lswitchports)d logical switch ports,"
|
|
"%(num_lrouters)d logical routers"),
|
|
{'num_lswitches': len(lswitches),
|
|
'num_lswitchports': len(lswitchports),
|
|
'num_lrouters': len(lrouters)})
|
|
return (lswitches, lrouters, lswitchports)
|
|
|
|
def _synchronize_state(self, sp):
|
|
# If the plugin has been destroyed, stop the LoopingCall
|
|
if not self._plugin:
|
|
raise loopingcall.LoopingCallDone
|
|
start = timeutils.utcnow()
|
|
# Reset page cursor variables if necessary
|
|
if sp.current_chunk == 0:
|
|
sp.ls_cursor = sp.lr_cursor = sp.lp_cursor = 'start'
|
|
LOG.info(_("Running state synchronization task. Chunk: %s"),
|
|
sp.current_chunk)
|
|
# Fetch chunk_size data from NVP
|
|
try:
|
|
(lswitches, lrouters, lswitchports) = (
|
|
self._fetch_nvp_data_chunk(sp))
|
|
except (NvpApiClient.RequestTimeout, NvpApiClient.NvpApiException):
|
|
sleep_interval = self._sync_backoff
|
|
# Cap max back off to 64 seconds
|
|
self._sync_backoff = min(self._sync_backoff * 2, 64)
|
|
LOG.exception(_("An error occured while communicating with "
|
|
"NVP backend. Will retry synchronization "
|
|
"in %d seconds"), sleep_interval)
|
|
return sleep_interval
|
|
LOG.debug(_("Time elapsed querying NVP: %s"),
|
|
timeutils.utcnow() - start)
|
|
if sp.total_size:
|
|
num_chunks = ((sp.total_size / sp.chunk_size) +
|
|
(sp.total_size % sp.chunk_size != 0))
|
|
else:
|
|
num_chunks = 1
|
|
LOG.debug(_("Number of chunks: %d"), num_chunks)
|
|
# Find objects which have changed on NVP side and need
|
|
# to be synchronized
|
|
(ls_uuids, lr_uuids, lp_uuids) = self._nvp_cache.process_updates(
|
|
lswitches, lrouters, lswitchports)
|
|
# Process removed objects only at the last chunk
|
|
scan_missing = (sp.current_chunk == num_chunks - 1 and
|
|
not sp.init_sync_performed)
|
|
if sp.current_chunk == num_chunks - 1:
|
|
self._nvp_cache.process_deletes()
|
|
ls_uuids = self._nvp_cache.get_lswitches(
|
|
changed_only=not scan_missing)
|
|
lr_uuids = self._nvp_cache.get_lrouters(
|
|
changed_only=not scan_missing)
|
|
lp_uuids = self._nvp_cache.get_lswitchports(
|
|
changed_only=not scan_missing)
|
|
LOG.debug(_("Time elapsed hashing data: %s"),
|
|
timeutils.utcnow() - start)
|
|
# Get an admin context
|
|
ctx = context.get_admin_context()
|
|
# Synchronize with database
|
|
with ctx.session.begin(subtransactions=True):
|
|
self._synchronize_lswitches(ctx, ls_uuids,
|
|
scan_missing=scan_missing)
|
|
self._synchronize_lrouters(ctx, lr_uuids,
|
|
scan_missing=scan_missing)
|
|
self._synchronize_lswitchports(ctx, lp_uuids,
|
|
scan_missing=scan_missing)
|
|
# Increase chunk counter
|
|
LOG.info(_("Synchronization for chunk %(chunk_num)d of "
|
|
"%(total_chunks)d performed"),
|
|
{'chunk_num': sp.current_chunk + 1,
|
|
'total_chunks': num_chunks})
|
|
sp.current_chunk = (sp.current_chunk + 1) % num_chunks
|
|
added_delay = 0
|
|
if sp.current_chunk == 0:
|
|
# Ensure init_sync_performed is True
|
|
if not sp.init_sync_performed:
|
|
sp.init_sync_performed = True
|
|
# Add additional random delay
|
|
added_delay = random.randint(0, self._max_rand_delay)
|
|
LOG.debug(_("Time elapsed at end of sync: %s"),
|
|
timeutils.utcnow() - start)
|
|
return self._sync_interval / num_chunks + added_delay
|