Merge "Introduce periodic state synchronization with backend"
This commit is contained in:
commit
8805af5bc6
@ -101,3 +101,36 @@
|
||||
# the RPC agents to operate. When 'agentless' is chosen, the config option metadata_mode
|
||||
# becomes ineffective. The mode 'agentless' is not supported for NVP 3.2 or below.
|
||||
# agent_mode = agent
|
||||
|
||||
[nvp_sync]
|
||||
# Interval in seconds between runs of the status synchronization task.
|
||||
# The plugin will aim at resynchronizing operational status for all
|
||||
# resources in this interval, and it should be therefore large enough
|
||||
# to ensure the task is feasible. Otherwise the plugin will be
|
||||
# constantly synchronizing resource status, ie: a new task is started
|
||||
# as soon as the previous is completed.
|
||||
# If this value is set to 0, the state synchronization thread for this
|
||||
# Neutron instance will be disabled.
|
||||
# state_sync_interval = 120
|
||||
|
||||
# Random additional delay between two runs of the state synchronization task.
|
||||
# An additional wait time between 0 and max_random_sync_delay seconds
|
||||
# will be added on top of state_sync_interval.
|
||||
# max_random_sync_delay = 0
|
||||
|
||||
# Minimum delay, in seconds, between two status synchronization requests for NVP.
|
||||
# Depending on chunk size, controller load, and other factors, state
|
||||
# synchronization requests might be pretty heavy. This means the
|
||||
# controller might take time to respond, and its load might be quite
|
||||
# increased by them. This parameter allows to specify a minimum
|
||||
# interval between two subsequent requests.
|
||||
# The value for this parameter must never exceed state_sync_interval.
|
||||
# If this does, an error will be raised at startup.
|
||||
# min_sync_req_delay = 10
|
||||
|
||||
# Minimum number of resources to be retrieved from NVP in a single status
|
||||
# synchronization request.
|
||||
# The actual size of the chunk will increase if the number of resources is such
|
||||
# that using the minimum chunk size will cause the interval between two
|
||||
# requests to be less than min_sync_req_delay
|
||||
# min_chunk_size = 500
|
||||
|
@ -20,7 +20,6 @@
|
||||
# @author: Aaron Rosen, Nicira Networks, Inc.
|
||||
|
||||
|
||||
import hashlib
|
||||
import logging
|
||||
import os
|
||||
|
||||
@ -56,6 +55,7 @@ from neutron.openstack.common import excutils
|
||||
from neutron.plugins.nicira.common import config
|
||||
from neutron.plugins.nicira.common import exceptions as nvp_exc
|
||||
from neutron.plugins.nicira.common import securitygroups as nvp_sec
|
||||
from neutron.plugins.nicira.common import sync
|
||||
from neutron.plugins.nicira.dbexts import distributedrouter as dist_rtr
|
||||
from neutron.plugins.nicira.dbexts import maclearning as mac_db
|
||||
from neutron.plugins.nicira.dbexts import nicira_db
|
||||
@ -180,6 +180,7 @@ class NvpPluginV2(agentschedulers_db.DhcpAgentSchedulerDbMixin,
|
||||
if not cfg.CONF.api_extensions_path:
|
||||
cfg.CONF.set_override('api_extensions_path', NVP_EXT_PATH)
|
||||
self.nvp_opts = cfg.CONF.NVP
|
||||
self.nvp_sync_opts = cfg.CONF.NVP_SYNC
|
||||
self.cluster = create_nvp_cluster(cfg.CONF,
|
||||
self.nvp_opts.concurrent_connections,
|
||||
self.nvp_opts.nvp_gen_timeout)
|
||||
@ -196,6 +197,13 @@ class NvpPluginV2(agentschedulers_db.DhcpAgentSchedulerDbMixin,
|
||||
# Set this flag to false as the default gateway has not
|
||||
# been yet updated from the config file
|
||||
self._is_default_net_gw_in_sync = False
|
||||
# Create a synchronizer instance for backend sync
|
||||
self._synchronizer = sync.NvpSynchronizer(
|
||||
self, self.cluster,
|
||||
self.nvp_sync_opts.state_sync_interval,
|
||||
self.nvp_sync_opts.min_sync_req_delay,
|
||||
self.nvp_sync_opts.min_chunk_size,
|
||||
self.nvp_sync_opts.max_random_sync_delay)
|
||||
|
||||
def _ensure_default_network_gateway(self):
|
||||
if self._is_default_net_gw_in_sync:
|
||||
@ -1049,39 +1057,12 @@ class NvpPluginV2(agentschedulers_db.DhcpAgentSchedulerDbMixin,
|
||||
with context.session.begin(subtransactions=True):
|
||||
# goto to the plugin DB and fetch the network
|
||||
network = self._get_network(context, id)
|
||||
# if the network is external, do not go to NVP
|
||||
if not network.external:
|
||||
# verify the fabric status of the corresponding
|
||||
# logical switch(es) in nvp
|
||||
try:
|
||||
lswitches = nvplib.get_lswitches(self.cluster, id)
|
||||
nvp_net_status = constants.NET_STATUS_ACTIVE
|
||||
neutron_status = network.status
|
||||
for lswitch in lswitches:
|
||||
relations = lswitch.get('_relations')
|
||||
if relations:
|
||||
lswitch_status = relations.get(
|
||||
'LogicalSwitchStatus')
|
||||
# FIXME(salvatore-orlando): Being unable to fetch
|
||||
# logical switch status should be an exception.
|
||||
if (lswitch_status and
|
||||
not lswitch_status.get('fabric_status',
|
||||
None)):
|
||||
nvp_net_status = constants.NET_STATUS_DOWN
|
||||
break
|
||||
LOG.debug(_("Current network status:%(nvp_net_status)s; "
|
||||
"Status in Neutron DB:%(neutron_status)s"),
|
||||
{'nvp_net_status': nvp_net_status,
|
||||
'neutron_status': neutron_status})
|
||||
if nvp_net_status != network.status:
|
||||
# update the network status
|
||||
network.status = nvp_net_status
|
||||
except q_exc.NotFound:
|
||||
network.status = constants.NET_STATUS_ERROR
|
||||
except Exception:
|
||||
err_msg = _("Unable to get logical switches")
|
||||
LOG.exception(err_msg)
|
||||
raise nvp_exc.NvpPluginException(err_msg=err_msg)
|
||||
if fields and 'status' in fields:
|
||||
# External networks are not backed by nvp lswitches
|
||||
if not network.external:
|
||||
# Perform explicit state synchronization
|
||||
self._synchronizer.synchronize_network(
|
||||
context, network)
|
||||
# Don't do field selection here otherwise we won't be able
|
||||
# to add provider networks fields
|
||||
net_result = self._make_network_dict(network)
|
||||
@ -1090,85 +1071,13 @@ class NvpPluginV2(agentschedulers_db.DhcpAgentSchedulerDbMixin,
|
||||
return self._fields(net_result, fields)
|
||||
|
||||
def get_networks(self, context, filters=None, fields=None):
|
||||
nvp_lswitches = {}
|
||||
filters = filters or {}
|
||||
with context.session.begin(subtransactions=True):
|
||||
neutron_lswitches = (
|
||||
super(NvpPluginV2, self).get_networks(context, filters))
|
||||
for net in neutron_lswitches:
|
||||
networks = super(NvpPluginV2, self).get_networks(context, filters)
|
||||
for net in networks:
|
||||
self._extend_network_dict_provider(context, net)
|
||||
self._extend_network_qos_queue(context, net)
|
||||
|
||||
tenant_ids = filters and filters.get('tenant_id') or None
|
||||
filter_fmt = "&tag=%s&tag_scope=os_tid"
|
||||
if context.is_admin and not tenant_ids:
|
||||
tenant_filter = ""
|
||||
else:
|
||||
tenant_ids = tenant_ids or [context.tenant_id]
|
||||
tenant_filter = ''.join(filter_fmt % tid for tid in tenant_ids)
|
||||
lswitch_filters = "uuid,display_name,fabric_status,tags"
|
||||
lswitch_url_path_1 = (
|
||||
"/ws.v1/lswitch?fields=%s&relations=LogicalSwitchStatus%s"
|
||||
% (lswitch_filters, tenant_filter))
|
||||
lswitch_url_path_2 = nvplib._build_uri_path(
|
||||
nvplib.LSWITCH_RESOURCE,
|
||||
fields=lswitch_filters,
|
||||
relations='LogicalSwitchStatus',
|
||||
filters={'tag': 'true', 'tag_scope': 'shared'})
|
||||
try:
|
||||
res = nvplib.get_all_query_pages(lswitch_url_path_1, self.cluster)
|
||||
nvp_lswitches.update(dict((ls['uuid'], ls) for ls in res))
|
||||
# Issue a second query for fetching shared networks.
|
||||
# We cannot unfortunately use just a single query because tags
|
||||
# cannot be or-ed
|
||||
res_shared = nvplib.get_all_query_pages(lswitch_url_path_2,
|
||||
self.cluster)
|
||||
nvp_lswitches.update(dict((ls['uuid'], ls) for ls in res_shared))
|
||||
except Exception:
|
||||
err_msg = _("Unable to get logical switches")
|
||||
LOG.exception(err_msg)
|
||||
raise nvp_exc.NvpPluginException(err_msg=err_msg)
|
||||
|
||||
if filters.get('id'):
|
||||
nvp_lswitches = dict(
|
||||
(uuid, ls) for (uuid, ls) in nvp_lswitches.iteritems()
|
||||
if uuid in set(filters['id']))
|
||||
for neutron_lswitch in neutron_lswitches:
|
||||
# Skip external networks as they do not exist in NVP
|
||||
if neutron_lswitch[l3.EXTERNAL]:
|
||||
continue
|
||||
elif neutron_lswitch['id'] not in nvp_lswitches:
|
||||
LOG.warning(_("Logical Switch %s found in neutron database "
|
||||
"but not in NVP."), neutron_lswitch["id"])
|
||||
neutron_lswitch["status"] = constants.NET_STATUS_ERROR
|
||||
else:
|
||||
# TODO(salvatore-orlando): be careful about "extended"
|
||||
# logical switches
|
||||
ls = nvp_lswitches.pop(neutron_lswitch['id'])
|
||||
if (ls["_relations"]["LogicalSwitchStatus"]["fabric_status"]):
|
||||
neutron_lswitch["status"] = constants.NET_STATUS_ACTIVE
|
||||
else:
|
||||
neutron_lswitch["status"] = constants.NET_STATUS_DOWN
|
||||
|
||||
# do not make the case in which switches are found in NVP
|
||||
# but not in Neutron catastrophic.
|
||||
if nvp_lswitches:
|
||||
LOG.warning(_("Found %s logical switches not bound "
|
||||
"to Neutron networks. Neutron and NVP are "
|
||||
"potentially out of sync"), len(nvp_lswitches))
|
||||
|
||||
LOG.debug(_("get_networks() completed for tenant %s"),
|
||||
context.tenant_id)
|
||||
|
||||
if fields:
|
||||
ret_fields = []
|
||||
for neutron_lswitch in neutron_lswitches:
|
||||
row = {}
|
||||
for field in fields:
|
||||
row[field] = neutron_lswitch[field]
|
||||
ret_fields.append(row)
|
||||
return ret_fields
|
||||
return neutron_lswitches
|
||||
return [self._fields(network, fields) for network in networks]
|
||||
|
||||
def update_network(self, context, id, network):
|
||||
pnet._raise_if_updates_provider_attributes(network['network'])
|
||||
@ -1194,105 +1103,10 @@ class NvpPluginV2(agentschedulers_db.DhcpAgentSchedulerDbMixin,
|
||||
def get_ports(self, context, filters=None, fields=None):
|
||||
filters = filters or {}
|
||||
with context.session.begin(subtransactions=True):
|
||||
neutron_lports = super(NvpPluginV2, self).get_ports(
|
||||
context, filters)
|
||||
if (filters.get('network_id') and len(filters.get('network_id')) and
|
||||
self._network_is_external(context, filters['network_id'][0])):
|
||||
# Do not perform check on NVP platform
|
||||
return neutron_lports
|
||||
|
||||
vm_filter = ""
|
||||
tenant_filter = ""
|
||||
# This is used when calling delete_network. Neutron checks to see if
|
||||
# the network has any ports.
|
||||
if filters.get("network_id"):
|
||||
# FIXME (Aaron) If we get more than one network_id this won't work
|
||||
lswitch = filters["network_id"][0]
|
||||
else:
|
||||
lswitch = "*"
|
||||
|
||||
if filters.get("device_id"):
|
||||
for vm_id in filters.get("device_id"):
|
||||
vm_filter = ("%stag_scope=vm_id&tag=%s&" % (vm_filter,
|
||||
hashlib.sha1(vm_id).hexdigest()))
|
||||
else:
|
||||
vm_id = ""
|
||||
|
||||
if filters.get("tenant_id"):
|
||||
for tenant in filters.get("tenant_id"):
|
||||
tenant_filter = ("%stag_scope=os_tid&tag=%s&" %
|
||||
(tenant_filter, tenant))
|
||||
|
||||
nvp_lports = {}
|
||||
|
||||
lport_fields_str = ("tags,admin_status_enabled,display_name,"
|
||||
"fabric_status_up")
|
||||
try:
|
||||
lport_query_path = (
|
||||
"/ws.v1/lswitch/%s/lport?fields=%s&%s%stag_scope=q_port_id"
|
||||
"&relations=LogicalPortStatus" %
|
||||
(lswitch, lport_fields_str, vm_filter, tenant_filter))
|
||||
|
||||
try:
|
||||
ports = nvplib.get_all_query_pages(lport_query_path,
|
||||
self.cluster)
|
||||
except q_exc.NotFound:
|
||||
LOG.warn(_("Lswitch %s not found in NVP"), lswitch)
|
||||
ports = None
|
||||
|
||||
if ports:
|
||||
for port in ports:
|
||||
for tag in port["tags"]:
|
||||
if tag["scope"] == "q_port_id":
|
||||
nvp_lports[tag["tag"]] = port
|
||||
except Exception:
|
||||
err_msg = _("Unable to get ports")
|
||||
LOG.exception(err_msg)
|
||||
raise nvp_exc.NvpPluginException(err_msg=err_msg)
|
||||
|
||||
lports = []
|
||||
for neutron_lport in neutron_lports:
|
||||
# if a neutron port is not found in NVP, this migth be because
|
||||
# such port is not mapped to a logical switch - ie: floating ip
|
||||
if neutron_lport['device_owner'] in (l3_db.DEVICE_OWNER_FLOATINGIP,
|
||||
l3_db.DEVICE_OWNER_ROUTER_GW):
|
||||
lports.append(neutron_lport)
|
||||
continue
|
||||
try:
|
||||
neutron_lport["admin_state_up"] = (
|
||||
nvp_lports[neutron_lport["id"]]["admin_status_enabled"])
|
||||
|
||||
if (nvp_lports[neutron_lport["id"]]
|
||||
["_relations"]
|
||||
["LogicalPortStatus"]
|
||||
["fabric_status_up"]):
|
||||
neutron_lport["status"] = constants.PORT_STATUS_ACTIVE
|
||||
else:
|
||||
neutron_lport["status"] = constants.PORT_STATUS_DOWN
|
||||
|
||||
del nvp_lports[neutron_lport["id"]]
|
||||
except KeyError:
|
||||
neutron_lport["status"] = constants.PORT_STATUS_ERROR
|
||||
LOG.debug(_("Neutron logical port %s was not found on NVP"),
|
||||
neutron_lport['id'])
|
||||
|
||||
lports.append(neutron_lport)
|
||||
# do not make the case in which ports are found in NVP
|
||||
# but not in Neutron catastrophic.
|
||||
if nvp_lports:
|
||||
LOG.warning(_("Found %s logical ports not bound "
|
||||
"to Neutron ports. Neutron and NVP are "
|
||||
"potentially out of sync"), len(nvp_lports))
|
||||
|
||||
if fields:
|
||||
ret_fields = []
|
||||
for lport in lports:
|
||||
row = {}
|
||||
for field in fields:
|
||||
row[field] = lport[field]
|
||||
ret_fields.append(row)
|
||||
return ret_fields
|
||||
return lports
|
||||
ports = super(NvpPluginV2, self).get_ports(context, filters)
|
||||
for port in ports:
|
||||
self._extend_port_qos_queue(context, port)
|
||||
return [self._fields(port, fields) for port in ports]
|
||||
|
||||
def create_port(self, context, port):
|
||||
# If PORTSECURITY is not the default value ATTR_NOT_SPECIFIED
|
||||
@ -1504,43 +1318,26 @@ class NvpPluginV2(agentschedulers_db.DhcpAgentSchedulerDbMixin,
|
||||
|
||||
def get_port(self, context, id, fields=None):
|
||||
with context.session.begin(subtransactions=True):
|
||||
neutron_db_port = super(NvpPluginV2, self).get_port(context,
|
||||
id, fields)
|
||||
self._extend_port_qos_queue(context, neutron_db_port)
|
||||
|
||||
if self._network_is_external(context,
|
||||
neutron_db_port['network_id']):
|
||||
return neutron_db_port
|
||||
nvp_id = self._nvp_get_port_id(context, self.cluster,
|
||||
neutron_db_port)
|
||||
# If there's no nvp IP do not bother going to NVP and put
|
||||
# the port in error state
|
||||
if nvp_id:
|
||||
# Find the NVP port corresponding to neutron port_id
|
||||
# Do not query by nvp id as the port might be on
|
||||
# an extended switch and we do not store the extended
|
||||
# switch uuid
|
||||
results = nvplib.query_lswitch_lports(
|
||||
self.cluster, '*',
|
||||
relations='LogicalPortStatus',
|
||||
filters={'tag': id, 'tag_scope': 'q_port_id'})
|
||||
if results:
|
||||
port = results[0]
|
||||
port_status = port["_relations"]["LogicalPortStatus"]
|
||||
neutron_db_port["admin_state_up"] = (
|
||||
port["admin_status_enabled"])
|
||||
if port_status["fabric_status_up"]:
|
||||
neutron_db_port["status"] = (
|
||||
constants.PORT_STATUS_ACTIVE)
|
||||
else:
|
||||
neutron_db_port["status"] = (
|
||||
constants.PORT_STATUS_DOWN)
|
||||
else:
|
||||
neutron_db_port["status"] = (
|
||||
constants.PORT_STATUS_ERROR)
|
||||
if fields and 'status' in fields:
|
||||
# Perform explicit state synchronization
|
||||
db_port = self._get_port(context, id)
|
||||
self._synchronizer.synchronize_port(
|
||||
context, db_port)
|
||||
port = self._make_port_dict(db_port, fields)
|
||||
else:
|
||||
neutron_db_port["status"] = constants.PORT_STATUS_ERROR
|
||||
return neutron_db_port
|
||||
port = super(NvpPluginV2, self).get_port(context, id, fields)
|
||||
self._extend_port_qos_queue(context, port)
|
||||
return port
|
||||
|
||||
def get_router(self, context, id, fields=None):
|
||||
if fields and 'status' in fields:
|
||||
db_router = self._get_router(context, id)
|
||||
# Perform explicit state synchronization
|
||||
self._synchronizer.synchronize_router(
|
||||
context, db_router)
|
||||
return self._make_router_dict(db_router, fields)
|
||||
else:
|
||||
return super(NvpPluginV2, self).get_router(context, id, fields)
|
||||
|
||||
def create_router(self, context, router):
|
||||
# NOTE(salvatore-orlando): We completely override this method in
|
||||
@ -1713,77 +1510,6 @@ class NvpPluginV2(agentschedulers_db.DhcpAgentSchedulerDbMixin,
|
||||
err_msg=(_("Unable to delete logical router '%s'"
|
||||
"on NVP Platform") % router_id))
|
||||
|
||||
def get_router(self, context, id, fields=None):
|
||||
router = self._get_router(context, id)
|
||||
try:
|
||||
lrouter = nvplib.get_lrouter(self.cluster, id)
|
||||
relations = lrouter.get('_relations')
|
||||
if relations:
|
||||
lrouter_status = relations.get('LogicalRouterStatus')
|
||||
# FIXME(salvatore-orlando): Being unable to fetch the
|
||||
# logical router status should be an exception.
|
||||
if lrouter_status:
|
||||
router_op_status = (lrouter_status.get('fabric_status')
|
||||
and constants.NET_STATUS_ACTIVE or
|
||||
constants.NET_STATUS_DOWN)
|
||||
except q_exc.NotFound:
|
||||
lrouter = {}
|
||||
router_op_status = constants.NET_STATUS_ERROR
|
||||
if router_op_status != router.status:
|
||||
LOG.debug(_("Current router status:%(router_status)s;"
|
||||
"Status in Neutron DB:%(db_router_status)s"),
|
||||
{'router_status': router_op_status,
|
||||
'db_router_status': router.status})
|
||||
# update the router status
|
||||
with context.session.begin(subtransactions=True):
|
||||
router.status = router_op_status
|
||||
return self._make_router_dict(router, fields)
|
||||
|
||||
def get_routers(self, context, filters=None, fields=None):
|
||||
router_query = self._apply_filters_to_query(
|
||||
self._model_query(context, l3_db.Router),
|
||||
l3_db.Router, filters)
|
||||
routers = router_query.all()
|
||||
# Query routers on NVP for updating operational status
|
||||
if context.is_admin and not filters.get("tenant_id"):
|
||||
tenant_id = None
|
||||
elif 'tenant_id' in filters:
|
||||
tenant_id = filters.get('tenant_id')[0]
|
||||
del filters['tenant_id']
|
||||
else:
|
||||
tenant_id = context.tenant_id
|
||||
try:
|
||||
nvp_lrouters = nvplib.get_lrouters(self.cluster,
|
||||
tenant_id,
|
||||
fields)
|
||||
except NvpApiClient.NvpApiException:
|
||||
err_msg = _("Unable to get logical routers from NVP controller")
|
||||
LOG.exception(err_msg)
|
||||
raise nvp_exc.NvpPluginException(err_msg=err_msg)
|
||||
|
||||
nvp_lrouters_dict = {}
|
||||
for nvp_lrouter in nvp_lrouters:
|
||||
nvp_lrouters_dict[nvp_lrouter['uuid']] = nvp_lrouter
|
||||
for router in routers:
|
||||
nvp_lrouter = nvp_lrouters_dict.get(router['id'])
|
||||
if nvp_lrouter:
|
||||
if (nvp_lrouter["_relations"]["LogicalRouterStatus"]
|
||||
["fabric_status"]):
|
||||
router.status = constants.NET_STATUS_ACTIVE
|
||||
else:
|
||||
router.status = constants.NET_STATUS_DOWN
|
||||
nvp_lrouters.remove(nvp_lrouter)
|
||||
else:
|
||||
router.status = constants.NET_STATUS_ERROR
|
||||
|
||||
# do not make the case in which routers are found in NVP
|
||||
# but not in Neutron catastrophic.
|
||||
if nvp_lrouters:
|
||||
LOG.warning(_("Found %s logical routers not bound "
|
||||
"to Neutron routers. Neutron and NVP are "
|
||||
"potentially out of sync"), len(nvp_lrouters))
|
||||
return [self._make_router_dict(router, fields) for router in routers]
|
||||
|
||||
def add_router_interface(self, context, router_id, interface_info):
|
||||
# When adding interface by port_id we need to create the
|
||||
# peer port on the nvp logical router in this routine
|
||||
|
@ -53,7 +53,24 @@ nvp_opts = [
|
||||
help=_("The default network tranport type to use (stt, gre, "
|
||||
"bridge, ipsec_gre, or ipsec_stt)")),
|
||||
cfg.StrOpt('agent_mode', default=AgentModes.AGENT,
|
||||
help=_("The mode used to implement DHCP/metadata services.")),
|
||||
help=_("The mode used to implement DHCP/metadata services."))
|
||||
]
|
||||
|
||||
sync_opts = [
|
||||
cfg.IntOpt('state_sync_interval', default=120,
|
||||
help=_("Interval in seconds between runs of the state "
|
||||
"synchronization task. Set it to 0 to disable it")),
|
||||
cfg.IntOpt('max_random_sync_delay', default=0,
|
||||
help=_("Maximum value for the additional random "
|
||||
"delay in seconds between runs of the state "
|
||||
"synchronization task")),
|
||||
cfg.IntOpt('min_sync_req_delay', default=10,
|
||||
help=_('Minimum delay, in seconds, between two state '
|
||||
'synchronization queries to NVP. It must not '
|
||||
'exceed state_sync_interval')),
|
||||
cfg.IntOpt('min_chunk_size', default=500,
|
||||
help=_('Minimum number of resources to be retrieved from NVP '
|
||||
'during state synchronization'))
|
||||
]
|
||||
|
||||
connection_opts = [
|
||||
@ -107,6 +124,8 @@ cluster_opts = [
|
||||
cfg.CONF.register_opts(connection_opts)
|
||||
cfg.CONF.register_opts(cluster_opts)
|
||||
cfg.CONF.register_opts(nvp_opts, "NVP")
|
||||
cfg.CONF.register_opts(sync_opts, "NVP_SYNC")
|
||||
|
||||
# NOTE(armando-migliaccio): keep the following code until we support
|
||||
# NVP configuration files in older format (Grizzly or older).
|
||||
# ### BEGIN
|
||||
|
596
neutron/plugins/nicira/common/sync.py
Normal file
596
neutron/plugins/nicira/common/sync.py
Normal file
@ -0,0 +1,596 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2013 Nicira, Inc.
|
||||
# All Rights Reserved
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import random
|
||||
|
||||
from neutron.common import constants
|
||||
from neutron.common import exceptions
|
||||
from neutron import context
|
||||
from neutron.db import l3_db
|
||||
from neutron.db import models_v2
|
||||
from neutron.openstack.common import jsonutils
|
||||
from neutron.openstack.common import log
|
||||
from neutron.openstack.common import loopingcall
|
||||
from neutron.openstack.common import timeutils
|
||||
from neutron.plugins.nicira.common import exceptions as nvp_exc
|
||||
from neutron.plugins.nicira import NvpApiClient
|
||||
from neutron.plugins.nicira import nvplib
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class NvpCache(object):
|
||||
"""A simple Cache for NVP resources.
|
||||
|
||||
Associates resource id with resource hash to rapidly identify
|
||||
updated resources.
|
||||
Each entry in the cache also stores the following information:
|
||||
- changed: the resource in the cache has been altered following
|
||||
an update or a delete
|
||||
- hit: the resource has been visited during an update (and possibly
|
||||
left unchanged)
|
||||
- data: current resource data
|
||||
- data_bk: backup of resource data prior to its removal
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
# Maps a uuid to the dict containing it
|
||||
self._uuid_dict_mappings = {}
|
||||
# Dicts for NVP cached resources
|
||||
self._lswitches = {}
|
||||
self._lswitchports = {}
|
||||
self._lrouters = {}
|
||||
|
||||
def __getitem__(self, key):
|
||||
# uuids are unique across the various types of resources
|
||||
# TODO(salv-orlando): Avoid lookups over all dictionaries
|
||||
# when retrieving items
|
||||
# Fetch lswitches, lports, or lrouters
|
||||
resources = self._uuid_dict_mappings[key]
|
||||
return resources[key]
|
||||
|
||||
def _update_resources(self, resources, new_resources):
|
||||
# Clear the 'changed' attribute for all items
|
||||
for uuid, item in resources.items():
|
||||
if item.pop('changed', None) and not item.get('data'):
|
||||
# The item is not anymore in NVP, so delete it
|
||||
del resources[uuid]
|
||||
del self._uuid_dict_mappings[uuid]
|
||||
|
||||
def do_hash(item):
|
||||
return hash(jsonutils.dumps(item))
|
||||
|
||||
# Parse new data and identify new, deleted, and updated resources
|
||||
for item in new_resources:
|
||||
item_id = item['uuid']
|
||||
if resources.get(item_id):
|
||||
new_hash = do_hash(item)
|
||||
if new_hash != resources[item_id]['hash']:
|
||||
resources[item_id]['hash'] = new_hash
|
||||
resources[item_id]['changed'] = True
|
||||
resources[item_id]['data_bk'] = (
|
||||
resources[item_id]['data'])
|
||||
resources[item_id]['data'] = item
|
||||
# Mark the item as hit in any case
|
||||
resources[item_id]['hit'] = True
|
||||
else:
|
||||
resources[item_id] = {'hash': do_hash(item)}
|
||||
resources[item_id]['hit'] = True
|
||||
resources[item_id]['changed'] = True
|
||||
resources[item_id]['data'] = item
|
||||
# add a uuid to dict mapping for easy retrieval
|
||||
# with __getitem__
|
||||
self._uuid_dict_mappings[item_id] = resources
|
||||
|
||||
def _delete_resources(self, resources):
|
||||
# Mark for removal all the elements which have not been visited.
|
||||
# And clear the 'hit' attribute.
|
||||
for to_delete in [k for (k, v) in resources.iteritems()
|
||||
if not v.pop('hit', False)]:
|
||||
resources[to_delete]['changed'] = True
|
||||
resources[to_delete]['data_bk'] = (
|
||||
resources[to_delete].pop('data', None))
|
||||
|
||||
def _get_resource_ids(self, resources, changed_only):
|
||||
if changed_only:
|
||||
return [k for (k, v) in resources.iteritems()
|
||||
if v.get('changed')]
|
||||
return resources.keys()
|
||||
|
||||
def get_lswitches(self, changed_only=False):
|
||||
return self._get_resource_ids(self._lswitches, changed_only)
|
||||
|
||||
def get_lrouters(self, changed_only=False):
|
||||
return self._get_resource_ids(self._lrouters, changed_only)
|
||||
|
||||
def get_lswitchports(self, changed_only=False):
|
||||
return self._get_resource_ids(self._lswitchports, changed_only)
|
||||
|
||||
def update_lswitch(self, lswitch):
|
||||
self._update_resources(self._lswitches, [lswitch])
|
||||
|
||||
def update_lrouter(self, lrouter):
|
||||
self._update_resources(self._lrouters, [lrouter])
|
||||
|
||||
def update_lswitchport(self, lswitchport):
|
||||
self._update_resources(self._lswitchports, [lswitchport])
|
||||
|
||||
def process_updates(self, lswitches=None,
|
||||
lrouters=None, lswitchports=None):
|
||||
self._update_resources(self._lswitches, lswitches)
|
||||
self._update_resources(self._lrouters, lrouters)
|
||||
self._update_resources(self._lswitchports, lswitchports)
|
||||
return (self._get_resource_ids(self._lswitches, changed_only=True),
|
||||
self._get_resource_ids(self._lrouters, changed_only=True),
|
||||
self._get_resource_ids(self._lswitchports, changed_only=True))
|
||||
|
||||
def process_deletes(self):
|
||||
self._delete_resources(self._lswitches)
|
||||
self._delete_resources(self._lrouters)
|
||||
self._delete_resources(self._lswitchports)
|
||||
return (self._get_resource_ids(self._lswitches, changed_only=True),
|
||||
self._get_resource_ids(self._lrouters, changed_only=True),
|
||||
self._get_resource_ids(self._lswitchports, changed_only=True))
|
||||
|
||||
|
||||
class SyncParameters():
|
||||
"""Defines attributes used by the synchronization procedure.
|
||||
|
||||
chunk_size: Actual chunk size
|
||||
extra_chunk_size: Additional data to fetch because of chunk size
|
||||
adjustment
|
||||
current_chunk: Counter of the current data chunk being synchronized
|
||||
Page cursors: markers for the next resource to fetch.
|
||||
'start' means page cursor unset for fetching 1st page
|
||||
init_sync_performed: True if the initial synchronization concluded
|
||||
"""
|
||||
|
||||
def __init__(self, min_chunk_size):
|
||||
self.chunk_size = min_chunk_size
|
||||
self.extra_chunk_size = 0
|
||||
self.current_chunk = 0
|
||||
self.ls_cursor = 'start'
|
||||
self.lr_cursor = 'start'
|
||||
self.lp_cursor = 'start'
|
||||
self.init_sync_performed = False
|
||||
self.total_size = 0
|
||||
|
||||
|
||||
def _start_loopingcall(min_chunk_size, state_sync_interval, func):
|
||||
"""Start a loopingcall for the synchronization task."""
|
||||
# Start a looping call to synchronize operational status
|
||||
# for neutron resources
|
||||
if not state_sync_interval:
|
||||
# do not start the looping call if specified
|
||||
# sync interval is 0
|
||||
return
|
||||
state_synchronizer = loopingcall.DynamicLoopingCall(
|
||||
func, sp=SyncParameters(min_chunk_size))
|
||||
state_synchronizer.start(
|
||||
periodic_interval_max=state_sync_interval)
|
||||
|
||||
|
||||
class NvpSynchronizer():
|
||||
|
||||
LS_URI = nvplib._build_uri_path(
|
||||
nvplib.LSWITCH_RESOURCE, fields='uuid,tags,fabric_status',
|
||||
relations='LogicalSwitchStatus')
|
||||
LR_URI = nvplib._build_uri_path(
|
||||
nvplib.LROUTER_RESOURCE, fields='uuid,tags,fabric_status',
|
||||
relations='LogicalRouterStatus')
|
||||
LP_URI = nvplib._build_uri_path(
|
||||
nvplib.LSWITCHPORT_RESOURCE,
|
||||
parent_resource_id='*',
|
||||
fields='uuid,tags,fabric_status,link_status_up',
|
||||
relations='LogicalPortStatus')
|
||||
|
||||
def __init__(self, plugin, cluster, state_sync_interval,
|
||||
req_delay, min_chunk_size, max_rand_delay=0):
|
||||
random.seed()
|
||||
self._nvp_cache = NvpCache()
|
||||
# Store parameters as instance members
|
||||
# NOTE(salv-orlando): apologies if it looks java-ish
|
||||
self._plugin = plugin
|
||||
self._cluster = cluster
|
||||
self._req_delay = req_delay
|
||||
self._sync_interval = state_sync_interval
|
||||
self._max_rand_delay = max_rand_delay
|
||||
# Validate parameters
|
||||
if self._sync_interval < self._req_delay:
|
||||
err_msg = (_("Minimum request delay:%(req_delay)s must not "
|
||||
"exceed synchronization interval:%(sync_interval)s") %
|
||||
{'req_delay': self._req_delay,
|
||||
'sync_interval': self._sync_interval})
|
||||
LOG.error(err_msg)
|
||||
raise nvp_exc.NvpPluginException(err_msg=err_msg)
|
||||
# Backoff time in case of failures while fetching sync data
|
||||
self._sync_backoff = 1
|
||||
_start_loopingcall(min_chunk_size, state_sync_interval,
|
||||
self._synchronize_state)
|
||||
|
||||
def _get_tag_dict(self, tags):
|
||||
return dict((tag.get('scope'), tag['tag']) for tag in tags)
|
||||
|
||||
def _update_neutron_object(self, context, neutron_data, status):
|
||||
if status == neutron_data['status']:
|
||||
# do nothing
|
||||
return
|
||||
with context.session.begin(subtransactions=True):
|
||||
LOG.debug(_("Updating status for neutron resource %(q_id)s to: "
|
||||
"%(status)s"), {'q_id': neutron_data['id'],
|
||||
'status': status})
|
||||
neutron_data['status'] = status
|
||||
context.session.add(neutron_data)
|
||||
|
||||
def synchronize_network(self, context, neutron_network_data,
|
||||
lswitches=None):
|
||||
"""Synchronize a Neutron network with its NVP counterpart.
|
||||
|
||||
This routine synchronizes a set of switches when a Neutron
|
||||
network is mapped to multiple lswitches.
|
||||
"""
|
||||
if not lswitches:
|
||||
# Try to get logical switches from nvp
|
||||
try:
|
||||
lswitches = nvplib.get_lswitches(
|
||||
self._cluster, neutron_network_data['id'])
|
||||
except exceptions.NetworkNotFound:
|
||||
# TODO(salv-orlando): We should be catching
|
||||
# NvpApiClient.ResourceNotFound here
|
||||
# The logical switch was not found
|
||||
LOG.warning(_("Logical switch for neutron network %s not "
|
||||
"found on NVP."), neutron_network_data['id'])
|
||||
lswitches = []
|
||||
else:
|
||||
for lswitch in lswitches:
|
||||
self._nvp_cache.update_lswitch(lswitch)
|
||||
# By default assume things go wrong
|
||||
status = constants.NET_STATUS_ERROR
|
||||
# In most cases lswitches will contain a single element
|
||||
for ls in lswitches:
|
||||
if not ls:
|
||||
# Logical switch was deleted
|
||||
break
|
||||
ls_status = ls['_relations']['LogicalSwitchStatus']
|
||||
if not ls_status['fabric_status']:
|
||||
status = constants.NET_STATUS_DOWN
|
||||
break
|
||||
else:
|
||||
# No switch was down or missing. Set status to ACTIVE unless
|
||||
# there were no switches in the first place!
|
||||
if lswitches:
|
||||
status = constants.NET_STATUS_ACTIVE
|
||||
# Update db object
|
||||
self._update_neutron_object(context, neutron_network_data, status)
|
||||
|
||||
def _synchronize_lswitches(self, ctx, ls_uuids, scan_missing=False):
|
||||
if not ls_uuids and not scan_missing:
|
||||
return
|
||||
neutron_net_ids = set()
|
||||
neutron_nvp_mappings = {}
|
||||
# TODO(salvatore-orlando): Deal with the case the tag
|
||||
# has been tampered with
|
||||
for ls_uuid in ls_uuids:
|
||||
# If the lswitch has been deleted, get backup copy of data
|
||||
lswitch = (self._nvp_cache[ls_uuid].get('data') or
|
||||
self._nvp_cache[ls_uuid].get('data_bk'))
|
||||
tags = self._get_tag_dict(lswitch['tags'])
|
||||
neutron_id = tags.get('neutron_net_id', ls_uuid)
|
||||
neutron_net_ids.add(neutron_id)
|
||||
neutron_nvp_mappings[neutron_id] = (
|
||||
neutron_nvp_mappings.get(neutron_id, []) +
|
||||
[self._nvp_cache[ls_uuid]])
|
||||
with ctx.session.begin(subtransactions=True):
|
||||
# Fetch neutron networks from database
|
||||
filters = {'router:external': [False]}
|
||||
if not scan_missing:
|
||||
filters['id'] = neutron_net_ids
|
||||
# TODO(salv-orlando): Filter out external networks
|
||||
for network in self._plugin._get_collection_query(
|
||||
ctx, models_v2.Network, filters=filters):
|
||||
lswitches = neutron_nvp_mappings.get(network['id'], [])
|
||||
lswitches = [lswitch.get('data') for lswitch in lswitches]
|
||||
self.synchronize_network(ctx, network, lswitches)
|
||||
|
||||
def synchronize_router(self, context, neutron_router_data,
|
||||
lrouter=None):
|
||||
"""Synchronize a neutron router with its NVP counterpart."""
|
||||
if not lrouter:
|
||||
# Try to get router from nvp
|
||||
try:
|
||||
# This query will return the logical router status too
|
||||
lrouter = nvplib.get_lrouter(
|
||||
self._cluster, neutron_router_data['id'])
|
||||
except exceptions.NotFound:
|
||||
# NOTE(salv-orlando): We should be catching
|
||||
# NvpApiClient.ResourceNotFound here
|
||||
# The logical router was not found
|
||||
LOG.warning(_("Logical router for neutron router %s not "
|
||||
"found on NVP."), neutron_router_data['id'])
|
||||
lrouter = None
|
||||
else:
|
||||
# Update the cache
|
||||
self._nvp_cache.update_lrouter(lrouter)
|
||||
|
||||
# Note(salv-orlando): It might worth adding a check to verify neutron
|
||||
# resource tag in nvp entity matches a Neutron id.
|
||||
# By default assume things go wrong
|
||||
status = constants.NET_STATUS_ERROR
|
||||
if lrouter:
|
||||
lr_status = (lrouter['_relations']
|
||||
['LogicalRouterStatus']
|
||||
['fabric_status'])
|
||||
status = (lr_status and
|
||||
constants.NET_STATUS_ACTIVE
|
||||
or constants.NET_STATUS_DOWN)
|
||||
# Update db object
|
||||
self._update_neutron_object(context, neutron_router_data, status)
|
||||
|
||||
def _synchronize_lrouters(self, ctx, lr_uuids, scan_missing=False):
|
||||
if not lr_uuids and not scan_missing:
|
||||
return
|
||||
neutron_router_mappings = (
|
||||
dict((lr_uuid, self._nvp_cache[lr_uuid]) for lr_uuid in lr_uuids))
|
||||
with ctx.session.begin(subtransactions=True):
|
||||
# Fetch neutron routers from database
|
||||
filters = ({} if scan_missing else
|
||||
{'id': neutron_router_mappings.keys()})
|
||||
for router in self._plugin._get_collection_query(
|
||||
ctx, l3_db.Router, filters=filters):
|
||||
lrouter = neutron_router_mappings.get(router['id'])
|
||||
self.synchronize_router(
|
||||
ctx, router, lrouter and lrouter.get('data'))
|
||||
|
||||
def synchronize_port(self, context, neutron_port_data,
|
||||
lswitchport=None, ext_networks=None):
|
||||
"""Synchronize a Neutron port with its NVP counterpart."""
|
||||
# Skip synchronization for ports on external networks
|
||||
if not ext_networks:
|
||||
ext_networks = [net['id'] for net in context.session.query(
|
||||
models_v2.Network).join(
|
||||
l3_db.ExternalNetwork,
|
||||
(models_v2.Network.id ==
|
||||
l3_db.ExternalNetwork.network_id))]
|
||||
if neutron_port_data['network_id'] in ext_networks:
|
||||
with context.session.begin(subtransactions=True):
|
||||
neutron_port_data['status'] = constants.PORT_STATUS_ACTIVE
|
||||
return
|
||||
|
||||
if not lswitchport:
|
||||
# Try to get port from nvp
|
||||
try:
|
||||
lp_uuid = self._plugin._nvp_get_port_id(
|
||||
context, self._cluster, neutron_port_data)
|
||||
if lp_uuid:
|
||||
lswitchport = nvplib.get_port(
|
||||
self._cluster, neutron_port_data['network_id'],
|
||||
lp_uuid, relations='LogicalPortStatus')
|
||||
except exceptions.PortNotFoundOnNetwork:
|
||||
# NOTE(salv-orlando): We should be catching
|
||||
# NvpApiClient.ResourceNotFound here
|
||||
# The logical switch port was not found
|
||||
LOG.warning(_("Logical switch port for neutron port %s "
|
||||
"not found on NVP."), neutron_port_data['id'])
|
||||
lswitchport = None
|
||||
else:
|
||||
# Update the cache
|
||||
self._nvp_cache.update_lswitchport(lswitchport)
|
||||
|
||||
# Note(salv-orlando): It might worth adding a check to verify neutron
|
||||
# resource tag in nvp entity matches Neutron id.
|
||||
# By default assume things go wrong
|
||||
status = constants.PORT_STATUS_ERROR
|
||||
if lswitchport:
|
||||
lp_status = (lswitchport['_relations']
|
||||
['LogicalPortStatus']
|
||||
['link_status_up'])
|
||||
status = (lp_status and
|
||||
constants.PORT_STATUS_ACTIVE
|
||||
or constants.PORT_STATUS_DOWN)
|
||||
# Update db object
|
||||
self._update_neutron_object(context, neutron_port_data, status)
|
||||
|
||||
def _synchronize_lswitchports(self, ctx, lp_uuids, scan_missing=False):
|
||||
if not lp_uuids and not scan_missing:
|
||||
return
|
||||
# Find Neutron port id by tag - the tag is already
|
||||
# loaded in memory, no reason for doing a db query
|
||||
# TODO(salvatore-orlando): Deal with the case the tag
|
||||
# has been tampered with
|
||||
neutron_port_mappings = {}
|
||||
for lp_uuid in lp_uuids:
|
||||
lport = (self._nvp_cache[lp_uuid].get('data') or
|
||||
self._nvp_cache[lp_uuid].get('data_bk'))
|
||||
tags = self._get_tag_dict(lport['tags'])
|
||||
neutron_port_id = tags.get('q_port_id')
|
||||
if neutron_port_id:
|
||||
neutron_port_mappings[neutron_port_id] = (
|
||||
self._nvp_cache[lp_uuid])
|
||||
with ctx.session.begin(subtransactions=True):
|
||||
# Fetch neutron ports from database
|
||||
# At the first sync we need to fetch all ports
|
||||
filters = ({} if scan_missing else
|
||||
{'id': neutron_port_mappings.keys()})
|
||||
# TODO(salv-orlando): Work out a solution for avoiding
|
||||
# this query
|
||||
ext_nets = [net['id'] for net in ctx.session.query(
|
||||
models_v2.Network).join(
|
||||
l3_db.ExternalNetwork,
|
||||
(models_v2.Network.id ==
|
||||
l3_db.ExternalNetwork.network_id))]
|
||||
for port in self._plugin._get_collection_query(
|
||||
ctx, models_v2.Port, filters=filters):
|
||||
lswitchport = neutron_port_mappings.get(port['id'])
|
||||
self.synchronize_port(
|
||||
ctx, port, lswitchport and lswitchport.get('data'),
|
||||
ext_networks=ext_nets)
|
||||
|
||||
def _get_chunk_size(self, sp):
|
||||
# NOTE(salv-orlando): Try to use __future__ for this routine only?
|
||||
ratio = ((float(sp.total_size) / float(sp.chunk_size)) /
|
||||
(float(self._sync_interval) / float(self._req_delay)))
|
||||
new_size = max(1.0, ratio) * float(sp.chunk_size)
|
||||
return int(new_size) + (new_size - int(new_size) > 0)
|
||||
|
||||
def _fetch_data(self, uri, cursor, page_size):
|
||||
# If not cursor there is nothing to retrieve
|
||||
if cursor:
|
||||
if cursor == 'start':
|
||||
cursor = None
|
||||
# Chunk size tuning might, in some conditions, make it larger
|
||||
# than 5,000, which is the maximum page size allowed by the NVP
|
||||
# API. In this case the request should be split in multiple
|
||||
# requests. This is not ideal, and therefore a log warning will
|
||||
# be emitted.
|
||||
requests = range(0, page_size / (nvplib.MAX_PAGE_SIZE + 1) + 1)
|
||||
if len(requests) > 1:
|
||||
LOG.warn(_("Requested page size is %(cur_chunk_size)d."
|
||||
"It might be necessary to do %(num_requests)d "
|
||||
"round-trips to NVP for fetching data. Please "
|
||||
"tune sync parameters to ensure chunk size "
|
||||
"is less than %(max_page_size)d"),
|
||||
{'cur_chunk_size': page_size,
|
||||
'num_requests': len(requests),
|
||||
'max_page_size': nvplib.MAX_PAGE_SIZE})
|
||||
results = []
|
||||
actual_size = 0
|
||||
for _req in requests:
|
||||
req_results, cursor, req_size = nvplib.get_single_query_page(
|
||||
uri, self._cluster, cursor,
|
||||
min(page_size, nvplib.MAX_PAGE_SIZE))
|
||||
results.extend(req_results)
|
||||
actual_size = actual_size + req_size
|
||||
# If no cursor is returned break the cycle as there is no
|
||||
# actual need to perform multiple requests (all fetched)
|
||||
# This happens when the overall size of resources exceeds
|
||||
# the maximum page size, but the number for each single
|
||||
# resource type is below this threshold
|
||||
if not cursor:
|
||||
break
|
||||
# reset cursor before returning if we queried just to
|
||||
# know the number of entities
|
||||
return results, cursor if page_size else 'start', actual_size
|
||||
return [], cursor, None
|
||||
|
||||
def _fetch_nvp_data_chunk(self, sp):
|
||||
base_chunk_size = sp.chunk_size
|
||||
chunk_size = base_chunk_size + sp.extra_chunk_size
|
||||
LOG.info(_("Fetching up to %s resources "
|
||||
"from NVP backend"), chunk_size)
|
||||
fetched = ls_count = lr_count = lp_count = 0
|
||||
lswitches = lrouters = lswitchports = []
|
||||
if sp.ls_cursor or sp.ls_cursor == 'start':
|
||||
(lswitches, sp.ls_cursor, ls_count) = self._fetch_data(
|
||||
self.LS_URI, sp.ls_cursor, chunk_size)
|
||||
fetched = len(lswitches)
|
||||
if fetched < chunk_size and sp.lr_cursor or sp.lr_cursor == 'start':
|
||||
(lrouters, sp.lr_cursor, lr_count) = self._fetch_data(
|
||||
self.LR_URI, sp.lr_cursor, max(chunk_size - fetched, 0))
|
||||
fetched += len(lrouters)
|
||||
if fetched < chunk_size and sp.lp_cursor or sp.lp_cursor == 'start':
|
||||
(lswitchports, sp.lp_cursor, lp_count) = self._fetch_data(
|
||||
self.LP_URI, sp.lp_cursor, max(chunk_size - fetched, 0))
|
||||
fetched += len(lswitchports)
|
||||
if sp.current_chunk == 0:
|
||||
# No cursors were provided. Then it must be possible to
|
||||
# calculate the total amount of data to fetch
|
||||
sp.total_size = ls_count + lr_count + lp_count
|
||||
LOG.debug(_("Total data size: %d"), sp.total_size)
|
||||
sp.chunk_size = self._get_chunk_size(sp)
|
||||
# Calculate chunk size adjustment
|
||||
sp.extra_chunk_size = sp.chunk_size - base_chunk_size
|
||||
LOG.debug(_("Fetched %(num_lswitches)d logical switches, "
|
||||
"%(num_lswitchports)d logical switch ports,"
|
||||
"%(num_lrouters)d logical routers"),
|
||||
{'num_lswitches': len(lswitches),
|
||||
'num_lswitchports': len(lswitchports),
|
||||
'num_lrouters': len(lrouters)})
|
||||
return (lswitches, lrouters, lswitchports)
|
||||
|
||||
def _synchronize_state(self, sp):
|
||||
# If the plugin has been destroyed, stop the LoopingCall
|
||||
if not self._plugin:
|
||||
raise loopingcall.LoopingCallDone
|
||||
start = timeutils.utcnow()
|
||||
# Reset page cursor variables if necessary
|
||||
if sp.current_chunk == 0:
|
||||
sp.ls_cursor = sp.lr_cursor = sp.lp_cursor = 'start'
|
||||
LOG.info(_("Running state synchronization task. Chunk: %s"),
|
||||
sp.current_chunk)
|
||||
# Fetch chunk_size data from NVP
|
||||
try:
|
||||
(lswitches, lrouters, lswitchports) = (
|
||||
self._fetch_nvp_data_chunk(sp))
|
||||
except (NvpApiClient.RequestTimeout, NvpApiClient.NvpApiException):
|
||||
sleep_interval = self._sync_backoff
|
||||
# Cap max back off to 64 seconds
|
||||
self._sync_backoff = min(self._sync_backoff * 2, 64)
|
||||
LOG.exception(_("An error occured while communicating with "
|
||||
"NVP backend. Will retry synchronization "
|
||||
"in %d seconds"), sleep_interval)
|
||||
return sleep_interval
|
||||
LOG.debug(_("Time elapsed querying NVP: %s"),
|
||||
timeutils.utcnow() - start)
|
||||
if sp.total_size:
|
||||
num_chunks = ((sp.total_size / sp.chunk_size) +
|
||||
(sp.total_size % sp.chunk_size != 0))
|
||||
else:
|
||||
num_chunks = 1
|
||||
LOG.debug(_("Number of chunks: %d"), num_chunks)
|
||||
# Find objects which have changed on NVP side and need
|
||||
# to be synchronized
|
||||
(ls_uuids, lr_uuids, lp_uuids) = self._nvp_cache.process_updates(
|
||||
lswitches, lrouters, lswitchports)
|
||||
# Process removed objects only at the last chunk
|
||||
scan_missing = (sp.current_chunk == num_chunks - 1 and
|
||||
not sp.init_sync_performed)
|
||||
if sp.current_chunk == num_chunks - 1:
|
||||
self._nvp_cache.process_deletes()
|
||||
ls_uuids = self._nvp_cache.get_lswitches(
|
||||
changed_only=not scan_missing)
|
||||
lr_uuids = self._nvp_cache.get_lrouters(
|
||||
changed_only=not scan_missing)
|
||||
lp_uuids = self._nvp_cache.get_lswitchports(
|
||||
changed_only=not scan_missing)
|
||||
LOG.debug(_("Time elapsed hashing data: %s"),
|
||||
timeutils.utcnow() - start)
|
||||
# Get an admin context
|
||||
ctx = context.get_admin_context()
|
||||
# Synchronize with database
|
||||
with ctx.session.begin(subtransactions=True):
|
||||
self._synchronize_lswitches(ctx, ls_uuids,
|
||||
scan_missing=scan_missing)
|
||||
self._synchronize_lrouters(ctx, lr_uuids,
|
||||
scan_missing=scan_missing)
|
||||
self._synchronize_lswitchports(ctx, lp_uuids,
|
||||
scan_missing=scan_missing)
|
||||
# Increase chunk counter
|
||||
LOG.info(_("Synchronization for chunk %(chunk_num)d of "
|
||||
"%(total_chunks)d performed"),
|
||||
{'chunk_num': sp.current_chunk + 1,
|
||||
'total_chunks': num_chunks})
|
||||
sp.current_chunk = (sp.current_chunk + 1) % num_chunks
|
||||
added_delay = 0
|
||||
if sp.current_chunk == 0:
|
||||
# Ensure init_sync_performed is True
|
||||
if not sp.init_sync_performed:
|
||||
sp.init_sync_performed = True
|
||||
# Add additional random delay
|
||||
added_delay = random.randint(0, self._max_rand_delay)
|
||||
LOG.debug(_("Time elapsed at end of sync: %s"),
|
||||
timeutils.utcnow() - start)
|
||||
return self._sync_interval / num_chunks + added_delay
|
@ -66,7 +66,10 @@ SNAT_KEYS = ["to_src_port_min", "to_src_port_max", "to_src_ip_min",
|
||||
"to_src_ip_max"]
|
||||
|
||||
DNAT_KEYS = ["to_dst_port", "to_dst_ip_min", "to_dst_ip_max"]
|
||||
|
||||
# Maximum page size for a single request
|
||||
# NOTE(salv-orlando): This might become a version-dependent map should the
|
||||
# limit be raised in future versions
|
||||
MAX_PAGE_SIZE = 5000
|
||||
|
||||
# TODO(bgh): it would be more efficient to use a bitmap
|
||||
taken_context_ids = []
|
||||
@ -157,21 +160,34 @@ def get_cluster_version(cluster):
|
||||
return version
|
||||
|
||||
|
||||
def get_single_query_page(path, cluster, page_cursor=None,
|
||||
page_length=1000, neutron_only=True):
|
||||
params = []
|
||||
if page_cursor:
|
||||
params.append("_page_cursor=%s" % page_cursor)
|
||||
params.append("_page_length=%s" % page_length)
|
||||
# NOTE(salv-orlando): On the NVP backend the 'Quantum' tag is still
|
||||
# used for marking Neutron entities in order to preserve compatibility
|
||||
if neutron_only:
|
||||
params.append("tag_scope=quantum")
|
||||
query_params = "&".join(params)
|
||||
path = "%s%s%s" % (path, "&" if (path.find("?") != -1) else "?",
|
||||
query_params)
|
||||
body = do_request(HTTP_GET, path, cluster=cluster)
|
||||
# Result_count won't be returned if _page_cursor is supplied
|
||||
return body['results'], body.get('page_cursor'), body.get('result_count')
|
||||
|
||||
|
||||
def get_all_query_pages(path, c):
|
||||
need_more_results = True
|
||||
result_list = []
|
||||
page_cursor = None
|
||||
query_marker = "&" if (path.find("?") != -1) else "?"
|
||||
while need_more_results:
|
||||
page_cursor_str = (
|
||||
"_page_cursor=%s" % page_cursor if page_cursor else "")
|
||||
body = do_request(HTTP_GET,
|
||||
"%s%s%s" % (path, query_marker, page_cursor_str),
|
||||
cluster=c)
|
||||
page_cursor = body.get('page_cursor')
|
||||
results, page_cursor = get_single_query_page(
|
||||
path, c, page_cursor)[:2]
|
||||
if not page_cursor:
|
||||
need_more_results = False
|
||||
result_list.extend(body['results'])
|
||||
result_list.extend(results)
|
||||
return result_list
|
||||
|
||||
|
||||
|
@ -19,7 +19,7 @@
|
||||
"lport_admin_up_count": %(lport_count)d,
|
||||
"_schema": "/ws.v1/schema/LogicalRouterStatus",
|
||||
"lport_count": %(lport_count)d,
|
||||
"fabric_status": true,
|
||||
"fabric_status": %(status)s,
|
||||
"type": "LogicalRouterStatus",
|
||||
"lport_link_up_count": %(lport_count)d
|
||||
}
|
||||
|
@ -2,7 +2,7 @@
|
||||
"_href": "/ws.v1/lswitch/%(uuid)s",
|
||||
"_schema": "/ws.v1/schema/LogicalSwitchConfig",
|
||||
"_relations": {"LogicalSwitchStatus":
|
||||
{"fabric_status": true,
|
||||
{"fabric_status": %(status)s,
|
||||
"type": "LogicalSwitchStatus",
|
||||
"lport_count": %(lport_count)d,
|
||||
"_href": "/ws.v1/lswitch/%(uuid)s/status",
|
||||
|
@ -3,7 +3,8 @@
|
||||
{"LogicalPortStatus":
|
||||
{"type": "LogicalSwitchPortStatus",
|
||||
"admin_status_enabled": true,
|
||||
"fabric_status_up": false,
|
||||
"fabric_status_up": %(status)s,
|
||||
"link_status_up": %(status)s,
|
||||
"_href": "/ws.v1/lswitch/%(ls_uuid)s/lport/%(uuid)s/status",
|
||||
"_schema": "/ws.v1/schema/LogicalSwitchPortStatus"},
|
||||
"LogicalSwitchConfig":
|
||||
|
@ -106,17 +106,6 @@ class FakeClient:
|
||||
LROUTER_LPORT_RESOURCE: ['LogicalPortAttachment'],
|
||||
}
|
||||
|
||||
_fake_lswitch_dict = {}
|
||||
_fake_lrouter_dict = {}
|
||||
_fake_lswitch_lport_dict = {}
|
||||
_fake_lrouter_lport_dict = {}
|
||||
_fake_lrouter_nat_dict = {}
|
||||
_fake_lswitch_lportstatus_dict = {}
|
||||
_fake_lrouter_lportstatus_dict = {}
|
||||
_fake_securityprofile_dict = {}
|
||||
_fake_lqueue_dict = {}
|
||||
_fake_gatewayservice_dict = {}
|
||||
|
||||
_validators = {
|
||||
LSWITCH_RESOURCE: _validate_resource,
|
||||
LSWITCH_LPORT_RESOURCE: _validate_resource,
|
||||
@ -128,6 +117,16 @@ class FakeClient:
|
||||
|
||||
def __init__(self, fake_files_path):
|
||||
self.fake_files_path = fake_files_path
|
||||
self._fake_lswitch_dict = {}
|
||||
self._fake_lrouter_dict = {}
|
||||
self._fake_lswitch_lport_dict = {}
|
||||
self._fake_lrouter_lport_dict = {}
|
||||
self._fake_lrouter_nat_dict = {}
|
||||
self._fake_lswitch_lportstatus_dict = {}
|
||||
self._fake_lrouter_lportstatus_dict = {}
|
||||
self._fake_securityprofile_dict = {}
|
||||
self._fake_lqueue_dict = {}
|
||||
self._fake_gatewayservice_dict = {}
|
||||
|
||||
def _get_tag(self, resource, scope):
|
||||
tags = [tag['tag'] for tag in resource['tags']
|
||||
@ -136,7 +135,7 @@ class FakeClient:
|
||||
|
||||
def _get_filters(self, querystring):
|
||||
if not querystring:
|
||||
return (None, None)
|
||||
return (None, None, None)
|
||||
params = urlparse.parse_qs(querystring)
|
||||
tag_filter = None
|
||||
attr_filter = None
|
||||
@ -145,7 +144,15 @@ class FakeClient:
|
||||
'tag': params['tag'][0]}
|
||||
elif 'uuid' in params:
|
||||
attr_filter = {'uuid': params['uuid'][0]}
|
||||
return (tag_filter, attr_filter)
|
||||
# Handle page_length
|
||||
# TODO(salv-orlando): Handle page cursor too
|
||||
page_len = params.get('_page_length')
|
||||
if page_len:
|
||||
page_len = int(page_len[0])
|
||||
else:
|
||||
# Explicitly set it to None (avoid 0 or empty list)
|
||||
page_len = None
|
||||
return (tag_filter, attr_filter, page_len)
|
||||
|
||||
def _add_lswitch(self, body):
|
||||
fake_lswitch = json.loads(body)
|
||||
@ -157,6 +164,8 @@ class FakeClient:
|
||||
fake_lswitch['zone_uuid'] = zone_uuid
|
||||
fake_lswitch['tenant_id'] = self._get_tag(fake_lswitch, 'os_tid')
|
||||
fake_lswitch['lport_count'] = 0
|
||||
# set status value
|
||||
fake_lswitch['status'] = 'true'
|
||||
return fake_lswitch
|
||||
|
||||
def _build_lrouter(self, body, uuid=None):
|
||||
@ -183,6 +192,8 @@ class FakeClient:
|
||||
uuidutils.generate_uuid())
|
||||
self._fake_lrouter_dict[fake_lrouter['uuid']] = fake_lrouter
|
||||
fake_lrouter['lport_count'] = 0
|
||||
# set status value
|
||||
fake_lrouter['status'] = 'true'
|
||||
return fake_lrouter
|
||||
|
||||
def _add_lqueue(self, body):
|
||||
@ -213,6 +224,8 @@ class FakeClient:
|
||||
fake_lport_status['ls_uuid'] = fake_lswitch['uuid']
|
||||
fake_lport_status['ls_name'] = fake_lswitch['display_name']
|
||||
fake_lport_status['ls_zone_uuid'] = fake_lswitch['zone_uuid']
|
||||
# set status value
|
||||
fake_lport['status'] = 'true'
|
||||
self._fake_lswitch_lportstatus_dict[new_uuid] = fake_lport_status
|
||||
return fake_lport
|
||||
|
||||
@ -356,7 +369,7 @@ class FakeClient:
|
||||
|
||||
def _list(self, resource_type, response_file,
|
||||
parent_uuid=None, query=None, relations=None):
|
||||
(tag_filter, attr_filter) = self._get_filters(query)
|
||||
(tag_filter, attr_filter, page_len) = self._get_filters(query)
|
||||
with open("%s/%s" % (self.fake_files_path, response_file)) as f:
|
||||
response_template = f.read()
|
||||
res_dict = getattr(self, '_fake_%s_dict' % resource_type)
|
||||
@ -425,8 +438,20 @@ class FakeClient:
|
||||
if (parent_func(res_uuid) and
|
||||
_tag_match(res_uuid) and
|
||||
_attr_match(res_uuid))]
|
||||
return json.dumps({'results': items,
|
||||
'result_count': len(items)})
|
||||
# Rather inefficient, but hey this is just a mock!
|
||||
next_cursor = None
|
||||
total_items = len(items)
|
||||
if page_len:
|
||||
try:
|
||||
next_cursor = items[page_len]['uuid']
|
||||
except IndexError:
|
||||
next_cursor = None
|
||||
items = items[:page_len]
|
||||
response_dict = {'results': items,
|
||||
'result_count': total_items}
|
||||
if next_cursor:
|
||||
response_dict['page_cursor'] = next_cursor
|
||||
return json.dumps(response_dict)
|
||||
|
||||
def _show(self, resource_type, response_file,
|
||||
uuid1, uuid2=None, relations=None):
|
||||
|
@ -16,6 +16,7 @@
|
||||
import mock
|
||||
|
||||
from neutron.common.test_lib import test_config
|
||||
from neutron.plugins.nicira.common import sync
|
||||
from neutron.tests.unit.nicira import fake_nvpapiclient
|
||||
from neutron.tests.unit.nicira import get_fake_conf
|
||||
from neutron.tests.unit.nicira import NVPAPI_NAME
|
||||
@ -35,6 +36,9 @@ class NVPDhcpAgentNotifierTestCase(test_base.OvsDhcpAgentNotifierTestCase):
|
||||
self.fc = fake_nvpapiclient.FakeClient(STUBS_PATH)
|
||||
self.mock_nvpapi = mock.patch(NVPAPI_NAME, autospec=True)
|
||||
instance = self.mock_nvpapi.start()
|
||||
# Avoid runs of the synchronizer looping call
|
||||
patch_sync = mock.patch.object(sync, '_start_loopingcall')
|
||||
patch_sync.start()
|
||||
|
||||
def _fake_request(*args, **kwargs):
|
||||
return self.fc.fake_request(*args, **kwargs)
|
||||
@ -44,6 +48,7 @@ class NVPDhcpAgentNotifierTestCase(test_base.OvsDhcpAgentNotifierTestCase):
|
||||
instance.return_value.request.side_effect = _fake_request
|
||||
super(NVPDhcpAgentNotifierTestCase, self).setUp()
|
||||
self.addCleanup(self.fc.reset_all)
|
||||
self.addCleanup(patch_sync.stop)
|
||||
self.addCleanup(self.mock_nvpapi.stop)
|
||||
|
||||
def _notification_mocks(self, hosts, mock_dhcp, net, subnet, port):
|
||||
|
@ -24,6 +24,7 @@ from neutron.api.v2 import attributes
|
||||
from neutron.common.test_lib import test_config
|
||||
from neutron import context
|
||||
from neutron.extensions import agent
|
||||
from neutron.plugins.nicira.common import sync
|
||||
from neutron.plugins.nicira.NvpApiClient import NVPVersion
|
||||
from neutron.tests.unit.nicira import fake_nvpapiclient
|
||||
from neutron.tests.unit.nicira import get_fake_conf
|
||||
@ -70,6 +71,9 @@ class MacLearningDBTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
|
||||
self.fc = fake_nvpapiclient.FakeClient(STUBS_PATH)
|
||||
self.mock_nvpapi = mock.patch(NVPAPI_NAME, autospec=True)
|
||||
instance = self.mock_nvpapi.start()
|
||||
# Avoid runs of the synchronizer looping call
|
||||
patch_sync = mock.patch.object(sync, '_start_loopingcall')
|
||||
patch_sync.start()
|
||||
|
||||
def _fake_request(*args, **kwargs):
|
||||
return self.fc.fake_request(*args, **kwargs)
|
||||
@ -80,6 +84,7 @@ class MacLearningDBTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
|
||||
cfg.CONF.set_override('metadata_mode', None, 'NVP')
|
||||
self.addCleanup(self.fc.reset_all)
|
||||
self.addCleanup(self.mock_nvpapi.stop)
|
||||
self.addCleanup(patch_sync.stop)
|
||||
self.addCleanup(self.restore_resource_attribute_map)
|
||||
self.addCleanup(cfg.CONF.reset)
|
||||
super(MacLearningDBTestCase, self).setUp()
|
||||
|
@ -34,6 +34,7 @@ from neutron.extensions import securitygroup as secgrp
|
||||
from neutron import manager
|
||||
from neutron.openstack.common import uuidutils
|
||||
from neutron.plugins.nicira.common import exceptions as nvp_exc
|
||||
from neutron.plugins.nicira.common import sync
|
||||
from neutron.plugins.nicira.dbexts import nicira_db
|
||||
from neutron.plugins.nicira.dbexts import nicira_qos_db as qos_db
|
||||
from neutron.plugins.nicira.extensions import distributedrouter as dist_router
|
||||
@ -92,6 +93,9 @@ class NiciraPluginV2TestCase(test_plugin.NeutronDbPluginV2TestCase):
|
||||
self.fc = fake_nvpapiclient.FakeClient(STUBS_PATH)
|
||||
self.mock_nvpapi = mock.patch(NVPAPI_NAME, autospec=True)
|
||||
self.mock_instance = self.mock_nvpapi.start()
|
||||
# Avoid runs of the synchronizer looping call
|
||||
patch_sync = mock.patch.object(sync, '_start_loopingcall')
|
||||
patch_sync.start()
|
||||
|
||||
def _fake_request(*args, **kwargs):
|
||||
return self.fc.fake_request(*args, **kwargs)
|
||||
@ -106,6 +110,7 @@ class NiciraPluginV2TestCase(test_plugin.NeutronDbPluginV2TestCase):
|
||||
cfg.CONF.set_override('metadata_mode', None, 'NVP')
|
||||
self.addCleanup(self.fc.reset_all)
|
||||
self.addCleanup(self.mock_nvpapi.stop)
|
||||
self.addCleanup(patch_sync.stop)
|
||||
|
||||
|
||||
class TestNiciraBasicGet(test_plugin.TestBasicGet, NiciraPluginV2TestCase):
|
||||
@ -325,6 +330,9 @@ class NiciraPortSecurityTestCase(psec.PortSecurityDBTestCase):
|
||||
self.mock_nvpapi = mock.patch(NVPAPI_NAME, autospec=True)
|
||||
instance = self.mock_nvpapi.start()
|
||||
instance.return_value.login.return_value = "the_cookie"
|
||||
# Avoid runs of the synchronizer looping call
|
||||
patch_sync = mock.patch.object(sync, '_start_loopingcall')
|
||||
patch_sync.start()
|
||||
|
||||
def _fake_request(*args, **kwargs):
|
||||
return self.fc.fake_request(*args, **kwargs)
|
||||
@ -333,6 +341,7 @@ class NiciraPortSecurityTestCase(psec.PortSecurityDBTestCase):
|
||||
super(NiciraPortSecurityTestCase, self).setUp(PLUGIN_NAME)
|
||||
self.addCleanup(self.fc.reset_all)
|
||||
self.addCleanup(self.mock_nvpapi.stop)
|
||||
self.addCleanup(patch_sync.stop)
|
||||
|
||||
|
||||
class TestNiciraPortSecurity(NiciraPortSecurityTestCase,
|
||||
@ -349,17 +358,18 @@ class NiciraSecurityGroupsTestCase(ext_sg.SecurityGroupDBTestCase):
|
||||
self.mock_nvpapi = mock.patch(NVPAPI_NAME, autospec=True)
|
||||
instance = self.mock_nvpapi.start()
|
||||
instance.return_value.login.return_value = "the_cookie"
|
||||
# Avoid runs of the synchronizer looping call
|
||||
patch_sync = mock.patch.object(sync, '_start_loopingcall')
|
||||
patch_sync.start()
|
||||
|
||||
def _fake_request(*args, **kwargs):
|
||||
return fc.fake_request(*args, **kwargs)
|
||||
|
||||
instance.return_value.request.side_effect = _fake_request
|
||||
self.addCleanup(self.mock_nvpapi.stop)
|
||||
self.addCleanup(patch_sync.stop)
|
||||
super(NiciraSecurityGroupsTestCase, self).setUp(PLUGIN_NAME)
|
||||
|
||||
def tearDown(self):
|
||||
super(NiciraSecurityGroupsTestCase, self).tearDown()
|
||||
self.mock_nvpapi.stop()
|
||||
|
||||
|
||||
class TestNiciraSecurityGroup(ext_sg.TestSecurityGroups,
|
||||
NiciraSecurityGroupsTestCase):
|
||||
@ -1125,20 +1135,12 @@ class NiciraNeutronNVPOutOfSync(NiciraPluginV2TestCase,
|
||||
res = req.get_response(self.api)
|
||||
self.assertEqual(res.status_int, 204)
|
||||
|
||||
def test_list_networks_not_in_nvp(self):
|
||||
res = self._create_network('json', 'net1', True)
|
||||
self.deserialize('json', res)
|
||||
self.fc._fake_lswitch_dict.clear()
|
||||
req = self.new_list_request('networks')
|
||||
nets = self.deserialize('json', req.get_response(self.api))
|
||||
self.assertEqual(nets['networks'][0]['status'],
|
||||
constants.NET_STATUS_ERROR)
|
||||
|
||||
def test_show_network_not_in_nvp(self):
|
||||
res = self._create_network('json', 'net1', True)
|
||||
net = self.deserialize('json', res)
|
||||
self.fc._fake_lswitch_dict.clear()
|
||||
req = self.new_show_request('networks', net['network']['id'])
|
||||
req = self.new_show_request('networks', net['network']['id'],
|
||||
fields=['id', 'status'])
|
||||
net = self.deserialize('json', req.get_response(self.api))
|
||||
self.assertEqual(net['network']['status'],
|
||||
constants.NET_STATUS_ERROR)
|
||||
@ -1153,17 +1155,6 @@ class NiciraNeutronNVPOutOfSync(NiciraPluginV2TestCase,
|
||||
res = req.get_response(self.api)
|
||||
self.assertEqual(res.status_int, 204)
|
||||
|
||||
def test_list_port_not_in_nvp(self):
|
||||
res = self._create_network('json', 'net1', True)
|
||||
net1 = self.deserialize('json', res)
|
||||
res = self._create_port('json', net1['network']['id'])
|
||||
self.deserialize('json', res)
|
||||
self.fc._fake_lswitch_lport_dict.clear()
|
||||
req = self.new_list_request('ports')
|
||||
nets = self.deserialize('json', req.get_response(self.api))
|
||||
self.assertEqual(nets['ports'][0]['status'],
|
||||
constants.PORT_STATUS_ERROR)
|
||||
|
||||
def test_show_port_not_in_nvp(self):
|
||||
res = self._create_network('json', 'net1', True)
|
||||
net1 = self.deserialize('json', res)
|
||||
@ -1171,7 +1162,8 @@ class NiciraNeutronNVPOutOfSync(NiciraPluginV2TestCase,
|
||||
port = self.deserialize('json', res)
|
||||
self.fc._fake_lswitch_lport_dict.clear()
|
||||
self.fc._fake_lswitch_lportstatus_dict.clear()
|
||||
req = self.new_show_request('ports', port['port']['id'])
|
||||
req = self.new_show_request('ports', port['port']['id'],
|
||||
fields=['id', 'status'])
|
||||
net = self.deserialize('json', req.get_response(self.api))
|
||||
self.assertEqual(net['port']['status'],
|
||||
constants.PORT_STATUS_ERROR)
|
||||
@ -1218,20 +1210,12 @@ class NiciraNeutronNVPOutOfSync(NiciraPluginV2TestCase,
|
||||
res = req.get_response(self.ext_api)
|
||||
self.assertEqual(res.status_int, 204)
|
||||
|
||||
def test_list_routers_not_in_nvp(self):
|
||||
res = self._create_router('json', 'tenant')
|
||||
self.deserialize('json', res)
|
||||
self.fc._fake_lrouter_dict.clear()
|
||||
req = self.new_list_request('routers')
|
||||
routers = self.deserialize('json', req.get_response(self.ext_api))
|
||||
self.assertEqual(routers['routers'][0]['status'],
|
||||
constants.NET_STATUS_ERROR)
|
||||
|
||||
def test_show_router_not_in_nvp(self):
|
||||
res = self._create_router('json', 'tenant')
|
||||
router = self.deserialize('json', res)
|
||||
self.fc._fake_lrouter_dict.clear()
|
||||
req = self.new_show_request('routers', router['router']['id'])
|
||||
req = self.new_show_request('routers', router['router']['id'],
|
||||
fields=['id', 'status'])
|
||||
router = self.deserialize('json', req.get_response(self.ext_api))
|
||||
self.assertEqual(router['router']['status'],
|
||||
constants.NET_STATUS_ERROR)
|
||||
|
587
neutron/tests/unit/nicira/test_nvp_sync.py
Normal file
587
neutron/tests/unit/nicira/test_nvp_sync.py
Normal file
@ -0,0 +1,587 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2013 Nicira Networks, Inc.
|
||||
# All Rights Reserved
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
import contextlib
|
||||
import time
|
||||
|
||||
import mock
|
||||
from oslo.config import cfg
|
||||
|
||||
from neutron.api.v2 import attributes as attr
|
||||
from neutron.common import config
|
||||
from neutron.common import constants
|
||||
from neutron import context
|
||||
from neutron.openstack.common import jsonutils as json
|
||||
from neutron.plugins.nicira.common import sync
|
||||
from neutron.plugins.nicira import NeutronPlugin
|
||||
from neutron.plugins.nicira import nvp_cluster
|
||||
from neutron.plugins.nicira import NvpApiClient
|
||||
from neutron.plugins.nicira import nvplib
|
||||
from neutron.tests import base
|
||||
from neutron.tests.unit.nicira import fake_nvpapiclient
|
||||
from neutron.tests.unit.nicira import get_fake_conf
|
||||
from neutron.tests.unit.nicira import NVPAPI_NAME
|
||||
from neutron.tests.unit.nicira import STUBS_PATH
|
||||
from neutron.tests.unit import test_api_v2
|
||||
|
||||
from neutron.openstack.common import log
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
_uuid = test_api_v2._uuid
|
||||
LSWITCHES = [{'uuid': _uuid(), 'name': 'ls-1'},
|
||||
{'uuid': _uuid(), 'name': 'ls-2'}]
|
||||
LSWITCHPORTS = [{'uuid': _uuid(), 'name': 'lp-1'},
|
||||
{'uuid': _uuid(), 'name': 'lp-2'}]
|
||||
LROUTERS = [{'uuid': _uuid(), 'name': 'lr-1'},
|
||||
{'uuid': _uuid(), 'name': 'lr-2'}]
|
||||
|
||||
|
||||
class NvpCacheTestCase(base.BaseTestCase):
|
||||
"""Test suite providing coverage for the NvpCache class."""
|
||||
|
||||
def setUp(self):
|
||||
self.nvp_cache = sync.NvpCache()
|
||||
for lswitch in LSWITCHES:
|
||||
self.nvp_cache._uuid_dict_mappings[lswitch['uuid']] = (
|
||||
self.nvp_cache._lswitches)
|
||||
self.nvp_cache._lswitches[lswitch['uuid']] = (
|
||||
{'data': lswitch,
|
||||
'hash': hash(json.dumps(lswitch))})
|
||||
for lswitchport in LSWITCHPORTS:
|
||||
self.nvp_cache._uuid_dict_mappings[lswitchport['uuid']] = (
|
||||
self.nvp_cache._lswitchports)
|
||||
self.nvp_cache._lswitchports[lswitchport['uuid']] = (
|
||||
{'data': lswitchport,
|
||||
'hash': hash(json.dumps(lswitchport))})
|
||||
for lrouter in LROUTERS:
|
||||
self.nvp_cache._uuid_dict_mappings[lrouter['uuid']] = (
|
||||
self.nvp_cache._lrouters)
|
||||
self.nvp_cache._lrouters[lrouter['uuid']] = (
|
||||
{'data': lrouter,
|
||||
'hash': hash(json.dumps(lrouter))})
|
||||
super(NvpCacheTestCase, self).setUp()
|
||||
|
||||
def test_get_lswitches(self):
|
||||
ls_uuids = self.nvp_cache.get_lswitches()
|
||||
self.assertEqual(set(ls_uuids),
|
||||
set([ls['uuid'] for ls in LSWITCHES]))
|
||||
|
||||
def test_get_lswitchports(self):
|
||||
lp_uuids = self.nvp_cache.get_lswitchports()
|
||||
self.assertEqual(set(lp_uuids),
|
||||
set([lp['uuid'] for lp in LSWITCHPORTS]))
|
||||
|
||||
def test_get_lrouters(self):
|
||||
lr_uuids = self.nvp_cache.get_lrouters()
|
||||
self.assertEqual(set(lr_uuids),
|
||||
set([lr['uuid'] for lr in LROUTERS]))
|
||||
|
||||
def test_get_lswitches_changed_only(self):
|
||||
ls_uuids = self.nvp_cache.get_lswitches(changed_only=True)
|
||||
self.assertEqual(0, len(ls_uuids))
|
||||
|
||||
def test_get_lswitchports_changed_only(self):
|
||||
lp_uuids = self.nvp_cache.get_lswitchports(changed_only=True)
|
||||
self.assertEqual(0, len(lp_uuids))
|
||||
|
||||
def test_get_lrouters_changed_only(self):
|
||||
lr_uuids = self.nvp_cache.get_lrouters(changed_only=True)
|
||||
self.assertEqual(0, len(lr_uuids))
|
||||
|
||||
def _verify_update(self, new_resource, changed=True, hit=True):
|
||||
cached_resource = self.nvp_cache[new_resource['uuid']]
|
||||
self.assertEqual(new_resource, cached_resource['data'])
|
||||
self.assertEqual(hit, cached_resource.get('hit', False))
|
||||
self.assertEqual(changed,
|
||||
cached_resource.get('changed', False))
|
||||
|
||||
def test_update_lswitch_new_item(self):
|
||||
new_switch_uuid = _uuid()
|
||||
new_switch = {'uuid': new_switch_uuid, 'name': 'new_switch'}
|
||||
self.nvp_cache.update_lswitch(new_switch)
|
||||
self.assertIn(new_switch_uuid, self.nvp_cache._lswitches.keys())
|
||||
self._verify_update(new_switch)
|
||||
|
||||
def test_update_lswitch_existing_item(self):
|
||||
switch = LSWITCHES[0]
|
||||
switch['name'] = 'new_name'
|
||||
self.nvp_cache.update_lswitch(switch)
|
||||
self.assertIn(switch['uuid'], self.nvp_cache._lswitches.keys())
|
||||
self._verify_update(switch)
|
||||
|
||||
def test_update_lswitchport_new_item(self):
|
||||
new_switchport_uuid = _uuid()
|
||||
new_switchport = {'uuid': new_switchport_uuid,
|
||||
'name': 'new_switchport'}
|
||||
self.nvp_cache.update_lswitchport(new_switchport)
|
||||
self.assertIn(new_switchport_uuid,
|
||||
self.nvp_cache._lswitchports.keys())
|
||||
self._verify_update(new_switchport)
|
||||
|
||||
def test_update_lswitchport_existing_item(self):
|
||||
switchport = LSWITCHPORTS[0]
|
||||
switchport['name'] = 'new_name'
|
||||
self.nvp_cache.update_lswitchport(switchport)
|
||||
self.assertIn(switchport['uuid'],
|
||||
self.nvp_cache._lswitchports.keys())
|
||||
self._verify_update(switchport)
|
||||
|
||||
def test_update_lrouter_new_item(self):
|
||||
new_router_uuid = _uuid()
|
||||
new_router = {'uuid': new_router_uuid,
|
||||
'name': 'new_router'}
|
||||
self.nvp_cache.update_lrouter(new_router)
|
||||
self.assertIn(new_router_uuid,
|
||||
self.nvp_cache._lrouters.keys())
|
||||
self._verify_update(new_router)
|
||||
|
||||
def test_update_lrouter_existing_item(self):
|
||||
router = LROUTERS[0]
|
||||
router['name'] = 'new_name'
|
||||
self.nvp_cache.update_lrouter(router)
|
||||
self.assertIn(router['uuid'],
|
||||
self.nvp_cache._lrouters.keys())
|
||||
self._verify_update(router)
|
||||
|
||||
def test_process_updates_initial(self):
|
||||
# Clear cache content to simulate first-time filling
|
||||
self.nvp_cache._lswitches.clear()
|
||||
self.nvp_cache._lswitchports.clear()
|
||||
self.nvp_cache._lrouters.clear()
|
||||
self.nvp_cache.process_updates(LSWITCHES, LROUTERS, LSWITCHPORTS)
|
||||
for resource in LSWITCHES + LROUTERS + LSWITCHPORTS:
|
||||
self._verify_update(resource)
|
||||
|
||||
def test_process_updates_no_change(self):
|
||||
self.nvp_cache.process_updates(LSWITCHES, LROUTERS, LSWITCHPORTS)
|
||||
for resource in LSWITCHES + LROUTERS + LSWITCHPORTS:
|
||||
self._verify_update(resource, changed=False)
|
||||
|
||||
def test_process_updates_with_changes(self):
|
||||
LSWITCHES[0]['name'] = 'altered'
|
||||
self.nvp_cache.process_updates(LSWITCHES, LROUTERS, LSWITCHPORTS)
|
||||
for resource in LSWITCHES + LROUTERS + LSWITCHPORTS:
|
||||
changed = (True if resource['uuid'] == LSWITCHES[0]['uuid']
|
||||
else False)
|
||||
self._verify_update(resource, changed=changed)
|
||||
|
||||
def _test_process_updates_with_removals(self):
|
||||
lswitches = LSWITCHES[:]
|
||||
lswitch = lswitches.pop()
|
||||
self.nvp_cache.process_updates(lswitches, LROUTERS, LSWITCHPORTS)
|
||||
for resource in LSWITCHES + LROUTERS + LSWITCHPORTS:
|
||||
hit = (False if resource['uuid'] == lswitch['uuid']
|
||||
else True)
|
||||
self._verify_update(resource, changed=False, hit=hit)
|
||||
return (lswitch, lswitches)
|
||||
|
||||
def test_process_updates_with_removals(self):
|
||||
self._test_process_updates_with_removals()
|
||||
|
||||
def test_process_updates_cleanup_after_delete(self):
|
||||
deleted_lswitch, lswitches = self._test_process_updates_with_removals()
|
||||
self.nvp_cache.process_deletes()
|
||||
self.nvp_cache.process_updates(lswitches, LROUTERS, LSWITCHPORTS)
|
||||
self.assertNotIn(deleted_lswitch['uuid'], self.nvp_cache._lswitches)
|
||||
|
||||
def _verify_delete(self, resource, deleted=True, hit=True):
|
||||
cached_resource = self.nvp_cache[resource['uuid']]
|
||||
data_field = 'data_bk' if deleted else 'data'
|
||||
self.assertEqual(resource, cached_resource[data_field])
|
||||
self.assertEqual(hit, cached_resource.get('hit', False))
|
||||
self.assertEqual(deleted,
|
||||
cached_resource.get('changed', False))
|
||||
|
||||
def _set_hit(self, resources, uuid_to_delete=None):
|
||||
for resource in resources:
|
||||
if resource['data']['uuid'] != uuid_to_delete:
|
||||
resource['hit'] = True
|
||||
|
||||
def test_process_deletes_no_change(self):
|
||||
# Mark all resources as hit
|
||||
self._set_hit(self.nvp_cache._lswitches.values())
|
||||
self._set_hit(self.nvp_cache._lswitchports.values())
|
||||
self._set_hit(self.nvp_cache._lrouters.values())
|
||||
self.nvp_cache.process_deletes()
|
||||
for resource in LSWITCHES + LROUTERS + LSWITCHPORTS:
|
||||
self._verify_delete(resource, hit=False, deleted=False)
|
||||
|
||||
def test_process_deletes_with_removals(self):
|
||||
# Mark all resources but one as hit
|
||||
uuid_to_delete = LSWITCHPORTS[0]['uuid']
|
||||
self._set_hit(self.nvp_cache._lswitches.values(),
|
||||
uuid_to_delete)
|
||||
self._set_hit(self.nvp_cache._lswitchports.values(),
|
||||
uuid_to_delete)
|
||||
self._set_hit(self.nvp_cache._lrouters.values(),
|
||||
uuid_to_delete)
|
||||
self.nvp_cache.process_deletes()
|
||||
for resource in LSWITCHES + LROUTERS + LSWITCHPORTS:
|
||||
deleted = resource['uuid'] == uuid_to_delete
|
||||
self._verify_delete(resource, hit=False, deleted=deleted)
|
||||
|
||||
|
||||
class SyncLoopingCallTestCase(base.BaseTestCase):
|
||||
|
||||
def test_looping_calls(self):
|
||||
# Avoid runs of the synchronization process - just start
|
||||
# the looping call
|
||||
with mock.patch.object(
|
||||
sync.NvpSynchronizer, '_synchronize_state',
|
||||
return_value=0.01):
|
||||
synchronizer = sync.NvpSynchronizer(None, None,
|
||||
100, 0, 0)
|
||||
time.sleep(0.04999)
|
||||
self.assertEqual(
|
||||
5, synchronizer._synchronize_state.call_count)
|
||||
|
||||
|
||||
class NvpSyncTestCase(base.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
# mock nvp api client
|
||||
self.fc = fake_nvpapiclient.FakeClient(STUBS_PATH)
|
||||
mock_nvpapi = mock.patch(NVPAPI_NAME, autospec=True)
|
||||
# Avoid runs of the synchronizer looping call
|
||||
# These unit tests will excplicitly invoke synchronization
|
||||
patch_sync = mock.patch.object(sync, '_start_loopingcall')
|
||||
self.mock_nvpapi = mock_nvpapi.start()
|
||||
patch_sync.start()
|
||||
self.mock_nvpapi.return_value.login.return_value = "the_cookie"
|
||||
# Emulate tests against NVP 3.x
|
||||
self.mock_nvpapi.return_value.get_nvp_version.return_value = (
|
||||
NvpApiClient.NVPVersion("3.1"))
|
||||
|
||||
def _fake_request(*args, **kwargs):
|
||||
return self.fc.fake_request(*args, **kwargs)
|
||||
|
||||
self.mock_nvpapi.return_value.request.side_effect = _fake_request
|
||||
self.fake_cluster = nvp_cluster.NVPCluster(
|
||||
name='fake-cluster', nvp_controllers=['1.1.1.1:999'],
|
||||
default_tz_uuid=_uuid(), nvp_user='foo', nvp_password='bar')
|
||||
self.fake_cluster.api_client = NvpApiClient.NVPApiHelper(
|
||||
('1.1.1.1', '999', True),
|
||||
self.fake_cluster.nvp_user, self.fake_cluster.nvp_password,
|
||||
self.fake_cluster.req_timeout, self.fake_cluster.http_timeout,
|
||||
self.fake_cluster.retries, self.fake_cluster.redirects)
|
||||
# Instantiate Neutron plugin
|
||||
# and setup needed config variables
|
||||
args = ['--config-file', get_fake_conf('neutron.conf.test'),
|
||||
'--config-file', get_fake_conf('nvp.ini.test')]
|
||||
config.parse(args=args)
|
||||
self._plugin = NeutronPlugin.NvpPluginV2()
|
||||
super(NvpSyncTestCase, self).setUp()
|
||||
self.addCleanup(self.fc.reset_all)
|
||||
self.addCleanup(patch_sync.stop)
|
||||
self.addCleanup(mock_nvpapi.stop)
|
||||
|
||||
def tearDown(self):
|
||||
cfg.CONF.reset()
|
||||
super(NvpSyncTestCase, self).tearDown()
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _populate_data(self, ctx, net_size=2, port_size=2, router_size=2):
|
||||
|
||||
def network(idx):
|
||||
return {'network': {'name': 'net-%s' % idx,
|
||||
'admin_state_up': True,
|
||||
'shared': False,
|
||||
'port_security_enabled': True,
|
||||
'tenant_id': 'foo'}}
|
||||
|
||||
def subnet(idx, net_id):
|
||||
return {'subnet':
|
||||
{'cidr': '10.10.%s.0/24' % idx,
|
||||
'name': 'sub-%s' % idx,
|
||||
'gateway_ip': attr.ATTR_NOT_SPECIFIED,
|
||||
'allocation_pools': attr.ATTR_NOT_SPECIFIED,
|
||||
'ip_version': 4,
|
||||
'dns_nameservers': attr.ATTR_NOT_SPECIFIED,
|
||||
'host_routes': attr.ATTR_NOT_SPECIFIED,
|
||||
'enable_dhcp': True,
|
||||
'network_id': net_id,
|
||||
'tenant_id': 'foo'}}
|
||||
|
||||
def port(idx, net_id):
|
||||
return {'port': {'network_id': net_id,
|
||||
'name': 'port-%s' % idx,
|
||||
'admin_state_up': True,
|
||||
'device_id': 'miao',
|
||||
'device_owner': 'bau',
|
||||
'fixed_ips': attr.ATTR_NOT_SPECIFIED,
|
||||
'mac_address': attr.ATTR_NOT_SPECIFIED,
|
||||
'tenant_id': 'foo'}}
|
||||
|
||||
def router(idx):
|
||||
# Use random uuids as names
|
||||
return {'router': {'name': 'rtr-%s' % idx,
|
||||
'admin_state_up': True,
|
||||
'tenant_id': 'foo'}}
|
||||
|
||||
networks = []
|
||||
ports = []
|
||||
routers = []
|
||||
for i in range(0, net_size):
|
||||
net = self._plugin.create_network(ctx, network(i))
|
||||
networks.append(net)
|
||||
self._plugin.create_subnet(ctx, subnet(i, net['id']))
|
||||
for j in range(0, port_size):
|
||||
ports.append(self._plugin.create_port(
|
||||
ctx, port("%s-%s" % (i, j), net['id'])))
|
||||
for i in range(0, router_size):
|
||||
routers.append(self._plugin.create_router(ctx, router(i)))
|
||||
# Do not return anything as the user does need the actual
|
||||
# data created
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
# Remove everything
|
||||
for router in routers:
|
||||
self._plugin.delete_router(ctx, router['id'])
|
||||
for port in ports:
|
||||
self._plugin.delete_port(ctx, port['id'])
|
||||
# This will remove networks and subnets
|
||||
for network in networks:
|
||||
self._plugin.delete_network(ctx, network['id'])
|
||||
|
||||
def _get_tag_dict(self, tags):
|
||||
return dict((tag['scope'], tag['tag']) for tag in tags)
|
||||
|
||||
def _test_sync(self, exp_net_status,
|
||||
exp_port_status, exp_router_status,
|
||||
action_callback=None, sp=None):
|
||||
neutron_net_id = ls_uuid = self.fc._fake_lswitch_dict.keys()[0]
|
||||
lp_uuid = self.fc._fake_lswitch_lport_dict.keys()[0]
|
||||
neutron_port_id = self._get_tag_dict(
|
||||
self.fc._fake_lswitch_lport_dict[lp_uuid]['tags'])['q_port_id']
|
||||
neutron_rtr_id = lr_uuid = self.fc._fake_lrouter_dict.keys()[0]
|
||||
if action_callback:
|
||||
action_callback(ls_uuid, lp_uuid, lr_uuid)
|
||||
# Make chunk big enough to read everything
|
||||
if not sp:
|
||||
sp = sync.SyncParameters(100)
|
||||
self._plugin._synchronizer._synchronize_state(sp)
|
||||
# Verify element is in expected status
|
||||
# TODO(salv-orlando): Verify status for all elements
|
||||
ctx = context.get_admin_context()
|
||||
neutron_net = self._plugin.get_network(ctx, neutron_net_id)
|
||||
neutron_port = self._plugin.get_port(ctx, neutron_port_id)
|
||||
neutron_rtr = self._plugin.get_router(ctx, neutron_rtr_id)
|
||||
self.assertEqual(exp_net_status, neutron_net['status'])
|
||||
self.assertEqual(exp_port_status, neutron_port['status'])
|
||||
self.assertEqual(exp_router_status, neutron_rtr['status'])
|
||||
|
||||
def _action_callback_status_down(self, ls_uuid, lp_uuid, lr_uuid):
|
||||
self.fc._fake_lswitch_dict[ls_uuid]['status'] = 'false'
|
||||
self.fc._fake_lswitch_lport_dict[lp_uuid]['status'] = 'false'
|
||||
self.fc._fake_lrouter_dict[lr_uuid]['status'] = 'false'
|
||||
|
||||
def test_initial_sync(self):
|
||||
ctx = context.get_admin_context()
|
||||
with self._populate_data(ctx):
|
||||
self._test_sync(
|
||||
constants.NET_STATUS_ACTIVE,
|
||||
constants.PORT_STATUS_ACTIVE,
|
||||
constants.NET_STATUS_ACTIVE)
|
||||
|
||||
def test_initial_sync_with_resources_down(self):
|
||||
ctx = context.get_admin_context()
|
||||
with self._populate_data(ctx):
|
||||
self._test_sync(
|
||||
constants.NET_STATUS_DOWN, constants.PORT_STATUS_DOWN,
|
||||
constants.NET_STATUS_DOWN, self._action_callback_status_down)
|
||||
|
||||
def test_resync_with_resources_down(self):
|
||||
ctx = context.get_admin_context()
|
||||
with self._populate_data(ctx):
|
||||
sp = sync.SyncParameters(100)
|
||||
self._plugin._synchronizer._synchronize_state(sp)
|
||||
self._test_sync(
|
||||
constants.NET_STATUS_DOWN, constants.PORT_STATUS_DOWN,
|
||||
constants.NET_STATUS_DOWN, self._action_callback_status_down)
|
||||
|
||||
def _action_callback_del_resource(self, ls_uuid, lp_uuid, lr_uuid):
|
||||
del self.fc._fake_lswitch_dict[ls_uuid]
|
||||
del self.fc._fake_lswitch_lport_dict[lp_uuid]
|
||||
del self.fc._fake_lrouter_dict[lr_uuid]
|
||||
|
||||
def test_initial_sync_with_resources_removed(self):
|
||||
ctx = context.get_admin_context()
|
||||
with self._populate_data(ctx):
|
||||
self._test_sync(
|
||||
constants.NET_STATUS_ERROR, constants.PORT_STATUS_ERROR,
|
||||
constants.NET_STATUS_ERROR, self._action_callback_del_resource)
|
||||
|
||||
def test_resync_with_resources_removed(self):
|
||||
ctx = context.get_admin_context()
|
||||
with self._populate_data(ctx):
|
||||
sp = sync.SyncParameters(100)
|
||||
self._plugin._synchronizer._synchronize_state(sp)
|
||||
self._test_sync(
|
||||
constants.NET_STATUS_ERROR, constants.PORT_STATUS_ERROR,
|
||||
constants.NET_STATUS_ERROR, self._action_callback_del_resource)
|
||||
|
||||
def _test_sync_with_chunk_larger_maxpagesize(
|
||||
self, net_size, port_size, router_size, chunk_size, exp_calls):
|
||||
ctx = context.get_admin_context()
|
||||
real_func = nvplib.get_single_query_page
|
||||
sp = sync.SyncParameters(chunk_size)
|
||||
with self._populate_data(ctx, net_size=net_size,
|
||||
port_size=port_size,
|
||||
router_size=router_size):
|
||||
with mock.patch.object(nvplib, 'MAX_PAGE_SIZE', 15):
|
||||
# The following mock is just for counting calls,
|
||||
# but we will still run the actual function
|
||||
with mock.patch.object(
|
||||
nvplib, 'get_single_query_page',
|
||||
side_effect=real_func) as mock_get_page:
|
||||
self._test_sync(
|
||||
constants.NET_STATUS_ACTIVE,
|
||||
constants.PORT_STATUS_ACTIVE,
|
||||
constants.NET_STATUS_ACTIVE,
|
||||
sp=sp)
|
||||
# As each resource type does not exceed the maximum page size,
|
||||
# the method should be called once for each resource type
|
||||
self.assertEqual(exp_calls, mock_get_page.call_count)
|
||||
|
||||
def test_sync_chunk_larger_maxpagesize_no_multiple_requests(self):
|
||||
# total resource size = 20
|
||||
# total size for each resource does not exceed max page size (15)
|
||||
self._test_sync_with_chunk_larger_maxpagesize(
|
||||
net_size=5, port_size=2, router_size=5,
|
||||
chunk_size=20, exp_calls=3)
|
||||
|
||||
def test_sync_chunk_larger_maxpagesize_triggers_multiple_requests(self):
|
||||
# total resource size = 48
|
||||
# total size for each resource does exceed max page size (15)
|
||||
self._test_sync_with_chunk_larger_maxpagesize(
|
||||
net_size=16, port_size=1, router_size=16,
|
||||
chunk_size=48, exp_calls=6)
|
||||
|
||||
def test_sync_multi_chunk(self):
|
||||
# The fake NVP API client cannot be used for this test
|
||||
ctx = context.get_admin_context()
|
||||
# Generate 4 networks, 1 port per network, and 4 routers
|
||||
with self._populate_data(ctx, net_size=4, port_size=1, router_size=4):
|
||||
fake_lswitches = json.loads(
|
||||
self.fc.handle_get('/ws.v1/lswitch'))['results']
|
||||
fake_lrouters = json.loads(
|
||||
self.fc.handle_get('/ws.v1/lrouter'))['results']
|
||||
fake_lswitchports = json.loads(
|
||||
self.fc.handle_get('/ws.v1/lswitch/*/lport'))['results']
|
||||
return_values = [
|
||||
# Chunk 0 - lswitches
|
||||
(fake_lswitches, None, 4),
|
||||
# Chunk 0 - lrouters
|
||||
(fake_lrouters[:2], 'xxx', 4),
|
||||
# Chunk 0 - lports (size only)
|
||||
([], 'start', 4),
|
||||
# Chunk 1 - lrouters (2 more) (lswitches are skipped)
|
||||
(fake_lrouters[2:], None, None),
|
||||
# Chunk 1 - lports
|
||||
(fake_lswitchports, None, 4)]
|
||||
|
||||
def fake_fetch_data(*args, **kwargs):
|
||||
return return_values.pop(0)
|
||||
|
||||
# 2 Chunks, with 6 resources each.
|
||||
# 1st chunk lswitches and lrouters
|
||||
# 2nd chunk lrouters and lports
|
||||
# Mock _fetch_data
|
||||
with mock.patch.object(
|
||||
self._plugin._synchronizer, '_fetch_data',
|
||||
side_effect=fake_fetch_data):
|
||||
sp = sync.SyncParameters(6)
|
||||
|
||||
def do_chunk(chunk_idx, ls_cursor, lr_cursor, lp_cursor):
|
||||
self._plugin._synchronizer._synchronize_state(sp)
|
||||
self.assertEqual(chunk_idx, sp.current_chunk)
|
||||
self.assertEqual(ls_cursor, sp.ls_cursor)
|
||||
self.assertEqual(lr_cursor, sp.lr_cursor)
|
||||
self.assertEqual(lp_cursor, sp.lp_cursor)
|
||||
|
||||
# check 1st chunk
|
||||
do_chunk(1, None, 'xxx', 'start')
|
||||
# check 2nd chunk
|
||||
do_chunk(0, None, None, None)
|
||||
# Chunk size should have stayed the same
|
||||
self.assertEqual(sp.chunk_size, 6)
|
||||
|
||||
def test_synchronize_network(self):
|
||||
ctx = context.get_admin_context()
|
||||
with self._populate_data(ctx):
|
||||
# Put a network down to verify synchronization
|
||||
q_net_id = ls_uuid = self.fc._fake_lswitch_dict.keys()[0]
|
||||
self.fc._fake_lswitch_dict[ls_uuid]['status'] = 'false'
|
||||
q_net_data = self._plugin._get_network(ctx, q_net_id)
|
||||
self._plugin._synchronizer.synchronize_network(ctx, q_net_data)
|
||||
# Reload from db
|
||||
q_nets = self._plugin.get_networks(ctx)
|
||||
for q_net in q_nets:
|
||||
if q_net['id'] == q_net_id:
|
||||
exp_status = constants.NET_STATUS_DOWN
|
||||
else:
|
||||
exp_status = constants.NET_STATUS_ACTIVE
|
||||
self.assertEqual(exp_status, q_net['status'])
|
||||
|
||||
def test_synchronize_port(self):
|
||||
ctx = context.get_admin_context()
|
||||
with self._populate_data(ctx):
|
||||
# Put a network down to verify synchronization
|
||||
lp_uuid = self.fc._fake_lswitch_lport_dict.keys()[0]
|
||||
lport = self.fc._fake_lswitch_lport_dict[lp_uuid]
|
||||
q_port_id = self._get_tag_dict(lport['tags'])['q_port_id']
|
||||
lport['status'] = 'false'
|
||||
q_port_data = self._plugin._get_port(ctx, q_port_id)
|
||||
self._plugin._synchronizer.synchronize_port(ctx, q_port_data)
|
||||
# Reload from db
|
||||
q_ports = self._plugin.get_ports(ctx)
|
||||
for q_port in q_ports:
|
||||
if q_port['id'] == q_port_id:
|
||||
exp_status = constants.PORT_STATUS_DOWN
|
||||
else:
|
||||
exp_status = constants.PORT_STATUS_ACTIVE
|
||||
self.assertEqual(exp_status, q_port['status'])
|
||||
|
||||
def test_synchronize_router(self):
|
||||
ctx = context.get_admin_context()
|
||||
with self._populate_data(ctx):
|
||||
# Put a network down to verify synchronization
|
||||
q_rtr_id = lr_uuid = self.fc._fake_lrouter_dict.keys()[0]
|
||||
self.fc._fake_lrouter_dict[lr_uuid]['status'] = 'false'
|
||||
q_rtr_data = self._plugin._get_router(ctx, q_rtr_id)
|
||||
self._plugin._synchronizer.synchronize_router(ctx, q_rtr_data)
|
||||
# Reload from db
|
||||
q_routers = self._plugin.get_routers(ctx)
|
||||
for q_rtr in q_routers:
|
||||
if q_rtr['id'] == q_rtr_id:
|
||||
exp_status = constants.NET_STATUS_DOWN
|
||||
else:
|
||||
exp_status = constants.NET_STATUS_ACTIVE
|
||||
self.assertEqual(exp_status, q_rtr['status'])
|
||||
|
||||
def test_sync_nvp_failure_backoff(self):
|
||||
self.mock_nvpapi.return_value.request.side_effect = (
|
||||
NvpApiClient.RequestTimeout)
|
||||
# chunk size won't matter here
|
||||
sp = sync.SyncParameters(999)
|
||||
for i in range(0, 10):
|
||||
self.assertEqual(
|
||||
min(64, 2 ** i),
|
||||
self._plugin._synchronizer._synchronize_state(sp))
|
@ -16,6 +16,7 @@
|
||||
import fixtures
|
||||
import testtools
|
||||
|
||||
import mock
|
||||
from oslo.config import cfg
|
||||
|
||||
from neutron.common import config as q_config
|
||||
@ -23,6 +24,7 @@ from neutron.manager import NeutronManager
|
||||
from neutron.openstack.common import uuidutils
|
||||
from neutron.plugins.nicira.common import config # noqa
|
||||
from neutron.plugins.nicira.common import exceptions
|
||||
from neutron.plugins.nicira.common import sync
|
||||
from neutron.plugins.nicira import nvp_cluster
|
||||
from neutron.tests.unit.nicira import get_fake_conf
|
||||
from neutron.tests.unit.nicira import PLUGIN_NAME
|
||||
@ -81,6 +83,10 @@ class ConfigurationTest(testtools.TestCase):
|
||||
self.useFixture(fixtures.MonkeyPatch(
|
||||
'neutron.manager.NeutronManager._instance',
|
||||
None))
|
||||
# Avoid runs of the synchronizer looping call
|
||||
patch_sync = mock.patch.object(sync, '_start_loopingcall')
|
||||
patch_sync.start()
|
||||
self.addCleanup(patch_sync.stop)
|
||||
|
||||
def _assert_required_options(self, cluster):
|
||||
self.assertEqual(cluster.nvp_controllers, ['fake_1:443', 'fake_2:443'])
|
||||
@ -175,6 +181,10 @@ class OldConfigurationTest(testtools.TestCase):
|
||||
self.useFixture(fixtures.MonkeyPatch(
|
||||
'neutron.manager.NeutronManager._instance',
|
||||
None))
|
||||
# Avoid runs of the synchronizer looping call
|
||||
patch_sync = mock.patch.object(sync, '_start_loopingcall')
|
||||
patch_sync.start()
|
||||
self.addCleanup(patch_sync.stop)
|
||||
|
||||
def _assert_required_options(self, cluster):
|
||||
self.assertEqual(cluster.nvp_controllers, ['fake_1:443', 'fake_2:443'])
|
||||
|
@ -202,10 +202,14 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
|
||||
'GET', resource, None, fmt, params=params, subresource=subresource
|
||||
)
|
||||
|
||||
def new_show_request(self, resource, id, fmt=None, subresource=None):
|
||||
return self._req(
|
||||
'GET', resource, None, fmt, id=id, subresource=subresource
|
||||
)
|
||||
def new_show_request(self, resource, id, fmt=None,
|
||||
subresource=None, fields=None):
|
||||
if fields:
|
||||
params = "&".join(["fields=%s" % x for x in fields])
|
||||
else:
|
||||
params = None
|
||||
return self._req('GET', resource, None, fmt, id=id,
|
||||
params=params, subresource=subresource)
|
||||
|
||||
def new_delete_request(self, resource, id, fmt=None, subresource=None,
|
||||
sub_id=None):
|
||||
|
Loading…
x
Reference in New Issue
Block a user