From ed3aae353a06d41aa315d52e737fd9de76d26944 Mon Sep 17 00:00:00 2001 From: asarfaty Date: Tue, 17 Dec 2019 11:10:12 +0200 Subject: [PATCH] Migrate octavia resources The nsx-migration tool will migrate octavia resources to the new plugin driver without changing the octavia DB, by calling the driver directly using RPC. In addition, since octavia keeps subnet-ids, the logic of the nsx-migration was modified so that subnet-ids will not change. Change-Id: I7b729cb28da362a9fdca21068cb6006d7f743286 --- vmware_nsx/api_replay/cli.py | 42 ++- vmware_nsx/api_replay/client.py | 329 +++++++++--------- vmware_nsx/api_replay/utils.py | 39 ++- vmware_nsx/extensions/api_replay.py | 3 + vmware_nsx/plugins/nsx_p/plugin.py | 4 +- vmware_nsx/plugins/nsx_v3/plugin.py | 1 + .../lbaas/nsx_p/implementation/member_mgr.py | 7 +- .../services/lbaas/octavia/constants.py | 2 + .../lbaas/octavia/octavia_listener.py | 54 ++- 9 files changed, 294 insertions(+), 187 deletions(-) diff --git a/vmware_nsx/api_replay/cli.py b/vmware_nsx/api_replay/cli.py index 1d983f4cec..17f07e4072 100644 --- a/vmware_nsx/api_replay/cli.py +++ b/vmware_nsx/api_replay/cli.py @@ -37,8 +37,15 @@ class ApiReplayCli(object): dest_os_auth_url=args.dest_os_auth_url, dest_plugin=args.dest_plugin, use_old_keystone=args.use_old_keystone, - max_retry=args.max_retry, - logfile=args.logfile) + octavia_os_tenant_name=args.octavia_os_project_name, + octavia_os_tenant_domain_id=args.octavia_os_project_domain_id, + octavia_os_username=args.octavia_os_username, + octavia_os_user_domain_id=args.octavia_os_user_domain_id, + octavia_os_password=args.octavia_os_password, + octavia_os_auth_url=args.octavia_os_auth_url, + neutron_conf=args.neutron_conf, + logfile=args.logfile, + max_retry=args.max_retry) def _setup_argparse(self): parser = argparse.ArgumentParser() @@ -115,11 +122,42 @@ class ApiReplayCli(object): action='store_true', help="Use old keystone client for source authentication.") + # Arguments required to connect to the octavia client (read only) + parser.add_argument( + "--octavia-os-username", + help="The octavia os-username to use to " + "gather loadbalancers resources with.") + parser.add_argument( + "--octavia-os-user-domain-id", + default=DEFAULT_DOMAIN_ID, + help="The octavia os-user-domain-id to use to " + "gather loadbalancers resources with.") + parser.add_argument( + "--octavia-os-project-name", + help="The octavia os-project-name to use to " + "gather loadbalancers resource with.") + parser.add_argument( + "--octavia-os-project-domain-id", + default=DEFAULT_DOMAIN_ID, + help="The octavia os-project-domain-id to use to " + "gather loadbalancers resource with.") + parser.add_argument( + "--octavia-os-password", + help="The password for this octavia user.") + parser.add_argument( + "--octavia-os-auth-url", + help="They keystone api endpoint for this octavia user.") + parser.add_argument( "--logfile", default=DEFAULT_LOGFILE, help="Output logfile.") + parser.add_argument( + "--neutron_conf", + default='/etc/neutron/neutron.conf', + help="neutron config file path.") + parser.add_argument( "--max-retry", default=10, diff --git a/vmware_nsx/api_replay/client.py b/vmware_nsx/api_replay/client.py index 0a4af0af21..409a1f3a38 100644 --- a/vmware_nsx/api_replay/client.py +++ b/vmware_nsx/api_replay/client.py @@ -10,8 +10,9 @@ # License for the specific language governing permissions and limitations # under the License. +import copy import logging -import time +import socket import six @@ -20,13 +21,20 @@ from keystoneauth1 import session from neutronclient.common import exceptions as n_exc from neutronclient.v2_0 import client from octaviaclient.api.v2 import octavia +from oslo_config import cfg +import oslo_messaging as messaging +from oslo_messaging.rpc import dispatcher from oslo_utils import excutils +from neutron.common import config as neutron_config +from octavia_lib.api.drivers import driver_lib + from vmware_nsx.api_replay import utils from vmware_nsx.common import nsxv_constants +from vmware_nsx.services.lbaas.octavia import constants as d_const -logging.basicConfig(level=logging.INFO) LOG = logging.getLogger(__name__) +LOG.setLevel(logging.INFO) # For internal testing only use_old_keystone_on_dest = False @@ -41,7 +49,15 @@ class ApiReplayClient(utils.PrepareObjectForMigration): dest_os_username, dest_os_user_domain_id, dest_os_tenant_name, dest_os_tenant_domain_id, dest_os_password, dest_os_auth_url, dest_plugin, - use_old_keystone, logfile, max_retry): + use_old_keystone, + octavia_os_username, octavia_os_user_domain_id, + octavia_os_tenant_name, octavia_os_tenant_domain_id, + octavia_os_password, octavia_os_auth_url, + neutron_conf, logfile, max_retry): + + # Init config and logging + if neutron_conf: + neutron_config.init(args=['--config-file', neutron_conf]) if logfile: f_handler = logging.FileHandler(logfile) @@ -54,6 +70,7 @@ class ApiReplayClient(utils.PrepareObjectForMigration): # connect to both clients if use_old_keystone: + LOG.info("Using old keystone for source neutron") # Since we are not sure what keystone version will be used on the # source setup, we add an option to use the v2 client self.source_neutron = client.Client( @@ -61,7 +78,6 @@ class ApiReplayClient(utils.PrepareObjectForMigration): tenant_name=source_os_tenant_name, password=source_os_password, auth_url=source_os_auth_url) - self.source_octavia = None else: self.source_neutron = self.connect_to_client( username=source_os_username, @@ -70,21 +86,14 @@ class ApiReplayClient(utils.PrepareObjectForMigration): tenant_domain_id=source_os_tenant_domain_id, password=source_os_password, auth_url=source_os_auth_url) - self.source_octavia = self.connect_to_octavia( - username=source_os_username, - user_domain_id=source_os_user_domain_id, - tenant_name=source_os_tenant_name, - tenant_domain_id=source_os_tenant_domain_id, - password=source_os_password, - auth_url=source_os_auth_url) if use_old_keystone_on_dest: + LOG.info("Using old keystone for destination neutron") self.dest_neutron = client.Client( username=dest_os_username, tenant_name=dest_os_tenant_name, password=dest_os_password, auth_url=dest_os_auth_url) - self.dest_octavia = None else: self.dest_neutron = self.connect_to_client( username=dest_os_username, @@ -93,13 +102,17 @@ class ApiReplayClient(utils.PrepareObjectForMigration): tenant_domain_id=dest_os_tenant_domain_id, password=dest_os_password, auth_url=dest_os_auth_url) - self.dest_octavia = self.connect_to_octavia( - username=dest_os_username, - user_domain_id=dest_os_user_domain_id, - tenant_name=dest_os_tenant_name, - tenant_domain_id=dest_os_tenant_domain_id, - password=dest_os_password, - auth_url=dest_os_auth_url) + + if octavia_os_auth_url: + self.octavia = self.connect_to_octavia( + username=octavia_os_username, + user_domain_id=octavia_os_user_domain_id, + tenant_name=octavia_os_tenant_name, + tenant_domain_id=octavia_os_tenant_domain_id, + password=octavia_os_password, + auth_url=octavia_os_auth_url) + else: + self.octavia = None self.dest_plugin = dest_plugin @@ -112,7 +125,7 @@ class ApiReplayClient(utils.PrepareObjectForMigration): self.migrate_floatingips() self.migrate_routers_routes(routers_routes) self.migrate_fwaas() - if self.source_octavia and self.dest_octavia: + if self.octavia: self.migrate_octavia() LOG.info("NSX migration is Done.") @@ -430,6 +443,7 @@ class ApiReplayClient(utils.PrepareObjectForMigration): source_networks = self.source_neutron.list_networks()['networks'] dest_networks = self.dest_neutron.list_networks()['networks'] dest_ports = self.dest_neutron.list_ports()['ports'] + dest_subnets = self.dest_neutron.list_subnets()['subnets'] remove_qos = False if not self.dest_qos_support: @@ -485,6 +499,12 @@ class ApiReplayClient(utils.PrepareObjectForMigration): dhcp_subnets = [] count_dhcp_subnet = 0 for subnet_id in network['subnets']: + + # only create subnet if the dest server doesn't have it + if self.have_id(subnet_id, dest_subnets): + LOG.info("Skip network %s: Already exists on the " + "destination", network['id']) + continue subnet = self.find_subnet_by_id(subnet_id, source_subnets) body = self.prepare_subnet(subnet) @@ -528,10 +548,6 @@ class ApiReplayClient(utils.PrepareObjectForMigration): except n_exc.BadRequest as e: LOG.error("Failed to create subnet: %(subnet)s: %(e)s", {'subnet': subnet, 'e': e}) - # NOTE(arosen): this occurs here if you run the script - # multiple times as we don't currently - # preserve the subnet_id. Also, 409 would be a better - # response code for this in neutron :( # create the ports on the network ports = self.get_ports_on_network(network['id'], source_ports) @@ -741,93 +757,75 @@ class ApiReplayClient(utils.PrepareObjectForMigration): LOG.info("FWaaS V2 migration done") - def _wait_for_lb_up(self, lb_id): - retry_num = 0 - while retry_num < self.max_retry: - lb = self.dest_octavia.load_balancer_show(lb_id) - if not lb['provisioning_status'].startswith('PENDING'): - if lb['provisioning_status'] == 'ACTIVE': - return True - # No point in trying any more - return False - retry_num = retry_num + 1 - time.sleep(1) - return False + def _delete_octavia_lb(self, body): + kw = {'loadbalancer': body} + self.octavia_rpc_client.call({}, 'loadbalancer_delete_cascade', **kw) def _migrate_octavia_lb(self, lb, orig_map): + # Creating all loadbalancers resources on the new nsx driver + # using RPC calls to the plugin listener. + # Create the loadbalancer: - body = self.prepare_lb_loadbalancer(lb) - try: - new_lb = self.dest_octavia.load_balancer_create( - json={'loadbalancer': body})['loadbalancer'] - except Exception as e: - LOG.error("Failed to create loadbalancer (%(lb)s): %(e)s", - {'lb': lb, 'e': e}) - return - new_lb_id = new_lb['id'] - if not self._wait_for_lb_up(new_lb_id): - LOG.error("New loadbalancer %s does not become active", new_lb_id) + lb_body = self.prepare_lb_loadbalancer(lb) + kw = {'loadbalancer': lb_body} + if not self.octavia_rpc_client.call({}, 'loadbalancer_create', **kw): + LOG.error("Failed to create loadbalancer (%s)", lb_body) + self._delete_octavia_lb(lb_body) return + lb_id = lb['id'] + lb_body_for_deletion = copy.deepcopy(lb_body) + lb_body_for_deletion['listeners'] = [] + lb_body_for_deletion['pools'] = [] + listeners_map = {} for listener_dict in lb.get('listeners', []): listener_id = listener_dict['id'] listener = orig_map['listeners'][listener_id] - body = self.prepare_lb_listener(listener) - # Update loadbalancer in listener - body['loadbalancer_id'] = new_lb_id - try: - new_listener = self.dest_octavia.listener_create( - json={'listener': body})['listener'] - except Exception as e: - LOG.error("Failed to create listener (%(list)s): %(e)s", - {'list': listener, 'e': e}) + body = self.prepare_lb_listener(listener, lb_body) + body['loadbalancer'] = lb_body + body['loadbalancer_id'] = lb_id + kw = {'listener': body, 'cert': None} + if not self.octavia_rpc_client.call({}, 'listener_create', **kw): + LOG.error("Failed to create loadbalancer %(lb)s listener " + "(%(list)s)", {'list': listener, 'lb': lb_id}) + self._delete_octavia_lb(lb_body_for_deletion) return - if not self._wait_for_lb_up(new_lb_id): - LOG.error("New loadbalancer %s does not become active after " - "listener creation", new_lb_id) - return - # map old-id to new - listeners_map[listener_id] = new_listener['id'] + listeners_map[listener_id] = body + lb_body_for_deletion['listeners'].append(body) - pools_map = {} for pool_dict in lb.get('pools', []): pool_id = pool_dict['id'] pool = orig_map['pools'][pool_id] - body = self.prepare_lb_pool(pool) - # Update loadbalancer and listeners in pool - body['loadbalancer_id'] = new_lb_id + pool_body = self.prepare_lb_pool(pool, lb_body) + # Update listeners in pool if pool.get('listeners'): - body['listener_id'] = listeners_map[pool['listeners'][0]['id']] - try: - new_pool = self.dest_octavia.pool_create( - json={'pool': body})['pool'] - except Exception as e: - LOG.error("Failed to create pool (%(pool)s): %(e)s", - {'pool': pool, 'e': e}) + listener_id = pool['listeners'][0]['id'] + pool_body['listener_id'] = listener_id + pool_body['listener'] = listeners_map.get(listener_id) + kw = {'pool': pool_body} + if not self.octavia_rpc_client.call({}, 'pool_create', **kw): + LOG.error("Failed to create loadbalancer %(lb)s pool " + "(%(pool)s)", {'pool': pool, 'lb': lb_id}) + self._delete_octavia_lb(lb_body_for_deletion) return - if not self._wait_for_lb_up(new_lb_id): - LOG.error("New loadbalancer %s does not become active after " - "pool creation", new_lb_id) - return - # map old-id to new - pools_map[pool_id] = new_pool['id'] + lb_body_for_deletion['pools'].append(pool) # Add members to this pool - source_members = self.source_octavia.member_list(pool_id)[ - 'members'] - for member in source_members: - body = self.prepare_lb_member(member) - try: - self.dest_octavia.member_create( - new_pool['id'], json={'member': body})['member'] - except Exception as e: - LOG.error("Failed to create member (%(member)s): %(e)s", - {'member': member, 'e': e}) - return - if not self._wait_for_lb_up(new_lb_id): - LOG.error("New loadbalancer %s does not become active " - "after member creation", new_lb_id) + pool_members = self.octavia.member_list(pool_id)['members'] + for member in pool_members: + body = self.prepare_lb_member(member, lb_body) + if not member['subnet_id']: + # Add the loadbalancer subnet + body['subnet_id'] = lb_body['vip_subnet_id'] + + body['pool'] = pool_body + kw = {'member': body} + if not self.octavia_rpc_client.call({}, 'member_create', **kw): + LOG.error("Failed to create pool %(pool)s member " + "(%(member)s)", + {'member': member, 'pool': pool_id}) + self._delete_octavia_lb(lb_body_for_deletion) return # Add pool health monitor @@ -835,61 +833,41 @@ class ApiReplayClient(utils.PrepareObjectForMigration): hm_id = pool['healthmonitor_id'] hm = orig_map['hms'][hm_id] body = self.prepare_lb_hm(hm) + body['pool'] = pool_body # Update pool id in hm - body['pool_id'] = new_pool['id'] - try: - self.dest_octavia.health_monitor_create( - json={'healthmonitor': body})['healthmonitor'] - except Exception as e: - LOG.error("Failed to create healthmonitor (%(hm)s): %(e)s", - {'hm': hm, 'e': e}) - return - if not self._wait_for_lb_up(new_lb_id): - LOG.error("New loadbalancer %s does not become active " - "after health monitor creation", new_lb_id) + kw = {'healthmonitor': body} + if not self.octavia_rpc_client.call( + {}, 'healthmonitor_create', **kw): + LOG.error("Failed to create pool %(pool)s healthmonitor " + "(%(hm)s)", {'hm': hm, 'pool': pool_id}) + self._delete_octavia_lb(lb_body_for_deletion) return + lb_body_for_deletion['pools'][-1]['healthmonitor'] = body # Add listeners L7 policies - for listener_id in listeners_map: + for listener_id in listeners_map.keys(): listener = orig_map['listeners'][listener_id] for l7pol_dict in listener.get('l7policies', []): - l7pol = orig_map['l7pols'][l7pol_dict['id']] - body = self.prepare_lb_l7policy(l7pol) - # Update pool id in l7 policy - body['listener_id'] = listeners_map[listener_id] - # update redirect_pool_id - if l7pol.get('redirect_pool_id'): - body['redirect_pool_id'] = pools_map[ - l7pol['redirect_pool_id']] - try: - new_pol = self.dest_octavia.l7policy_create( - json={'l7policy': body})['l7policy'] - except Exception as e: - LOG.error("Failed to create l7policy (%(l7pol)s): " - "%(e)s", {'l7pol': l7pol, 'e': e}) - return - if not self._wait_for_lb_up(new_lb_id): - LOG.error("New loadbalancer %s does not become active " - "after L7 policy creation", new_lb_id) - return + l7_pol_id = l7pol_dict['id'] + l7pol = orig_map['l7pols'][l7_pol_id] + pol_body = self.prepare_lb_l7policy(l7pol) # Add the rules of this policy - source_l7rules = self.source_octavia.l7rule_list( - l7pol['id'])['rules'] + source_l7rules = self.octavia.l7rule_list( + l7_pol_id)['rules'] for rule in source_l7rules: - body = self.prepare_lb_l7rule(rule) - try: - self.dest_octavia.l7rule_create( - new_pol['id'], json={'rule': body})['rule'] - except Exception as e: - LOG.error("Failed to create l7rule (%(rule)s): " - "%(e)s", {'rule': rule, 'e': e}) - return - if not self._wait_for_lb_up(new_lb_id): - LOG.error("New loadbalancer %s does not become " - "active after L7 rule creation", - new_lb_id) - return + rule_body = self.prepare_lb_l7rule(rule) + pol_body['rules'].append(rule_body) + + kw = {'l7policy': pol_body} + if not self.octavia_rpc_client.call( + {}, 'l7policy_create', **kw): + LOG.error("Failed to create l7policy (%(l7pol)s)", + {'l7pol': l7pol}) + self._delete_octavia_lb(lb_body_for_deletion) + return + + LOG.info("Created loadbalancer %s", lb_id) def _map_orig_objects_of_type(self, source_objects): result = {} @@ -908,35 +886,58 @@ class ApiReplayClient(utils.PrepareObjectForMigration): return result def migrate_octavia(self): - """Migrates Octavia objects from source to dest neutron. - - Right now the Octavia objects are created with new IDS, and - do not keep their original IDs + """Migrates Octavia NSX objects to the new neutron driver. + The Octavia proccess & DB will remain unchanged. + Using RPC connection to connect directly with the new plugin driver. """ - # TODO(asarfaty): Keep original ids + # Read all existing octavia resources try: - source_loadbalancers = self.source_octavia.\ - load_balancer_list()['loadbalancers'] - source_listeners = self.source_octavia.listener_list()['listeners'] - source_pools = self.source_octavia.pool_list()['pools'] - source_hms = self.source_octavia.\ - health_monitor_list()['healthmonitors'] - source_l7pols = self.source_octavia.l7policy_list()['l7policies'] + loadbalancers = self.octavia.load_balancer_list()['loadbalancers'] + listeners = self.octavia.listener_list()['listeners'] + pools = self.octavia.pool_list()['pools'] + hms = self.octavia.health_monitor_list()['healthmonitors'] + l7pols = self.octavia.l7policy_list()['l7policies'] except Exception as e: # Octavia might be disabled in the source - LOG.info("Octavia was not found on the source server: %s", e) + LOG.info("Octavia was not found on the server: %s", e) return - try: - self.dest_octavia.load_balancer_list() - except Exception as e: - # Octavia might be disabled in the destination - LOG.warning("Skipping Octavia migration. Octavia was not found " - "on the destination server: %s", e) - return - orig_map = self._map_orig_lb_objects(source_listeners, source_pools, - source_hms, source_l7pols) - total_num = len(source_loadbalancers) + # Init the RPC connection for sending messages to the octavia driver + topic = d_const.OCTAVIA_TO_DRIVER_MIGRATION_TOPIC + transport = messaging.get_rpc_transport(cfg.CONF) + target = messaging.Target(topic=topic, exchange="common", + namespace='control', fanout=False, + version='1.0') + self.octavia_rpc_client = messaging.RPCClient(transport, target) + + # Initialize RPC listener for getting status updates from the driver + # so that the rsource status will not change in the octavia DB + topic = d_const.DRIVER_TO_OCTAVIA_MIGRATION_TOPIC + server = socket.gethostname() + target = messaging.Target(topic=topic, server=server, + exchange="common", fanout=False) + + class MigrationOctaviaDriverEndpoint(driver_lib.DriverLibrary): + target = messaging.Target(namespace="control", version='1.0') + + def update_loadbalancer_status(self, **kw): + # Do nothing + pass + + endpoints = [MigrationOctaviaDriverEndpoint] + access_policy = dispatcher.DefaultRPCAccessPolicy + self.octavia_rpc_server = messaging.get_rpc_server( + transport, target, endpoints, executor='threading', + access_policy=access_policy) + self.octavia_rpc_server.start() + + orig_map = self._map_orig_lb_objects(listeners, pools, + hms, l7pols) + total_num = len(loadbalancers) LOG.info("Migrating %d loadbalancer(s)", total_num) - for lb in source_loadbalancers: - self._migrate_octavia_lb(lb, orig_map) + for lb in loadbalancers: + if lb['provisioning_status'] == 'ACTIVE': + self._migrate_octavia_lb(lb, orig_map) + else: + LOG.info("Skipping %s loadbalancer %s", + lb['provisioning_status'], lb['id']) diff --git a/vmware_nsx/api_replay/utils.py b/vmware_nsx/api_replay/utils.py index 01cc888326..ad1ab6312e 100644 --- a/vmware_nsx/api_replay/utils.py +++ b/vmware_nsx/api_replay/utils.py @@ -73,7 +73,6 @@ class PrepareObjectForMigration(object): drop_subnet_fields = basic_ignore_fields + [ 'advanced_service_providers', - 'id', 'service_types'] drop_port_fields = basic_ignore_fields + [ @@ -103,14 +102,14 @@ class PrepareObjectForMigration(object): drop_fwaas_group_fields = ['status'] lb_ignore_fields = ['created_at', 'updated_at', 'operating_status', - 'provisioning_status', 'id'] + 'provisioning_status'] drop_lb_loadbalancer_fields = lb_ignore_fields + [ - 'listeners', 'pools', # Those objects will be created laster - 'vip_subnet_id', # vip_port_id will be used + 'listeners', 'pools', # Those objects will be created later 'flavor_id', # not supported by the driver + 'vip_qos_policy_id', # not supported by the driver ] drop_lb_listener_fields = lb_ignore_fields + [ - 'loadbalancers', 'l7policies', 'default_pool_id'] + 'l7policies', 'default_pool_id'] drop_lb_pool_fields = lb_ignore_fields + [ 'loadbalancers', 'healthmonitor_id', 'listeners', 'members'] drop_lb_member_fields = lb_ignore_fields @@ -192,6 +191,7 @@ class PrepareObjectForMigration(object): # external networks needs some special care if body.get('router:external'): fields_reset = False + # TODO(asarfaty): map external network neutron ids to Policy tier0 for field in ['provider:network_type', 'provider:segmentation_id', 'provider:physical_network']: if field in body: @@ -264,10 +264,6 @@ class PrepareObjectForMigration(object): if 'device_owner' not in body: body['device_owner'] = "" - if body.get('device_owner') == 'Octavia': - # remove device id & owner. Octavia will re-set it. - body['device_id'] = "" - body['device_owner'] = "" return body def prepare_floatingip(self, fip, direct_call=False): @@ -297,20 +293,31 @@ class PrepareObjectForMigration(object): def prepare_lb_loadbalancer(self, lb_obj): return self.drop_fields(lb_obj, self.drop_lb_loadbalancer_fields) - def prepare_lb_listener(self, lb_obj): - return self.drop_fields(lb_obj, self.drop_lb_listener_fields) + def prepare_lb_listener(self, listener_obj, lb_body): + body = self.drop_fields(listener_obj, self.drop_lb_listener_fields) + body['loadbalancer'] = lb_body + body['loadbalancer_id'] = lb_body['id'] + return body - def prepare_lb_pool(self, lb_obj): - return self.drop_fields(lb_obj, self.drop_lb_pool_fields) + def prepare_lb_pool(self, pool_obj, lb_body): + body = self.drop_fields(pool_obj, self.drop_lb_pool_fields) + body['loadbalancer'] = lb_body + body['loadbalancer_id'] = lb_body['id'] + return body - def prepare_lb_member(self, lb_obj): - return self.drop_fields(lb_obj, self.drop_lb_member_fields) + def prepare_lb_member(self, mem_obj, lb_body): + body = self.drop_fields(mem_obj, self.drop_lb_member_fields) + body['loadbalancer'] = lb_body + body['loadbalancer_id'] = lb_body['id'] + return body def prepare_lb_hm(self, lb_obj): return self.drop_fields(lb_obj, self.drop_lb_hm_fields) def prepare_lb_l7policy(self, lb_obj): - return self.drop_fields(lb_obj, self.drop_lb_l7policy_fields) + body = self.drop_fields(lb_obj, self.drop_lb_l7policy_fields) + body['rules'] = [] + return body def prepare_lb_l7rule(self, lb_obj): return self.drop_fields(lb_obj, self.drop_lb_l7rule_fields) diff --git a/vmware_nsx/extensions/api_replay.py b/vmware_nsx/extensions/api_replay.py index 75cb4b6ade..ad3c38a392 100644 --- a/vmware_nsx/extensions/api_replay.py +++ b/vmware_nsx/extensions/api_replay.py @@ -37,6 +37,9 @@ RESOURCE_ATTRIBUTE_MAP = { 'networks': { 'id': ID_WITH_POST, }, + 'subnets': { + 'id': ID_WITH_POST, + }, 'security_groups': { 'id': ID_WITH_POST, 'name': {'allow_post': True, 'allow_put': True, diff --git a/vmware_nsx/plugins/nsx_p/plugin.py b/vmware_nsx/plugins/nsx_p/plugin.py index 9e820d922c..f835a165af 100644 --- a/vmware_nsx/plugins/nsx_p/plugin.py +++ b/vmware_nsx/plugins/nsx_p/plugin.py @@ -837,7 +837,8 @@ class NsxPolicyPlugin(nsx_plugin_common.NsxPluginV3Base): if network_id in NET_NEUTRON_2_NSX_ID_CACHE: nsx_id = NET_NEUTRON_2_NSX_ID_CACHE[network_id] del NET_NEUTRON_2_NSX_ID_CACHE[network_id] - del NET_NSX_2_NEUTRON_ID_CACHE[nsx_id] + if nsx_id in NET_NSX_2_NEUTRON_ID_CACHE: + del NET_NSX_2_NEUTRON_ID_CACHE[nsx_id] def update_network(self, context, network_id, network): original_net = super(NsxPolicyPlugin, self).get_network( @@ -944,6 +945,7 @@ class NsxPolicyPlugin(nsx_plugin_common.NsxPluginV3Base): self.nsxpolicy.tier1.update(router_id, ipv6_ndra_profile_id=profile_id) + @nsx_plugin_common.api_replay_mode_wrapper def create_subnet(self, context, subnet): return self._create_subnet(context, subnet) diff --git a/vmware_nsx/plugins/nsx_v3/plugin.py b/vmware_nsx/plugins/nsx_v3/plugin.py index ca9d7c39e9..27f2e8ceb6 100644 --- a/vmware_nsx/plugins/nsx_v3/plugin.py +++ b/vmware_nsx/plugins/nsx_v3/plugin.py @@ -1220,6 +1220,7 @@ class NsxV3Plugin(nsx_plugin_common.NsxPluginV3Base, LOG.warning("Failed to update network %(id)s dhcp server on " "the NSX: %(e)s", {'id': network['id'], 'e': e}) + @nsx_plugin_common.api_replay_mode_wrapper def create_subnet(self, context, subnet): return self._create_subnet(context, subnet) diff --git a/vmware_nsx/services/lbaas/nsx_p/implementation/member_mgr.py b/vmware_nsx/services/lbaas/nsx_p/implementation/member_mgr.py index 3d58e40c72..cc1405948b 100644 --- a/vmware_nsx/services/lbaas/nsx_p/implementation/member_mgr.py +++ b/vmware_nsx/services/lbaas/nsx_p/implementation/member_mgr.py @@ -103,8 +103,11 @@ class EdgeMemberManagerFromDict(base_mgr.NsxpLoadbalancerBaseManager): def create(self, context, member, completor): pool_client = self.core_plugin.nsxpolicy.load_balancer.lb_pool self._validate_member_lb_connectivity(context, member, completor) - network = lb_utils.get_network_from_subnet( - context, self.core_plugin, member['subnet_id']) + if member.get('subnet_id'): + network = lb_utils.get_network_from_subnet( + context, self.core_plugin, member['subnet_id']) + else: + network = None if network and network.get('router:external'): fixed_ip = self._get_info_from_fip(context, member['address']) else: diff --git a/vmware_nsx/services/lbaas/octavia/constants.py b/vmware_nsx/services/lbaas/octavia/constants.py index 0d95ea0e1e..3c2fec7ae2 100644 --- a/vmware_nsx/services/lbaas/octavia/constants.py +++ b/vmware_nsx/services/lbaas/octavia/constants.py @@ -14,7 +14,9 @@ # under the License. OCTAVIA_TO_DRIVER_TOPIC = 'vmware_nsx__lb_listener' +OCTAVIA_TO_DRIVER_MIGRATION_TOPIC = 'vmware_nsx__lb_listener_migration' DRIVER_TO_OCTAVIA_TOPIC = 'vmware_nsx__driver_listener' +DRIVER_TO_OCTAVIA_MIGRATION_TOPIC = 'vmware_nsx__driver_listener_migration' LOADBALANCER = 'loadbalancer' LISTENER = 'listener' diff --git a/vmware_nsx/services/lbaas/octavia/octavia_listener.py b/vmware_nsx/services/lbaas/octavia/octavia_listener.py index ba4d552bb0..a526959cb7 100644 --- a/vmware_nsx/services/lbaas/octavia/octavia_listener.py +++ b/vmware_nsx/services/lbaas/octavia/octavia_listener.py @@ -44,7 +44,10 @@ class NSXOctaviaListener(object): loadbalancer, member, pool) def _init_rpc_messaging(self): - topic = constants.DRIVER_TO_OCTAVIA_TOPIC + if cfg.CONF.api_replay_mode: + topic = constants.DRIVER_TO_OCTAVIA_MIGRATION_TOPIC + else: + topic = constants.DRIVER_TO_OCTAVIA_TOPIC transport = messaging.get_rpc_transport(cfg.CONF) target = messaging.Target(topic=topic, exchange="common", namespace='control', fanout=False, @@ -54,7 +57,10 @@ class NSXOctaviaListener(object): def _init_rpc_listener(self, healthmonitor, l7policy, l7rule, listener, loadbalancer, member, pool): # Initialize RPC listener - topic = constants.OCTAVIA_TO_DRIVER_TOPIC + if cfg.CONF.api_replay_mode: + topic = constants.OCTAVIA_TO_DRIVER_MIGRATION_TOPIC + else: + topic = constants.OCTAVIA_TO_DRIVER_TOPIC server = socket.gethostname() transport = messaging.get_rpc_transport(cfg.CONF) target = messaging.Target(topic=topic, server=server, @@ -299,6 +305,8 @@ class NSXOctaviaListenerEndpoint(object): except Exception as e: LOG.error('NSX driver loadbalancer_create failed %s', e) completor(success=False) + return False + return True @log_helpers.log_method_call def loadbalancer_delete_cascade(self, ctxt, loadbalancer): @@ -336,6 +344,8 @@ class NSXOctaviaListenerEndpoint(object): except Exception as e: LOG.error('NSX driver loadbalancer_delete_cascade failed %s', e) completor(success=False) + return False + return True @log_helpers.log_method_call def loadbalancer_delete(self, ctxt, loadbalancer, cascade=False): @@ -350,6 +360,8 @@ class NSXOctaviaListenerEndpoint(object): except Exception as e: LOG.error('NSX driver loadbalancer_delete failed %s', e) completor(success=False) + return False + return True @log_helpers.log_method_call def loadbalancer_update(self, ctxt, old_loadbalancer, new_loadbalancer): @@ -362,6 +374,8 @@ class NSXOctaviaListenerEndpoint(object): except Exception as e: LOG.error('NSX driver loadbalancer_update failed %s', e) completor(success=False) + return False + return True # Listener @log_helpers.log_method_call @@ -375,6 +389,8 @@ class NSXOctaviaListenerEndpoint(object): except Exception as e: LOG.error('NSX driver listener_create failed %s', e) completor(success=False) + return False + return True @log_helpers.log_method_call def listener_delete(self, ctxt, listener): @@ -386,6 +402,8 @@ class NSXOctaviaListenerEndpoint(object): except Exception as e: LOG.error('NSX driver listener_delete failed %s', e) completor(success=False) + return False + return True @log_helpers.log_method_call def listener_update(self, ctxt, old_listener, new_listener, cert): @@ -398,6 +416,8 @@ class NSXOctaviaListenerEndpoint(object): except Exception as e: LOG.error('NSX driver listener_update failed %s', e) completor(success=False) + return False + return True # Pool @log_helpers.log_method_call @@ -410,6 +430,8 @@ class NSXOctaviaListenerEndpoint(object): except Exception as e: LOG.error('NSX driver pool_create failed %s', e) completor(success=False) + return False + return True @log_helpers.log_method_call def pool_delete(self, ctxt, pool): @@ -421,6 +443,8 @@ class NSXOctaviaListenerEndpoint(object): except Exception as e: LOG.error('NSX driver pool_delete failed %s', e) completor(success=False) + return False + return True @log_helpers.log_method_call def pool_update(self, ctxt, old_pool, new_pool): @@ -432,6 +456,8 @@ class NSXOctaviaListenerEndpoint(object): except Exception as e: LOG.error('NSX driver pool_update failed %s', e) completor(success=False) + return False + return True # Member @log_helpers.log_method_call @@ -444,6 +470,8 @@ class NSXOctaviaListenerEndpoint(object): except Exception as e: LOG.error('NSX driver member_create failed %s', e) completor(success=False) + return False + return True @log_helpers.log_method_call def member_delete(self, ctxt, member): @@ -455,6 +483,8 @@ class NSXOctaviaListenerEndpoint(object): except Exception as e: LOG.error('NSX driver member_delete failed %s', e) completor(success=False) + return False + return True @log_helpers.log_method_call def member_update(self, ctxt, old_member, new_member): @@ -466,6 +496,8 @@ class NSXOctaviaListenerEndpoint(object): except Exception as e: LOG.error('NSX driver member_update failed %s', e) completor(success=False) + return False + return True # Health Monitor @log_helpers.log_method_call @@ -478,6 +510,8 @@ class NSXOctaviaListenerEndpoint(object): except Exception as e: LOG.error('NSX driver healthmonitor_create failed %s', e) completor(success=False) + return False + return True @log_helpers.log_method_call def healthmonitor_delete(self, ctxt, healthmonitor): @@ -489,6 +523,8 @@ class NSXOctaviaListenerEndpoint(object): except Exception as e: LOG.error('NSX driver healthmonitor_delete failed %s', e) completor(success=False) + return False + return True @log_helpers.log_method_call def healthmonitor_update(self, ctxt, old_healthmonitor, new_healthmonitor): @@ -501,6 +537,8 @@ class NSXOctaviaListenerEndpoint(object): except Exception as e: LOG.error('NSX driver healthmonitor_update failed %s', e) completor(success=False) + return False + return True # L7 Policy @log_helpers.log_method_call @@ -513,6 +551,8 @@ class NSXOctaviaListenerEndpoint(object): except Exception as e: LOG.error('NSX driver l7policy_create failed %s', e) completor(success=False) + return False + return True @log_helpers.log_method_call def l7policy_delete(self, ctxt, l7policy): @@ -524,6 +564,8 @@ class NSXOctaviaListenerEndpoint(object): except Exception as e: LOG.error('NSX driver l7policy_delete failed %s', e) completor(success=False) + return False + return True @log_helpers.log_method_call def l7policy_update(self, ctxt, old_l7policy, new_l7policy): @@ -535,6 +577,8 @@ class NSXOctaviaListenerEndpoint(object): except Exception as e: LOG.error('NSX driver l7policy_update failed %s', e) completor(success=False) + return False + return True # L7 Rule @log_helpers.log_method_call @@ -546,6 +590,8 @@ class NSXOctaviaListenerEndpoint(object): except Exception as e: LOG.error('NSX driver l7rule_create failed %s', e) completor(success=False) + return False + return True @log_helpers.log_method_call def l7rule_delete(self, ctxt, l7rule): @@ -557,6 +603,8 @@ class NSXOctaviaListenerEndpoint(object): except Exception as e: LOG.error('NSX driver l7rule_delete failed %s', e) completor(success=False) + return False + return True @log_helpers.log_method_call def l7rule_update(self, ctxt, old_l7rule, new_l7rule): @@ -567,6 +615,8 @@ class NSXOctaviaListenerEndpoint(object): except Exception as e: LOG.error('NSX driver l7rule_update failed %s', e) completor(success=False) + return False + return True class NSXOctaviaStatisticsCollector(object):