NSX|V3+P migration: Support migration of Octavia objects

Change-Id: I7008ebd9ff36990909c79bc96a3a339085dd4bee
This commit is contained in:
Adit Sarfaty 2019-07-10 15:00:42 +03:00
parent 2c342f2e22
commit 223719031b
3 changed files with 296 additions and 5 deletions

View File

@ -37,6 +37,7 @@ class ApiReplayCli(object):
dest_os_auth_url=args.dest_os_auth_url,
dest_plugin=args.dest_plugin,
use_old_keystone=args.use_old_keystone,
max_retry=args.max_retry,
logfile=args.logfile)
def _setup_argparse(self):
@ -119,6 +120,11 @@ class ApiReplayCli(object):
default=DEFAULT_LOGFILE,
help="Output logfile.")
parser.add_argument(
"--max-retry",
default=10,
help="Maximum number of retrying different operations.")
# NOTE: this will return an error message if any of the
# require options are missing.
return parser.parse_args()

View File

@ -11,6 +11,7 @@
# under the License.
import logging
import time
import six
@ -18,6 +19,7 @@ from keystoneauth1 import identity
from keystoneauth1 import session
from neutronclient.common import exceptions as n_exc
from neutronclient.v2_0 import client
from octaviaclient.api.v2 import octavia
from oslo_utils import excutils
from vmware_nsx.api_replay import utils
@ -39,7 +41,7 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
dest_os_username, dest_os_user_domain_id,
dest_os_tenant_name, dest_os_tenant_domain_id,
dest_os_password, dest_os_auth_url, dest_plugin,
use_old_keystone, logfile):
use_old_keystone, logfile, max_retry):
if logfile:
f_handler = logging.FileHandler(logfile)
@ -48,6 +50,8 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
f_handler.setFormatter(f_formatter)
LOG.addHandler(f_handler)
self.max_retry = max_retry
# connect to both clients
if use_old_keystone:
# Since we are not sure what keystone version will be used on the
@ -57,6 +61,7 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
tenant_name=source_os_tenant_name,
password=source_os_password,
auth_url=source_os_auth_url)
self.source_octavia = None
else:
self.source_neutron = self.connect_to_client(
username=source_os_username,
@ -65,6 +70,13 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
tenant_domain_id=source_os_tenant_domain_id,
password=source_os_password,
auth_url=source_os_auth_url)
self.source_octavia = self.connect_to_octavia(
username=source_os_username,
user_domain_id=source_os_user_domain_id,
tenant_name=source_os_tenant_name,
tenant_domain_id=source_os_tenant_domain_id,
password=source_os_password,
auth_url=source_os_auth_url)
if use_old_keystone_on_dest:
self.dest_neutron = client.Client(
@ -72,6 +84,7 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
tenant_name=dest_os_tenant_name,
password=dest_os_password,
auth_url=dest_os_auth_url)
self.dest_octavia = None
else:
self.dest_neutron = self.connect_to_client(
username=dest_os_username,
@ -80,6 +93,14 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
tenant_domain_id=dest_os_tenant_domain_id,
password=dest_os_password,
auth_url=dest_os_auth_url)
self.dest_octavia = self.connect_to_octavia(
username=dest_os_username,
user_domain_id=dest_os_user_domain_id,
tenant_name=dest_os_tenant_name,
tenant_domain_id=dest_os_tenant_domain_id,
password=dest_os_password,
auth_url=dest_os_auth_url)
self.dest_plugin = dest_plugin
LOG.info("Starting NSX migration to %s.", self.dest_plugin)
@ -91,9 +112,11 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
self.migrate_floatingips()
self.migrate_routers_routes(routers_routes)
self.migrate_fwaas()
if self.source_octavia and self.dest_octavia:
self.migrate_octavia()
LOG.info("NSX migration is Done.")
def connect_to_client(self, username, user_domain_id,
def _get_session(self, username, user_domain_id,
tenant_name, tenant_domain_id,
password, auth_url):
auth = identity.Password(username=username,
@ -102,10 +125,31 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
project_name=tenant_name,
project_domain_id=tenant_domain_id,
auth_url=auth_url)
sess = session.Session(auth=auth)
return session.Session(auth=auth)
def connect_to_client(self, username, user_domain_id,
tenant_name, tenant_domain_id,
password, auth_url):
sess = self._get_session(username, user_domain_id,
tenant_name, tenant_domain_id,
password, auth_url)
neutron = client.Client(session=sess)
return neutron
def connect_to_octavia(self, username, user_domain_id,
tenant_name, tenant_domain_id,
password, auth_url):
sess = self._get_session(username, user_domain_id,
tenant_name, tenant_domain_id,
password, auth_url)
endpoint = sess.get_endpoint(service_type='load-balancer')
client = octavia.OctaviaAPI(
session=sess,
service_type='load-balancer',
endpoint=endpoint,
)
return client
def find_subnet_by_id(self, subnet_id, subnets):
for subnet in subnets:
if subnet['id'] == subnet_id:
@ -696,3 +740,203 @@ class ApiReplayClient(utils.PrepareObjectForMigration):
self.dest_neutron.create_fwaas_firewall_group)
LOG.info("FWaaS V2 migration done")
def _wait_for_lb_up(self, lb_id):
retry_num = 0
while retry_num < self.max_retry:
lb = self.dest_octavia.load_balancer_show(lb_id)
if not lb['provisioning_status'].startswith('PENDING'):
if lb['provisioning_status'] == 'ACTIVE':
return True
# No point in trying any more
return False
retry_num = retry_num + 1
time.sleep(1)
return False
def _migrate_octavia_lb(self, lb, orig_map):
# Create the loadbalancer:
body = self.prepare_lb_loadbalancer(lb)
try:
new_lb = self.dest_octavia.load_balancer_create(
json={'loadbalancer': body})['loadbalancer']
except Exception as e:
LOG.error("Failed to create loadbalancer (%(lb)s): %(e)s",
{'lb': lb, 'e': e})
return
new_lb_id = new_lb['id']
if not self._wait_for_lb_up(new_lb_id):
LOG.error("New loadbalancer %s does not become active", new_lb_id)
return
listeners_map = {}
for listener_dict in lb.get('listeners', []):
listener_id = listener_dict['id']
listener = orig_map['listeners'][listener_id]
body = self.prepare_lb_listener(listener)
# Update loadbalancer in listener
body['loadbalancer_id'] = new_lb_id
try:
new_listener = self.dest_octavia.listener_create(
json={'listener': body})['listener']
except Exception as e:
LOG.error("Failed to create listener (%(list)s): %(e)s",
{'list': listener, 'e': e})
return
if not self._wait_for_lb_up(new_lb_id):
LOG.error("New loadbalancer %s does not become active after "
"listener creation", new_lb_id)
return
# map old-id to new
listeners_map[listener_id] = new_listener['id']
pools_map = {}
for pool_dict in lb.get('pools', []):
pool_id = pool_dict['id']
pool = orig_map['pools'][pool_id]
body = self.prepare_lb_pool(pool)
# Update loadbalancer and listeners in pool
body['loadbalancer_id'] = new_lb_id
if pool.get('listeners'):
body['listener_id'] = listeners_map[pool['listeners'][0]['id']]
try:
new_pool = self.dest_octavia.pool_create(
json={'pool': body})['pool']
except Exception as e:
LOG.error("Failed to create pool (%(pool)s): %(e)s",
{'pool': pool, 'e': e})
return
if not self._wait_for_lb_up(new_lb_id):
LOG.error("New loadbalancer %s does not become active after "
"pool creation", new_lb_id)
return
# map old-id to new
pools_map[pool_id] = new_pool['id']
# Add members to this pool
source_members = self.source_octavia.member_list(pool_id)[
'members']
for member in source_members:
body = self.prepare_lb_member(member)
try:
self.dest_octavia.member_create(
new_pool['id'], json={'member': body})['member']
except Exception as e:
LOG.error("Failed to create member (%(member)s): %(e)s",
{'member': member, 'e': e})
return
if not self._wait_for_lb_up(new_lb_id):
LOG.error("New loadbalancer %s does not become active "
"after member creation", new_lb_id)
return
# Add pool health monitor
if pool.get('healthmonitor_id'):
hm_id = pool['healthmonitor_id']
hm = orig_map['hms'][hm_id]
body = self.prepare_lb_hm(hm)
# Update pool id in hm
body['pool_id'] = new_pool['id']
try:
self.dest_octavia.health_monitor_create(
json={'healthmonitor': body})['healthmonitor']
except Exception as e:
LOG.error("Failed to create healthmonitor (%(hm)s): %(e)s",
{'hm': hm, 'e': e})
return
if not self._wait_for_lb_up(new_lb_id):
LOG.error("New loadbalancer %s does not become active "
"after health monitor creation", new_lb_id)
return
# Add listeners L7 policies
for listener_id in listeners_map:
listener = orig_map['listeners'][listener_id]
for l7pol_dict in listener.get('l7policies', []):
l7pol = orig_map['l7pols'][l7pol_dict['id']]
body = self.prepare_lb_l7policy(l7pol)
# Update pool id in l7 policy
body['listener_id'] = listeners_map[listener_id]
# update redirect_pool_id
if l7pol.get('redirect_pool_id'):
body['redirect_pool_id'] = pools_map[
l7pol['redirect_pool_id']]
try:
new_pol = self.dest_octavia.l7policy_create(
json={'l7policy': body})['l7policy']
except Exception as e:
LOG.error("Failed to create l7policy (%(l7pol)s): "
"%(e)s", {'l7pol': l7pol, 'e': e})
return
if not self._wait_for_lb_up(new_lb_id):
LOG.error("New loadbalancer %s does not become active "
"after L7 policy creation", new_lb_id)
return
# Add the rules of this policy
source_l7rules = self.source_octavia.l7rule_list(
l7pol['id'])['rules']
for rule in source_l7rules:
body = self.prepare_lb_l7rule(rule)
try:
self.dest_octavia.l7rule_create(
new_pol['id'], json={'rule': body})['rule']
except Exception as e:
LOG.error("Failed to create l7rule (%(rule)s): "
"%(e)s", {'rule': rule, 'e': e})
return
if not self._wait_for_lb_up(new_lb_id):
LOG.error("New loadbalancer %s does not become "
"active after L7 rule creation",
new_lb_id)
return
def _map_orig_objects_of_type(self, source_objects):
result = {}
for obj in source_objects:
result[obj['id']] = obj
return result
def _map_orig_lb_objects(self, source_listeners, source_pools,
source_hms, source_l7pols):
result = {
'listeners': self._map_orig_objects_of_type(source_listeners),
'pools': self._map_orig_objects_of_type(source_pools),
'hms': self._map_orig_objects_of_type(source_hms),
'l7pols': self._map_orig_objects_of_type(source_l7pols),
}
return result
def migrate_octavia(self):
"""Migrates Octavia objects from source to dest neutron.
Right now the Octavia objects are created with new IDS, and
do not keep their original IDs
"""
# TODO(asarfaty): Keep original ids
try:
source_loadbalancers = self.source_octavia.\
load_balancer_list()['loadbalancers']
source_listeners = self.source_octavia.listener_list()['listeners']
source_pools = self.source_octavia.pool_list()['pools']
source_hms = self.source_octavia.\
health_monitor_list()['healthmonitors']
source_l7pols = self.source_octavia.l7policy_list()['l7policies']
except Exception as e:
# Octavia might be disabled in the source
LOG.info("Octavia was not found on the source server: %s", e)
return
try:
self.dest_octavia.load_balancer_list()
except Exception as e:
# Octavia might be disabled in the destination
LOG.warning("Skipping Octavia migration. Octavia was not found "
"on the destination server: %s", e)
return
orig_map = self._map_orig_lb_objects(source_listeners, source_pools,
source_hms, source_l7pols)
total_num = len(source_loadbalancers)
LOG.info("Migrating %d loadbalancer(s)", total_num)
for lb in source_loadbalancers:
self._migrate_octavia_lb(lb, orig_map)

