diff --git a/etc/neutron/plugins/nicira/nvp.ini b/etc/neutron/plugins/nicira/nvp.ini index a4d59737f6..a5e0059b9d 100644 --- a/etc/neutron/plugins/nicira/nvp.ini +++ b/etc/neutron/plugins/nicira/nvp.ini @@ -101,3 +101,36 @@ # the RPC agents to operate. When 'agentless' is chosen, the config option metadata_mode # becomes ineffective. The mode 'agentless' is not supported for NVP 3.2 or below. # agent_mode = agent + +[nvp_sync] +# Interval in seconds between runs of the status synchronization task. +# The plugin will aim at resynchronizing operational status for all +# resources in this interval, and it should be therefore large enough +# to ensure the task is feasible. Otherwise the plugin will be +# constantly synchronizing resource status, ie: a new task is started +# as soon as the previous is completed. +# If this value is set to 0, the state synchronization thread for this +# Neutron instance will be disabled. +# state_sync_interval = 120 + +# Random additional delay between two runs of the state synchronization task. +# An additional wait time between 0 and max_random_sync_delay seconds +# will be added on top of state_sync_interval. +# max_random_sync_delay = 0 + +# Minimum delay, in seconds, between two status synchronization requests for NVP. +# Depending on chunk size, controller load, and other factors, state +# synchronization requests might be pretty heavy. This means the +# controller might take time to respond, and its load might be quite +# increased by them. This parameter allows to specify a minimum +# interval between two subsequent requests. +# The value for this parameter must never exceed state_sync_interval. +# If this does, an error will be raised at startup. +# min_sync_req_delay = 10 + +# Minimum number of resources to be retrieved from NVP in a single status +# synchronization request. +# The actual size of the chunk will increase if the number of resources is such +# that using the minimum chunk size will cause the interval between two +# requests to be less than min_sync_req_delay +# min_chunk_size = 500 diff --git a/neutron/plugins/nicira/NeutronPlugin.py b/neutron/plugins/nicira/NeutronPlugin.py index 5b2b19814d..f3c1d1eae1 100644 --- a/neutron/plugins/nicira/NeutronPlugin.py +++ b/neutron/plugins/nicira/NeutronPlugin.py @@ -20,7 +20,6 @@ # @author: Aaron Rosen, Nicira Networks, Inc. -import hashlib import logging import os @@ -56,6 +55,7 @@ from neutron.openstack.common import excutils from neutron.plugins.nicira.common import config from neutron.plugins.nicira.common import exceptions as nvp_exc from neutron.plugins.nicira.common import securitygroups as nvp_sec +from neutron.plugins.nicira.common import sync from neutron.plugins.nicira.dbexts import distributedrouter as dist_rtr from neutron.plugins.nicira.dbexts import maclearning as mac_db from neutron.plugins.nicira.dbexts import nicira_db @@ -180,6 +180,7 @@ class NvpPluginV2(agentschedulers_db.DhcpAgentSchedulerDbMixin, if not cfg.CONF.api_extensions_path: cfg.CONF.set_override('api_extensions_path', NVP_EXT_PATH) self.nvp_opts = cfg.CONF.NVP + self.nvp_sync_opts = cfg.CONF.NVP_SYNC self.cluster = create_nvp_cluster(cfg.CONF, self.nvp_opts.concurrent_connections, self.nvp_opts.nvp_gen_timeout) @@ -196,6 +197,13 @@ class NvpPluginV2(agentschedulers_db.DhcpAgentSchedulerDbMixin, # Set this flag to false as the default gateway has not # been yet updated from the config file self._is_default_net_gw_in_sync = False + # Create a synchronizer instance for backend sync + self._synchronizer = sync.NvpSynchronizer( + self, self.cluster, + self.nvp_sync_opts.state_sync_interval, + self.nvp_sync_opts.min_sync_req_delay, + self.nvp_sync_opts.min_chunk_size, + self.nvp_sync_opts.max_random_sync_delay) def _ensure_default_network_gateway(self): if self._is_default_net_gw_in_sync: @@ -1049,39 +1057,12 @@ class NvpPluginV2(agentschedulers_db.DhcpAgentSchedulerDbMixin, with context.session.begin(subtransactions=True): # goto to the plugin DB and fetch the network network = self._get_network(context, id) - # if the network is external, do not go to NVP - if not network.external: - # verify the fabric status of the corresponding - # logical switch(es) in nvp - try: - lswitches = nvplib.get_lswitches(self.cluster, id) - nvp_net_status = constants.NET_STATUS_ACTIVE - neutron_status = network.status - for lswitch in lswitches: - relations = lswitch.get('_relations') - if relations: - lswitch_status = relations.get( - 'LogicalSwitchStatus') - # FIXME(salvatore-orlando): Being unable to fetch - # logical switch status should be an exception. - if (lswitch_status and - not lswitch_status.get('fabric_status', - None)): - nvp_net_status = constants.NET_STATUS_DOWN - break - LOG.debug(_("Current network status:%(nvp_net_status)s; " - "Status in Neutron DB:%(neutron_status)s"), - {'nvp_net_status': nvp_net_status, - 'neutron_status': neutron_status}) - if nvp_net_status != network.status: - # update the network status - network.status = nvp_net_status - except q_exc.NotFound: - network.status = constants.NET_STATUS_ERROR - except Exception: - err_msg = _("Unable to get logical switches") - LOG.exception(err_msg) - raise nvp_exc.NvpPluginException(err_msg=err_msg) + if fields and 'status' in fields: + # External networks are not backed by nvp lswitches + if not network.external: + # Perform explicit state synchronization + self._synchronizer.synchronize_network( + context, network) # Don't do field selection here otherwise we won't be able # to add provider networks fields net_result = self._make_network_dict(network) @@ -1090,85 +1071,13 @@ class NvpPluginV2(agentschedulers_db.DhcpAgentSchedulerDbMixin, return self._fields(net_result, fields) def get_networks(self, context, filters=None, fields=None): - nvp_lswitches = {} filters = filters or {} with context.session.begin(subtransactions=True): - neutron_lswitches = ( - super(NvpPluginV2, self).get_networks(context, filters)) - for net in neutron_lswitches: + networks = super(NvpPluginV2, self).get_networks(context, filters) + for net in networks: self._extend_network_dict_provider(context, net) self._extend_network_qos_queue(context, net) - - tenant_ids = filters and filters.get('tenant_id') or None - filter_fmt = "&tag=%s&tag_scope=os_tid" - if context.is_admin and not tenant_ids: - tenant_filter = "" - else: - tenant_ids = tenant_ids or [context.tenant_id] - tenant_filter = ''.join(filter_fmt % tid for tid in tenant_ids) - lswitch_filters = "uuid,display_name,fabric_status,tags" - lswitch_url_path_1 = ( - "/ws.v1/lswitch?fields=%s&relations=LogicalSwitchStatus%s" - % (lswitch_filters, tenant_filter)) - lswitch_url_path_2 = nvplib._build_uri_path( - nvplib.LSWITCH_RESOURCE, - fields=lswitch_filters, - relations='LogicalSwitchStatus', - filters={'tag': 'true', 'tag_scope': 'shared'}) - try: - res = nvplib.get_all_query_pages(lswitch_url_path_1, self.cluster) - nvp_lswitches.update(dict((ls['uuid'], ls) for ls in res)) - # Issue a second query for fetching shared networks. - # We cannot unfortunately use just a single query because tags - # cannot be or-ed - res_shared = nvplib.get_all_query_pages(lswitch_url_path_2, - self.cluster) - nvp_lswitches.update(dict((ls['uuid'], ls) for ls in res_shared)) - except Exception: - err_msg = _("Unable to get logical switches") - LOG.exception(err_msg) - raise nvp_exc.NvpPluginException(err_msg=err_msg) - - if filters.get('id'): - nvp_lswitches = dict( - (uuid, ls) for (uuid, ls) in nvp_lswitches.iteritems() - if uuid in set(filters['id'])) - for neutron_lswitch in neutron_lswitches: - # Skip external networks as they do not exist in NVP - if neutron_lswitch[l3.EXTERNAL]: - continue - elif neutron_lswitch['id'] not in nvp_lswitches: - LOG.warning(_("Logical Switch %s found in neutron database " - "but not in NVP."), neutron_lswitch["id"]) - neutron_lswitch["status"] = constants.NET_STATUS_ERROR - else: - # TODO(salvatore-orlando): be careful about "extended" - # logical switches - ls = nvp_lswitches.pop(neutron_lswitch['id']) - if (ls["_relations"]["LogicalSwitchStatus"]["fabric_status"]): - neutron_lswitch["status"] = constants.NET_STATUS_ACTIVE - else: - neutron_lswitch["status"] = constants.NET_STATUS_DOWN - - # do not make the case in which switches are found in NVP - # but not in Neutron catastrophic. - if nvp_lswitches: - LOG.warning(_("Found %s logical switches not bound " - "to Neutron networks. Neutron and NVP are " - "potentially out of sync"), len(nvp_lswitches)) - - LOG.debug(_("get_networks() completed for tenant %s"), - context.tenant_id) - - if fields: - ret_fields = [] - for neutron_lswitch in neutron_lswitches: - row = {} - for field in fields: - row[field] = neutron_lswitch[field] - ret_fields.append(row) - return ret_fields - return neutron_lswitches + return [self._fields(network, fields) for network in networks] def update_network(self, context, id, network): pnet._raise_if_updates_provider_attributes(network['network']) @@ -1194,105 +1103,10 @@ class NvpPluginV2(agentschedulers_db.DhcpAgentSchedulerDbMixin, def get_ports(self, context, filters=None, fields=None): filters = filters or {} with context.session.begin(subtransactions=True): - neutron_lports = super(NvpPluginV2, self).get_ports( - context, filters) - if (filters.get('network_id') and len(filters.get('network_id')) and - self._network_is_external(context, filters['network_id'][0])): - # Do not perform check on NVP platform - return neutron_lports - - vm_filter = "" - tenant_filter = "" - # This is used when calling delete_network. Neutron checks to see if - # the network has any ports. - if filters.get("network_id"): - # FIXME (Aaron) If we get more than one network_id this won't work - lswitch = filters["network_id"][0] - else: - lswitch = "*" - - if filters.get("device_id"): - for vm_id in filters.get("device_id"): - vm_filter = ("%stag_scope=vm_id&tag=%s&" % (vm_filter, - hashlib.sha1(vm_id).hexdigest())) - else: - vm_id = "" - - if filters.get("tenant_id"): - for tenant in filters.get("tenant_id"): - tenant_filter = ("%stag_scope=os_tid&tag=%s&" % - (tenant_filter, tenant)) - - nvp_lports = {} - - lport_fields_str = ("tags,admin_status_enabled,display_name," - "fabric_status_up") - try: - lport_query_path = ( - "/ws.v1/lswitch/%s/lport?fields=%s&%s%stag_scope=q_port_id" - "&relations=LogicalPortStatus" % - (lswitch, lport_fields_str, vm_filter, tenant_filter)) - - try: - ports = nvplib.get_all_query_pages(lport_query_path, - self.cluster) - except q_exc.NotFound: - LOG.warn(_("Lswitch %s not found in NVP"), lswitch) - ports = None - - if ports: - for port in ports: - for tag in port["tags"]: - if tag["scope"] == "q_port_id": - nvp_lports[tag["tag"]] = port - except Exception: - err_msg = _("Unable to get ports") - LOG.exception(err_msg) - raise nvp_exc.NvpPluginException(err_msg=err_msg) - - lports = [] - for neutron_lport in neutron_lports: - # if a neutron port is not found in NVP, this migth be because - # such port is not mapped to a logical switch - ie: floating ip - if neutron_lport['device_owner'] in (l3_db.DEVICE_OWNER_FLOATINGIP, - l3_db.DEVICE_OWNER_ROUTER_GW): - lports.append(neutron_lport) - continue - try: - neutron_lport["admin_state_up"] = ( - nvp_lports[neutron_lport["id"]]["admin_status_enabled"]) - - if (nvp_lports[neutron_lport["id"]] - ["_relations"] - ["LogicalPortStatus"] - ["fabric_status_up"]): - neutron_lport["status"] = constants.PORT_STATUS_ACTIVE - else: - neutron_lport["status"] = constants.PORT_STATUS_DOWN - - del nvp_lports[neutron_lport["id"]] - except KeyError: - neutron_lport["status"] = constants.PORT_STATUS_ERROR - LOG.debug(_("Neutron logical port %s was not found on NVP"), - neutron_lport['id']) - - lports.append(neutron_lport) - # do not make the case in which ports are found in NVP - # but not in Neutron catastrophic. - if nvp_lports: - LOG.warning(_("Found %s logical ports not bound " - "to Neutron ports. Neutron and NVP are " - "potentially out of sync"), len(nvp_lports)) - - if fields: - ret_fields = [] - for lport in lports: - row = {} - for field in fields: - row[field] = lport[field] - ret_fields.append(row) - return ret_fields - return lports + ports = super(NvpPluginV2, self).get_ports(context, filters) + for port in ports: + self._extend_port_qos_queue(context, port) + return [self._fields(port, fields) for port in ports] def create_port(self, context, port): # If PORTSECURITY is not the default value ATTR_NOT_SPECIFIED @@ -1504,43 +1318,26 @@ class NvpPluginV2(agentschedulers_db.DhcpAgentSchedulerDbMixin, def get_port(self, context, id, fields=None): with context.session.begin(subtransactions=True): - neutron_db_port = super(NvpPluginV2, self).get_port(context, - id, fields) - self._extend_port_qos_queue(context, neutron_db_port) - - if self._network_is_external(context, - neutron_db_port['network_id']): - return neutron_db_port - nvp_id = self._nvp_get_port_id(context, self.cluster, - neutron_db_port) - # If there's no nvp IP do not bother going to NVP and put - # the port in error state - if nvp_id: - # Find the NVP port corresponding to neutron port_id - # Do not query by nvp id as the port might be on - # an extended switch and we do not store the extended - # switch uuid - results = nvplib.query_lswitch_lports( - self.cluster, '*', - relations='LogicalPortStatus', - filters={'tag': id, 'tag_scope': 'q_port_id'}) - if results: - port = results[0] - port_status = port["_relations"]["LogicalPortStatus"] - neutron_db_port["admin_state_up"] = ( - port["admin_status_enabled"]) - if port_status["fabric_status_up"]: - neutron_db_port["status"] = ( - constants.PORT_STATUS_ACTIVE) - else: - neutron_db_port["status"] = ( - constants.PORT_STATUS_DOWN) - else: - neutron_db_port["status"] = ( - constants.PORT_STATUS_ERROR) + if fields and 'status' in fields: + # Perform explicit state synchronization + db_port = self._get_port(context, id) + self._synchronizer.synchronize_port( + context, db_port) + port = self._make_port_dict(db_port, fields) else: - neutron_db_port["status"] = constants.PORT_STATUS_ERROR - return neutron_db_port + port = super(NvpPluginV2, self).get_port(context, id, fields) + self._extend_port_qos_queue(context, port) + return port + + def get_router(self, context, id, fields=None): + if fields and 'status' in fields: + db_router = self._get_router(context, id) + # Perform explicit state synchronization + self._synchronizer.synchronize_router( + context, db_router) + return self._make_router_dict(db_router, fields) + else: + return super(NvpPluginV2, self).get_router(context, id, fields) def create_router(self, context, router): # NOTE(salvatore-orlando): We completely override this method in @@ -1713,77 +1510,6 @@ class NvpPluginV2(agentschedulers_db.DhcpAgentSchedulerDbMixin, err_msg=(_("Unable to delete logical router '%s'" "on NVP Platform") % router_id)) - def get_router(self, context, id, fields=None): - router = self._get_router(context, id) - try: - lrouter = nvplib.get_lrouter(self.cluster, id) - relations = lrouter.get('_relations') - if relations: - lrouter_status = relations.get('LogicalRouterStatus') - # FIXME(salvatore-orlando): Being unable to fetch the - # logical router status should be an exception. - if lrouter_status: - router_op_status = (lrouter_status.get('fabric_status') - and constants.NET_STATUS_ACTIVE or - constants.NET_STATUS_DOWN) - except q_exc.NotFound: - lrouter = {} - router_op_status = constants.NET_STATUS_ERROR - if router_op_status != router.status: - LOG.debug(_("Current router status:%(router_status)s;" - "Status in Neutron DB:%(db_router_status)s"), - {'router_status': router_op_status, - 'db_router_status': router.status}) - # update the router status - with context.session.begin(subtransactions=True): - router.status = router_op_status - return self._make_router_dict(router, fields) - - def get_routers(self, context, filters=None, fields=None): - router_query = self._apply_filters_to_query( - self._model_query(context, l3_db.Router), - l3_db.Router, filters) - routers = router_query.all() - # Query routers on NVP for updating operational status - if context.is_admin and not filters.get("tenant_id"): - tenant_id = None - elif 'tenant_id' in filters: - tenant_id = filters.get('tenant_id')[0] - del filters['tenant_id'] - else: - tenant_id = context.tenant_id - try: - nvp_lrouters = nvplib.get_lrouters(self.cluster, - tenant_id, - fields) - except NvpApiClient.NvpApiException: - err_msg = _("Unable to get logical routers from NVP controller") - LOG.exception(err_msg) - raise nvp_exc.NvpPluginException(err_msg=err_msg) - - nvp_lrouters_dict = {} - for nvp_lrouter in nvp_lrouters: - nvp_lrouters_dict[nvp_lrouter['uuid']] = nvp_lrouter - for router in routers: - nvp_lrouter = nvp_lrouters_dict.get(router['id']) - if nvp_lrouter: - if (nvp_lrouter["_relations"]["LogicalRouterStatus"] - ["fabric_status"]): - router.status = constants.NET_STATUS_ACTIVE - else: - router.status = constants.NET_STATUS_DOWN - nvp_lrouters.remove(nvp_lrouter) - else: - router.status = constants.NET_STATUS_ERROR - - # do not make the case in which routers are found in NVP - # but not in Neutron catastrophic. - if nvp_lrouters: - LOG.warning(_("Found %s logical routers not bound " - "to Neutron routers. Neutron and NVP are " - "potentially out of sync"), len(nvp_lrouters)) - return [self._make_router_dict(router, fields) for router in routers] - def add_router_interface(self, context, router_id, interface_info): # When adding interface by port_id we need to create the # peer port on the nvp logical router in this routine diff --git a/neutron/plugins/nicira/common/config.py b/neutron/plugins/nicira/common/config.py index 8f0c08d1c7..88c9ea3488 100644 --- a/neutron/plugins/nicira/common/config.py +++ b/neutron/plugins/nicira/common/config.py @@ -53,7 +53,24 @@ nvp_opts = [ help=_("The default network tranport type to use (stt, gre, " "bridge, ipsec_gre, or ipsec_stt)")), cfg.StrOpt('agent_mode', default=AgentModes.AGENT, - help=_("The mode used to implement DHCP/metadata services.")), + help=_("The mode used to implement DHCP/metadata services.")) +] + +sync_opts = [ + cfg.IntOpt('state_sync_interval', default=120, + help=_("Interval in seconds between runs of the state " + "synchronization task. Set it to 0 to disable it")), + cfg.IntOpt('max_random_sync_delay', default=0, + help=_("Maximum value for the additional random " + "delay in seconds between runs of the state " + "synchronization task")), + cfg.IntOpt('min_sync_req_delay', default=10, + help=_('Minimum delay, in seconds, between two state ' + 'synchronization queries to NVP. It must not ' + 'exceed state_sync_interval')), + cfg.IntOpt('min_chunk_size', default=500, + help=_('Minimum number of resources to be retrieved from NVP ' + 'during state synchronization')) ] connection_opts = [ @@ -107,6 +124,8 @@ cluster_opts = [ cfg.CONF.register_opts(connection_opts) cfg.CONF.register_opts(cluster_opts) cfg.CONF.register_opts(nvp_opts, "NVP") +cfg.CONF.register_opts(sync_opts, "NVP_SYNC") + # NOTE(armando-migliaccio): keep the following code until we support # NVP configuration files in older format (Grizzly or older). # ### BEGIN diff --git a/neutron/plugins/nicira/common/sync.py b/neutron/plugins/nicira/common/sync.py new file mode 100644 index 0000000000..a321d69411 --- /dev/null +++ b/neutron/plugins/nicira/common/sync.py @@ -0,0 +1,596 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 Nicira, Inc. +# All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import random + +from neutron.common import constants +from neutron.common import exceptions +from neutron import context +from neutron.db import l3_db +from neutron.db import models_v2 +from neutron.openstack.common import jsonutils +from neutron.openstack.common import log +from neutron.openstack.common import loopingcall +from neutron.openstack.common import timeutils +from neutron.plugins.nicira.common import exceptions as nvp_exc +from neutron.plugins.nicira import NvpApiClient +from neutron.plugins.nicira import nvplib + +LOG = log.getLogger(__name__) + + +class NvpCache(object): + """A simple Cache for NVP resources. + + Associates resource id with resource hash to rapidly identify + updated resources. + Each entry in the cache also stores the following information: + - changed: the resource in the cache has been altered following + an update or a delete + - hit: the resource has been visited during an update (and possibly + left unchanged) + - data: current resource data + - data_bk: backup of resource data prior to its removal + """ + + def __init__(self): + # Maps a uuid to the dict containing it + self._uuid_dict_mappings = {} + # Dicts for NVP cached resources + self._lswitches = {} + self._lswitchports = {} + self._lrouters = {} + + def __getitem__(self, key): + # uuids are unique across the various types of resources + # TODO(salv-orlando): Avoid lookups over all dictionaries + # when retrieving items + # Fetch lswitches, lports, or lrouters + resources = self._uuid_dict_mappings[key] + return resources[key] + + def _update_resources(self, resources, new_resources): + # Clear the 'changed' attribute for all items + for uuid, item in resources.items(): + if item.pop('changed', None) and not item.get('data'): + # The item is not anymore in NVP, so delete it + del resources[uuid] + del self._uuid_dict_mappings[uuid] + + def do_hash(item): + return hash(jsonutils.dumps(item)) + + # Parse new data and identify new, deleted, and updated resources + for item in new_resources: + item_id = item['uuid'] + if resources.get(item_id): + new_hash = do_hash(item) + if new_hash != resources[item_id]['hash']: + resources[item_id]['hash'] = new_hash + resources[item_id]['changed'] = True + resources[item_id]['data_bk'] = ( + resources[item_id]['data']) + resources[item_id]['data'] = item + # Mark the item as hit in any case + resources[item_id]['hit'] = True + else: + resources[item_id] = {'hash': do_hash(item)} + resources[item_id]['hit'] = True + resources[item_id]['changed'] = True + resources[item_id]['data'] = item + # add a uuid to dict mapping for easy retrieval + # with __getitem__ + self._uuid_dict_mappings[item_id] = resources + + def _delete_resources(self, resources): + # Mark for removal all the elements which have not been visited. + # And clear the 'hit' attribute. + for to_delete in [k for (k, v) in resources.iteritems() + if not v.pop('hit', False)]: + resources[to_delete]['changed'] = True + resources[to_delete]['data_bk'] = ( + resources[to_delete].pop('data', None)) + + def _get_resource_ids(self, resources, changed_only): + if changed_only: + return [k for (k, v) in resources.iteritems() + if v.get('changed')] + return resources.keys() + + def get_lswitches(self, changed_only=False): + return self._get_resource_ids(self._lswitches, changed_only) + + def get_lrouters(self, changed_only=False): + return self._get_resource_ids(self._lrouters, changed_only) + + def get_lswitchports(self, changed_only=False): + return self._get_resource_ids(self._lswitchports, changed_only) + + def update_lswitch(self, lswitch): + self._update_resources(self._lswitches, [lswitch]) + + def update_lrouter(self, lrouter): + self._update_resources(self._lrouters, [lrouter]) + + def update_lswitchport(self, lswitchport): + self._update_resources(self._lswitchports, [lswitchport]) + + def process_updates(self, lswitches=None, + lrouters=None, lswitchports=None): + self._update_resources(self._lswitches, lswitches) + self._update_resources(self._lrouters, lrouters) + self._update_resources(self._lswitchports, lswitchports) + return (self._get_resource_ids(self._lswitches, changed_only=True), + self._get_resource_ids(self._lrouters, changed_only=True), + self._get_resource_ids(self._lswitchports, changed_only=True)) + + def process_deletes(self): + self._delete_resources(self._lswitches) + self._delete_resources(self._lrouters) + self._delete_resources(self._lswitchports) + return (self._get_resource_ids(self._lswitches, changed_only=True), + self._get_resource_ids(self._lrouters, changed_only=True), + self._get_resource_ids(self._lswitchports, changed_only=True)) + + +class SyncParameters(): + """Defines attributes used by the synchronization procedure. + + chunk_size: Actual chunk size + extra_chunk_size: Additional data to fetch because of chunk size + adjustment + current_chunk: Counter of the current data chunk being synchronized + Page cursors: markers for the next resource to fetch. + 'start' means page cursor unset for fetching 1st page + init_sync_performed: True if the initial synchronization concluded + """ + + def __init__(self, min_chunk_size): + self.chunk_size = min_chunk_size + self.extra_chunk_size = 0 + self.current_chunk = 0 + self.ls_cursor = 'start' + self.lr_cursor = 'start' + self.lp_cursor = 'start' + self.init_sync_performed = False + self.total_size = 0 + + +def _start_loopingcall(min_chunk_size, state_sync_interval, func): + """Start a loopingcall for the synchronization task.""" + # Start a looping call to synchronize operational status + # for neutron resources + if not state_sync_interval: + # do not start the looping call if specified + # sync interval is 0 + return + state_synchronizer = loopingcall.DynamicLoopingCall( + func, sp=SyncParameters(min_chunk_size)) + state_synchronizer.start( + periodic_interval_max=state_sync_interval) + + +class NvpSynchronizer(): + + LS_URI = nvplib._build_uri_path( + nvplib.LSWITCH_RESOURCE, fields='uuid,tags,fabric_status', + relations='LogicalSwitchStatus') + LR_URI = nvplib._build_uri_path( + nvplib.LROUTER_RESOURCE, fields='uuid,tags,fabric_status', + relations='LogicalRouterStatus') + LP_URI = nvplib._build_uri_path( + nvplib.LSWITCHPORT_RESOURCE, + parent_resource_id='*', + fields='uuid,tags,fabric_status,link_status_up', + relations='LogicalPortStatus') + + def __init__(self, plugin, cluster, state_sync_interval, + req_delay, min_chunk_size, max_rand_delay=0): + random.seed() + self._nvp_cache = NvpCache() + # Store parameters as instance members + # NOTE(salv-orlando): apologies if it looks java-ish + self._plugin = plugin + self._cluster = cluster + self._req_delay = req_delay + self._sync_interval = state_sync_interval + self._max_rand_delay = max_rand_delay + # Validate parameters + if self._sync_interval < self._req_delay: + err_msg = (_("Minimum request delay:%(req_delay)s must not " + "exceed synchronization interval:%(sync_interval)s") % + {'req_delay': self._req_delay, + 'sync_interval': self._sync_interval}) + LOG.error(err_msg) + raise nvp_exc.NvpPluginException(err_msg=err_msg) + # Backoff time in case of failures while fetching sync data + self._sync_backoff = 1 + _start_loopingcall(min_chunk_size, state_sync_interval, + self._synchronize_state) + + def _get_tag_dict(self, tags): + return dict((tag.get('scope'), tag['tag']) for tag in tags) + + def _update_neutron_object(self, context, neutron_data, status): + if status == neutron_data['status']: + # do nothing + return + with context.session.begin(subtransactions=True): + LOG.debug(_("Updating status for neutron resource %(q_id)s to: " + "%(status)s"), {'q_id': neutron_data['id'], + 'status': status}) + neutron_data['status'] = status + context.session.add(neutron_data) + + def synchronize_network(self, context, neutron_network_data, + lswitches=None): + """Synchronize a Neutron network with its NVP counterpart. + + This routine synchronizes a set of switches when a Neutron + network is mapped to multiple lswitches. + """ + if not lswitches: + # Try to get logical switches from nvp + try: + lswitches = nvplib.get_lswitches( + self._cluster, neutron_network_data['id']) + except exceptions.NetworkNotFound: + # TODO(salv-orlando): We should be catching + # NvpApiClient.ResourceNotFound here + # The logical switch was not found + LOG.warning(_("Logical switch for neutron network %s not " + "found on NVP."), neutron_network_data['id']) + lswitches = [] + else: + for lswitch in lswitches: + self._nvp_cache.update_lswitch(lswitch) + # By default assume things go wrong + status = constants.NET_STATUS_ERROR + # In most cases lswitches will contain a single element + for ls in lswitches: + if not ls: + # Logical switch was deleted + break + ls_status = ls['_relations']['LogicalSwitchStatus'] + if not ls_status['fabric_status']: + status = constants.NET_STATUS_DOWN + break + else: + # No switch was down or missing. Set status to ACTIVE unless + # there were no switches in the first place! + if lswitches: + status = constants.NET_STATUS_ACTIVE + # Update db object + self._update_neutron_object(context, neutron_network_data, status) + + def _synchronize_lswitches(self, ctx, ls_uuids, scan_missing=False): + if not ls_uuids and not scan_missing: + return + neutron_net_ids = set() + neutron_nvp_mappings = {} + # TODO(salvatore-orlando): Deal with the case the tag + # has been tampered with + for ls_uuid in ls_uuids: + # If the lswitch has been deleted, get backup copy of data + lswitch = (self._nvp_cache[ls_uuid].get('data') or + self._nvp_cache[ls_uuid].get('data_bk')) + tags = self._get_tag_dict(lswitch['tags']) + neutron_id = tags.get('neutron_net_id', ls_uuid) + neutron_net_ids.add(neutron_id) + neutron_nvp_mappings[neutron_id] = ( + neutron_nvp_mappings.get(neutron_id, []) + + [self._nvp_cache[ls_uuid]]) + with ctx.session.begin(subtransactions=True): + # Fetch neutron networks from database + filters = {'router:external': [False]} + if not scan_missing: + filters['id'] = neutron_net_ids + # TODO(salv-orlando): Filter out external networks + for network in self._plugin._get_collection_query( + ctx, models_v2.Network, filters=filters): + lswitches = neutron_nvp_mappings.get(network['id'], []) + lswitches = [lswitch.get('data') for lswitch in lswitches] + self.synchronize_network(ctx, network, lswitches) + + def synchronize_router(self, context, neutron_router_data, + lrouter=None): + """Synchronize a neutron router with its NVP counterpart.""" + if not lrouter: + # Try to get router from nvp + try: + # This query will return the logical router status too + lrouter = nvplib.get_lrouter( + self._cluster, neutron_router_data['id']) + except exceptions.NotFound: + # NOTE(salv-orlando): We should be catching + # NvpApiClient.ResourceNotFound here + # The logical router was not found + LOG.warning(_("Logical router for neutron router %s not " + "found on NVP."), neutron_router_data['id']) + lrouter = None + else: + # Update the cache + self._nvp_cache.update_lrouter(lrouter) + + # Note(salv-orlando): It might worth adding a check to verify neutron + # resource tag in nvp entity matches a Neutron id. + # By default assume things go wrong + status = constants.NET_STATUS_ERROR + if lrouter: + lr_status = (lrouter['_relations'] + ['LogicalRouterStatus'] + ['fabric_status']) + status = (lr_status and + constants.NET_STATUS_ACTIVE + or constants.NET_STATUS_DOWN) + # Update db object + self._update_neutron_object(context, neutron_router_data, status) + + def _synchronize_lrouters(self, ctx, lr_uuids, scan_missing=False): + if not lr_uuids and not scan_missing: + return + neutron_router_mappings = ( + dict((lr_uuid, self._nvp_cache[lr_uuid]) for lr_uuid in lr_uuids)) + with ctx.session.begin(subtransactions=True): + # Fetch neutron routers from database + filters = ({} if scan_missing else + {'id': neutron_router_mappings.keys()}) + for router in self._plugin._get_collection_query( + ctx, l3_db.Router, filters=filters): + lrouter = neutron_router_mappings.get(router['id']) + self.synchronize_router( + ctx, router, lrouter and lrouter.get('data')) + + def synchronize_port(self, context, neutron_port_data, + lswitchport=None, ext_networks=None): + """Synchronize a Neutron port with its NVP counterpart.""" + # Skip synchronization for ports on external networks + if not ext_networks: + ext_networks = [net['id'] for net in context.session.query( + models_v2.Network).join( + l3_db.ExternalNetwork, + (models_v2.Network.id == + l3_db.ExternalNetwork.network_id))] + if neutron_port_data['network_id'] in ext_networks: + with context.session.begin(subtransactions=True): + neutron_port_data['status'] = constants.PORT_STATUS_ACTIVE + return + + if not lswitchport: + # Try to get port from nvp + try: + lp_uuid = self._plugin._nvp_get_port_id( + context, self._cluster, neutron_port_data) + if lp_uuid: + lswitchport = nvplib.get_port( + self._cluster, neutron_port_data['network_id'], + lp_uuid, relations='LogicalPortStatus') + except exceptions.PortNotFoundOnNetwork: + # NOTE(salv-orlando): We should be catching + # NvpApiClient.ResourceNotFound here + # The logical switch port was not found + LOG.warning(_("Logical switch port for neutron port %s " + "not found on NVP."), neutron_port_data['id']) + lswitchport = None + else: + # Update the cache + self._nvp_cache.update_lswitchport(lswitchport) + + # Note(salv-orlando): It might worth adding a check to verify neutron + # resource tag in nvp entity matches Neutron id. + # By default assume things go wrong + status = constants.PORT_STATUS_ERROR + if lswitchport: + lp_status = (lswitchport['_relations'] + ['LogicalPortStatus'] + ['link_status_up']) + status = (lp_status and + constants.PORT_STATUS_ACTIVE + or constants.PORT_STATUS_DOWN) + # Update db object + self._update_neutron_object(context, neutron_port_data, status) + + def _synchronize_lswitchports(self, ctx, lp_uuids, scan_missing=False): + if not lp_uuids and not scan_missing: + return + # Find Neutron port id by tag - the tag is already + # loaded in memory, no reason for doing a db query + # TODO(salvatore-orlando): Deal with the case the tag + # has been tampered with + neutron_port_mappings = {} + for lp_uuid in lp_uuids: + lport = (self._nvp_cache[lp_uuid].get('data') or + self._nvp_cache[lp_uuid].get('data_bk')) + tags = self._get_tag_dict(lport['tags']) + neutron_port_id = tags.get('q_port_id') + if neutron_port_id: + neutron_port_mappings[neutron_port_id] = ( + self._nvp_cache[lp_uuid]) + with ctx.session.begin(subtransactions=True): + # Fetch neutron ports from database + # At the first sync we need to fetch all ports + filters = ({} if scan_missing else + {'id': neutron_port_mappings.keys()}) + # TODO(salv-orlando): Work out a solution for avoiding + # this query + ext_nets = [net['id'] for net in ctx.session.query( + models_v2.Network).join( + l3_db.ExternalNetwork, + (models_v2.Network.id == + l3_db.ExternalNetwork.network_id))] + for port in self._plugin._get_collection_query( + ctx, models_v2.Port, filters=filters): + lswitchport = neutron_port_mappings.get(port['id']) + self.synchronize_port( + ctx, port, lswitchport and lswitchport.get('data'), + ext_networks=ext_nets) + + def _get_chunk_size(self, sp): + # NOTE(salv-orlando): Try to use __future__ for this routine only? + ratio = ((float(sp.total_size) / float(sp.chunk_size)) / + (float(self._sync_interval) / float(self._req_delay))) + new_size = max(1.0, ratio) * float(sp.chunk_size) + return int(new_size) + (new_size - int(new_size) > 0) + + def _fetch_data(self, uri, cursor, page_size): + # If not cursor there is nothing to retrieve + if cursor: + if cursor == 'start': + cursor = None + # Chunk size tuning might, in some conditions, make it larger + # than 5,000, which is the maximum page size allowed by the NVP + # API. In this case the request should be split in multiple + # requests. This is not ideal, and therefore a log warning will + # be emitted. + requests = range(0, page_size / (nvplib.MAX_PAGE_SIZE + 1) + 1) + if len(requests) > 1: + LOG.warn(_("Requested page size is %(cur_chunk_size)d." + "It might be necessary to do %(num_requests)d " + "round-trips to NVP for fetching data. Please " + "tune sync parameters to ensure chunk size " + "is less than %(max_page_size)d"), + {'cur_chunk_size': page_size, + 'num_requests': len(requests), + 'max_page_size': nvplib.MAX_PAGE_SIZE}) + results = [] + actual_size = 0 + for _req in requests: + req_results, cursor, req_size = nvplib.get_single_query_page( + uri, self._cluster, cursor, + min(page_size, nvplib.MAX_PAGE_SIZE)) + results.extend(req_results) + actual_size = actual_size + req_size + # If no cursor is returned break the cycle as there is no + # actual need to perform multiple requests (all fetched) + # This happens when the overall size of resources exceeds + # the maximum page size, but the number for each single + # resource type is below this threshold + if not cursor: + break + # reset cursor before returning if we queried just to + # know the number of entities + return results, cursor if page_size else 'start', actual_size + return [], cursor, None + + def _fetch_nvp_data_chunk(self, sp): + base_chunk_size = sp.chunk_size + chunk_size = base_chunk_size + sp.extra_chunk_size + LOG.info(_("Fetching up to %s resources " + "from NVP backend"), chunk_size) + fetched = ls_count = lr_count = lp_count = 0 + lswitches = lrouters = lswitchports = [] + if sp.ls_cursor or sp.ls_cursor == 'start': + (lswitches, sp.ls_cursor, ls_count) = self._fetch_data( + self.LS_URI, sp.ls_cursor, chunk_size) + fetched = len(lswitches) + if fetched < chunk_size and sp.lr_cursor or sp.lr_cursor == 'start': + (lrouters, sp.lr_cursor, lr_count) = self._fetch_data( + self.LR_URI, sp.lr_cursor, max(chunk_size - fetched, 0)) + fetched += len(lrouters) + if fetched < chunk_size and sp.lp_cursor or sp.lp_cursor == 'start': + (lswitchports, sp.lp_cursor, lp_count) = self._fetch_data( + self.LP_URI, sp.lp_cursor, max(chunk_size - fetched, 0)) + fetched += len(lswitchports) + if sp.current_chunk == 0: + # No cursors were provided. Then it must be possible to + # calculate the total amount of data to fetch + sp.total_size = ls_count + lr_count + lp_count + LOG.debug(_("Total data size: %d"), sp.total_size) + sp.chunk_size = self._get_chunk_size(sp) + # Calculate chunk size adjustment + sp.extra_chunk_size = sp.chunk_size - base_chunk_size + LOG.debug(_("Fetched %(num_lswitches)d logical switches, " + "%(num_lswitchports)d logical switch ports," + "%(num_lrouters)d logical routers"), + {'num_lswitches': len(lswitches), + 'num_lswitchports': len(lswitchports), + 'num_lrouters': len(lrouters)}) + return (lswitches, lrouters, lswitchports) + + def _synchronize_state(self, sp): + # If the plugin has been destroyed, stop the LoopingCall + if not self._plugin: + raise loopingcall.LoopingCallDone + start = timeutils.utcnow() + # Reset page cursor variables if necessary + if sp.current_chunk == 0: + sp.ls_cursor = sp.lr_cursor = sp.lp_cursor = 'start' + LOG.info(_("Running state synchronization task. Chunk: %s"), + sp.current_chunk) + # Fetch chunk_size data from NVP + try: + (lswitches, lrouters, lswitchports) = ( + self._fetch_nvp_data_chunk(sp)) + except (NvpApiClient.RequestTimeout, NvpApiClient.NvpApiException): + sleep_interval = self._sync_backoff + # Cap max back off to 64 seconds + self._sync_backoff = min(self._sync_backoff * 2, 64) + LOG.exception(_("An error occured while communicating with " + "NVP backend. Will retry synchronization " + "in %d seconds"), sleep_interval) + return sleep_interval + LOG.debug(_("Time elapsed querying NVP: %s"), + timeutils.utcnow() - start) + if sp.total_size: + num_chunks = ((sp.total_size / sp.chunk_size) + + (sp.total_size % sp.chunk_size != 0)) + else: + num_chunks = 1 + LOG.debug(_("Number of chunks: %d"), num_chunks) + # Find objects which have changed on NVP side and need + # to be synchronized + (ls_uuids, lr_uuids, lp_uuids) = self._nvp_cache.process_updates( + lswitches, lrouters, lswitchports) + # Process removed objects only at the last chunk + scan_missing = (sp.current_chunk == num_chunks - 1 and + not sp.init_sync_performed) + if sp.current_chunk == num_chunks - 1: + self._nvp_cache.process_deletes() + ls_uuids = self._nvp_cache.get_lswitches( + changed_only=not scan_missing) + lr_uuids = self._nvp_cache.get_lrouters( + changed_only=not scan_missing) + lp_uuids = self._nvp_cache.get_lswitchports( + changed_only=not scan_missing) + LOG.debug(_("Time elapsed hashing data: %s"), + timeutils.utcnow() - start) + # Get an admin context + ctx = context.get_admin_context() + # Synchronize with database + with ctx.session.begin(subtransactions=True): + self._synchronize_lswitches(ctx, ls_uuids, + scan_missing=scan_missing) + self._synchronize_lrouters(ctx, lr_uuids, + scan_missing=scan_missing) + self._synchronize_lswitchports(ctx, lp_uuids, + scan_missing=scan_missing) + # Increase chunk counter + LOG.info(_("Synchronization for chunk %(chunk_num)d of " + "%(total_chunks)d performed"), + {'chunk_num': sp.current_chunk + 1, + 'total_chunks': num_chunks}) + sp.current_chunk = (sp.current_chunk + 1) % num_chunks + added_delay = 0 + if sp.current_chunk == 0: + # Ensure init_sync_performed is True + if not sp.init_sync_performed: + sp.init_sync_performed = True + # Add additional random delay + added_delay = random.randint(0, self._max_rand_delay) + LOG.debug(_("Time elapsed at end of sync: %s"), + timeutils.utcnow() - start) + return self._sync_interval / num_chunks + added_delay diff --git a/neutron/plugins/nicira/nvplib.py b/neutron/plugins/nicira/nvplib.py index 2b0e6e5b48..646a214909 100644 --- a/neutron/plugins/nicira/nvplib.py +++ b/neutron/plugins/nicira/nvplib.py @@ -66,7 +66,10 @@ SNAT_KEYS = ["to_src_port_min", "to_src_port_max", "to_src_ip_min", "to_src_ip_max"] DNAT_KEYS = ["to_dst_port", "to_dst_ip_min", "to_dst_ip_max"] - +# Maximum page size for a single request +# NOTE(salv-orlando): This might become a version-dependent map should the +# limit be raised in future versions +MAX_PAGE_SIZE = 5000 # TODO(bgh): it would be more efficient to use a bitmap taken_context_ids = [] @@ -157,21 +160,34 @@ def get_cluster_version(cluster): return version +def get_single_query_page(path, cluster, page_cursor=None, + page_length=1000, neutron_only=True): + params = [] + if page_cursor: + params.append("_page_cursor=%s" % page_cursor) + params.append("_page_length=%s" % page_length) + # NOTE(salv-orlando): On the NVP backend the 'Quantum' tag is still + # used for marking Neutron entities in order to preserve compatibility + if neutron_only: + params.append("tag_scope=quantum") + query_params = "&".join(params) + path = "%s%s%s" % (path, "&" if (path.find("?") != -1) else "?", + query_params) + body = do_request(HTTP_GET, path, cluster=cluster) + # Result_count won't be returned if _page_cursor is supplied + return body['results'], body.get('page_cursor'), body.get('result_count') + + def get_all_query_pages(path, c): need_more_results = True result_list = [] page_cursor = None - query_marker = "&" if (path.find("?") != -1) else "?" while need_more_results: - page_cursor_str = ( - "_page_cursor=%s" % page_cursor if page_cursor else "") - body = do_request(HTTP_GET, - "%s%s%s" % (path, query_marker, page_cursor_str), - cluster=c) - page_cursor = body.get('page_cursor') + results, page_cursor = get_single_query_page( + path, c, page_cursor)[:2] if not page_cursor: need_more_results = False - result_list.extend(body['results']) + result_list.extend(results) return result_list diff --git a/neutron/tests/unit/nicira/etc/fake_get_lrouter.json b/neutron/tests/unit/nicira/etc/fake_get_lrouter.json index 9bda5b4764..9425ad654d 100644 --- a/neutron/tests/unit/nicira/etc/fake_get_lrouter.json +++ b/neutron/tests/unit/nicira/etc/fake_get_lrouter.json @@ -19,7 +19,7 @@ "lport_admin_up_count": %(lport_count)d, "_schema": "/ws.v1/schema/LogicalRouterStatus", "lport_count": %(lport_count)d, - "fabric_status": true, + "fabric_status": %(status)s, "type": "LogicalRouterStatus", "lport_link_up_count": %(lport_count)d } diff --git a/neutron/tests/unit/nicira/etc/fake_get_lswitch.json b/neutron/tests/unit/nicira/etc/fake_get_lswitch.json index 58b132b307..a55d508c71 100644 --- a/neutron/tests/unit/nicira/etc/fake_get_lswitch.json +++ b/neutron/tests/unit/nicira/etc/fake_get_lswitch.json @@ -2,7 +2,7 @@ "_href": "/ws.v1/lswitch/%(uuid)s", "_schema": "/ws.v1/schema/LogicalSwitchConfig", "_relations": {"LogicalSwitchStatus": - {"fabric_status": true, + {"fabric_status": %(status)s, "type": "LogicalSwitchStatus", "lport_count": %(lport_count)d, "_href": "/ws.v1/lswitch/%(uuid)s/status", diff --git a/neutron/tests/unit/nicira/etc/fake_get_lswitch_lport.json b/neutron/tests/unit/nicira/etc/fake_get_lswitch_lport.json index cfa7aed463..3e5cb90c20 100644 --- a/neutron/tests/unit/nicira/etc/fake_get_lswitch_lport.json +++ b/neutron/tests/unit/nicira/etc/fake_get_lswitch_lport.json @@ -3,7 +3,8 @@ {"LogicalPortStatus": {"type": "LogicalSwitchPortStatus", "admin_status_enabled": true, - "fabric_status_up": false, + "fabric_status_up": %(status)s, + "link_status_up": %(status)s, "_href": "/ws.v1/lswitch/%(ls_uuid)s/lport/%(uuid)s/status", "_schema": "/ws.v1/schema/LogicalSwitchPortStatus"}, "LogicalSwitchConfig": diff --git a/neutron/tests/unit/nicira/fake_nvpapiclient.py b/neutron/tests/unit/nicira/fake_nvpapiclient.py index 1133b0e7e0..e8b6f5b410 100644 --- a/neutron/tests/unit/nicira/fake_nvpapiclient.py +++ b/neutron/tests/unit/nicira/fake_nvpapiclient.py @@ -106,17 +106,6 @@ class FakeClient: LROUTER_LPORT_RESOURCE: ['LogicalPortAttachment'], } - _fake_lswitch_dict = {} - _fake_lrouter_dict = {} - _fake_lswitch_lport_dict = {} - _fake_lrouter_lport_dict = {} - _fake_lrouter_nat_dict = {} - _fake_lswitch_lportstatus_dict = {} - _fake_lrouter_lportstatus_dict = {} - _fake_securityprofile_dict = {} - _fake_lqueue_dict = {} - _fake_gatewayservice_dict = {} - _validators = { LSWITCH_RESOURCE: _validate_resource, LSWITCH_LPORT_RESOURCE: _validate_resource, @@ -128,6 +117,16 @@ class FakeClient: def __init__(self, fake_files_path): self.fake_files_path = fake_files_path + self._fake_lswitch_dict = {} + self._fake_lrouter_dict = {} + self._fake_lswitch_lport_dict = {} + self._fake_lrouter_lport_dict = {} + self._fake_lrouter_nat_dict = {} + self._fake_lswitch_lportstatus_dict = {} + self._fake_lrouter_lportstatus_dict = {} + self._fake_securityprofile_dict = {} + self._fake_lqueue_dict = {} + self._fake_gatewayservice_dict = {} def _get_tag(self, resource, scope): tags = [tag['tag'] for tag in resource['tags'] @@ -136,7 +135,7 @@ class FakeClient: def _get_filters(self, querystring): if not querystring: - return (None, None) + return (None, None, None) params = urlparse.parse_qs(querystring) tag_filter = None attr_filter = None @@ -145,7 +144,15 @@ class FakeClient: 'tag': params['tag'][0]} elif 'uuid' in params: attr_filter = {'uuid': params['uuid'][0]} - return (tag_filter, attr_filter) + # Handle page_length + # TODO(salv-orlando): Handle page cursor too + page_len = params.get('_page_length') + if page_len: + page_len = int(page_len[0]) + else: + # Explicitly set it to None (avoid 0 or empty list) + page_len = None + return (tag_filter, attr_filter, page_len) def _add_lswitch(self, body): fake_lswitch = json.loads(body) @@ -157,6 +164,8 @@ class FakeClient: fake_lswitch['zone_uuid'] = zone_uuid fake_lswitch['tenant_id'] = self._get_tag(fake_lswitch, 'os_tid') fake_lswitch['lport_count'] = 0 + # set status value + fake_lswitch['status'] = 'true' return fake_lswitch def _build_lrouter(self, body, uuid=None): @@ -183,6 +192,8 @@ class FakeClient: uuidutils.generate_uuid()) self._fake_lrouter_dict[fake_lrouter['uuid']] = fake_lrouter fake_lrouter['lport_count'] = 0 + # set status value + fake_lrouter['status'] = 'true' return fake_lrouter def _add_lqueue(self, body): @@ -213,6 +224,8 @@ class FakeClient: fake_lport_status['ls_uuid'] = fake_lswitch['uuid'] fake_lport_status['ls_name'] = fake_lswitch['display_name'] fake_lport_status['ls_zone_uuid'] = fake_lswitch['zone_uuid'] + # set status value + fake_lport['status'] = 'true' self._fake_lswitch_lportstatus_dict[new_uuid] = fake_lport_status return fake_lport @@ -356,7 +369,7 @@ class FakeClient: def _list(self, resource_type, response_file, parent_uuid=None, query=None, relations=None): - (tag_filter, attr_filter) = self._get_filters(query) + (tag_filter, attr_filter, page_len) = self._get_filters(query) with open("%s/%s" % (self.fake_files_path, response_file)) as f: response_template = f.read() res_dict = getattr(self, '_fake_%s_dict' % resource_type) @@ -425,8 +438,20 @@ class FakeClient: if (parent_func(res_uuid) and _tag_match(res_uuid) and _attr_match(res_uuid))] - return json.dumps({'results': items, - 'result_count': len(items)}) + # Rather inefficient, but hey this is just a mock! + next_cursor = None + total_items = len(items) + if page_len: + try: + next_cursor = items[page_len]['uuid'] + except IndexError: + next_cursor = None + items = items[:page_len] + response_dict = {'results': items, + 'result_count': total_items} + if next_cursor: + response_dict['page_cursor'] = next_cursor + return json.dumps(response_dict) def _show(self, resource_type, response_file, uuid1, uuid2=None, relations=None): diff --git a/neutron/tests/unit/nicira/test_agent_scheduler.py b/neutron/tests/unit/nicira/test_agent_scheduler.py index aad015405e..7bd2af700b 100644 --- a/neutron/tests/unit/nicira/test_agent_scheduler.py +++ b/neutron/tests/unit/nicira/test_agent_scheduler.py @@ -16,6 +16,7 @@ import mock from neutron.common.test_lib import test_config +from neutron.plugins.nicira.common import sync from neutron.tests.unit.nicira import fake_nvpapiclient from neutron.tests.unit.nicira import get_fake_conf from neutron.tests.unit.nicira import NVPAPI_NAME @@ -35,6 +36,9 @@ class NVPDhcpAgentNotifierTestCase(test_base.OvsDhcpAgentNotifierTestCase): self.fc = fake_nvpapiclient.FakeClient(STUBS_PATH) self.mock_nvpapi = mock.patch(NVPAPI_NAME, autospec=True) instance = self.mock_nvpapi.start() + # Avoid runs of the synchronizer looping call + patch_sync = mock.patch.object(sync, '_start_loopingcall') + patch_sync.start() def _fake_request(*args, **kwargs): return self.fc.fake_request(*args, **kwargs) @@ -44,6 +48,7 @@ class NVPDhcpAgentNotifierTestCase(test_base.OvsDhcpAgentNotifierTestCase): instance.return_value.request.side_effect = _fake_request super(NVPDhcpAgentNotifierTestCase, self).setUp() self.addCleanup(self.fc.reset_all) + self.addCleanup(patch_sync.stop) self.addCleanup(self.mock_nvpapi.stop) def _notification_mocks(self, hosts, mock_dhcp, net, subnet, port): diff --git a/neutron/tests/unit/nicira/test_maclearning.py b/neutron/tests/unit/nicira/test_maclearning.py index 9534ccae0b..2df2e74191 100644 --- a/neutron/tests/unit/nicira/test_maclearning.py +++ b/neutron/tests/unit/nicira/test_maclearning.py @@ -24,6 +24,7 @@ from neutron.api.v2 import attributes from neutron.common.test_lib import test_config from neutron import context from neutron.extensions import agent +from neutron.plugins.nicira.common import sync from neutron.plugins.nicira.NvpApiClient import NVPVersion from neutron.tests.unit.nicira import fake_nvpapiclient from neutron.tests.unit.nicira import get_fake_conf @@ -70,6 +71,9 @@ class MacLearningDBTestCase(test_db_plugin.NeutronDbPluginV2TestCase): self.fc = fake_nvpapiclient.FakeClient(STUBS_PATH) self.mock_nvpapi = mock.patch(NVPAPI_NAME, autospec=True) instance = self.mock_nvpapi.start() + # Avoid runs of the synchronizer looping call + patch_sync = mock.patch.object(sync, '_start_loopingcall') + patch_sync.start() def _fake_request(*args, **kwargs): return self.fc.fake_request(*args, **kwargs) @@ -80,6 +84,7 @@ class MacLearningDBTestCase(test_db_plugin.NeutronDbPluginV2TestCase): cfg.CONF.set_override('metadata_mode', None, 'NVP') self.addCleanup(self.fc.reset_all) self.addCleanup(self.mock_nvpapi.stop) + self.addCleanup(patch_sync.stop) self.addCleanup(self.restore_resource_attribute_map) self.addCleanup(cfg.CONF.reset) super(MacLearningDBTestCase, self).setUp() diff --git a/neutron/tests/unit/nicira/test_nicira_plugin.py b/neutron/tests/unit/nicira/test_nicira_plugin.py index f2dc40a6ed..d51d527c0a 100644 --- a/neutron/tests/unit/nicira/test_nicira_plugin.py +++ b/neutron/tests/unit/nicira/test_nicira_plugin.py @@ -34,6 +34,7 @@ from neutron.extensions import securitygroup as secgrp from neutron import manager from neutron.openstack.common import uuidutils from neutron.plugins.nicira.common import exceptions as nvp_exc +from neutron.plugins.nicira.common import sync from neutron.plugins.nicira.dbexts import nicira_db from neutron.plugins.nicira.dbexts import nicira_qos_db as qos_db from neutron.plugins.nicira.extensions import distributedrouter as dist_router @@ -92,6 +93,9 @@ class NiciraPluginV2TestCase(test_plugin.NeutronDbPluginV2TestCase): self.fc = fake_nvpapiclient.FakeClient(STUBS_PATH) self.mock_nvpapi = mock.patch(NVPAPI_NAME, autospec=True) self.mock_instance = self.mock_nvpapi.start() + # Avoid runs of the synchronizer looping call + patch_sync = mock.patch.object(sync, '_start_loopingcall') + patch_sync.start() def _fake_request(*args, **kwargs): return self.fc.fake_request(*args, **kwargs) @@ -106,6 +110,7 @@ class NiciraPluginV2TestCase(test_plugin.NeutronDbPluginV2TestCase): cfg.CONF.set_override('metadata_mode', None, 'NVP') self.addCleanup(self.fc.reset_all) self.addCleanup(self.mock_nvpapi.stop) + self.addCleanup(patch_sync.stop) class TestNiciraBasicGet(test_plugin.TestBasicGet, NiciraPluginV2TestCase): @@ -325,6 +330,9 @@ class NiciraPortSecurityTestCase(psec.PortSecurityDBTestCase): self.mock_nvpapi = mock.patch(NVPAPI_NAME, autospec=True) instance = self.mock_nvpapi.start() instance.return_value.login.return_value = "the_cookie" + # Avoid runs of the synchronizer looping call + patch_sync = mock.patch.object(sync, '_start_loopingcall') + patch_sync.start() def _fake_request(*args, **kwargs): return self.fc.fake_request(*args, **kwargs) @@ -333,6 +341,7 @@ class NiciraPortSecurityTestCase(psec.PortSecurityDBTestCase): super(NiciraPortSecurityTestCase, self).setUp(PLUGIN_NAME) self.addCleanup(self.fc.reset_all) self.addCleanup(self.mock_nvpapi.stop) + self.addCleanup(patch_sync.stop) class TestNiciraPortSecurity(NiciraPortSecurityTestCase, @@ -349,17 +358,18 @@ class NiciraSecurityGroupsTestCase(ext_sg.SecurityGroupDBTestCase): self.mock_nvpapi = mock.patch(NVPAPI_NAME, autospec=True) instance = self.mock_nvpapi.start() instance.return_value.login.return_value = "the_cookie" + # Avoid runs of the synchronizer looping call + patch_sync = mock.patch.object(sync, '_start_loopingcall') + patch_sync.start() def _fake_request(*args, **kwargs): return fc.fake_request(*args, **kwargs) instance.return_value.request.side_effect = _fake_request + self.addCleanup(self.mock_nvpapi.stop) + self.addCleanup(patch_sync.stop) super(NiciraSecurityGroupsTestCase, self).setUp(PLUGIN_NAME) - def tearDown(self): - super(NiciraSecurityGroupsTestCase, self).tearDown() - self.mock_nvpapi.stop() - class TestNiciraSecurityGroup(ext_sg.TestSecurityGroups, NiciraSecurityGroupsTestCase): @@ -1125,20 +1135,12 @@ class NiciraNeutronNVPOutOfSync(NiciraPluginV2TestCase, res = req.get_response(self.api) self.assertEqual(res.status_int, 204) - def test_list_networks_not_in_nvp(self): - res = self._create_network('json', 'net1', True) - self.deserialize('json', res) - self.fc._fake_lswitch_dict.clear() - req = self.new_list_request('networks') - nets = self.deserialize('json', req.get_response(self.api)) - self.assertEqual(nets['networks'][0]['status'], - constants.NET_STATUS_ERROR) - def test_show_network_not_in_nvp(self): res = self._create_network('json', 'net1', True) net = self.deserialize('json', res) self.fc._fake_lswitch_dict.clear() - req = self.new_show_request('networks', net['network']['id']) + req = self.new_show_request('networks', net['network']['id'], + fields=['id', 'status']) net = self.deserialize('json', req.get_response(self.api)) self.assertEqual(net['network']['status'], constants.NET_STATUS_ERROR) @@ -1153,17 +1155,6 @@ class NiciraNeutronNVPOutOfSync(NiciraPluginV2TestCase, res = req.get_response(self.api) self.assertEqual(res.status_int, 204) - def test_list_port_not_in_nvp(self): - res = self._create_network('json', 'net1', True) - net1 = self.deserialize('json', res) - res = self._create_port('json', net1['network']['id']) - self.deserialize('json', res) - self.fc._fake_lswitch_lport_dict.clear() - req = self.new_list_request('ports') - nets = self.deserialize('json', req.get_response(self.api)) - self.assertEqual(nets['ports'][0]['status'], - constants.PORT_STATUS_ERROR) - def test_show_port_not_in_nvp(self): res = self._create_network('json', 'net1', True) net1 = self.deserialize('json', res) @@ -1171,7 +1162,8 @@ class NiciraNeutronNVPOutOfSync(NiciraPluginV2TestCase, port = self.deserialize('json', res) self.fc._fake_lswitch_lport_dict.clear() self.fc._fake_lswitch_lportstatus_dict.clear() - req = self.new_show_request('ports', port['port']['id']) + req = self.new_show_request('ports', port['port']['id'], + fields=['id', 'status']) net = self.deserialize('json', req.get_response(self.api)) self.assertEqual(net['port']['status'], constants.PORT_STATUS_ERROR) @@ -1218,20 +1210,12 @@ class NiciraNeutronNVPOutOfSync(NiciraPluginV2TestCase, res = req.get_response(self.ext_api) self.assertEqual(res.status_int, 204) - def test_list_routers_not_in_nvp(self): - res = self._create_router('json', 'tenant') - self.deserialize('json', res) - self.fc._fake_lrouter_dict.clear() - req = self.new_list_request('routers') - routers = self.deserialize('json', req.get_response(self.ext_api)) - self.assertEqual(routers['routers'][0]['status'], - constants.NET_STATUS_ERROR) - def test_show_router_not_in_nvp(self): res = self._create_router('json', 'tenant') router = self.deserialize('json', res) self.fc._fake_lrouter_dict.clear() - req = self.new_show_request('routers', router['router']['id']) + req = self.new_show_request('routers', router['router']['id'], + fields=['id', 'status']) router = self.deserialize('json', req.get_response(self.ext_api)) self.assertEqual(router['router']['status'], constants.NET_STATUS_ERROR) diff --git a/neutron/tests/unit/nicira/test_nvp_sync.py b/neutron/tests/unit/nicira/test_nvp_sync.py new file mode 100644 index 0000000000..9b2b28fa0b --- /dev/null +++ b/neutron/tests/unit/nicira/test_nvp_sync.py @@ -0,0 +1,587 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 Nicira Networks, Inc. +# All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import contextlib +import time + +import mock +from oslo.config import cfg + +from neutron.api.v2 import attributes as attr +from neutron.common import config +from neutron.common import constants +from neutron import context +from neutron.openstack.common import jsonutils as json +from neutron.plugins.nicira.common import sync +from neutron.plugins.nicira import NeutronPlugin +from neutron.plugins.nicira import nvp_cluster +from neutron.plugins.nicira import NvpApiClient +from neutron.plugins.nicira import nvplib +from neutron.tests import base +from neutron.tests.unit.nicira import fake_nvpapiclient +from neutron.tests.unit.nicira import get_fake_conf +from neutron.tests.unit.nicira import NVPAPI_NAME +from neutron.tests.unit.nicira import STUBS_PATH +from neutron.tests.unit import test_api_v2 + +from neutron.openstack.common import log + +LOG = log.getLogger(__name__) + +_uuid = test_api_v2._uuid +LSWITCHES = [{'uuid': _uuid(), 'name': 'ls-1'}, + {'uuid': _uuid(), 'name': 'ls-2'}] +LSWITCHPORTS = [{'uuid': _uuid(), 'name': 'lp-1'}, + {'uuid': _uuid(), 'name': 'lp-2'}] +LROUTERS = [{'uuid': _uuid(), 'name': 'lr-1'}, + {'uuid': _uuid(), 'name': 'lr-2'}] + + +class NvpCacheTestCase(base.BaseTestCase): + """Test suite providing coverage for the NvpCache class.""" + + def setUp(self): + self.nvp_cache = sync.NvpCache() + for lswitch in LSWITCHES: + self.nvp_cache._uuid_dict_mappings[lswitch['uuid']] = ( + self.nvp_cache._lswitches) + self.nvp_cache._lswitches[lswitch['uuid']] = ( + {'data': lswitch, + 'hash': hash(json.dumps(lswitch))}) + for lswitchport in LSWITCHPORTS: + self.nvp_cache._uuid_dict_mappings[lswitchport['uuid']] = ( + self.nvp_cache._lswitchports) + self.nvp_cache._lswitchports[lswitchport['uuid']] = ( + {'data': lswitchport, + 'hash': hash(json.dumps(lswitchport))}) + for lrouter in LROUTERS: + self.nvp_cache._uuid_dict_mappings[lrouter['uuid']] = ( + self.nvp_cache._lrouters) + self.nvp_cache._lrouters[lrouter['uuid']] = ( + {'data': lrouter, + 'hash': hash(json.dumps(lrouter))}) + super(NvpCacheTestCase, self).setUp() + + def test_get_lswitches(self): + ls_uuids = self.nvp_cache.get_lswitches() + self.assertEqual(set(ls_uuids), + set([ls['uuid'] for ls in LSWITCHES])) + + def test_get_lswitchports(self): + lp_uuids = self.nvp_cache.get_lswitchports() + self.assertEqual(set(lp_uuids), + set([lp['uuid'] for lp in LSWITCHPORTS])) + + def test_get_lrouters(self): + lr_uuids = self.nvp_cache.get_lrouters() + self.assertEqual(set(lr_uuids), + set([lr['uuid'] for lr in LROUTERS])) + + def test_get_lswitches_changed_only(self): + ls_uuids = self.nvp_cache.get_lswitches(changed_only=True) + self.assertEqual(0, len(ls_uuids)) + + def test_get_lswitchports_changed_only(self): + lp_uuids = self.nvp_cache.get_lswitchports(changed_only=True) + self.assertEqual(0, len(lp_uuids)) + + def test_get_lrouters_changed_only(self): + lr_uuids = self.nvp_cache.get_lrouters(changed_only=True) + self.assertEqual(0, len(lr_uuids)) + + def _verify_update(self, new_resource, changed=True, hit=True): + cached_resource = self.nvp_cache[new_resource['uuid']] + self.assertEqual(new_resource, cached_resource['data']) + self.assertEqual(hit, cached_resource.get('hit', False)) + self.assertEqual(changed, + cached_resource.get('changed', False)) + + def test_update_lswitch_new_item(self): + new_switch_uuid = _uuid() + new_switch = {'uuid': new_switch_uuid, 'name': 'new_switch'} + self.nvp_cache.update_lswitch(new_switch) + self.assertIn(new_switch_uuid, self.nvp_cache._lswitches.keys()) + self._verify_update(new_switch) + + def test_update_lswitch_existing_item(self): + switch = LSWITCHES[0] + switch['name'] = 'new_name' + self.nvp_cache.update_lswitch(switch) + self.assertIn(switch['uuid'], self.nvp_cache._lswitches.keys()) + self._verify_update(switch) + + def test_update_lswitchport_new_item(self): + new_switchport_uuid = _uuid() + new_switchport = {'uuid': new_switchport_uuid, + 'name': 'new_switchport'} + self.nvp_cache.update_lswitchport(new_switchport) + self.assertIn(new_switchport_uuid, + self.nvp_cache._lswitchports.keys()) + self._verify_update(new_switchport) + + def test_update_lswitchport_existing_item(self): + switchport = LSWITCHPORTS[0] + switchport['name'] = 'new_name' + self.nvp_cache.update_lswitchport(switchport) + self.assertIn(switchport['uuid'], + self.nvp_cache._lswitchports.keys()) + self._verify_update(switchport) + + def test_update_lrouter_new_item(self): + new_router_uuid = _uuid() + new_router = {'uuid': new_router_uuid, + 'name': 'new_router'} + self.nvp_cache.update_lrouter(new_router) + self.assertIn(new_router_uuid, + self.nvp_cache._lrouters.keys()) + self._verify_update(new_router) + + def test_update_lrouter_existing_item(self): + router = LROUTERS[0] + router['name'] = 'new_name' + self.nvp_cache.update_lrouter(router) + self.assertIn(router['uuid'], + self.nvp_cache._lrouters.keys()) + self._verify_update(router) + + def test_process_updates_initial(self): + # Clear cache content to simulate first-time filling + self.nvp_cache._lswitches.clear() + self.nvp_cache._lswitchports.clear() + self.nvp_cache._lrouters.clear() + self.nvp_cache.process_updates(LSWITCHES, LROUTERS, LSWITCHPORTS) + for resource in LSWITCHES + LROUTERS + LSWITCHPORTS: + self._verify_update(resource) + + def test_process_updates_no_change(self): + self.nvp_cache.process_updates(LSWITCHES, LROUTERS, LSWITCHPORTS) + for resource in LSWITCHES + LROUTERS + LSWITCHPORTS: + self._verify_update(resource, changed=False) + + def test_process_updates_with_changes(self): + LSWITCHES[0]['name'] = 'altered' + self.nvp_cache.process_updates(LSWITCHES, LROUTERS, LSWITCHPORTS) + for resource in LSWITCHES + LROUTERS + LSWITCHPORTS: + changed = (True if resource['uuid'] == LSWITCHES[0]['uuid'] + else False) + self._verify_update(resource, changed=changed) + + def _test_process_updates_with_removals(self): + lswitches = LSWITCHES[:] + lswitch = lswitches.pop() + self.nvp_cache.process_updates(lswitches, LROUTERS, LSWITCHPORTS) + for resource in LSWITCHES + LROUTERS + LSWITCHPORTS: + hit = (False if resource['uuid'] == lswitch['uuid'] + else True) + self._verify_update(resource, changed=False, hit=hit) + return (lswitch, lswitches) + + def test_process_updates_with_removals(self): + self._test_process_updates_with_removals() + + def test_process_updates_cleanup_after_delete(self): + deleted_lswitch, lswitches = self._test_process_updates_with_removals() + self.nvp_cache.process_deletes() + self.nvp_cache.process_updates(lswitches, LROUTERS, LSWITCHPORTS) + self.assertNotIn(deleted_lswitch['uuid'], self.nvp_cache._lswitches) + + def _verify_delete(self, resource, deleted=True, hit=True): + cached_resource = self.nvp_cache[resource['uuid']] + data_field = 'data_bk' if deleted else 'data' + self.assertEqual(resource, cached_resource[data_field]) + self.assertEqual(hit, cached_resource.get('hit', False)) + self.assertEqual(deleted, + cached_resource.get('changed', False)) + + def _set_hit(self, resources, uuid_to_delete=None): + for resource in resources: + if resource['data']['uuid'] != uuid_to_delete: + resource['hit'] = True + + def test_process_deletes_no_change(self): + # Mark all resources as hit + self._set_hit(self.nvp_cache._lswitches.values()) + self._set_hit(self.nvp_cache._lswitchports.values()) + self._set_hit(self.nvp_cache._lrouters.values()) + self.nvp_cache.process_deletes() + for resource in LSWITCHES + LROUTERS + LSWITCHPORTS: + self._verify_delete(resource, hit=False, deleted=False) + + def test_process_deletes_with_removals(self): + # Mark all resources but one as hit + uuid_to_delete = LSWITCHPORTS[0]['uuid'] + self._set_hit(self.nvp_cache._lswitches.values(), + uuid_to_delete) + self._set_hit(self.nvp_cache._lswitchports.values(), + uuid_to_delete) + self._set_hit(self.nvp_cache._lrouters.values(), + uuid_to_delete) + self.nvp_cache.process_deletes() + for resource in LSWITCHES + LROUTERS + LSWITCHPORTS: + deleted = resource['uuid'] == uuid_to_delete + self._verify_delete(resource, hit=False, deleted=deleted) + + +class SyncLoopingCallTestCase(base.BaseTestCase): + + def test_looping_calls(self): + # Avoid runs of the synchronization process - just start + # the looping call + with mock.patch.object( + sync.NvpSynchronizer, '_synchronize_state', + return_value=0.01): + synchronizer = sync.NvpSynchronizer(None, None, + 100, 0, 0) + time.sleep(0.04999) + self.assertEqual( + 5, synchronizer._synchronize_state.call_count) + + +class NvpSyncTestCase(base.BaseTestCase): + + def setUp(self): + # mock nvp api client + self.fc = fake_nvpapiclient.FakeClient(STUBS_PATH) + mock_nvpapi = mock.patch(NVPAPI_NAME, autospec=True) + # Avoid runs of the synchronizer looping call + # These unit tests will excplicitly invoke synchronization + patch_sync = mock.patch.object(sync, '_start_loopingcall') + self.mock_nvpapi = mock_nvpapi.start() + patch_sync.start() + self.mock_nvpapi.return_value.login.return_value = "the_cookie" + # Emulate tests against NVP 3.x + self.mock_nvpapi.return_value.get_nvp_version.return_value = ( + NvpApiClient.NVPVersion("3.1")) + + def _fake_request(*args, **kwargs): + return self.fc.fake_request(*args, **kwargs) + + self.mock_nvpapi.return_value.request.side_effect = _fake_request + self.fake_cluster = nvp_cluster.NVPCluster( + name='fake-cluster', nvp_controllers=['1.1.1.1:999'], + default_tz_uuid=_uuid(), nvp_user='foo', nvp_password='bar') + self.fake_cluster.api_client = NvpApiClient.NVPApiHelper( + ('1.1.1.1', '999', True), + self.fake_cluster.nvp_user, self.fake_cluster.nvp_password, + self.fake_cluster.req_timeout, self.fake_cluster.http_timeout, + self.fake_cluster.retries, self.fake_cluster.redirects) + # Instantiate Neutron plugin + # and setup needed config variables + args = ['--config-file', get_fake_conf('neutron.conf.test'), + '--config-file', get_fake_conf('nvp.ini.test')] + config.parse(args=args) + self._plugin = NeutronPlugin.NvpPluginV2() + super(NvpSyncTestCase, self).setUp() + self.addCleanup(self.fc.reset_all) + self.addCleanup(patch_sync.stop) + self.addCleanup(mock_nvpapi.stop) + + def tearDown(self): + cfg.CONF.reset() + super(NvpSyncTestCase, self).tearDown() + + @contextlib.contextmanager + def _populate_data(self, ctx, net_size=2, port_size=2, router_size=2): + + def network(idx): + return {'network': {'name': 'net-%s' % idx, + 'admin_state_up': True, + 'shared': False, + 'port_security_enabled': True, + 'tenant_id': 'foo'}} + + def subnet(idx, net_id): + return {'subnet': + {'cidr': '10.10.%s.0/24' % idx, + 'name': 'sub-%s' % idx, + 'gateway_ip': attr.ATTR_NOT_SPECIFIED, + 'allocation_pools': attr.ATTR_NOT_SPECIFIED, + 'ip_version': 4, + 'dns_nameservers': attr.ATTR_NOT_SPECIFIED, + 'host_routes': attr.ATTR_NOT_SPECIFIED, + 'enable_dhcp': True, + 'network_id': net_id, + 'tenant_id': 'foo'}} + + def port(idx, net_id): + return {'port': {'network_id': net_id, + 'name': 'port-%s' % idx, + 'admin_state_up': True, + 'device_id': 'miao', + 'device_owner': 'bau', + 'fixed_ips': attr.ATTR_NOT_SPECIFIED, + 'mac_address': attr.ATTR_NOT_SPECIFIED, + 'tenant_id': 'foo'}} + + def router(idx): + # Use random uuids as names + return {'router': {'name': 'rtr-%s' % idx, + 'admin_state_up': True, + 'tenant_id': 'foo'}} + + networks = [] + ports = [] + routers = [] + for i in range(0, net_size): + net = self._plugin.create_network(ctx, network(i)) + networks.append(net) + self._plugin.create_subnet(ctx, subnet(i, net['id'])) + for j in range(0, port_size): + ports.append(self._plugin.create_port( + ctx, port("%s-%s" % (i, j), net['id']))) + for i in range(0, router_size): + routers.append(self._plugin.create_router(ctx, router(i))) + # Do not return anything as the user does need the actual + # data created + try: + yield + finally: + # Remove everything + for router in routers: + self._plugin.delete_router(ctx, router['id']) + for port in ports: + self._plugin.delete_port(ctx, port['id']) + # This will remove networks and subnets + for network in networks: + self._plugin.delete_network(ctx, network['id']) + + def _get_tag_dict(self, tags): + return dict((tag['scope'], tag['tag']) for tag in tags) + + def _test_sync(self, exp_net_status, + exp_port_status, exp_router_status, + action_callback=None, sp=None): + neutron_net_id = ls_uuid = self.fc._fake_lswitch_dict.keys()[0] + lp_uuid = self.fc._fake_lswitch_lport_dict.keys()[0] + neutron_port_id = self._get_tag_dict( + self.fc._fake_lswitch_lport_dict[lp_uuid]['tags'])['q_port_id'] + neutron_rtr_id = lr_uuid = self.fc._fake_lrouter_dict.keys()[0] + if action_callback: + action_callback(ls_uuid, lp_uuid, lr_uuid) + # Make chunk big enough to read everything + if not sp: + sp = sync.SyncParameters(100) + self._plugin._synchronizer._synchronize_state(sp) + # Verify element is in expected status + # TODO(salv-orlando): Verify status for all elements + ctx = context.get_admin_context() + neutron_net = self._plugin.get_network(ctx, neutron_net_id) + neutron_port = self._plugin.get_port(ctx, neutron_port_id) + neutron_rtr = self._plugin.get_router(ctx, neutron_rtr_id) + self.assertEqual(exp_net_status, neutron_net['status']) + self.assertEqual(exp_port_status, neutron_port['status']) + self.assertEqual(exp_router_status, neutron_rtr['status']) + + def _action_callback_status_down(self, ls_uuid, lp_uuid, lr_uuid): + self.fc._fake_lswitch_dict[ls_uuid]['status'] = 'false' + self.fc._fake_lswitch_lport_dict[lp_uuid]['status'] = 'false' + self.fc._fake_lrouter_dict[lr_uuid]['status'] = 'false' + + def test_initial_sync(self): + ctx = context.get_admin_context() + with self._populate_data(ctx): + self._test_sync( + constants.NET_STATUS_ACTIVE, + constants.PORT_STATUS_ACTIVE, + constants.NET_STATUS_ACTIVE) + + def test_initial_sync_with_resources_down(self): + ctx = context.get_admin_context() + with self._populate_data(ctx): + self._test_sync( + constants.NET_STATUS_DOWN, constants.PORT_STATUS_DOWN, + constants.NET_STATUS_DOWN, self._action_callback_status_down) + + def test_resync_with_resources_down(self): + ctx = context.get_admin_context() + with self._populate_data(ctx): + sp = sync.SyncParameters(100) + self._plugin._synchronizer._synchronize_state(sp) + self._test_sync( + constants.NET_STATUS_DOWN, constants.PORT_STATUS_DOWN, + constants.NET_STATUS_DOWN, self._action_callback_status_down) + + def _action_callback_del_resource(self, ls_uuid, lp_uuid, lr_uuid): + del self.fc._fake_lswitch_dict[ls_uuid] + del self.fc._fake_lswitch_lport_dict[lp_uuid] + del self.fc._fake_lrouter_dict[lr_uuid] + + def test_initial_sync_with_resources_removed(self): + ctx = context.get_admin_context() + with self._populate_data(ctx): + self._test_sync( + constants.NET_STATUS_ERROR, constants.PORT_STATUS_ERROR, + constants.NET_STATUS_ERROR, self._action_callback_del_resource) + + def test_resync_with_resources_removed(self): + ctx = context.get_admin_context() + with self._populate_data(ctx): + sp = sync.SyncParameters(100) + self._plugin._synchronizer._synchronize_state(sp) + self._test_sync( + constants.NET_STATUS_ERROR, constants.PORT_STATUS_ERROR, + constants.NET_STATUS_ERROR, self._action_callback_del_resource) + + def _test_sync_with_chunk_larger_maxpagesize( + self, net_size, port_size, router_size, chunk_size, exp_calls): + ctx = context.get_admin_context() + real_func = nvplib.get_single_query_page + sp = sync.SyncParameters(chunk_size) + with self._populate_data(ctx, net_size=net_size, + port_size=port_size, + router_size=router_size): + with mock.patch.object(nvplib, 'MAX_PAGE_SIZE', 15): + # The following mock is just for counting calls, + # but we will still run the actual function + with mock.patch.object( + nvplib, 'get_single_query_page', + side_effect=real_func) as mock_get_page: + self._test_sync( + constants.NET_STATUS_ACTIVE, + constants.PORT_STATUS_ACTIVE, + constants.NET_STATUS_ACTIVE, + sp=sp) + # As each resource type does not exceed the maximum page size, + # the method should be called once for each resource type + self.assertEqual(exp_calls, mock_get_page.call_count) + + def test_sync_chunk_larger_maxpagesize_no_multiple_requests(self): + # total resource size = 20 + # total size for each resource does not exceed max page size (15) + self._test_sync_with_chunk_larger_maxpagesize( + net_size=5, port_size=2, router_size=5, + chunk_size=20, exp_calls=3) + + def test_sync_chunk_larger_maxpagesize_triggers_multiple_requests(self): + # total resource size = 48 + # total size for each resource does exceed max page size (15) + self._test_sync_with_chunk_larger_maxpagesize( + net_size=16, port_size=1, router_size=16, + chunk_size=48, exp_calls=6) + + def test_sync_multi_chunk(self): + # The fake NVP API client cannot be used for this test + ctx = context.get_admin_context() + # Generate 4 networks, 1 port per network, and 4 routers + with self._populate_data(ctx, net_size=4, port_size=1, router_size=4): + fake_lswitches = json.loads( + self.fc.handle_get('/ws.v1/lswitch'))['results'] + fake_lrouters = json.loads( + self.fc.handle_get('/ws.v1/lrouter'))['results'] + fake_lswitchports = json.loads( + self.fc.handle_get('/ws.v1/lswitch/*/lport'))['results'] + return_values = [ + # Chunk 0 - lswitches + (fake_lswitches, None, 4), + # Chunk 0 - lrouters + (fake_lrouters[:2], 'xxx', 4), + # Chunk 0 - lports (size only) + ([], 'start', 4), + # Chunk 1 - lrouters (2 more) (lswitches are skipped) + (fake_lrouters[2:], None, None), + # Chunk 1 - lports + (fake_lswitchports, None, 4)] + + def fake_fetch_data(*args, **kwargs): + return return_values.pop(0) + + # 2 Chunks, with 6 resources each. + # 1st chunk lswitches and lrouters + # 2nd chunk lrouters and lports + # Mock _fetch_data + with mock.patch.object( + self._plugin._synchronizer, '_fetch_data', + side_effect=fake_fetch_data): + sp = sync.SyncParameters(6) + + def do_chunk(chunk_idx, ls_cursor, lr_cursor, lp_cursor): + self._plugin._synchronizer._synchronize_state(sp) + self.assertEqual(chunk_idx, sp.current_chunk) + self.assertEqual(ls_cursor, sp.ls_cursor) + self.assertEqual(lr_cursor, sp.lr_cursor) + self.assertEqual(lp_cursor, sp.lp_cursor) + + # check 1st chunk + do_chunk(1, None, 'xxx', 'start') + # check 2nd chunk + do_chunk(0, None, None, None) + # Chunk size should have stayed the same + self.assertEqual(sp.chunk_size, 6) + + def test_synchronize_network(self): + ctx = context.get_admin_context() + with self._populate_data(ctx): + # Put a network down to verify synchronization + q_net_id = ls_uuid = self.fc._fake_lswitch_dict.keys()[0] + self.fc._fake_lswitch_dict[ls_uuid]['status'] = 'false' + q_net_data = self._plugin._get_network(ctx, q_net_id) + self._plugin._synchronizer.synchronize_network(ctx, q_net_data) + # Reload from db + q_nets = self._plugin.get_networks(ctx) + for q_net in q_nets: + if q_net['id'] == q_net_id: + exp_status = constants.NET_STATUS_DOWN + else: + exp_status = constants.NET_STATUS_ACTIVE + self.assertEqual(exp_status, q_net['status']) + + def test_synchronize_port(self): + ctx = context.get_admin_context() + with self._populate_data(ctx): + # Put a network down to verify synchronization + lp_uuid = self.fc._fake_lswitch_lport_dict.keys()[0] + lport = self.fc._fake_lswitch_lport_dict[lp_uuid] + q_port_id = self._get_tag_dict(lport['tags'])['q_port_id'] + lport['status'] = 'false' + q_port_data = self._plugin._get_port(ctx, q_port_id) + self._plugin._synchronizer.synchronize_port(ctx, q_port_data) + # Reload from db + q_ports = self._plugin.get_ports(ctx) + for q_port in q_ports: + if q_port['id'] == q_port_id: + exp_status = constants.PORT_STATUS_DOWN + else: + exp_status = constants.PORT_STATUS_ACTIVE + self.assertEqual(exp_status, q_port['status']) + + def test_synchronize_router(self): + ctx = context.get_admin_context() + with self._populate_data(ctx): + # Put a network down to verify synchronization + q_rtr_id = lr_uuid = self.fc._fake_lrouter_dict.keys()[0] + self.fc._fake_lrouter_dict[lr_uuid]['status'] = 'false' + q_rtr_data = self._plugin._get_router(ctx, q_rtr_id) + self._plugin._synchronizer.synchronize_router(ctx, q_rtr_data) + # Reload from db + q_routers = self._plugin.get_routers(ctx) + for q_rtr in q_routers: + if q_rtr['id'] == q_rtr_id: + exp_status = constants.NET_STATUS_DOWN + else: + exp_status = constants.NET_STATUS_ACTIVE + self.assertEqual(exp_status, q_rtr['status']) + + def test_sync_nvp_failure_backoff(self): + self.mock_nvpapi.return_value.request.side_effect = ( + NvpApiClient.RequestTimeout) + # chunk size won't matter here + sp = sync.SyncParameters(999) + for i in range(0, 10): + self.assertEqual( + min(64, 2 ** i), + self._plugin._synchronizer._synchronize_state(sp)) diff --git a/neutron/tests/unit/nicira/test_nvpopts.py b/neutron/tests/unit/nicira/test_nvpopts.py index 3d5b2587e9..891f11fbd6 100644 --- a/neutron/tests/unit/nicira/test_nvpopts.py +++ b/neutron/tests/unit/nicira/test_nvpopts.py @@ -16,6 +16,7 @@ import fixtures import testtools +import mock from oslo.config import cfg from neutron.common import config as q_config @@ -23,6 +24,7 @@ from neutron.manager import NeutronManager from neutron.openstack.common import uuidutils from neutron.plugins.nicira.common import config # noqa from neutron.plugins.nicira.common import exceptions +from neutron.plugins.nicira.common import sync from neutron.plugins.nicira import nvp_cluster from neutron.tests.unit.nicira import get_fake_conf from neutron.tests.unit.nicira import PLUGIN_NAME @@ -81,6 +83,10 @@ class ConfigurationTest(testtools.TestCase): self.useFixture(fixtures.MonkeyPatch( 'neutron.manager.NeutronManager._instance', None)) + # Avoid runs of the synchronizer looping call + patch_sync = mock.patch.object(sync, '_start_loopingcall') + patch_sync.start() + self.addCleanup(patch_sync.stop) def _assert_required_options(self, cluster): self.assertEqual(cluster.nvp_controllers, ['fake_1:443', 'fake_2:443']) @@ -175,6 +181,10 @@ class OldConfigurationTest(testtools.TestCase): self.useFixture(fixtures.MonkeyPatch( 'neutron.manager.NeutronManager._instance', None)) + # Avoid runs of the synchronizer looping call + patch_sync = mock.patch.object(sync, '_start_loopingcall') + patch_sync.start() + self.addCleanup(patch_sync.stop) def _assert_required_options(self, cluster): self.assertEqual(cluster.nvp_controllers, ['fake_1:443', 'fake_2:443']) diff --git a/neutron/tests/unit/test_db_plugin.py b/neutron/tests/unit/test_db_plugin.py index e628192f45..b89743ccd9 100644 --- a/neutron/tests/unit/test_db_plugin.py +++ b/neutron/tests/unit/test_db_plugin.py @@ -202,10 +202,14 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase): 'GET', resource, None, fmt, params=params, subresource=subresource ) - def new_show_request(self, resource, id, fmt=None, subresource=None): - return self._req( - 'GET', resource, None, fmt, id=id, subresource=subresource - ) + def new_show_request(self, resource, id, fmt=None, + subresource=None, fields=None): + if fields: + params = "&".join(["fields=%s" % x for x in fields]) + else: + params = None + return self._req('GET', resource, None, fmt, id=id, + params=params, subresource=subresource) def new_delete_request(self, resource, id, fmt=None, subresource=None, sub_id=None):