From 223719031b2ee398e30f6b03bfa6118c3c0e4516 Mon Sep 17 00:00:00 2001 From: Adit Sarfaty Date: Wed, 10 Jul 2019 15:00:42 +0300 Subject: [PATCH] NSX|V3+P migration: Support migration of Octavia objects Change-Id: I7008ebd9ff36990909c79bc96a3a339085dd4bee --- vmware_nsx/api_replay/cli.py | 6 + vmware_nsx/api_replay/client.py | 254 +++++++++++++++++++++++++++++++- vmware_nsx/api_replay/utils.py | 41 ++++++ 3 files changed, 296 insertions(+), 5 deletions(-) diff --git a/vmware_nsx/api_replay/cli.py b/vmware_nsx/api_replay/cli.py index ae03b90a28..1d983f4cec 100644 --- a/vmware_nsx/api_replay/cli.py +++ b/vmware_nsx/api_replay/cli.py @@ -37,6 +37,7 @@ class ApiReplayCli(object): dest_os_auth_url=args.dest_os_auth_url, dest_plugin=args.dest_plugin, use_old_keystone=args.use_old_keystone, + max_retry=args.max_retry, logfile=args.logfile) def _setup_argparse(self): @@ -119,6 +120,11 @@ class ApiReplayCli(object): default=DEFAULT_LOGFILE, help="Output logfile.") + parser.add_argument( + "--max-retry", + default=10, + help="Maximum number of retrying different operations.") + # NOTE: this will return an error message if any of the # require options are missing. return parser.parse_args() diff --git a/vmware_nsx/api_replay/client.py b/vmware_nsx/api_replay/client.py index 0b395dfde5..0a4af0af21 100644 --- a/vmware_nsx/api_replay/client.py +++ b/vmware_nsx/api_replay/client.py @@ -11,6 +11,7 @@ # under the License. import logging +import time import six @@ -18,6 +19,7 @@ from keystoneauth1 import identity from keystoneauth1 import session from neutronclient.common import exceptions as n_exc from neutronclient.v2_0 import client +from octaviaclient.api.v2 import octavia from oslo_utils import excutils from vmware_nsx.api_replay import utils @@ -39,7 +41,7 @@ class ApiReplayClient(utils.PrepareObjectForMigration): dest_os_username, dest_os_user_domain_id, dest_os_tenant_name, dest_os_tenant_domain_id, dest_os_password, dest_os_auth_url, dest_plugin, - use_old_keystone, logfile): + use_old_keystone, logfile, max_retry): if logfile: f_handler = logging.FileHandler(logfile) @@ -48,6 +50,8 @@ class ApiReplayClient(utils.PrepareObjectForMigration): f_handler.setFormatter(f_formatter) LOG.addHandler(f_handler) + self.max_retry = max_retry + # connect to both clients if use_old_keystone: # Since we are not sure what keystone version will be used on the @@ -57,6 +61,7 @@ class ApiReplayClient(utils.PrepareObjectForMigration): tenant_name=source_os_tenant_name, password=source_os_password, auth_url=source_os_auth_url) + self.source_octavia = None else: self.source_neutron = self.connect_to_client( username=source_os_username, @@ -65,6 +70,13 @@ class ApiReplayClient(utils.PrepareObjectForMigration): tenant_domain_id=source_os_tenant_domain_id, password=source_os_password, auth_url=source_os_auth_url) + self.source_octavia = self.connect_to_octavia( + username=source_os_username, + user_domain_id=source_os_user_domain_id, + tenant_name=source_os_tenant_name, + tenant_domain_id=source_os_tenant_domain_id, + password=source_os_password, + auth_url=source_os_auth_url) if use_old_keystone_on_dest: self.dest_neutron = client.Client( @@ -72,6 +84,7 @@ class ApiReplayClient(utils.PrepareObjectForMigration): tenant_name=dest_os_tenant_name, password=dest_os_password, auth_url=dest_os_auth_url) + self.dest_octavia = None else: self.dest_neutron = self.connect_to_client( username=dest_os_username, @@ -80,6 +93,14 @@ class ApiReplayClient(utils.PrepareObjectForMigration): tenant_domain_id=dest_os_tenant_domain_id, password=dest_os_password, auth_url=dest_os_auth_url) + self.dest_octavia = self.connect_to_octavia( + username=dest_os_username, + user_domain_id=dest_os_user_domain_id, + tenant_name=dest_os_tenant_name, + tenant_domain_id=dest_os_tenant_domain_id, + password=dest_os_password, + auth_url=dest_os_auth_url) + self.dest_plugin = dest_plugin LOG.info("Starting NSX migration to %s.", self.dest_plugin) @@ -91,21 +112,44 @@ class ApiReplayClient(utils.PrepareObjectForMigration): self.migrate_floatingips() self.migrate_routers_routes(routers_routes) self.migrate_fwaas() + if self.source_octavia and self.dest_octavia: + self.migrate_octavia() LOG.info("NSX migration is Done.") - def connect_to_client(self, username, user_domain_id, - tenant_name, tenant_domain_id, - password, auth_url): + def _get_session(self, username, user_domain_id, + tenant_name, tenant_domain_id, + password, auth_url): auth = identity.Password(username=username, user_domain_id=user_domain_id, password=password, project_name=tenant_name, project_domain_id=tenant_domain_id, auth_url=auth_url) - sess = session.Session(auth=auth) + return session.Session(auth=auth) + + def connect_to_client(self, username, user_domain_id, + tenant_name, tenant_domain_id, + password, auth_url): + sess = self._get_session(username, user_domain_id, + tenant_name, tenant_domain_id, + password, auth_url) neutron = client.Client(session=sess) return neutron + def connect_to_octavia(self, username, user_domain_id, + tenant_name, tenant_domain_id, + password, auth_url): + sess = self._get_session(username, user_domain_id, + tenant_name, tenant_domain_id, + password, auth_url) + endpoint = sess.get_endpoint(service_type='load-balancer') + client = octavia.OctaviaAPI( + session=sess, + service_type='load-balancer', + endpoint=endpoint, + ) + return client + def find_subnet_by_id(self, subnet_id, subnets): for subnet in subnets: if subnet['id'] == subnet_id: @@ -696,3 +740,203 @@ class ApiReplayClient(utils.PrepareObjectForMigration): self.dest_neutron.create_fwaas_firewall_group) LOG.info("FWaaS V2 migration done") + + def _wait_for_lb_up(self, lb_id): + retry_num = 0 + while retry_num < self.max_retry: + lb = self.dest_octavia.load_balancer_show(lb_id) + if not lb['provisioning_status'].startswith('PENDING'): + if lb['provisioning_status'] == 'ACTIVE': + return True + # No point in trying any more + return False + retry_num = retry_num + 1 + time.sleep(1) + return False + + def _migrate_octavia_lb(self, lb, orig_map): + # Create the loadbalancer: + body = self.prepare_lb_loadbalancer(lb) + try: + new_lb = self.dest_octavia.load_balancer_create( + json={'loadbalancer': body})['loadbalancer'] + except Exception as e: + LOG.error("Failed to create loadbalancer (%(lb)s): %(e)s", + {'lb': lb, 'e': e}) + return + new_lb_id = new_lb['id'] + if not self._wait_for_lb_up(new_lb_id): + LOG.error("New loadbalancer %s does not become active", new_lb_id) + return + + listeners_map = {} + for listener_dict in lb.get('listeners', []): + listener_id = listener_dict['id'] + listener = orig_map['listeners'][listener_id] + body = self.prepare_lb_listener(listener) + # Update loadbalancer in listener + body['loadbalancer_id'] = new_lb_id + try: + new_listener = self.dest_octavia.listener_create( + json={'listener': body})['listener'] + except Exception as e: + LOG.error("Failed to create listener (%(list)s): %(e)s", + {'list': listener, 'e': e}) + return + if not self._wait_for_lb_up(new_lb_id): + LOG.error("New loadbalancer %s does not become active after " + "listener creation", new_lb_id) + return + # map old-id to new + listeners_map[listener_id] = new_listener['id'] + + pools_map = {} + for pool_dict in lb.get('pools', []): + pool_id = pool_dict['id'] + pool = orig_map['pools'][pool_id] + body = self.prepare_lb_pool(pool) + # Update loadbalancer and listeners in pool + body['loadbalancer_id'] = new_lb_id + if pool.get('listeners'): + body['listener_id'] = listeners_map[pool['listeners'][0]['id']] + try: + new_pool = self.dest_octavia.pool_create( + json={'pool': body})['pool'] + except Exception as e: + LOG.error("Failed to create pool (%(pool)s): %(e)s", + {'pool': pool, 'e': e}) + return + if not self._wait_for_lb_up(new_lb_id): + LOG.error("New loadbalancer %s does not become active after " + "pool creation", new_lb_id) + return + # map old-id to new + pools_map[pool_id] = new_pool['id'] + + # Add members to this pool + source_members = self.source_octavia.member_list(pool_id)[ + 'members'] + for member in source_members: + body = self.prepare_lb_member(member) + try: + self.dest_octavia.member_create( + new_pool['id'], json={'member': body})['member'] + except Exception as e: + LOG.error("Failed to create member (%(member)s): %(e)s", + {'member': member, 'e': e}) + return + if not self._wait_for_lb_up(new_lb_id): + LOG.error("New loadbalancer %s does not become active " + "after member creation", new_lb_id) + return + + # Add pool health monitor + if pool.get('healthmonitor_id'): + hm_id = pool['healthmonitor_id'] + hm = orig_map['hms'][hm_id] + body = self.prepare_lb_hm(hm) + # Update pool id in hm + body['pool_id'] = new_pool['id'] + try: + self.dest_octavia.health_monitor_create( + json={'healthmonitor': body})['healthmonitor'] + except Exception as e: + LOG.error("Failed to create healthmonitor (%(hm)s): %(e)s", + {'hm': hm, 'e': e}) + return + if not self._wait_for_lb_up(new_lb_id): + LOG.error("New loadbalancer %s does not become active " + "after health monitor creation", new_lb_id) + return + + # Add listeners L7 policies + for listener_id in listeners_map: + listener = orig_map['listeners'][listener_id] + for l7pol_dict in listener.get('l7policies', []): + l7pol = orig_map['l7pols'][l7pol_dict['id']] + body = self.prepare_lb_l7policy(l7pol) + # Update pool id in l7 policy + body['listener_id'] = listeners_map[listener_id] + # update redirect_pool_id + if l7pol.get('redirect_pool_id'): + body['redirect_pool_id'] = pools_map[ + l7pol['redirect_pool_id']] + try: + new_pol = self.dest_octavia.l7policy_create( + json={'l7policy': body})['l7policy'] + except Exception as e: + LOG.error("Failed to create l7policy (%(l7pol)s): " + "%(e)s", {'l7pol': l7pol, 'e': e}) + return + if not self._wait_for_lb_up(new_lb_id): + LOG.error("New loadbalancer %s does not become active " + "after L7 policy creation", new_lb_id) + return + + # Add the rules of this policy + source_l7rules = self.source_octavia.l7rule_list( + l7pol['id'])['rules'] + for rule in source_l7rules: + body = self.prepare_lb_l7rule(rule) + try: + self.dest_octavia.l7rule_create( + new_pol['id'], json={'rule': body})['rule'] + except Exception as e: + LOG.error("Failed to create l7rule (%(rule)s): " + "%(e)s", {'rule': rule, 'e': e}) + return + if not self._wait_for_lb_up(new_lb_id): + LOG.error("New loadbalancer %s does not become " + "active after L7 rule creation", + new_lb_id) + return + + def _map_orig_objects_of_type(self, source_objects): + result = {} + for obj in source_objects: + result[obj['id']] = obj + return result + + def _map_orig_lb_objects(self, source_listeners, source_pools, + source_hms, source_l7pols): + result = { + 'listeners': self._map_orig_objects_of_type(source_listeners), + 'pools': self._map_orig_objects_of_type(source_pools), + 'hms': self._map_orig_objects_of_type(source_hms), + 'l7pols': self._map_orig_objects_of_type(source_l7pols), + } + return result + + def migrate_octavia(self): + """Migrates Octavia objects from source to dest neutron. + + Right now the Octavia objects are created with new IDS, and + do not keep their original IDs + """ + # TODO(asarfaty): Keep original ids + try: + source_loadbalancers = self.source_octavia.\ + load_balancer_list()['loadbalancers'] + source_listeners = self.source_octavia.listener_list()['listeners'] + source_pools = self.source_octavia.pool_list()['pools'] + source_hms = self.source_octavia.\ + health_monitor_list()['healthmonitors'] + source_l7pols = self.source_octavia.l7policy_list()['l7policies'] + except Exception as e: + # Octavia might be disabled in the source + LOG.info("Octavia was not found on the source server: %s", e) + return + + try: + self.dest_octavia.load_balancer_list() + except Exception as e: + # Octavia might be disabled in the destination + LOG.warning("Skipping Octavia migration. Octavia was not found " + "on the destination server: %s", e) + return + orig_map = self._map_orig_lb_objects(source_listeners, source_pools, + source_hms, source_l7pols) + total_num = len(source_loadbalancers) + LOG.info("Migrating %d loadbalancer(s)", total_num) + for lb in source_loadbalancers: + self._migrate_octavia_lb(lb, orig_map) diff --git a/vmware_nsx/api_replay/utils.py b/vmware_nsx/api_replay/utils.py index e5848dc091..01cc888326 100644 --- a/vmware_nsx/api_replay/utils.py +++ b/vmware_nsx/api_replay/utils.py @@ -102,6 +102,22 @@ class PrepareObjectForMigration(object): drop_fwaas_policy_fields = [] drop_fwaas_group_fields = ['status'] + lb_ignore_fields = ['created_at', 'updated_at', 'operating_status', + 'provisioning_status', 'id'] + drop_lb_loadbalancer_fields = lb_ignore_fields + [ + 'listeners', 'pools', # Those objects will be created laster + 'vip_subnet_id', # vip_port_id will be used + 'flavor_id', # not supported by the driver + ] + drop_lb_listener_fields = lb_ignore_fields + [ + 'loadbalancers', 'l7policies', 'default_pool_id'] + drop_lb_pool_fields = lb_ignore_fields + [ + 'loadbalancers', 'healthmonitor_id', 'listeners', 'members'] + drop_lb_member_fields = lb_ignore_fields + drop_lb_hm_fields = lb_ignore_fields + ['pools'] + drop_lb_l7policy_fields = lb_ignore_fields + ['rules'] + drop_lb_l7rule_fields = lb_ignore_fields + def drop_fields(self, item, drop_fields): body = {} for k, v in item.items(): @@ -248,6 +264,10 @@ class PrepareObjectForMigration(object): if 'device_owner' not in body: body['device_owner'] = "" + if body.get('device_owner') == 'Octavia': + # remove device id & owner. Octavia will re-set it. + body['device_id'] = "" + body['device_owner'] = "" return body def prepare_floatingip(self, fip, direct_call=False): @@ -273,3 +293,24 @@ class PrepareObjectForMigration(object): def prepare_fwaas_group(self, group): self.fix_description(group) return self.drop_fields(group, self.drop_fwaas_group_fields) + + def prepare_lb_loadbalancer(self, lb_obj): + return self.drop_fields(lb_obj, self.drop_lb_loadbalancer_fields) + + def prepare_lb_listener(self, lb_obj): + return self.drop_fields(lb_obj, self.drop_lb_listener_fields) + + def prepare_lb_pool(self, lb_obj): + return self.drop_fields(lb_obj, self.drop_lb_pool_fields) + + def prepare_lb_member(self, lb_obj): + return self.drop_fields(lb_obj, self.drop_lb_member_fields) + + def prepare_lb_hm(self, lb_obj): + return self.drop_fields(lb_obj, self.drop_lb_hm_fields) + + def prepare_lb_l7policy(self, lb_obj): + return self.drop_fields(lb_obj, self.drop_lb_l7policy_fields) + + def prepare_lb_l7rule(self, lb_obj): + return self.drop_fields(lb_obj, self.drop_lb_l7rule_fields)