View File

@ -102,6 +102,22 @@ class PrepareObjectForMigration(object):
drop_fwaas_policy_fields = []
drop_fwaas_group_fields = ['status']
lb_ignore_fields = ['created_at', 'updated_at', 'operating_status',
'provisioning_status', 'id']
drop_lb_loadbalancer_fields = lb_ignore_fields + [
'listeners', 'pools', # Those objects will be created laster
'vip_subnet_id', # vip_port_id will be used
'flavor_id', # not supported by the driver
]
drop_lb_listener_fields = lb_ignore_fields + [
'loadbalancers', 'l7policies', 'default_pool_id']
drop_lb_pool_fields = lb_ignore_fields + [
'loadbalancers', 'healthmonitor_id', 'listeners', 'members']
drop_lb_member_fields = lb_ignore_fields
drop_lb_hm_fields = lb_ignore_fields + ['pools']
drop_lb_l7policy_fields = lb_ignore_fields + ['rules']
drop_lb_l7rule_fields = lb_ignore_fields
def drop_fields(self, item, drop_fields):
body = {}
for k, v in item.items():
@ -248,6 +264,10 @@ class PrepareObjectForMigration(object):
if 'device_owner' not in body:
body['device_owner'] = ""
if body.get('device_owner') == 'Octavia':
# remove device id & owner. Octavia will re-set it.
body['device_id'] = ""
body['device_owner'] = ""
return body
def prepare_floatingip(self, fip, direct_call=False):
@ -273,3 +293,24 @@ class PrepareObjectForMigration(object):
def prepare_fwaas_group(self, group):
self.fix_description(group)
return self.drop_fields(group, self.drop_fwaas_group_fields)
def prepare_lb_loadbalancer(self, lb_obj):
return self.drop_fields(lb_obj, self.drop_lb_loadbalancer_fields)
def prepare_lb_listener(self, lb_obj):
return self.drop_fields(lb_obj, self.drop_lb_listener_fields)
def prepare_lb_pool(self, lb_obj):
return self.drop_fields(lb_obj, self.drop_lb_pool_fields)
def prepare_lb_member(self, lb_obj):
return self.drop_fields(lb_obj, self.drop_lb_member_fields)
def prepare_lb_hm(self, lb_obj):
return self.drop_fields(lb_obj, self.drop_lb_hm_fields)
def prepare_lb_l7policy(self, lb_obj):
return self.drop_fields(lb_obj, self.drop_lb_l7policy_fields)
def prepare_lb_l7rule(self, lb_obj):
return self.drop_fields(lb_obj, self.drop_lb_l7rule_fields)