Migrate octavia resources

The nsx-migration tool will migrate octavia resources to the new
plugin driver without changing the octavia DB, by calling the driver directly
using RPC.
In addition, since octavia keeps subnet-ids, the logic of the nsx-migration
was modified so that subnet-ids will not change.

Change-Id: I7b729cb28da362a9fdca21068cb6006d7f743286
This commit is contained in:
asarfaty 2019-12-17 11:10:12 +02:00
parent 50b93766ef
commit ed3aae353a
9 changed files with 294 additions and 187 deletions

View File

@ -37,8 +37,15 @@ class ApiReplayCli(object):
dest_os_auth_url=args.dest_os_auth_url,
dest_plugin=args.dest_plugin,
use_old_keystone=args.use_old_keystone,
max_retry=args.max_retry,
logfile=args.logfile)
octavia_os_tenant_name=args.octavia_os_project_name,
octavia_os_tenant_domain_id=args.octavia_os_project_domain_id,
octavia_os_username=args.octavia_os_username,
octavia_os_user_domain_id=args.octavia_os_user_domain_id,
octavia_os_password=args.octavia_os_password,
octavia_os_auth_url=args.octavia_os_auth_url,
neutron_conf=args.neutron_conf,
logfile=args.logfile,
max_retry=args.max_retry)
def _setup_argparse(self):
parser = argparse.ArgumentParser()
@ -115,11 +122,42 @@ class ApiReplayCli(object):
action='store_true',
help="Use old keystone client for source authentication.")
# Arguments required to connect to the octavia client (read only)
parser.add_argument(
"--octavia-os-username",
help="The octavia os-username to use to "
"gather loadbalancers resources with.")
parser.add_argument(
"--octavia-os-user-domain-id",
default=DEFAULT_DOMAIN_ID,
help="The octavia os-user-domain-id to use to "
"gather loadbalancers resources with.")
parser.add_argument(
"--octavia-os-project-name",
help="The octavia os-project-name to use to "
"gather loadbalancers resource with.")
parser.add_argument(
"--octavia-os-project-domain-id",
default=DEFAULT_DOMAIN_ID,
help="The octavia os-project-domain-id to use to "
"gather loadbalancers resource with.")
parser.add_argument(
"--octavia-os-password",
help="The password for this octavia user.")
parser.add_argument(
"--octavia-os-auth-url",
help="They keystone api endpoint for this octavia user.")
parser.add_argument(
"--logfile",
default=DEFAULT_LOGFILE,
help="Output logfile.")
parser.add_argument(
"--neutron_conf",
default='/etc/neutron/neutron.conf',
help="neutron config file path.")
parser.add_argument(
"--max-retry",
default=10,

View File

@ -10,8 +10,9 @@
# License for the specific language governing permissions and limitations
# under the License.
import copy
import logging
import time
import socket
import six
@ -20,13 +21,20 @@ from keystoneauth1 import session
from neutronclient.common import exceptions as n_exc
from neutronclient.v2_0 import client
from octaviaclient.api.v2 import octavia
from oslo_config import cfg
import oslo_messaging as messaging
from oslo_messaging.rpc import dispatcher
from oslo_utils import excutils
from neutron.common import config as neutron_config
from octavia_lib.api.drivers import driver_lib
from vmware_nsx.api_replay import utils
from vmware_nsx.common import nsxv_constants
from vmware_nsx.services.lbaas.octavia import constants as d_const
logging.basicConfig(level=logging.INFO)
LOG = logging.getLogger(__name__)
LOG.setLevel(logging.INFO)
# For internal testing only
use_old_keystone_on_dest = False
@ -41,7 +49,15 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
dest_os_username, dest_os_user_domain_id,
dest_os_tenant_name, dest_os_tenant_domain_id,
dest_os_password, dest_os_auth_url, dest_plugin,
use_old_keystone, logfile, max_retry):
use_old_keystone,
octavia_os_username, octavia_os_user_domain_id,
octavia_os_tenant_name, octavia_os_tenant_domain_id,
octavia_os_password, octavia_os_auth_url,
neutron_conf, logfile, max_retry):
# Init config and logging
if neutron_conf:
neutron_config.init(args=['--config-file', neutron_conf])
if logfile:
f_handler = logging.FileHandler(logfile)
@ -54,6 +70,7 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
# connect to both clients
if use_old_keystone:
LOG.info("Using old keystone for source neutron")
# Since we are not sure what keystone version will be used on the
# source setup, we add an option to use the v2 client
self.source_neutron = client.Client(
@ -61,7 +78,6 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
tenant_name=source_os_tenant_name,
password=source_os_password,
auth_url=source_os_auth_url)
self.source_octavia = None
else:
self.source_neutron = self.connect_to_client(
username=source_os_username,
@ -70,21 +86,14 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
tenant_domain_id=source_os_tenant_domain_id,
password=source_os_password,
auth_url=source_os_auth_url)
self.source_octavia = self.connect_to_octavia(
username=source_os_username,
user_domain_id=source_os_user_domain_id,
tenant_name=source_os_tenant_name,
tenant_domain_id=source_os_tenant_domain_id,
password=source_os_password,
auth_url=source_os_auth_url)
if use_old_keystone_on_dest:
LOG.info("Using old keystone for destination neutron")
self.dest_neutron = client.Client(
username=dest_os_username,
tenant_name=dest_os_tenant_name,
password=dest_os_password,
auth_url=dest_os_auth_url)
self.dest_octavia = None
else:
self.dest_neutron = self.connect_to_client(
username=dest_os_username,
@ -93,13 +102,17 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
tenant_domain_id=dest_os_tenant_domain_id,
password=dest_os_password,
auth_url=dest_os_auth_url)
self.dest_octavia = self.connect_to_octavia(
username=dest_os_username,
user_domain_id=dest_os_user_domain_id,
tenant_name=dest_os_tenant_name,
tenant_domain_id=dest_os_tenant_domain_id,
password=dest_os_password,
auth_url=dest_os_auth_url)
if octavia_os_auth_url:
self.octavia = self.connect_to_octavia(
username=octavia_os_username,
user_domain_id=octavia_os_user_domain_id,
tenant_name=octavia_os_tenant_name,
tenant_domain_id=octavia_os_tenant_domain_id,
password=octavia_os_password,
auth_url=octavia_os_auth_url)
else:
self.octavia = None
self.dest_plugin = dest_plugin
@ -112,7 +125,7 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
self.migrate_floatingips()
self.migrate_routers_routes(routers_routes)
self.migrate_fwaas()
if self.source_octavia and self.dest_octavia:
if self.octavia:
self.migrate_octavia()
LOG.info("NSX migration is Done.")
@ -430,6 +443,7 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
source_networks = self.source_neutron.list_networks()['networks']
dest_networks = self.dest_neutron.list_networks()['networks']
dest_ports = self.dest_neutron.list_ports()['ports']
dest_subnets = self.dest_neutron.list_subnets()['subnets']
remove_qos = False
if not self.dest_qos_support:
@ -485,6 +499,12 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
dhcp_subnets = []
count_dhcp_subnet = 0
for subnet_id in network['subnets']:
# only create subnet if the dest server doesn't have it
if self.have_id(subnet_id, dest_subnets):
LOG.info("Skip network %s: Already exists on the "
"destination", network['id'])
continue
subnet = self.find_subnet_by_id(subnet_id, source_subnets)
body = self.prepare_subnet(subnet)
@ -528,10 +548,6 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
except n_exc.BadRequest as e:
LOG.error("Failed to create subnet: %(subnet)s: %(e)s",
{'subnet': subnet, 'e': e})
# NOTE(arosen): this occurs here if you run the script
# multiple times as we don't currently
# preserve the subnet_id. Also, 409 would be a better
# response code for this in neutron :(
# create the ports on the network
ports = self.get_ports_on_network(network['id'], source_ports)
@ -741,93 +757,75 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
LOG.info("FWaaS V2 migration done")
def _wait_for_lb_up(self, lb_id):
retry_num = 0
while retry_num < self.max_retry:
lb = self.dest_octavia.load_balancer_show(lb_id)
if not lb['provisioning_status'].startswith('PENDING'):
if lb['provisioning_status'] == 'ACTIVE':
return True
# No point in trying any more
return False
retry_num = retry_num + 1
time.sleep(1)
return False
def _delete_octavia_lb(self, body):
kw = {'loadbalancer': body}
self.octavia_rpc_client.call({}, 'loadbalancer_delete_cascade', **kw)
def _migrate_octavia_lb(self, lb, orig_map):
# Creating all loadbalancers resources on the new nsx driver
# using RPC calls to the plugin listener.
# Create the loadbalancer:
body = self.prepare_lb_loadbalancer(lb)
try:
new_lb = self.dest_octavia.load_balancer_create(
json={'loadbalancer': body})['loadbalancer']
except Exception as e:
LOG.error("Failed to create loadbalancer (%(lb)s): %(e)s",
{'lb': lb, 'e': e})
return
new_lb_id = new_lb['id']
if not self._wait_for_lb_up(new_lb_id):
LOG.error("New loadbalancer %s does not become active", new_lb_id)
lb_body = self.prepare_lb_loadbalancer(lb)
kw = {'loadbalancer': lb_body}
if not self.octavia_rpc_client.call({}, 'loadbalancer_create', **kw):
LOG.error("Failed to create loadbalancer (%s)", lb_body)
self._delete_octavia_lb(lb_body)
return
lb_id = lb['id']
lb_body_for_deletion = copy.deepcopy(lb_body)
lb_body_for_deletion['listeners'] = []
lb_body_for_deletion['pools'] = []
listeners_map = {}
for listener_dict in lb.get('listeners', []):
listener_id = listener_dict['id']
listener = orig_map['listeners'][listener_id]
body = self.prepare_lb_listener(listener)
# Update loadbalancer in listener
body['loadbalancer_id'] = new_lb_id
try:
new_listener = self.dest_octavia.listener_create(
json={'listener': body})['listener']
except Exception as e:
LOG.error("Failed to create listener (%(list)s): %(e)s",
{'list': listener, 'e': e})
body = self.prepare_lb_listener(listener, lb_body)
body['loadbalancer'] = lb_body
body['loadbalancer_id'] = lb_id
kw = {'listener': body, 'cert': None}
if not self.octavia_rpc_client.call({}, 'listener_create', **kw):
LOG.error("Failed to create loadbalancer %(lb)s listener "
"(%(list)s)", {'list': listener, 'lb': lb_id})
self._delete_octavia_lb(lb_body_for_deletion)
return
if not self._wait_for_lb_up(new_lb_id):
LOG.error("New loadbalancer %s does not become active after "
"listener creation", new_lb_id)
return
# map old-id to new
listeners_map[listener_id] = new_listener['id']
listeners_map[listener_id] = body
lb_body_for_deletion['listeners'].append(body)
pools_map = {}
for pool_dict in lb.get('pools', []):
pool_id = pool_dict['id']
pool = orig_map['pools'][pool_id]
body = self.prepare_lb_pool(pool)
# Update loadbalancer and listeners in pool
body['loadbalancer_id'] = new_lb_id
pool_body = self.prepare_lb_pool(pool, lb_body)
# Update listeners in pool
if pool.get('listeners'):
body['listener_id'] = listeners_map[pool['listeners'][0]['id']]
try:
new_pool = self.dest_octavia.pool_create(
json={'pool': body})['pool']
except Exception as e:
LOG.error("Failed to create pool (%(pool)s): %(e)s",
{'pool': pool, 'e': e})
listener_id = pool['listeners'][0]['id']
pool_body['listener_id'] = listener_id
pool_body['listener'] = listeners_map.get(listener_id)
kw = {'pool': pool_body}
if not self.octavia_rpc_client.call({}, 'pool_create', **kw):
LOG.error("Failed to create loadbalancer %(lb)s pool "
"(%(pool)s)", {'pool': pool, 'lb': lb_id})
self._delete_octavia_lb(lb_body_for_deletion)
return
if not self._wait_for_lb_up(new_lb_id):
LOG.error("New loadbalancer %s does not become active after "
"pool creation", new_lb_id)
return
# map old-id to new
pools_map[pool_id] = new_pool['id']
lb_body_for_deletion['pools'].append(pool)
# Add members to this pool
source_members = self.source_octavia.member_list(pool_id)[
'members']
for member in source_members:
body = self.prepare_lb_member(member)
try:
self.dest_octavia.member_create(
new_pool['id'], json={'member': body})['member']
except Exception as e:
LOG.error("Failed to create member (%(member)s): %(e)s",
{'member': member, 'e': e})
return
if not self._wait_for_lb_up(new_lb_id):
LOG.error("New loadbalancer %s does not become active "
"after member creation", new_lb_id)
pool_members = self.octavia.member_list(pool_id)['members']
for member in pool_members:
body = self.prepare_lb_member(member, lb_body)
if not member['subnet_id']:
# Add the loadbalancer subnet
body['subnet_id'] = lb_body['vip_subnet_id']
body['pool'] = pool_body
kw = {'member': body}
if not self.octavia_rpc_client.call({}, 'member_create', **kw):
LOG.error("Failed to create pool %(pool)s member "
"(%(member)s)",
{'member': member, 'pool': pool_id})
self._delete_octavia_lb(lb_body_for_deletion)
return
# Add pool health monitor
@ -835,62 +833,42 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
hm_id = pool['healthmonitor_id']
hm = orig_map['hms'][hm_id]
body = self.prepare_lb_hm(hm)
body['pool'] = pool_body
# Update pool id in hm
body['pool_id'] = new_pool['id']
try:
self.dest_octavia.health_monitor_create(
json={'healthmonitor': body})['healthmonitor']
except Exception as e:
LOG.error("Failed to create healthmonitor (%(hm)s): %(e)s",
{'hm': hm, 'e': e})
return
if not self._wait_for_lb_up(new_lb_id):
LOG.error("New loadbalancer %s does not become active "
"after health monitor creation", new_lb_id)
kw = {'healthmonitor': body}
if not self.octavia_rpc_client.call(
{}, 'healthmonitor_create', **kw):
LOG.error("Failed to create pool %(pool)s healthmonitor "
"(%(hm)s)", {'hm': hm, 'pool': pool_id})
self._delete_octavia_lb(lb_body_for_deletion)
return
lb_body_for_deletion['pools'][-1]['healthmonitor'] = body
# Add listeners L7 policies
for listener_id in listeners_map:
for listener_id in listeners_map.keys():
listener = orig_map['listeners'][listener_id]
for l7pol_dict in listener.get('l7policies', []):
l7pol = orig_map['l7pols'][l7pol_dict['id']]
body = self.prepare_lb_l7policy(l7pol)
# Update pool id in l7 policy
body['listener_id'] = listeners_map[listener_id]
# update redirect_pool_id
if l7pol.get('redirect_pool_id'):
body['redirect_pool_id'] = pools_map[
l7pol['redirect_pool_id']]
try:
new_pol = self.dest_octavia.l7policy_create(
json={'l7policy': body})['l7policy']
except Exception as e:
LOG.error("Failed to create l7policy (%(l7pol)s): "
"%(e)s", {'l7pol': l7pol, 'e': e})
return
if not self._wait_for_lb_up(new_lb_id):
LOG.error("New loadbalancer %s does not become active "
"after L7 policy creation", new_lb_id)
return
l7_pol_id = l7pol_dict['id']
l7pol = orig_map['l7pols'][l7_pol_id]
pol_body = self.prepare_lb_l7policy(l7pol)
# Add the rules of this policy
source_l7rules = self.source_octavia.l7rule_list(
l7pol['id'])['rules']
source_l7rules = self.octavia.l7rule_list(
l7_pol_id)['rules']
for rule in source_l7rules:
body = self.prepare_lb_l7rule(rule)
try:
self.dest_octavia.l7rule_create(
new_pol['id'], json={'rule': body})['rule']
except Exception as e:
LOG.error("Failed to create l7rule (%(rule)s): "
"%(e)s", {'rule': rule, 'e': e})
return
if not self._wait_for_lb_up(new_lb_id):
LOG.error("New loadbalancer %s does not become "
"active after L7 rule creation",
new_lb_id)
rule_body = self.prepare_lb_l7rule(rule)
pol_body['rules'].append(rule_body)
kw = {'l7policy': pol_body}
if not self.octavia_rpc_client.call(
{}, 'l7policy_create', **kw):
LOG.error("Failed to create l7policy (%(l7pol)s)",
{'l7pol': l7pol})
self._delete_octavia_lb(lb_body_for_deletion)
return
LOG.info("Created loadbalancer %s", lb_id)
def _map_orig_objects_of_type(self, source_objects):
result = {}
for obj in source_objects:
@ -908,35 +886,58 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
return result
def migrate_octavia(self):
"""Migrates Octavia objects from source to dest neutron.
Right now the Octavia objects are created with new IDS, and
do not keep their original IDs
"""Migrates Octavia NSX objects to the new neutron driver.
The Octavia proccess & DB will remain unchanged.
Using RPC connection to connect directly with the new plugin driver.
"""
# TODO(asarfaty): Keep original ids
# Read all existing octavia resources
try:
source_loadbalancers = self.source_octavia.\
load_balancer_list()['loadbalancers']
source_listeners = self.source_octavia.listener_list()['listeners']
source_pools = self.source_octavia.pool_list()['pools']
source_hms = self.source_octavia.\
health_monitor_list()['healthmonitors']
source_l7pols = self.source_octavia.l7policy_list()['l7policies']
loadbalancers = self.octavia.load_balancer_list()['loadbalancers']
listeners = self.octavia.listener_list()['listeners']
pools = self.octavia.pool_list()['pools']
hms = self.octavia.health_monitor_list()['healthmonitors']
l7pols = self.octavia.l7policy_list()['l7policies']
except Exception as e:
# Octavia might be disabled in the source
LOG.info("Octavia was not found on the source server: %s", e)
LOG.info("Octavia was not found on the server: %s", e)
return
try:
self.dest_octavia.load_balancer_list()
except Exception as e:
# Octavia might be disabled in the destination
LOG.warning("Skipping Octavia migration. Octavia was not found "
"on the destination server: %s", e)
return
orig_map = self._map_orig_lb_objects(source_listeners, source_pools,
source_hms, source_l7pols)
total_num = len(source_loadbalancers)
# Init the RPC connection for sending messages to the octavia driver
topic = d_const.OCTAVIA_TO_DRIVER_MIGRATION_TOPIC
transport = messaging.get_rpc_transport(cfg.CONF)
target = messaging.Target(topic=topic, exchange="common",
namespace='control', fanout=False,
version='1.0')
self.octavia_rpc_client = messaging.RPCClient(transport, target)
# Initialize RPC listener for getting status updates from the driver
# so that the rsource status will not change in the octavia DB
topic = d_const.DRIVER_TO_OCTAVIA_MIGRATION_TOPIC
server = socket.gethostname()
target = messaging.Target(topic=topic, server=server,
exchange="common", fanout=False)
class MigrationOctaviaDriverEndpoint(driver_lib.DriverLibrary):
target = messaging.Target(namespace="control", version='1.0')
def update_loadbalancer_status(self, **kw):
# Do nothing
pass
endpoints = [MigrationOctaviaDriverEndpoint]
access_policy = dispatcher.DefaultRPCAccessPolicy
self.octavia_rpc_server = messaging.get_rpc_server(
transport, target, endpoints, executor='threading',
access_policy=access_policy)
self.octavia_rpc_server.start()
orig_map = self._map_orig_lb_objects(listeners, pools,
hms, l7pols)
total_num = len(loadbalancers)
LOG.info("Migrating %d loadbalancer(s)", total_num)
for lb in source_loadbalancers:
for lb in loadbalancers:
if lb['provisioning_status'] == 'ACTIVE':
self._migrate_octavia_lb(lb, orig_map)
else:
LOG.info("Skipping %s loadbalancer %s",
lb['provisioning_status'], lb['id'])

View File

@ -73,7 +73,6 @@ class PrepareObjectForMigration(object):
drop_subnet_fields = basic_ignore_fields + [
'advanced_service_providers',
'id',
'service_types']
drop_port_fields = basic_ignore_fields + [
@ -103,14 +102,14 @@ class PrepareObjectForMigration(object):
drop_fwaas_group_fields = ['status']
lb_ignore_fields = ['created_at', 'updated_at', 'operating_status',
'provisioning_status', 'id']
'provisioning_status']
drop_lb_loadbalancer_fields = lb_ignore_fields + [
'listeners', 'pools', # Those objects will be created laster
'vip_subnet_id', # vip_port_id will be used
'listeners', 'pools', # Those objects will be created later
'flavor_id', # not supported by the driver
'vip_qos_policy_id', # not supported by the driver
]
drop_lb_listener_fields = lb_ignore_fields + [
'loadbalancers', 'l7policies', 'default_pool_id']
'l7policies', 'default_pool_id']
drop_lb_pool_fields = lb_ignore_fields + [
'loadbalancers', 'healthmonitor_id', 'listeners', 'members']
drop_lb_member_fields = lb_ignore_fields
@ -192,6 +191,7 @@ class PrepareObjectForMigration(object):
# external networks needs some special care
if body.get('router:external'):
fields_reset = False
# TODO(asarfaty): map external network neutron ids to Policy tier0
for field in ['provider:network_type', 'provider:segmentation_id',
'provider:physical_network']:
if field in body:
@ -264,10 +264,6 @@ class PrepareObjectForMigration(object):
if 'device_owner' not in body:
body['device_owner'] = ""
if body.get('device_owner') == 'Octavia':
# remove device id & owner. Octavia will re-set it.
body['device_id'] = ""
body['device_owner'] = ""
return body
def prepare_floatingip(self, fip, direct_call=False):
@ -297,20 +293,31 @@ class PrepareObjectForMigration(object):
def prepare_lb_loadbalancer(self, lb_obj):
return self.drop_fields(lb_obj, self.drop_lb_loadbalancer_fields)
def prepare_lb_listener(self, lb_obj):
return self.drop_fields(lb_obj, self.drop_lb_listener_fields)
def prepare_lb_listener(self, listener_obj, lb_body):
body = self.drop_fields(listener_obj, self.drop_lb_listener_fields)
body['loadbalancer'] = lb_body
body['loadbalancer_id'] = lb_body['id']
return body
def prepare_lb_pool(self, lb_obj):
return self.drop_fields(lb_obj, self.drop_lb_pool_fields)
def prepare_lb_pool(self, pool_obj, lb_body):
body = self.drop_fields(pool_obj, self.drop_lb_pool_fields)
body['loadbalancer'] = lb_body
body['loadbalancer_id'] = lb_body['id']
return body
def prepare_lb_member(self, lb_obj):
return self.drop_fields(lb_obj, self.drop_lb_member_fields)
def prepare_lb_member(self, mem_obj, lb_body):
body = self.drop_fields(mem_obj, self.drop_lb_member_fields)
body['loadbalancer'] = lb_body
body['loadbalancer_id'] = lb_body['id']
return body
def prepare_lb_hm(self, lb_obj):
return self.drop_fields(lb_obj, self.drop_lb_hm_fields)
def prepare_lb_l7policy(self, lb_obj):
return self.drop_fields(lb_obj, self.drop_lb_l7policy_fields)
body = self.drop_fields(lb_obj, self.drop_lb_l7policy_fields)
body['rules'] = []
return body
def prepare_lb_l7rule(self, lb_obj):
return self.drop_fields(lb_obj, self.drop_lb_l7rule_fields)

View File

@ -37,6 +37,9 @@ RESOURCE_ATTRIBUTE_MAP = {
'networks': {
'id': ID_WITH_POST,
},
'subnets': {
'id': ID_WITH_POST,
},
'security_groups': {
'id': ID_WITH_POST,
'name': {'allow_post': True, 'allow_put': True,

View File

@ -837,6 +837,7 @@ class NsxPolicyPlugin(nsx_plugin_common.NsxPluginV3Base):
if network_id in NET_NEUTRON_2_NSX_ID_CACHE:
nsx_id = NET_NEUTRON_2_NSX_ID_CACHE[network_id]
del NET_NEUTRON_2_NSX_ID_CACHE[network_id]
if nsx_id in NET_NSX_2_NEUTRON_ID_CACHE:
del NET_NSX_2_NEUTRON_ID_CACHE[nsx_id]
def update_network(self, context, network_id, network):
@ -944,6 +945,7 @@ class NsxPolicyPlugin(nsx_plugin_common.NsxPluginV3Base):
self.nsxpolicy.tier1.update(router_id,
ipv6_ndra_profile_id=profile_id)
@nsx_plugin_common.api_replay_mode_wrapper
def create_subnet(self, context, subnet):
return self._create_subnet(context, subnet)

View File

@ -1220,6 +1220,7 @@ class NsxV3Plugin(nsx_plugin_common.NsxPluginV3Base,
LOG.warning("Failed to update network %(id)s dhcp server on "
"the NSX: %(e)s", {'id': network['id'], 'e': e})
@nsx_plugin_common.api_replay_mode_wrapper
def create_subnet(self, context, subnet):
return self._create_subnet(context, subnet)

View File

@ -103,8 +103,11 @@ class EdgeMemberManagerFromDict(base_mgr.NsxpLoadbalancerBaseManager):
def create(self, context, member, completor):
pool_client = self.core_plugin.nsxpolicy.load_balancer.lb_pool
self._validate_member_lb_connectivity(context, member, completor)
if member.get('subnet_id'):
network = lb_utils.get_network_from_subnet(
context, self.core_plugin, member['subnet_id'])
else:
network = None
if network and network.get('router:external'):
fixed_ip = self._get_info_from_fip(context, member['address'])
else:

View File

@ -14,7 +14,9 @@
# under the License.
OCTAVIA_TO_DRIVER_TOPIC = 'vmware_nsx__lb_listener'
OCTAVIA_TO_DRIVER_MIGRATION_TOPIC = 'vmware_nsx__lb_listener_migration'
DRIVER_TO_OCTAVIA_TOPIC = 'vmware_nsx__driver_listener'
DRIVER_TO_OCTAVIA_MIGRATION_TOPIC = 'vmware_nsx__driver_listener_migration'
LOADBALANCER = 'loadbalancer'
LISTENER = 'listener'

View File

@ -44,6 +44,9 @@ class NSXOctaviaListener(object):
loadbalancer, member, pool)
def _init_rpc_messaging(self):
if cfg.CONF.api_replay_mode:
topic = constants.DRIVER_TO_OCTAVIA_MIGRATION_TOPIC
else:
topic = constants.DRIVER_TO_OCTAVIA_TOPIC
transport = messaging.get_rpc_transport(cfg.CONF)
target = messaging.Target(topic=topic, exchange="common",
@ -54,6 +57,9 @@ class NSXOctaviaListener(object):
def _init_rpc_listener(self, healthmonitor, l7policy, l7rule, listener,
loadbalancer, member, pool):
# Initialize RPC listener
if cfg.CONF.api_replay_mode:
topic = constants.OCTAVIA_TO_DRIVER_MIGRATION_TOPIC
else:
topic = constants.OCTAVIA_TO_DRIVER_TOPIC
server = socket.gethostname()
transport = messaging.get_rpc_transport(cfg.CONF)
@ -299,6 +305,8 @@ class NSXOctaviaListenerEndpoint(object):
except Exception as e:
LOG.error('NSX driver loadbalancer_create failed %s', e)
completor(success=False)
return False
return True
@log_helpers.log_method_call
def loadbalancer_delete_cascade(self, ctxt, loadbalancer):
@ -336,6 +344,8 @@ class NSXOctaviaListenerEndpoint(object):
except Exception as e:
LOG.error('NSX driver loadbalancer_delete_cascade failed %s', e)
completor(success=False)
return False
return True
@log_helpers.log_method_call
def loadbalancer_delete(self, ctxt, loadbalancer, cascade=False):
@ -350,6 +360,8 @@ class NSXOctaviaListenerEndpoint(object):
except Exception as e:
LOG.error('NSX driver loadbalancer_delete failed %s', e)
completor(success=False)
return False
return True
@log_helpers.log_method_call
def loadbalancer_update(self, ctxt, old_loadbalancer, new_loadbalancer):
@ -362,6 +374,8 @@ class NSXOctaviaListenerEndpoint(object):
except Exception as e:
LOG.error('NSX driver loadbalancer_update failed %s', e)
completor(success=False)
return False
return True
# Listener
@log_helpers.log_method_call
@ -375,6 +389,8 @@ class NSXOctaviaListenerEndpoint(object):
except Exception as e:
LOG.error('NSX driver listener_create failed %s', e)
completor(success=False)
return False
return True
@log_helpers.log_method_call
def listener_delete(self, ctxt, listener):
@ -386,6 +402,8 @@ class NSXOctaviaListenerEndpoint(object):
except Exception as e:
LOG.error('NSX driver listener_delete failed %s', e)
completor(success=False)
return False
return True
@log_helpers.log_method_call
def listener_update(self, ctxt, old_listener, new_listener, cert):
@ -398,6 +416,8 @@ class NSXOctaviaListenerEndpoint(object):
except Exception as e:
LOG.error('NSX driver listener_update failed %s', e)
completor(success=False)
return False
return True
# Pool
@log_helpers.log_method_call
@ -410,6 +430,8 @@ class NSXOctaviaListenerEndpoint(object):
except Exception as e:
LOG.error('NSX driver pool_create failed %s', e)
completor(success=False)
return False
return True
@log_helpers.log_method_call
def pool_delete(self, ctxt, pool):
@ -421,6 +443,8 @@ class NSXOctaviaListenerEndpoint(object):
except Exception as e:
LOG.error('NSX driver pool_delete failed %s', e)
completor(success=False)
return False
return True
@log_helpers.log_method_call
def pool_update(self, ctxt, old_pool, new_pool):
@ -432,6 +456,8 @@ class NSXOctaviaListenerEndpoint(object):
except Exception as e:
LOG.error('NSX driver pool_update failed %s', e)
completor(success=False)
return False
return True
# Member
@log_helpers.log_method_call
@ -444,6 +470,8 @@ class NSXOctaviaListenerEndpoint(object):
except Exception as e:
LOG.error('NSX driver member_create failed %s', e)
completor(success=False)
return False
return True
@log_helpers.log_method_call
def member_delete(self, ctxt, member):
@ -455,6 +483,8 @@ class NSXOctaviaListenerEndpoint(object):
except Exception as e:
LOG.error('NSX driver member_delete failed %s', e)
completor(success=False)
return False
return True
@log_helpers.log_method_call
def member_update(self, ctxt, old_member, new_member):
@ -466,6 +496,8 @@ class NSXOctaviaListenerEndpoint(object):
except Exception as e:
LOG.error('NSX driver member_update failed %s', e)
completor(success=False)
return False
return True
# Health Monitor
@log_helpers.log_method_call
@ -478,6 +510,8 @@ class NSXOctaviaListenerEndpoint(object):
except Exception as e:
LOG.error('NSX driver healthmonitor_create failed %s', e)
completor(success=False)
return False
return True
@log_helpers.log_method_call
def healthmonitor_delete(self, ctxt, healthmonitor):
@ -489,6 +523,8 @@ class NSXOctaviaListenerEndpoint(object):
except Exception as e:
LOG.error('NSX driver healthmonitor_delete failed %s', e)
completor(success=False)
return False
return True
@log_helpers.log_method_call
def healthmonitor_update(self, ctxt, old_healthmonitor, new_healthmonitor):
@ -501,6 +537,8 @@ class NSXOctaviaListenerEndpoint(object):
except Exception as e:
LOG.error('NSX driver healthmonitor_update failed %s', e)
completor(success=False)
return False
return True
# L7 Policy
@log_helpers.log_method_call
@ -513,6 +551,8 @@ class NSXOctaviaListenerEndpoint(object):
except Exception as e:
LOG.error('NSX driver l7policy_create failed %s', e)
completor(success=False)
return False
return True
@log_helpers.log_method_call
def l7policy_delete(self, ctxt, l7policy):
@ -524,6 +564,8 @@ class NSXOctaviaListenerEndpoint(object):
except Exception as e:
LOG.error('NSX driver l7policy_delete failed %s', e)
completor(success=False)
return False
return True
@log_helpers.log_method_call
def l7policy_update(self, ctxt, old_l7policy, new_l7policy):
@ -535,6 +577,8 @@ class NSXOctaviaListenerEndpoint(object):
except Exception as e:
LOG.error('NSX driver l7policy_update failed %s', e)
completor(success=False)
return False
return True
# L7 Rule
@log_helpers.log_method_call
@ -546,6 +590,8 @@ class NSXOctaviaListenerEndpoint(object):
except Exception as e:
LOG.error('NSX driver l7rule_create failed %s', e)
completor(success=False)
return False
return True
@log_helpers.log_method_call
def l7rule_delete(self, ctxt, l7rule):
@ -557,6 +603,8 @@ class NSXOctaviaListenerEndpoint(object):
except Exception as e:
LOG.error('NSX driver l7rule_delete failed %s', e)
completor(success=False)
return False
return True
@log_helpers.log_method_call
def l7rule_update(self, ctxt, old_l7rule, new_l7rule):
@ -567,6 +615,8 @@ class NSXOctaviaListenerEndpoint(object):
except Exception as e:
LOG.error('NSX driver l7rule_update failed %s', e)
completor(success=False)
return False
return True
class NSXOctaviaStatisticsCollector(object):